Datajob
Build and deploy a serverless data pipeline on AWS with no effort.
Install / Use
/learn @vincentclaes/DatajobREADME
- Deploy code to python shell / pyspark AWS Glue jobs.
- Use AWS Sagemaker to create ML Models.
- Orchestrate the above jobs using AWS Stepfunctions as simple as
task1 >> task2 - Let us know what you want to see next.
:rocket: :new: :rocket: </br> </br> Check our new example of an End-to-end Machine Learning Pipeline with Glue, Sagemaker and Stepfunctions </br> </br> :rocket: :new: :rocket:
</br></br>
</div> </br>Installation
Datajob can be installed using pip. <br/> Beware that we depend on aws cdk cli!
pip install datajob
npm install -g aws-cdk@1.109.0 # latest version of datajob depends this version
Quickstart
You can find the full example in examples/data_pipeline_simple.
We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially using step functions.
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow
app = core.App()
# The datajob_stack is the instance that will result in a cloudformation stack.
# We inject the datajob_stack object through all the resources that we want to add.
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:
# We define 2 glue jobs with the relative path to the source code.
task1 = GlueJob(
datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task.py"
)
task2 = GlueJob(
datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
)
# We instantiate a step functions workflow and orchestrate the glue jobs.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
task1 >> task2
app.synth()
We add the above code in a file called datajob_stack.py in the root of the project.
Configure CDK
Follow the steps here to configure your credentials.
export AWS_PROFILE=default
# use the aws cli to get your account number
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=eu-west-1
# init cdk
cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION
Deploy
Deploy the pipeline using CDK.
cd examples/data_pipeline_simple
cdk deploy --app "python datajob_stack.py" --require-approval never
Execute
datajob execute --state-machine data-pipeline-simple-workflow
The terminal will show a link to the step functions page to follow up on your pipeline run.

Destroy
cdk destroy --app "python datajob_stack.py"
Examples
- Data pipeline with parallel steps
- Data pipeline for processing big data using PySpark
- Data pipeline where you package and ship your project as a wheel
- Machine Learning pipeline where we combine glue jobs with sagemaker
All our examples are in ./examples
Functionality
<details> <summary>Deploy to a stage</summary>Specify a stage to deploy an isolated pipeline.
Typical examples would be dev , prod, ...
cdk deploy --app "python datajob_stack.py" --context stage=my-stage
</details>
<details>
<summary>Using datajob's S3 data bucket</summary>
Dynamically reference the datajob_stack data bucket name to the arguments of your GlueJob by calling
datajob_stack.context.data_bucket_name.
import pathlib
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow
current_dir = str(pathlib.Path(__file__).parent.absolute())
app = core.App()
with DataJobStack(
scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:
pyspark_job = GlueJob(
datajob_stack=datajob_stack,
name="pyspark-job",
job_path="glue_job/glue_pyspark_example.py",
job_type="glueetl",
glue_version="2.0", # we only support glue 2.0
python_version="3",
worker_type="Standard", # options are Standard / G.1X / G.2X
number_of_workers=1,
arguments={
"--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
"--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
},
)
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
pyspark_job >> ...
you can find this example here
</details> <details> <summary>Deploy files to the datajob's deployment bucket</summary>Specify the path to the folder we would like to include in the deployment bucket.
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
app = core.App()
with DataJobStack(
scope=app, id="some-stack-name", include_folder="path/to/folder/"
) as datajob_stack:
...
</details>
<details>
<summary>Package your project as a wheel and ship it to AWS</summary>
You can find the example here
# We add the path to the project root in the constructor of DataJobStack.
# By specifying project_root, datajob will look for a .whl in
# the dist/ folder in your project_root.
with DataJobStack(
scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:
Package you project using poetry
poetry build
cdk deploy --app "python datajob_stack.py"
Package you project using setup.py
python setup.py bdist_wheel
cdk deploy --app "python datajob_stack.py"
you can also use the datajob cli to do the two commands at once:
# for poetry
datajob deploy --config datajob_stack.py --package poetry
# for setup.py
datajob deploy --config datajob_stack.py --package setuppy
</details>
<details>
<summary>Processing big data using a Glue Pyspark job</summary>
import pathlib
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
current_dir = str(pathlib.Path(__file__).parent.absolute())
app = core.App()
with DataJobStack(
scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:
pyspark_job = GlueJob(
datajob_stack=datajob_stack,
name="pyspark-job",
job_path="glue_job/glue_pyspark_example.py",
job_type="glueetl",
glue_version="2.0", # we only support glue 2.0
python_version="3",
worker_type="Standard", # options are Standard / G.1X / G.2X
number_of_workers=1,
arguments={
"--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
"--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
},
)
full example can be found in examples/data_pipeline_pyspark.
</details> <details> <summary>Orchestrate stepfunctions tasks in parallel</summary># Task2 comes after task1. task4 comes after task3.
# Task 5 depends on both task2 and task4 to be finished.
# Therefore task1 and task2 can run in parallel,
# as well as task3 and task4.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
task1 >> task2
task3 >> task4
task2 >> task5
task4 >> task5
More can be found in examples/data_pipeline_parallel
</details> <details> <summary>Orchestrate 1 stepfunction task</summary>Use the Ellipsis object to be able to orchestrate 1 job via step functions.
some_task >> ...
</details>
<details>
<summary>Notify in case of error/success</summary>
Provide the parameter notification in the constructor of a StepfunctionsWorkflow object.
This will create an SNS Topic which will be triggered in case of failure or success.
The email will subscribe to the topic and receive the notification in its inbox.
with StepfunctionsWorkflow(datajob_stack=datajob_stack,
name="workflow",
notification="email@domain.com") as sfn:
task1 >> task2
You can provide 1 email or a list of emails ["email1@domain.com", "email2@domain.com"].
Datajob in depth
The datajob_stack is the instance that will result in a cloudformation stack.
The path in project_root helps datajob_stack locate the root of the project where
the setup.py/poetry pyproject.toml file can be found, as well as the dist/ folder with the wheel of your project .
import pathlib
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
current_dir = pathlib.Path(__file

