大数据算法之空间亚线性算法(一)

水库抽样算法

Posted by Jason Lee on 2019-05-23

问题描述

  • 输入:一组数据,大小未知
  • 输出:这组数据的K个均匀抽取
  • 要求:仅扫描一次
  • 总体要求:从N个元素中随机的抽取k个元素,其中N无法确定,保证每个元素抽到的概率相同

问题难点

首先明确什么叫均匀抽样,就是每个数据被等概率抽取呗。即当样本总体为n 时候,每个样本被抽取的概率为1/n,如果要抽取k个元素,那么每个样本被抽到的概率为k/n。但是我们实现不知道总体的数量n。如果就只有n=K个元素,可用空间正好也是K,那我们正好把K个数据都保存起来,但是如果n>K呢?要想均匀抽样,概率当然是

问题解析

思路分析

假设我们有一个数据流,分别是1,2,3,起初我们并不知道这个数据流的大小。 我们要从中随机抽取一个数字,如果是均匀的抽取,每个数字被抽到的概率为1/3,首先我们需要新建一个数组,由于我们只需要取一个数据,所以数组的长度为1,现在我们有

  • 未知数据流 data_stream = [1,2,3]
  • 样本数组 sample = [] (长度为1)

我们开始读取数据1

  1. sample : [1], 假设 这个时候结束,那么样本被抽取的概率为 1,既100%被抽到
  2. 再次读取数据2 sample=[1,2],由于我们只需要一个数据,所以1 和 2 必须淘汰一个,那么如何淘汰呢?假设读到2就已经结束了,那么总体就变成了2个,2个这个时候,那么每个样本被抽取的概率就要变成1/2 才符合要求。这个时候,我们需要让2 以1/2的概率被淘汰,这样1和2被留下的概率就相等,都是1/2 满足要求。
  3. 再次读取数据3 sample=[?,3] 因为在第二步,我们已经淘汰了1和2 所以这里用?表示被留下的那个数 即 1 或者 2。 这个时候,我们应该如何淘汰3 才能让1,2,3 等概率被淘汰呢?
    现在样本变成了3个,那么如果要满足提议,那必须让3 以1/3的概率留下,让3以2/3概率淘汰,那么这样来看
    • ?(1或者2)被留下的概率为(?留下,3淘汰)1/2 * 2/3
    • 3 被留下的 概率为 1/3
    • 这样刚好满足题意
  • 依次类推。

来看证明:

这里做个说明:1 - 1/(i+1) = i/(i+1) 表示没有被选中的概率.
总结起来就是一句话每个数取到的概率等于取到该数且取不到该数后面所有数的概率。
如:取到第10个数的概率等于取到第十个数且取不到第11到第n个数的概率
现在我们回到较复杂的情况,也就是如何在一个N个数(开始不知道N是几)中随机取M个数。其实思想是一样的,就是先取出前M个,然后对后面的开始每个以(k/(i))的概率进行替换,这样我们得到的就是所要的结果,证明如下:

思路推广

  • 从一个数据流中获取数据,保存在k个数组当中。
  • 数据流中前K个数值直接保存在数组中
  • 后面的在来的数据i 以 k/n 的概率 替换数组中的随机一个

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import random
import numpy as np
import matplotlib.pyplot as plt

class ReservoirSampling(object):
def __init__(self, k):
self.sample = []
self.k = k
self.time = 0

def addItem(self, item):
i = len(self.sample)
if i < self.k:
self.sample.append(item)
else:
M = int(random.uniform(0, self.time + 1))
if M < self.k :
self.sample[M] = item
self.time +=1
return self

def getSample(self):
return self.sample

def count(self,count_arrray):
return [self.sample.count(i) for i in count_arrray]



def test(array_item_total, k):
a = np.array([[i]*array_item_total for i in range(3)])#生成等量的0,1,2
L0 = a[0]
L1 = a[1]
L2 = a[2]
rs = ReservoirSampling(k)
for i in L0:
rs.addItem(i)
print("finsh 1")

for i in L1:
rs.addItem(i)
print("finsh 2")
for i in L2:
rs.addItem(i)
print("finsh 3")
l1 = ['value=%d'% x for x in range(3)]
plt.pie(rs.count([0,1,2]),labels=l1,labeldistance=0.1,autopct='%1.2f%%')
plt.title("Reservoir sampling")
plt.show()
if __name__ == '__main__':
test(10000,5000)

看一下结果

总结



支付宝打赏 微信打赏

赞赏一下