Dagster — Complete Guide To Deploy Multiple Data Pipelines to Dagster on Docker Environment

Ibrahim Halil Koyuncu
8 min readSep 21, 2023

--

In this article, I am going to provide a step-by-step guide on deploying multiple data pipelines to Dagster within a Docker environment, offering a streamlined approach to managing and executing data workflows.

Pull Sample Dagster Project

Pull sample project with command in below.

git clone https://github.com/menendes/dagster_deployment_docker.git

I am going to follow this sample project to show you how to deploy Dagster to Docker environment so all files mentioned in below will point this project files.

In this project I created two pipelines which are pipeline-x and pipeline-y. Lets take a look closer whats happening in these pipelines.

Pipeline-X

  • Read data from S3 storage via “s3_io_manager” resource. This IO manager provided via Dagster so I just give some parameters such as aws_access_key and aws_secret_key to configure it.
  • After reading .csv file; basically filter the data respect to “Salary” information.
  • Save filtered data to etl_db. To do that, “postgres_io_manager” resource created for that purpose. Database connection information retrieved from environment variables.
  • Created a graph to run this OPs sequentially and converted the graph to job to make it Dagster runnable unit.
  • Each compute(OP) logs and result stored in the AWS S3 storage.

Pipeline-Y

  • Read data from Postgresql Database via “postgres_io_manager”.
  • After reading data from DB; add new column to data and filter it based on “Expectation” column, to basically demonstrate “transform-data” operation.
  • Finally write transformed data to .csv file and put it into S3 storage using “s3_io_manager” resource.
  • Created a graph to run this OPs sequentially and converted the graph to job to make it Dagster runnable unit.
  • Each compute(OP) logs and result stored in the AWS S3 storage.

Now lets continue with how looks like general architecture and exploring deployment details.

Deploy Multiple Data Pipelines to Dagster on Docker

Lets start with review for general architecture.

General Architecture Diagram-1.0 for Pipelines

We have 3 main components which are Dagster Daemon, Dagster Web Server and Database.

  • Dagster WebServer: Serves the user interface and responds to GraphQL queries.
  • Dagster Daemon: Operates schedules, sensors, and run queuing.
  • Database: It is used for run storage, schedule storage and event log storage.
  • Code Locations: A code location is a collection of Dagster definitions loadable and accessible by Dagster. We can think that each code location represent a data pipeline. As you can see in below, code location provides isolated structure for each data pipeline.
Dagster Code Locations

Now we can continue with configuration files.

  • “workspace.yaml” : Workspace files contain a collection of user-defined code-locations and information about where to find them. Code locations are loaded via workspace files. Lets take a closer look at “workspace.yaml” structure.
load_from:
- grpc_server:
host: pipeline_x
port: 4000
location_name: "pipeline_x"
- grpc_server:
host: pipeline_y
port: 4047
location_name: "pipeline_y"

In this structure we defined how to serve information about data pipelines with access type in this case gRPC, host name and port number and finally location name(data pipeline name). In our example we have two data pipelines which are “pipeline_x” and “pipeline_y”.

  • “dagster.yml” : This file is used to load up what's known as the Dagster instance. The naming here is slightly misleading because the Dagster instance is not a process or a long-running daemon like the dagster-daemon or dagster-webserver. Instead, we can think of the Dagster instance as the root configuration file that contains storage, run-launcher, run-coordinator, compute log storage configurations and so on that the Dagster system (including the dagster-daemon and dagster-webserver) will use. I will give more details about this file and component in the next section.

You can find both of two files in our sample project. Now lets continue with examining of Dockerfile which belongs to our code location.

RUN pip install \
dagster \
dagster-postgres \
dagster-docker \
dagster-aws \
pandas
# Add repository code
WORKDIR /opt/dagster/app
RUN mkdir data
COPY . /opt/dagster/app/pipeline_x
# Run dagster gRPC server on port 4000
EXPOSE 4000
# CMD allows this to be overridden from run launchers or executors that want
# to run other commands against your repository
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "pipeline_x"]

In this Dockerfile, just upload your data pipeline to relevant working directory and start dagster api with gRPC connection. If you realize that, port number is same with workspace.yaml which points to this code location. If you have multiple code locations(data pipelines) you should create an docker image for each pipeline with different gRPC port and then add them to workspace.yml file and docker-compose file. To create an docker image for this pipeline run below command.

cd pipeline_x
docker build -t pipeline-x .
cd ../pipeline_y
docker build -t pipeline-y .

Until now, we have created our data pipeline docker images and configure the workspace.yml respect to it. Now lets create with dagster component image.

FROM python:3.10-slim
RUN pip install \
dagster \
dagster-graphql \
dagster-webserver \
dagster-postgres \
dagster-aws \
dagster-docker
# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/opt/dagster/dagster_home/
RUN mkdir -p $DAGSTER_HOME
COPY dagster.yaml workspace.yaml $DAGSTER_HOME
WORKDIR $DAGSTER_HOME

Both dagster-webserver and dagster-daemon services will use same image which is created from above Dockerfile content. Be aware that we copy our configurations files which are dagster.yml and workspace.yml into this image. So dagster-daemon services can read workspace.yml and trigger pipeline’s jobs according to it.

To create an image for dagster component run below command.

docker build -t dagster .

The final step is putting these docker images to docker-compose.yml file and run all services via docker-compose.

version: "3.9"

services:
dagster_postgresql:
image: postgres:11
container_name: dagster_postgresql
ports:
- "5432:5432"
environment:
POSTGRES_USER: "postgres_user"
POSTGRES_PASSWORD: "postgres_password"
POSTGRES_DB: "postgres_db"
networks:
- dagster_network

pipeline_x:
image: pipeline-x
restart: always
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
DAGSTER_CURRENT_IMAGE: "pipeline-x"
AWS_ACCESS_KEY_ID: "yourAWSKeyID"
AWS_SECRET_ACCESS_KEY: "yourAWSSecretAccessKey"
DATABASE_IP: "yourETLDbAddress"
DATABASE_PORT: "yourETLDbPort"
DATABASE_USER: "etl_user"
DATABASE_PASSWORD: "etl_password"
DATABASE_NAME: "etl_db"
networks:
- dagster_network
volumes:
- /opt/dagster/dagster_home:/tmp/dagster-data

pipeline_y:
image: pipeline-y
restart: always
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
DAGSTER_CURRENT_IMAGE: "pipeline-y"
AWS_ACCESS_KEY_ID: "yourAWSKeyID"
AWS_SECRET_ACCESS_KEY: "yourAWSSecretAccessKey"
DATABASE_IP: "yourETLDbAddress"
DATABASE_PORT: "yourETLDbPort"
DATABASE_USER: "etl_user"
DATABASE_PASSWORD: "etl_password"
DATABASE_NAME: "etl_db"
networks:
- dagster_network

dagster_webserver:
image: dagster
entrypoint:
- dagster-webserver
- -h
- "0.0.0.0"
- -p
- "3000"
- -w
- workspace.yaml
container_name: dagster_webserver
expose:
- "3000"
ports:
- "3000:3000"
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
volumes: # Make docker client accessible, so we can terminate containers from the webserver
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- dagster_network
depends_on:
- dagster_postgresql
- pipeline_x
- pipeline_y

# This service runs the dagster-daemon process, which is responsible for taking runs
# off of the queue and launching them, as well as creating runs from schedules or sensors.
dagster_daemon:
image: dagster
entrypoint:
- dagster-daemon
- run
container_name: dagster_daemon
restart: on-failure
environment:
DAGSTER_POSTGRES_USER: "postgres_user"
DAGSTER_POSTGRES_PASSWORD: "postgres_password"
DAGSTER_POSTGRES_DB: "postgres_db"
AWS_ACCESS_KEY_ID: "yourAWSKeyID"
AWS_SECRET_ACCESS_KEY: "yourAWSSecretAccessKey"
DATABASE_IP: "yourETLDbAddress"
DATABASE_PORT: "yourETLDbPort"
DATABASE_USER: "etl_user"
DATABASE_PASSWORD: "etl_password"
DATABASE_NAME: "etl_db"
volumes: # Make docker client accessible, so we can launch containers using host docker
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage
networks:
- dagster_network
depends_on:
- dagster_postgresql
- pipeline_y
- pipeline_x

networks:
dagster_network:
driver: bridge
name: dagster_network

As you can see the above docker-compose file, we added both data pipelines “pipeline_x” and “pipeline_y” as services. Each service runs the gRPC server that load own data pipeline code.By setting DAGSTER_CURRENT_IMAGE to its own image, we tell the run launcher to use this same image when launching runs in a new container.

“dagster_webserver” service serves Dagster user interface and loads your user code from each pipeline container, in this case “pipeline_x” and “pipeline_y”.

“dagster_daemon” service is responsible for taking runs from queue and launching them, as well as creating runs from schedules and sensors.

Also be aware of that to configure our resources such as “s3_io_manager” and “postgresql_io_manager”, environment variables added into pipelines containers and daemon container.

Now you are ready to deploy your data pipeline to Dagster on Docker environment. Just run command in below :)

docker-compose up -d

Thats all ! Now check the Dagster UI from “localhost:3000” address and check your pipeline status. It looks like in the below.

Dagster Dashboard with data pipelines “pipeline_x” and “pipeline_y”

From the dashboard you can check your data pipeline status, runs and their logs. Lets continue with exploring dagster storages, daemon types and run launcher with dagster configuration file.

Dagster Yaml File, Storages, Daemon Types, and Run Launchers

scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler

run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
# config:
# max_concurrent_runs:
# env: DAGSTER_OVERALL_CONCURRENCY_LIMIT
# tag_concurrency_limits:
# - key: "database"
# value: "redshift"
# limit:
# env: DAGSTER_REDSHIFT_CONCURRENCY_LIMIT
# - key: "dagster/backfill"
# limit:
# env: DAGSTER_BACKFILL_CONCURRENCY_LIMIT

run_launcher:
module: dagster_docker
class: DockerRunLauncher
config:
env_vars:
- DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- DATABASE_IP
- DATABASE_PORT
- DATABASE_USER
- DATABASE_PASSWORD
- DATABASE_NAME
network: dagster_network
container_kwargs:
volumes: # Make docker client accessible to any launched containers as well
- /var/run/docker.sock:/var/run/docker.sock
- /tmp/dagster-data:/opt/dagster/dagster_home/storage

run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
hostname: dagster_postgresql
username:
env: DAGSTER_POSTGRES_USER
password:
env: DAGSTER_POSTGRES_PASSWORD
db_name:
env: DAGSTER_POSTGRES_DB
port: 5432

schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
hostname: dagster_postgresql
username:
env: DAGSTER_POSTGRES_USER
password:
env: DAGSTER_POSTGRES_PASSWORD
db_name:
env: DAGSTER_POSTGRES_DB
port: 5432

event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
hostname: dagster_postgresql
username:
env: DAGSTER_POSTGRES_USER
password:
env: DAGSTER_POSTGRES_PASSWORD
db_name:
env: DAGSTER_POSTGRES_DB
port: 5432

compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
config:
bucket: "etl-dagster-data"
prefix: "compute-logs"
skip_empty_files: true
upload_interval: 30

In the previous section, I mentioned which configurations hold in the dagster.yaml so lets examine it one by one.

scheduler : Creates runs from active schedules. Each run represent OP. It uses “DagsterDaemonScheduler” class.

run_coordinator : It is an optional configuration. Setting up run coordinator with “QueuedRunCoordinator” class parameters, run queue daemon will launches queued runs considering any limits and prioritization rules.

run_launcher : It is an optional configuration. Run launchers can be used for launch pipeline runs(OPs) in docker container or on a remote Kubernetes cluster. If you are using docker environment and you want to persist the output of the runs you should add volume information under the run_launcher config. Also you can give environment variables to every runs container in this config.

run_storage : It is used to store metadata about ongoing and past pipeline runs. There are available 3 type of run storage; SqliteRunStorage, PostgresRunStorage in our case we used it, and MySQLRunStorage.

schedule_storage : Stores information and metadata about schedulers which is a definition of Dagster that is used to execute a job at a fixed interval.

event_log_storage : The event log is a central component that stores information about the events that occur during the execution of your pipelines. Dagster supports different storage backends such as SqliteEventLogStorage, PostgresEventLogStorage and MysqlEventLogStorage.

compute_logs : The compute log manager handles “stdout” and “stderr” logging for OP compute functions. In our case I used “S3ComputeLogManager” to store OP logs in the S3 storage.

--

--

Ibrahim Halil Koyuncu
Ibrahim Halil Koyuncu

Responses (1)