mardi 21 juin 2016

pyspark. Transformer that generates a random number generates always the same number

I am trying to measure the performance impact on having to copy a dataframe from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        self._setDefault(bogusarg=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe

However I am having some weirdness. When testing my code the same random number is generated for all columns:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()

+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+

And then consecutive invocations of transformedDF.show() actually change the values!?

transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2|   x3| randFloat|
+---+---+-----+----------+
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|
+---+---+-----+----------+

Is this behavior expected? Does .show() actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.




Aucun commentaire:

Enregistrer un commentaire