Parallel and Distributed Computing on Dagster with Dask

Ibrahim Halil Koyuncu
7 min readSep 2, 2023

--

Hi, in this article I am going to focus on parallel computing with Dask library and automate Dask computitons via Dagster. You can find general subjects and question in below.

  • Quick introduction about Dask Distributed
  • Quick introduction about Dagster
  • How to orchestrate Dask computations and integrate Dask with Dagster ?
  • Building a Machine Learning Pipeline Example

Quick Introduction about Dask Distributed

Dask.distributed is a Python library for distributed computing. It uses a central task scheduler to manage dynamic tasks across various machines, while an asynchronous, event-driven approach handles multiple client requests and worker progress. This setup allows flexible handling of diverse workloads from multiple users and can easily adapt to changing worker numbers due to failures or additions.

Dask Distributed General Architecture

According to above architecture, workers can be distributed across multiple machines in a cluster or run on a single machine. They communicate with the scheduler to receive tasks, report task progress, and return results.

On the other hand Scheduler coordinates the distribution of tasks to available workers for computation. The scheduler decides which tasks to execute next, handles task dependencies, and ensures that computations are performed efficiently. It also tracks the progress of tasks and collects results from workers. The scheduler is the brain behind the orchestration of tasks across a Dask computation.

The scheduler exposes an interface (usually through the Client object) for users to submit tasks and manage computations. Lets go with an example.

from dask.distributed import Client
client = Client() # set up local cluster on your machine

def square(x):
return x ** 2

def neg(x):
return -x

A = client.map(square, range(10))
B = client.map(neg, A)
total = client.submit(sum, B)
total.result() #Wait until computation completes, gather result to local process

In the above example, map and submit methods used to launch computations on the cluster. The map/submit functions send the function and arguments to the remote workers for processing. They return Future objects that refer to remote data on the cluster. The Future returns immediately while the computations run remotely in the background.

Quick Introduction About Dagster

Dagster is a powerful data orchestrator that simplifies the development, testing, and deployment of complex data workflows. It provides a unified framework for defining and managing data pipelines, ensuring reliable and transparent data processing. I will deep dive into Dagster in another articles but I would like to explain some components in Dagster to simulate Dask integration for data processing.

Asset: An asset is a fundamental unit representing a significant piece of data within a data ecosystem. Assets help manage and track data lineage, quality, and metadata. They provide a structured way to understand, document, and govern key data elements that flow through data pipelines and workflows, ensuring data reliability and traceability.

Ops: Ops are the core unit of computation in Dagster or a solid. It represents a single, self-contained task or transformation within a data pipeline. Ops are modular and reusable, allowing you to compose complex workflows by connecting them together.

Jobs: Jobs are the main unit of execution and monitoring in Dagster pipeline. It represents a set of computations defined by a pipeline that can be run on a defined schedule, triggered manually from Dagster UI or when external change occur using sensor(it is a another component in Dagster). Jobs encapsulate the pipeline logic, runtime configuration, and execution context, making it easy to automate and manage data workflows.

Resource: Resource is an object that provides external dependencies or services to your ops within a pipeline. Resources can represent various external systems or services, such as database connections, cloud storage, APIs, or anything your ops need to perform their tasks. Resources enable you to encapsulate and manage these dependencies, making it easier to share and reuse them across multiple ops and pipelines. They enhance modularity, testability, and maintainability in your data workflows by decoupling the logic of your ops from their external dependencies.

Thats all ! So lets go with creating data pipeline in Dagster and compute tasks over Dask Cluster.

How to orchestrate Dask computations and integrate Dask with Dagster

To use Dask with Dagster, we should define a dask resource to set up a Dask client to share Dask configuration across Dagster components. So lets define a local cluster.

from dagster import ConfigurableResource, Definitions
from distributed import Client, LocalCluster

class dask_resource(ConfigurableResource):

def make_dask_cluster(self, n_workers) -> Client:
client = Client(LocalCluster(n_workers=n_workers))
return client

We have completed the setup of our resource with above example. Now make available it to use this resource by Dagster.

from dagster import (
Definitions, load_assets_from_modules,
)
from . import assets
from .resources import dask_resource

all_assets = load_assets_from_modules([assets]) #collect all assets

defs = Definitions( #load components
assets=all_assets,
resources={
"my_dask_resource": dask_resource,
},
)

Resources and assests are loaded in Dagster via passing them to the Definitions. Definitions include assets, jobs, resources, schedules, and sensors components. Now, lets use dask resource in the asset to parallelize computation.

from dagster import asset
from tutorial.resources import dask_resource

@asset
def simple_dask_asset(my_dask_resource: dask_resource): #parameter name represents name of the resource key in the Definitions block
client = my_dask_resource.make_dask_cluster(my_dask_resource, n_workers=5)
def square(x):
return x ** 2
def neg(x):
return -x
A = client.map(square, range(10000))
B = client.map(neg, A)
total = client.submit(sum, B) #send computation to cluster
return total.result() #get result from cluster

In the above assets, we passed the local cluster client via its key name which is previously defined in the Definitions block, and we added two function to make computation over local dask cluster.

Thats all ! We are ready to make some computition over the dask cluster :) When you run “dagster dev” command and you go to the Dagster Dashboard you can see your asset like in below.

When you click the “Materialize selected” button you manually started the computition. Since operation started, dask dashboard also available in the default link which is “http://localhost:8787/status”. You can monitor your computation from dask dashboard.

Be aware of that, we send the three computation which are “square”, “neg” and “sum” methods and set five workers. You can verify that they are available in the dask dashboard. You can monitor data processing progress from dask dashboard.

Building a Machine Learning Pipeline

In previous section, we learned how to orchestrate dask computation in Dagster. Lets move one step further with creating machine learning pipeline in dagster and train model with dask in parallel.

from dagster import AssetOut, multi_asset
from dask_ml import model_selection
from sklearn.ensemble import RandomForestClassifier

from tutorial.resources import dask_resource
from dask_ml.datasets import make_classification
from dagster import asset

@asset
def my_classification_data():
X, y = make_classification(chunks=200, n_samples=10000)
return X, y


@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def train_test_data(my_classification_data):
X, y = my_classification_data
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y)
return (X_train, y_train), (X_test, y_test)


@asset
def my_model(context, my_dask_resource: dask_resource, training_data):
client = my_dask_resource.make_dask_cluster(self=my_dask_resource, n_workers=5)
X_train, y_train = training_data
model = RandomForestClassifier(verbose=1)
result = client.submit(train,model,X_train,y_train)
return result.result()


@asset
def my_score(context, test_data, my_model):
X_test, y_test = test_data
my_score = my_model.predict(X_test)
context.add_output_metadata({'score': str(my_score.tolist())})
return my_score


def train(model,X, y):
model.fit(X, y)
return model

Let’s take a closer look at this example.

  • We are creating synthetic data in “my_classification_data” method using dask_ml library and load it into dagster as asset.
  • After creating sample dataset, trying to splitting data into training and testing sets in the train_test_data method and load these output into dagster with “training_data” and “test_data” keyword.
  • In the “my_model” method, we are trying to train a model within the dask cluster in parallel.
  • Finally we can make prediction over the model with “test_data” and put the result to output metadata. Be aware of that every asset represent various data and we can use them in our operations with their keywords with passing as parameters to another assets. You can find the pipeline in below.
Sample Dagster Pipeline

To summary we create a machine learning pipeline step by step, firstly we created a sample dataset and split the dataset into traing and test sets and start to train model with training data set, finally used to model with test data set. Assets creates relationship between each other via their name so pipeline executed respect to this relationship.

--

--

Ibrahim Halil Koyuncu
Ibrahim Halil Koyuncu

No responses yet