vendredi 10 août 2018

dask distributed error when wrapping numpy random generator

Dask is great, yet when moving to use distributed, there are some things to take care of. However, the following example shows use of a random generator wrapper for numpy that breaks with dask distributed.

import dask
import subprocess
import dask.distributed
import os
import numpy

try:
    os.mkdir('tmp')
except:
    print 'tmp dir already exists'
DaskClient = None
# commenting the next line, makes things work
DaskClient = dask.distributed.Client('127.0.0.1:8786')
WorkingDir = os.getcwd()

# Remove this class to make the program work with distributed
class NumpyRandomWrapper(numpy.random.RandomState):
    def __init__(self, seed=None):
        super(NumpyRandomWrapper, self).__init__(seed)

    def sample(self, population, k):
        return self.choice(population, k, replace=False)

    def random(self):
        return self.random_sample()

@dask.delayed
def MapElement(RunNumber):
    "Compute Element"
    os.chdir(WorkingDir)
    Random = 1
    # To make this work with distributed, comment the next two lines
    RandomGeneratorToUse = NumpyRandomWrapper(None) 
    Random = RandomGeneratorToUse.random()
    FileNameOut = 'tmp\\Out%i.txt'%RunNumber
    FileNameErr = 'tmp\\Err%i.txt'%RunNumber
    FileNameBatch = 'tmp\\Bacht%i.bat'%RunNumber
    FileOut = open (FileNameOut,'wb')
    FileErr = open (FileNameErr,'wb')    
    FileBatch = open (FileNameBatch,'w')    
    Script = ''
    for Entry in range(int(10000*Random)):
        Script = Script + 'echo ' + str(Entry) +' \n'  
    FileBatch.write(Script)
    FileBatch.close()
    subprocess.call(args = FileNameBatch, stdout = FileOut, stderr = FileErr, shell=True)       
    FileOut.close()  
    FileErr.close()
    return FileNameOut

@dask.delayed
def DelayedReduce(Array):
    "Reduce the array"
    OutStr = ''
    for Entry in Array:
        print Entry
        OutStr = OutStr + str(Entry) + ' , ' 
    FileName = 'tmp\\Summary.txt'
    FileOut = open (FileName,'w')
    FileOut.write(OutStr)
    FileOut.close()    
    return FileName


ComputeArray = [ MapElement(Entry) for Entry in range(10)]
Out = DelayedReduce(ComputeArray) 
Value = Out.compute()
print Value
if DaskClient != None:
    DaskClient.close()

This code works nice without distributed - comment the link defining DaskClient This code also works fine if not using the numpy random generator wrapper - comment the lines declaring RandomGeneratorToUse and Random inside the MapElement function.

The random wrapper is necessary to my code, as well as dask distributed. Does anyone have a workaround that will make this code work in distributed environment.

Below is the error generated:

(base) C:\Users\Work\Desktop>python DaskTestDistrib.py
tmp dir already exists
distributed.protocol.pickle - INFO - Failed to serialize <function MapElement at 0x0000000005098F28>. Exception: No module named mtrand
Traceback (most recent call last):
  File "DaskTestDistrib.py", line 67, in <module>
    Value = Out.compute()
  File "C:\Users\Work\Anaconda2\lib\site-packages\dask\base.py", line 155, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\Work\Anaconda2\lib\site-packages\dask\base.py", line 404, in compute
    results = get(dsk, keys, **kwargs)
  File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\client.py", line 2079, in get
    user_priority=priority,
  File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\client.py", line 2028, in _graph_to_futures
    'tasks': valmap(dumps_task, dsk3),
  File "cytoolz/dicttoolz.pyx", line 165, in cytoolz.dicttoolz.valmap (cytoolz/dicttoolz.c:3277)
    cpdef object valmap(object func, object d, object factory=dict):
  File "cytoolz/dicttoolz.pyx", line 190, in cytoolz.dicttoolz.valmap (cytoolz/dicttoolz.c:3124)
    rv[<object>pkey] = func(<object>pval)
  File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\worker.py", line 718, in dumps_task
    return {'function': dumps_function(task[0]),
  File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\worker.py", line 682, in dumps_function
    result = pickle.dumps(func)
  File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\protocol\pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 900, in dumps
    cp.dump(obj)
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 234, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 360, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 508, in save_function_tuple
    save(f_globals)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 687, in _batch_setitems
    save(v)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 621, in save_global
    self.save_dynamic_class(obj)
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 452, in save_dynamic_class
    doc_dict,
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 554, in save_tuple
    save(element)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 554, in save_tuple
    save(element)
  File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 612, in save_global
    __import__(modname)
ImportError: No module named mtrand




Aucun commentaire:

Enregistrer un commentaire