jeudi 23 juillet 2015

Reservoir Sampling on large Streams

I am trying to implement a reservoir sampling algorithm using java. I have N streams of data ( readings from sensors arriving at a sink node ) of unknown size. For the sake of simplicity lets assume i have one stream of unknown size.

So what one of the reservoir sampling algorithms suggests is to create a reservoir of size reservoirSize. Lets say it is 5. The first five readings you get, store them in your reservoir. Ok. Now as you get more and more readings, for each reading generate a random number from 0 to reading number and if that random number is smaller than the reservoirSize then store the reading in the reservoir[randomNumber].

So lets say i have reservoirSize = 5 and i just got my 10th reading. I will generate a random number from 0 to 10 and if that number is smaller than 5 i wll store the reading there where the random number points. Lets say random number is 3 so i store reading number 10 in reservoir[3].

public void sample (Vector pool, double Measurement, int streamIndex) {

    if (streamIndex < ReservoirSize){
        pool.addElement(Double.toString(Measurement));
    }
    else if ((randomIndex=(int)ranNum.nextInt((streamIndex+1)))<ReservoirSize) {
        pool.setElementAt(Double.toString(Measurement), randomIndex);
    }    
}  

The problem with this code is that once the streamIndex gets big enough ( above 4.000 for example ) i rarely sample any readings. And it does make sense because the propability of generating a random number from 0 to 4000 that is smaller than 5 is significanly smaller than the propability to generate a random number from 0 to lets say 100, that is smaller than 5.

I also implemented AlgorthmR from Vitters paper and another way described here:
Gregable ReservoirSampling

but all implementations have the same problem. The larger the stream gets the smaller the sampling frequency becomes. So for a sampling rate of 0.5s, one hour after i start sampling (which means that about 7000 readings have been forwarded to the sink node ), a change in the measured quantity will not be detected for another good half an hour i.e the reading indicating the change will be discarded from the reservoir.

AlgorthmR implemantation

    public RSAlgorithmR() {
        this.currentPool = null;
        this.randomStoreatIndex = 0;
        this.randomIndex = 0;
        this.ranNum = new Random();
    }

    public void sample (LLNode cNode, double Measurement) {

    int streamIndex = cNode.getStreamIndex();
    int storeatIndex =cNode.getStoreatIndex();


    if (streamIndex < ReservoirSize) {
        cNode.data.addElement(Double.toString(Measurement));

        if (streamIndex == ( ReservoirSize - 1) ) {
            randomStoreatIndex = (int)ranNum.nextInt(ReservoirSize);
            cNode.setStoreatIndex((int)randomStoreatIndex);                
        }
    }
    else {
        if (storeatIndex == streamIndex) {
            randomIndex=(int)ranNum.nextInt(ReservoirSize);
            cNode.data.setElementAt(Double.toString(Measurement), randomIndex); 

            randomStoreatIndex = (int)ranNum.nextInt(streamIndex - ReservoirSize) + ReservoirSize;
            cNode.setStoreatIndex(randomStoreatIndex);

            System.out.println("Index:: "+streamIndex);
            System.out.println("randomIndex:: " + randomIndex);  
        }               
    }
    cNode.setStreamIndex(); 
};  

Gregable Implementation

    public ReservoirSampler() {

        this.currentPool = null;
        this.randomIndex = 0;
        this.ranProp = new Random();
        this.ranInd = new Random();
    }

    public void sample (LLNode currentSpot, double humidityRead, 
        double temperatureRead, int streamIndex) {

    double acceptancePropability = (double)ReservoirSize/streamIndex;

    if (streamIndex < ReservoirSize){
        currentSpot.humidityData.addElement(Double.toString(humidityRead));
        currentSpot.temperatureData.addElement(Double.toString(temperatureRead));
    }
    else {

        ranProp.setSeed(System.currentTimeMillis());
        randomPropability=(double)ranProp.nextDouble();


        if ( randomPropability < acceptancePropability){
            ranInd.setSeed(System.currentTimeMillis());
            randomIndex=(int)ranInd.nextInt((ReservoirSize));
            currentSpot.humidityData.setElementAt(Double.toString(humidityRead),randomIndex);
            currentSpot.temperatureData.setElementAt(Double.toString(temperatureRead),randomIndex);

        }
    }                 
}  

Is that the normal behaviour of the algorthm or am i missing something here? And if that is the normal behaviour is there a way to make it work more "accuratelly"?




Aucun commentaire:

Enregistrer un commentaire