SkillAgentSearch skills...

StravaDataPipline

:arrows_counterclockwise: :running: EtLT of my own Strava data using the Strava API, MySQL, Python, S3, Redshift, and Airflow

Install / Use

/learn @jackmleitch/StravaDataPipline
About this skill

Quality Score

0/100

Category

Operations

Supported Platforms

Universal

README

Strava ELT Data Pipline

EtLT of my own Strava data using the Strava API, MySQL, Python, S3, Redshift, and Airflow

system_diagram

I build an EtLT pipeline to ingest my Strava data from the Strava API and load it into a Redshift data warehouse. This pipeline is then run once a week using Airflow to extract any new activity data. The end goal is then to use this data warehouse to build an automatically updating dashboard in Tableau and also to trigger automatic re-training of my Strava Kudos Prediction model.

<!--truncate-->

Data Extraction

My Strava activity data is first ingested incrementally using the Strava API and loaded into an S3 bucket. On each ingestion run, we query a MySQL database to get the date of the last extraction:

def get_date_of_last_warehouse_update() -> Tuple[datetime, str]:
    """
    Get the datetime of last time data was extracted from Strava API
    by querying MySQL database and also return current datetime.
    """
    mysql_conn = connect_mysql()
    get_last_updated_query = """
    	SELECT COALESCE(MAX(LastUpdated), '1900-01-01')
        FROM last_extracted;"""
    mysql_cursor = mysql_conn.cursor()
    mysql_cursor.execute(get_last_updated_query)
    result = mysql_cursor.fetchone()
    last_updated_warehouse = datetime.strptime(result[0], "%Y-%m-%d %H:%M:%S")
    current_datetime = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
    return last_updated_warehouse, current_datetime

We then make repeated calls to the REST API using the requests library until we have all activity data between now and last_updated_warehouse. We include a time.sleep() command to comply with Strava's set rate limit of 100 requests/15 minutes. We also include try: except: blocks to combat missing data on certain activities.

def make_strava_api_request(
    header: Dict[str, str], activity_num: int = 1
) -> Dict[str, str]:
    """Use Strava API to get recent page of new data."""
    param = {"per_page": 1, "page": activity_num}
    api_response = requests.get(
        "https://www.strava.com/api/v3/athlete/activities", headers=header, params=param
    ).json()
    response_json = api_response[0]
    return response_json

def extract_strava_activities(last_updated_warehouse: datetime) -> List[List]:
    """Connect to Strava API and get data up until last_updated_warehouse datetime."""
    header = connect_strava()
    all_activities = []
    activity_num = 1
    # while activity has not been extracted yet
    while True:
        # Strava has a rate limit of 100 requests every 15 mins
        if activity_num % 75 == 0:
            print("Rate limit hit, sleeping for 15 minutes...")
            time.sleep(15 * 60)
        try:
            response_json = make_strava_api_request(header, activity_num)
        # rate limit has exceeded, wait 15 minutes
        except KeyError:
            print("Rate limit hit, sleeping for 15 minutes...")
            time.sleep(15 * 60)
            response_json = make_strava_api_request(header, activity_num)
        date = response_json["start_date"]
        if date > last_updated_warehouse:
            activity = parse_api_output(response_json)
            all_activities.append(activity)
            activity_num += 1
        else:
            break
    return all_activities

Before exporting the data locally into a flat pipe-delimited .csv file, we perform a few minor transformations such as formatting dates and timezone columns. Hence the little 't' in EtLT! After we save the data, it is then uploaded to an S3 bucket for later loading into the data warehouse.

def save_data_to_csv(all_activities: List[List]) -> str:
    """Save extracted data to .csv file."""
    todays_date = datetime.today().strftime("%Y_%m_%d")
    export_file_path = f"strava_data/{todays_date}_export_file.csv"
    with open(export_file_path, "w") as fp:
        csvw = csv.writer(fp, delimiter="|")
        csvw.writerows(all_activities)
    return export_file_path

def upload_csv_to_s3(export_file_path: str) -> None:
    """Upload extracted .csv file to s3 bucket."""
    s3 = connect_s3()
    s3.upload_file(export_file_path, "strava-data-pipeline", export_file_path)

Finally, we execute a query to update the MySQL database on the last date of extraction.

def save_extraction_date_to_database(current_datetime: datetime) -> None:
    """Update last extraction date in MySQL database to todays datetime."""
    mysql_conn = connect_mysql()
    update_last_updated_query = """
        INSERT INTO last_extracted (LastUpdated)
        VALUES (%s);"""
    mysql_cursor = mysql_conn.cursor()
    mysql_cursor.execute(update_last_updated_query, current_datetime)
    mysql_conn.commit()

Data Loading

Once the data is loaded into the S3 data lake it is then loaded into our Redshift data warehouse. We load the data in two parts:

  • We first load the data from the S3 bucket into a staging table with the same schema as our production table
  • We then perform validation tests between the staging table and the production table (see here). If all critical tests pass we then remove all duplicates between the two tables by first deleting them from the production table. The data from the staging table is then fully inserted into the production table.
def copy_to_redshift_staging(table_name: str, rs_conn, s3_file_path: str, role_string: str) -> None:
    """Copy data from s3 into Redshift staging table."""
    # write queries to execute on redshift
    create_temp_table = f"CREATE TABLE staging_table (LIKE {table_name});"
    sql_copy_to_temp = f"COPY staging_table FROM {s3_file_path} iam_role {role_string};"

    # execute queries
    cur = rs_conn.cursor()
    cur.execute(create_temp_table)
    cur.execute(sql_copy_to_temp)
    rs_conn.commit()

def redshift_staging_to_production(table_name: str, rs_conn) -> None:
    """Copy data from Redshift staging table to production table."""
    # if id already exists in table, we remove it and add new id record during load
    delete_from_table = f"DELETE FROM {table_name} USING staging_table WHERE '{table_name}'.id = staging_table.id;"
    insert_into_table = f"INSERT INTO {table_name} SELECT * FROM staging_table;"
    drop_temp_table = "DROP TABLE staging_table;"
    # execute queries
    cur = rs_conn.cursor()
    cur.execute(delete_from_table)
    cur.execute(insert_into_table)
    cur.execute(drop_temp_table)
    rs_conn.commit()

Data Validation

We implement a simple framework in python that is used to execute SQL-based data validation checks in our data pipeline. Although it lacks many features we would expect to see in a production environment, it is a good start and provides some insight into how we can improve our infrastructure.

The validatior.py script executes a pair of SQL scripts on Redshift and compares the two based on a comparison operator (>, <, =). The test then passes or fails based on the outcome of the two executed scripts. We execute this validation step after we upload our newly ingested data to the staging table but before we insert this table into the production table.

def execute_test(db_conn, script_1: str, script_2: str, comp_operator: str) -> bool:
    """
    Execute test made up of two scripts and a comparison operator
    :param comp_operator: comparison operator to compare script outcome
        (equals, greater_equals, greater, less_equals, less, not_equals)
    :return: True/False for test pass/fail
    """
    # execute the 1st script and store the result
    cursor = db_conn.cursor()
    sql_file = open(script_1, "r")
    cursor.execute(sql_file.read())
    record = cursor.fetchone()
    result_1 = record[0]
    db_conn.commit()
    cursor.close()

    # execute the 2nd script and store the result
    ...

    print("Result 1 = " + str(result_1))
    print("Result 2 = " + str(result_2))

    # compare values based on the comp_operator
    if comp_operator == "equals": return result_1 == result_2
    elif comp_operator == "greater_equals": return result_1 >= result_2
    ...

    # tests have failed if we make it here
    return False

As a starting point, I implemented checks that check for duplicates, compare the distribution of the total activities in the staging table (Airflow is set to execute at the end of each week) to the average historical weekly activity count, and compare the distribution of the Kudos Count metric to the historical distribution using the z-score. In other words, the last two queries check if the values are within a 90% confidence interval in either direction of what's expected based on history. For example, the following query computes the z-score for the total activities uploaded in a given week (found in the staging table).

with activities_by_week AS (
  SELECT
  	date_trunc('week', start_date::date) AS activity_week,
  	COUNT(*) AS activity_count
  FROM public.strava_activity_data
  GROUP BY activity_week
  ORDER BY activity_week
),

activities_by_week_statistics AS (
  SELECT
  	AVG(activity_count) AS avg_activities_per_week,
  	STDDEV(activity_count) AS std_activities_per_week
  FROM activities_by_week
),

staging_table_weekly_count AS (
  SELECT COUNT(*) AS staging_weekly_count
  FROM staging_table
),

activity_count_zscore AS (
  SELECT
  	s.staging_weekly_count AS staging_table_count,
  	p.avg_activities_per_week A
View on GitHub
GitHub Stars34
CategoryOperations
Updated2mo ago
Forks3

Languages

Python

Security Score

80/100

Audited on Jan 14, 2026

No findings