SkillAgentSearch skills...

Ewah

ELT With Airflow Helper - Classes and functions to make apache airflow life easier

Install / Use

/learn @Gemma-Analytics/Ewah
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

ewah

Ewah: ELT With Airflow Helper - Classes and functions to make apache airflow life easier.

Functions to create all DAGs required for ELT using only a simple config file.

DWHs Implemented

  • Snowflake
  • PostgreSQL
  • Bigquery

Operators

EWAH currently supports the following operators:

  • Aircall
  • BigQuery
  • DynamoDB
  • Facebook (partially, so far: ads insights; incremental only)
  • FX Rates (from Yahoo Finance)
  • Google Ads
  • Google Analytics (incremental only)
  • Google Maps (location data from an address)
  • Google Sheets
  • Hubspot
  • Mailchimp
  • Mailingwork
  • MongoDB
  • MySQL
  • OracleSQL
  • Pipedrive
  • PostgreSQL / Redshift
  • Recurly
  • S3 (for CSV or JSON files stored in an S3 bucket, e.g. from Kinesis Firehose)
  • Salesforce
  • Shopify
  • Stripe
  • Zendesk

Universal operator arguments

The following arguments are accepted by all operators, unless explicitly stated otherwise:

| argument | required | type | default | description | | --- | --- | --- | --- | --- | | source_conn_id | yes | string | n.a. | name of the airflow connection with source credentials | | dwh_engine | yes | string | n.a. | DWH type - e.g. postgres - usually | | dwh_conn_id | yes | string | n.a. | name of the airflow connection of the DWH | | target_table_name | implicit | string | n.a. | name of the table in the DWH; the target table name is the name given in the table config | | target_schema_name | yes | string | name of the schema in the DWH where the table will live | | target_schema_name_suffix | no | string | _next | when loading new data, how to suffix the schema name during the loading process | | target_database_name | yes for Snowflake DWH | string | n.a. | name of the database (only for Snowflake, illegal argument for non-Snowflake DWHs) | | drop_and_replace | no | boolean | same as DAG-level setting | whether a table is loading as full refresh or incrementally. Normally set by the DAG level config. Incremental loads can overwrite this setting to fully refresh some small tables (e.g. if they are small and have no updated_at column) | | primary_key | operator-dependent | string or list of strings | n.a. | name of the primary key column(s); if given, EWAH will set the column as primary key in the DWH and use it when applicable during upsert operations | | add_metadata | no | boolean | True | some operators may add metadata to the tables; this behavior can be turned off (e.g. shop name for the shopify operator) |

Operator: Google Ads

These arguments are specific to the Google Ads operator.

| argument | required | type | default | description | | --- | --- | --- | --- | --- | | fields | yes | dict | n.a. | most important argument; excludes metrics; detailed below | | metrics | yes | list of strings | n.a. | list of all metrics to load, must load at least one metric | | resource | yes | string | n.a. | name of the report, e.g. keyword_view | | client_id | yes | string | n.a. | 10-digit number, often written with hyphens, e.g. 123-123-1234 (acceptable with or without hyphens) | | conditions | no | list of strings | n.a. | list of strings of condition to include in the query, all conditions will be combined using AND operator | | data_from | no | datetime, timedelta or airflow-template-string | data_interval_start of task instance | start date of particular airflow task instance OR timedelta -> calculate delta from data_until | | data_until | no | datetime or airflow-template-string | data_interval_end of task instance | get data from google_ads until this point |

arguments: fields and metrics

the fields and metrics arguments are the most important for this operator. The metrics are separated from fields because the fields are simultaneously the updated_on_columns. When creating the google ads query, they are combined. The metrics argument is simply a list of metrics to be requested from Google Ads. The fields argument is a bit more complex, due to the nature of Google Ad's API. It is essentially a nested json.

Because the query may look something like this:

SELECT
    campaign.id
  , campaign.name
  , ad_group_criterion.criterion_id
  , ad_group_criterion.keyword.text
  , ad_group_criterion.keyword.match_type
  , segments.date
  , metrics.impressions
  , metrics.clicks
  , metrics.cost_micros
FROM keyword_view
WHERE segments.date BETWEEN 2020-08-01 AND 2020-08-08

i.e., there are nested object structures, the fields structure must reflect the same. Take a look at the example config for the correct configuration of abovementioned Google Ads query. Note in addition, that the fields will be uploaded with the same names to the DWH, excepts that the periods will be replaced by underscored. i.e., the table keyword_view_data in the example below will have the columns campaign_id, ad_group_criterion_keyword_text, etc.

Finally, note that segments.date is always required in the fields argument.

Oracle operator particularities

The Oracle operator utilizes the cx_Oracle python library. To make it work, you need to install additional packages, see here for details.

Example

Sample configuration in dags.yaml file:

EL_Google_Ads:
  incremental: True
  el_operator: google_ads
  target_schema_name: raw_google_ads
  operator_config:
    general_config:
      source_conn_id: google_ads
      client_id: "123-123-1234"
      # conditions: none
    incremental_config:
      data_from: !!python/object/apply:datetime.timedelta
        - 3 # days as time interval before the execution date of the DAG
        # only use this for normal runs! backfills: use execution date context instead
    tables:
      keyword_view_data:
        resource: keyword_view
        fields:
          campaign:
            - id
            - name
          ad_group_criterion:
            - keyword:
              - text
              - match_type
            - criterion_id
          segments:
            - date
        metrics:
          - impressions
          - clicks
          - cost_micros

Philosophy

This package strictly follows an ELT Philosophy:

  • Business value is created by infusing business logic into the data and making great analyses and usable data available to stakeholders, not by building data pipelines
  • Airflow solely orchestrates loading raw data into a central DWH
  • Data is either loaded as full refresh (all data at every load) or incrementally, exploiting airflow's catchup and execution logic
  • The only additional DAGs are dbt DAGs and utility DAGs
  • Within that DWH, each data source lives in its own schema (e.g. raw_salesforce)
  • Irrespective of full refresh or incremental loading, DAGs always load into a separate schema (e.g. raw_salesforce_next) and at the end replace the schema with the old data with the schema with the new data, to avoid data corruption due to errors in DAG execution
  • Any data transformation is defined using SQL, ideally using dbt
  • Seriously, dbt is awesome, give it a shot!
  • (Non-SQL) Code contains no transformations

Usage

In your airflow Dags folder, define the DAGs by invoking either the incremental loading or full refresh DAG factory. The incremental loading DAG factory returns three DAGs in a tuple, make sure to call it like so: dag1, dag2, dag3 = dag_factory_incremental_loading() or add the dag IDs to your namespace like so:

dags = dag_factory_incremental_loading()
for dag in dags:
  globals()[dag._dag_id] = dag

Otherwise, airflow will not recognize the DAGs. Most arguments should be self-explanatory. The two noteworthy arguments are el_operator and operator_config. The former must be a child object of ewah.operators.base.EWAHBaseOperator. Ideally, The required operator is already available for your use. Please feel free to fork and commit your own operators to this project! The latter is a dictionary containing the entire configuration of the operator. This is where you define what tables to load, how to load them, if loading specific columns only, and any other detail related to your EL job.

Full refresh factory

A filename.py file in your airflow/dags folder may look something like this:

from ewah.utils.dag_factory_full_refresh import dag_factory_drop_and_replace
from ewah.constants import EWAHConstants as EC
from ewah.operators.postgres import EWAHPostgresOperator

from datetime import datetime, timedelta

dag = dag_factory_drop_and_replace(
    dag_name='EL_production_postgres_database', # Name of the DAG
    dwh_engine=EC.DWH_ENGINE_POSTGRES, # Implemented DWH Engine
    dwh_conn_id='dwh', # Airflow connection ID with connection details to the DWH
    el_operator=EWAHPostgresOperator, # Ewah Operator (or custom child class of EWAHBaseOperator)
    target_schema_name='raw_production', # Name of the raw schema where data will end up in the DWH
    target_schema_suffix='_next', # suffix of the schema containing the data before replacing the production data schema with the temporary loading schema
    # target_database_name='raw', # Only Snowflake
    start_date=datetime(2019, 10, 23), # As per airflow standard
    schedule_interval=timedelta(hours=1), # Only timedelta is allowed!
    default_args={ # Default args for DAG as per airflow standard
        'owner': 'Data Engineering',
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'email_on_retry': False,
        'email_on_failure': True,
        'email': ['email@address.com'],
    },
    operator_config={
        'general_config': {
            'source_conn_id': 'production_postgres',
            'source_schema_name': 'public',
        },
        'tables': {
            'table_name':{},
            # ...
            # Additional optional kwargs at the table level:
            #   primary_key
            #   + any operator specific arguments
        },
  
View on GitHub
GitHub Stars12
CategoryDevelopment
Updated10d ago
Forks0

Languages

Python

Security Score

90/100

Audited on Mar 30, 2026

No findings