In [48]:
import dask
from dask.distributed import Client
import prefect
from prefect.engine.executors import LocalDaskExecutor, DaskExecutor
from sklearn.linear_model import LinearRegression
import dask_ml
from dask_ml.datasets import make_classification, make_regression
import pandas as pd
import numpy as np
In [3]:
client = Client()
In [4]:
client
Out[4]:

Client

Cluster

  • Workers: 4
  • Cores: 8
  • Memory: 42.95 GB
In [11]:
X, y = make_regression(n_samples=500_000, n_features=100, random_state=42, chunks=100_000)
X
Out[11]:
Array Chunk
Bytes 400.00 MB 80.00 MB
Shape (500000, 100) (100000, 100)
Count 5 Tasks 5 Chunks
Type float64 numpy.ndarray
100 500000
In [18]:
pd.DataFrame(X.compute()).to_csv("X_test", header=False, index=False, sep="\t")
pd.DataFrame(y.compute()).to_csv("y_test", header=False, index = False, sep="\t")
In [51]:
np.random.seed(42)
In [54]:
@prefect.task
def read_data(file1:str, file2:str):
    X = pd.read_csv("X_test", header=None, sep="\t")
    y = pd.read_csv("y_test", header=None, sep="\t")
    return X, y

@prefect.task
def make_random_iter(num_batches:int):
    return np.random.rand(num_batches)

@prefect.task
def fit_create_model(X_y, rand:float):
    X,y = X_y
   
    coef_list =[]
    model = LinearRegression()
    for i in range(10):
        y = y + np.random.rand()
        coef_list.append(model.fit(X,y).coef_[0])
    return coef_list
In [55]:
with prefect.Flow("test") as flow:
    X_file = prefect.Parameter("X_file")
    y_file = prefect.Parameter("y_file")
    num_100_batches = prefect.Parameter("num_100_batches")
    
    X_y = read_data(X_file, y_file) #cannot split here because lazy evaluate
    
    rand_list = make_random_iter(num_100_batches)
    coef_list = fit_create_model.map(prefect.unmapped(X_y), rand_list)
In [56]:
run_sequential = flow.run(num_100_batches=10, y_file="y_test", X_file="X_test")
[2020-04-17 14:24:37,955] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-04-17 14:24:37,960] INFO - prefect.FlowRunner | Starting flow run.
[2020-04-17 14:24:37,977] INFO - prefect.TaskRunner | Task 'num_100_batches': Starting task run...
[2020-04-17 14:24:37,982] INFO - prefect.TaskRunner | Task 'num_100_batches': finished task run for task with final state: 'Success'
[2020-04-17 14:24:37,991] INFO - prefect.TaskRunner | Task 'y_file': Starting task run...
[2020-04-17 14:24:37,995] INFO - prefect.TaskRunner | Task 'y_file': finished task run for task with final state: 'Success'
[2020-04-17 14:24:38,010] INFO - prefect.TaskRunner | Task 'make_random_iter': Starting task run...
[2020-04-17 14:24:38,016] INFO - prefect.TaskRunner | Task 'make_random_iter': finished task run for task with final state: 'Success'
[2020-04-17 14:24:38,025] INFO - prefect.TaskRunner | Task 'X_file': Starting task run...
[2020-04-17 14:24:38,029] INFO - prefect.TaskRunner | Task 'X_file': finished task run for task with final state: 'Success'
[2020-04-17 14:24:38,039] INFO - prefect.TaskRunner | Task 'read_data': Starting task run...
[2020-04-17 14:24:45,393] INFO - prefect.TaskRunner | Task 'read_data': finished task run for task with final state: 'Success'
[2020-04-17 14:24:45,401] INFO - prefect.TaskRunner | Task 'fit_create_model': Starting task run...
[2020-04-17 14:24:45,407] INFO - prefect.TaskRunner | Task 'fit_create_model[0]': Starting task run...
[2020-04-17 14:25:21,204] INFO - prefect.TaskRunner | Task 'fit_create_model[0]': finished task run for task with final state: 'Success'
[2020-04-17 14:25:21,217] INFO - prefect.TaskRunner | Task 'fit_create_model[1]': Starting task run...
[2020-04-17 14:25:58,400] INFO - prefect.TaskRunner | Task 'fit_create_model[1]': finished task run for task with final state: 'Success'
[2020-04-17 14:25:58,410] INFO - prefect.TaskRunner | Task 'fit_create_model[2]': Starting task run...
[2020-04-17 14:26:35,581] INFO - prefect.TaskRunner | Task 'fit_create_model[2]': finished task run for task with final state: 'Success'
[2020-04-17 14:26:35,590] INFO - prefect.TaskRunner | Task 'fit_create_model[3]': Starting task run...
[2020-04-17 14:27:14,767] INFO - prefect.TaskRunner | Task 'fit_create_model[3]': finished task run for task with final state: 'Success'
[2020-04-17 14:27:14,778] INFO - prefect.TaskRunner | Task 'fit_create_model[4]': Starting task run...
[2020-04-17 14:27:49,878] INFO - prefect.TaskRunner | Task 'fit_create_model[4]': finished task run for task with final state: 'Success'
[2020-04-17 14:27:49,888] INFO - prefect.TaskRunner | Task 'fit_create_model[5]': Starting task run...
[2020-04-17 14:28:25,352] INFO - prefect.TaskRunner | Task 'fit_create_model[5]': finished task run for task with final state: 'Success'
[2020-04-17 14:28:25,367] INFO - prefect.TaskRunner | Task 'fit_create_model[6]': Starting task run...
[2020-04-17 14:29:05,007] INFO - prefect.TaskRunner | Task 'fit_create_model[6]': finished task run for task with final state: 'Success'
[2020-04-17 14:29:05,023] INFO - prefect.TaskRunner | Task 'fit_create_model[7]': Starting task run...
[2020-04-17 14:29:41,437] INFO - prefect.TaskRunner | Task 'fit_create_model[7]': finished task run for task with final state: 'Success'
[2020-04-17 14:29:41,446] INFO - prefect.TaskRunner | Task 'fit_create_model[8]': Starting task run...
[2020-04-17 14:30:18,685] INFO - prefect.TaskRunner | Task 'fit_create_model[8]': finished task run for task with final state: 'Success'
[2020-04-17 14:30:18,695] INFO - prefect.TaskRunner | Task 'fit_create_model[9]': Starting task run...
[2020-04-17 14:30:55,113] INFO - prefect.TaskRunner | Task 'fit_create_model[9]': finished task run for task with final state: 'Success'
[2020-04-17 14:30:55,119] INFO - prefect.TaskRunner | Task 'fit_create_model': finished task run for task with final state: 'Mapped'
[2020-04-17 14:30:55,122] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
In [58]:
#batches
len(run_sequential.result[coef_list].result)
Out[58]:
10
In [59]:
#number of models per batch
len(run_sequential.result[coef_list].result[0])
Out[59]:
10
In [60]:
#number of features in X
len(run_sequential.result[coef_list].result[0][0])
Out[60]:
100
In [ ]:
executor = DaskExecutor("tcp://127.0.0.1:63558")
flow.run(num_100_batches=10, y_file="y_test", X_file="X_test", executor=executor)
[2020-04-17 14:34:43,918] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-04-17 14:34:43,921] INFO - prefect.FlowRunner | Starting flow run.
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.scheduler - ERROR - Couldn't gather keys {'fit_create_model-734ac104-cca1-4b93-aca7-f50ddd436ed6': ['tcp://127.0.0.1:64810'], 'num_100_batches-691b4353-54e8-4e70-bcdf-4c7211b3cb40': ['tcp://127.0.0.1:64810'], 'make_random_iter-7d697750-530c-4bc6-b429-60fc81d33182': ['tcp://127.0.0.1:64810']} state: ['processing', 'memory', 'memory'] workers: ['tcp://127.0.0.1:64810']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:64810'], fit_create_model-734ac104-cca1-4b93-aca7-f50ddd436ed6
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:64810'], num_100_batches-691b4353-54e8-4e70-bcdf-4c7211b3cb40
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:64810'], make_random_iter-7d697750-530c-4bc6-b429-60fc81d33182
NoneType: None
distributed.client - WARNING - Couldn't gather 3 keys, rescheduling {'fit_create_model-734ac104-cca1-4b93-aca7-f50ddd436ed6': ('tcp://127.0.0.1:64810',), 'num_100_batches-691b4353-54e8-4e70-bcdf-4c7211b3cb40': ('tcp://127.0.0.1:64810',), 'make_random_iter-7d697750-530c-4bc6-b429-60fc81d33182': ('tcp://127.0.0.1:64810',)}