DataEngineeringCapstoneProject
😈Complete End to End ETL Pipeline with Spark, Airflow, & AWS
Install / Use
/learn @supratim94336/DataEngineeringCapstoneProjectREADME
Data Engineering Capstone Project for Udacity
Objective
In this project we are going to work with US immigraton data from the year 1994. We have facts such as visa types, transport modes, landing ports, us state codes, country codes. Apart from the sas7bdat formatted immigration data we have us airport information and us demographics data. We are going to parse SAS descriptor files for all the dimensions and sas7bdat files for all the facts. The tools we are going to use here are Apache Spark, Apache Airflow, Amazon Redshift, Amazon S3.
We will be reading, parsing and cleaning the data from local file systems, Amazon S3 and transferring data to redshift tables in AWS. We will be orchestrating the flow of data through Apache Airflow DAGs.
Finally we will be using some SQL queries to extract some valuable stats and graphs from the data itself.
Data Model
Data Pipeline

Installing and starting
Installing Python Dependencies
You need to install this python dependencies In Terminal/CommandPrompt:
without anaconda you can do this:
$ python3 -m venv virtual-env-name
$ source virtual-env-name/bin/activate
$ pip install -r requirements.txt
with anaconda you can do this (in Windows):
$ conda env create -f env.yml
$ source activate <conda-env-name>
or (in Others)
conda create -y -n <conda-env-name> python==3.6
conda install -f -y -q -n <conda-env-name> -c conda-forge --file requirements.txt
[source activate/ conda activate] <conda-env-name>
Fixing/Configuring Airflow
$ pip install --upgrade Flask
$ pip install zappa
$ mkdir airflow_home
$ export AIRFLOW_HOME=./airflow_home
$ cd airflow_home
$ airflow initdb
$ airflow webserver
$ airflow scheduler
More Airflow commands
To list existing dags registered with airflow
$ airflow list_dags
Secure/Encrypt your connections and hooks
Run
$ python cryptosetup.py
copy this key to airflow.cfg to paste after
fernet_key = ************
Setting up connections and variables in Airflow UI for AWS
TODO: There is no code to modify in this exercise. We're going to create a connection and a variable.
S3
- Open your browser to localhost:8080 and open Admin->Variables
- Click "Create"
- Set "Key" equal to "s3_bucket" and set "Val" equal to "udacity-dend"
- Set "Key" equal to "s3_prefix" and set "Val" equal to "data-pipelines"
- Click save
AWS
- Open Admin->Connections
- Click "Create"
- Set "Conn Id" to "aws_credentials", "Conn Type" to "Amazon Web Services"
- Set "Login" to your aws_access_key_id and "Password" to your aws_secret_key
- Click save
- If it doesn't work then in "Extra" field put:
{"region_name": "your_aws_region", "aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key", "aws_iam_user": "your_created_iam_user"} - These are all you can put:
- aws_account_id: AWS account ID for the connection
- aws_iam_role: AWS IAM role for the connection
- external_id: AWS external ID for the connection
- host: Endpoint URL for the connection
- region_name: AWS region for the connection
- role_arn: AWS role ARN for the connection
Redshift
- Open Admin->Connections
- Click "Create"
- Set "Conn Id" to "redshift", "Conn Type" to "postgres"
- Set "Login" to your master_username for your cluster and "Password" to your master_password for your cluster
- Click save
Optional
If you haven't setup your AWS Redshift Cluster yet (or don't want to create one manually), then use the files inside 'aws' folder
- To create cluster and IAM role: Run the below code in terminal from 'aws' folder to create your Redshift database and a
iam_role in aws having read access to Amazon S3 and permissions
attached to the created cluster
copy the DWH_ENDPOINT for <cluster_endpoint_address> and DWH_ROLE_ARN for <iam_role> from the print statements$ python aws_operate.py --action start - To create Tables: Run the below code in terminal from project dir to create tables in your Redshift database
in aws
$ python create_table.py --host <cluster_endpoint_address> - To Stop: Run the below code in terminal from 'aws' directory to destroy your Redshift database and
detach iam_role from the cluster
$ python aws_operate.py --action stop
About the data
I94 Immigration Data:
This data comes from the US National Tourism and Trade Office. This is where the data comes from. There's a sample file so you can take a look at the data in csv format before sreading it all in. The report contains international visitor arrival statistics by world regions and selected countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries)
World Temperature Data:
This dataset came from Kaggle. You can read more about it here.
U.S. City Demographic Data:
This data comes from OpenSoft. You can read more about it here.
Airport Code Table:
This is a simple table of airport codes and corresponding cities. It comes from here.
Run the project
-
Follow all the setup mentioned above
-
Create a bucket in region 'us-west-2' in Amazon S3
-
You have to setup all the connections and variables in the Airflow admin
i. Setup aws connection with user credentials (access_key and secret_key with login and password). Make sure the region is 'us-west-2'
ii. Setup Redshift connection with user, password, host, port, schema, db
iii. Setup iam_role for your aws account
iv. Setup variables for 'temp_input', 'temp_output', 'spark_path' (spark manipulation path for parquet files), sas_file (sas7bdat descriptor files)
v. Place all the csv inputs inside temp_output directory
vi. Create a folder called 'spark_path' inside \airflow\dags\
vii. Create variable called 's3_bucket' (make sure the bucket in AWS is in region 'us-west-2')Example:
| variable | example value | |:-------------|-------------:| | iam_role | #### | | s3_bucket | #### | | sas_file | /home/workspace/airflow/dags/temp_input/I94_SAS_Labels_Descriptions.SAS | | spark_path | /home/workspace/airflow/dags/spark_path | | temp_input | /home/workspace/airflow/dags/temp_input/ | | temp_output | /home/workspace/airflow/dags/temp_output/ |
-
Data Location for input files:
i. Put all your sas7bdat formatted files in temp_input directory (whenever you want to process/insert them into the db, when you are done remove the .sas7bdat file/files and drop new files)
ii. Put SAS descriptor file in temp_input directory
iii. Put airport-codes_csv.csv file in temp_output directory
Test it Yourself!
Here are some example queries we test to see the uploaded results into the Redshift schema
Example Queries
City from where immigrants arrived
SELECT TOP 10 b.port_city, b.port_state_or_country, COUNT(cicid) AS count
FROM project.immigration a INNER JOIN project.i94ports b ON a.i94port=b.port_code
GROUP BY b.port_city, b.port_state_or_country
ORDER BY COUNT(cicid) DESC
Different kinds of airports
SELECT top 10 distinct type, count(*) AS count_type
FROM project.airport_codes
WHERE iso_country = 'US'
GROUP BY type
ORDER BY count_type DESC
Immigrants from different countries
SELECT top 10 SUBSTRING(b.country_name, 0, 15) as country_name, COUNT(cicid) as count
FROM project.immigration a INNER JOIN project.i94res b ON a.i94res=b.country_code
GROUP BY b.country_name
ORDER BY COUNT(cicid) DESC
Small airports from different states
SELECT a.state_name AS State, airports.count AS Count_of_Airports
FROM
(SELECT top 10 distinct substring(iso_region, 4, length(iso_region)) AS state, count(*)
FROM project.airport_codes
WHERE iso_country = 'US' AND type='small_airport'
GROUP BY iso_region) airports INNER JOIN project.i94addr a ON airports.state=a.state_code
ORDER BY airports.count DESC
Small airport locations
SELECT a.longitude_deg, a.latitude_deg
FROM project.airport_codes a
WHERE a.iso_country = 'US' AND a.type = 'small_airport'
Stats and Graphs
City from where immigrants arrived

Different kinds of airports

Immigrants from different countries

Small airports from different states

Small airports locations in different states

Scoping the Project
The purpose is to produce interesting stats from the US immigration data, airports around the world, and different dimensions such as visa type, transport mode, nationality etc.
Steps Taken:
The steps taken are in the following order:
Gather the data:
This took a while as different kinds of formats were chosen, I
needed to fix my mindset on which data I will actually use in
future for my analysis and queries. I fixated on .sas7bdat
formatted immigration data which fulfills the minimum number of
rows requirements, the cleaned airport data for dimensions and
SAS descriptor file for
