mardi 21 juin 2016

pyspark. Transformer that generates a random number generates always the same number

I am trying to measure the performance impact on having to copy a dataframe from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:

from import Transformer
from import HasInputCol, HasOutputCol, Param
from import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        kwargs = self.__init__._input_kwargs

    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe

However I am having some weirdness. When testing my code the same random number is generated for all columns:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
transformedDF = myTestTransformer.transform(df)

| x1| x2|   x3|  randFloat|
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|

And then consecutive invocations of actually change the values!?
| x1| x2|   x3|  randFloat|
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|

In [3]:
| x1| x2|   x3|  randFloat|
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|

In [4]:
| x1| x2|   x3|  randFloat|
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|

In [5]:
| x1| x2|   x3| randFloat|
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|

Is this behavior expected? Does .show() actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.

Aucun commentaire:

Enregistrer un commentaire