Dask is great, yet when moving to use distributed, there are some things to take care of. However, the following example shows use of a random generator wrapper for numpy that breaks with dask distributed.
import dask
import subprocess
import dask.distributed
import os
import numpy
try:
os.mkdir('tmp')
except:
print 'tmp dir already exists'
DaskClient = None
# commenting the next line, makes things work
DaskClient = dask.distributed.Client('127.0.0.1:8786')
WorkingDir = os.getcwd()
# Remove this class to make the program work with distributed
class NumpyRandomWrapper(numpy.random.RandomState):
def __init__(self, seed=None):
super(NumpyRandomWrapper, self).__init__(seed)
def sample(self, population, k):
return self.choice(population, k, replace=False)
def random(self):
return self.random_sample()
@dask.delayed
def MapElement(RunNumber):
"Compute Element"
os.chdir(WorkingDir)
Random = 1
# To make this work with distributed, comment the next two lines
RandomGeneratorToUse = NumpyRandomWrapper(None)
Random = RandomGeneratorToUse.random()
FileNameOut = 'tmp\\Out%i.txt'%RunNumber
FileNameErr = 'tmp\\Err%i.txt'%RunNumber
FileNameBatch = 'tmp\\Bacht%i.bat'%RunNumber
FileOut = open (FileNameOut,'wb')
FileErr = open (FileNameErr,'wb')
FileBatch = open (FileNameBatch,'w')
Script = ''
for Entry in range(int(10000*Random)):
Script = Script + 'echo ' + str(Entry) +' \n'
FileBatch.write(Script)
FileBatch.close()
subprocess.call(args = FileNameBatch, stdout = FileOut, stderr = FileErr, shell=True)
FileOut.close()
FileErr.close()
return FileNameOut
@dask.delayed
def DelayedReduce(Array):
"Reduce the array"
OutStr = ''
for Entry in Array:
print Entry
OutStr = OutStr + str(Entry) + ' , '
FileName = 'tmp\\Summary.txt'
FileOut = open (FileName,'w')
FileOut.write(OutStr)
FileOut.close()
return FileName
ComputeArray = [ MapElement(Entry) for Entry in range(10)]
Out = DelayedReduce(ComputeArray)
Value = Out.compute()
print Value
if DaskClient != None:
DaskClient.close()
This code works nice without distributed - comment the link defining DaskClient This code also works fine if not using the numpy random generator wrapper - comment the lines declaring RandomGeneratorToUse and Random inside the MapElement function.
The random wrapper is necessary to my code, as well as dask distributed. Does anyone have a workaround that will make this code work in distributed environment.
Below is the error generated:
(base) C:\Users\Work\Desktop>python DaskTestDistrib.py
tmp dir already exists
distributed.protocol.pickle - INFO - Failed to serialize <function MapElement at 0x0000000005098F28>. Exception: No module named mtrand
Traceback (most recent call last):
File "DaskTestDistrib.py", line 67, in <module>
Value = Out.compute()
File "C:\Users\Work\Anaconda2\lib\site-packages\dask\base.py", line 155, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Users\Work\Anaconda2\lib\site-packages\dask\base.py", line 404, in compute
results = get(dsk, keys, **kwargs)
File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\client.py", line 2079, in get
user_priority=priority,
File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\client.py", line 2028, in _graph_to_futures
'tasks': valmap(dumps_task, dsk3),
File "cytoolz/dicttoolz.pyx", line 165, in cytoolz.dicttoolz.valmap (cytoolz/dicttoolz.c:3277)
cpdef object valmap(object func, object d, object factory=dict):
File "cytoolz/dicttoolz.pyx", line 190, in cytoolz.dicttoolz.valmap (cytoolz/dicttoolz.c:3124)
rv[<object>pkey] = func(<object>pval)
File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\worker.py", line 718, in dumps_task
return {'function': dumps_function(task[0]),
File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\worker.py", line 682, in dumps_function
result = pickle.dumps(func)
File "C:\Users\Work\Anaconda2\lib\site-packages\distributed\protocol\pickle.py", line 51, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 900, in dumps
cp.dump(obj)
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 234, in dump
return Pickler.dump(self, obj)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 360, in save_function
self.save_function_tuple(obj)
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 508, in save_function_tuple
save(f_globals)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 687, in _batch_setitems
save(v)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 621, in save_global
self.save_dynamic_class(obj)
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 452, in save_dynamic_class
doc_dict,
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 554, in save_tuple
save(element)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 554, in save_tuple
save(element)
File "C:\Users\Work\Anaconda2\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\Work\Anaconda2\lib\site-packages\cloudpickle\cloudpickle.py", line 612, in save_global
__import__(modname)
ImportError: No module named mtrand
Aucun commentaire:
Enregistrer un commentaire