SkillAgentSearch skills...

Atspm

Aggregate Traffic Signal Performance Measures from ATC controller hi-res event logs.

Install / Use

/learn @ShawnStrasser/Atspm
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

ATSPM Aggregation

<!-- Package Info -->

PyPI PyPI - Python Version PyPI - Downloads

<!-- Repository Info -->

GitHub License GitHub issues GitHub stars

<!-- Status -->

Unit Tests codecov

atspm is a cutting-edge, lightweight Python package that transforms raw traffic signal controller event logs into aggregate performance measures and troubleshooting data which help transportation agencies continuously monitor and optimize signal timing performance, detect issues, and take proactive actions - all in real-time. atspm may be used by itself, embedded inside an ATMS application, or installed on an edge device.

What Makes ATSPM Different?

Traditional traffic signal optimization tools like Synchro rely on periodic manual data collection and simulation models. In contrast, atspm offers:

  • Real-Time Data: Uses data directly collected from signal controllers at intersections.
  • Continuous Monitoring: Allows agencies to generate performance data for any time range, diagnosing problems before they escalate.
  • Proactive Management: Enables agencies to solve issues before they lead to major traffic disruptions, rather than relying on infrequent manual studies or citizen complaints.
  • Cost Efficiency: With over 330,000 traffic signals in the US, continuous monitoring reduces the need for costly manual interventions (typically $4,500 per intersection every 3-5 years).

The Python atspm project is inspired by UDOT ATSPM, which is a full-stack application for intersection-level visualization. This package focuses on aggregation and analytics, enabling a system-wide monitoring approach. Both projects are complementary and can be deployed together.

This project focuses only on transforming event logs into performance measures and troubleshooting data; it does not include data visualization. Feel free to submit feature requests or bug reports or to reach out with questions or comments. Contributions are welcome!

Table of Contents

Features

  • Transforms event logs into aggregate performance measures and troubleshooting metrics
  • Supports incremental processing for real-time data (ie. every 15 minutes)
  • Runs locally using the powerful DuckDB analytical SQL engine.
  • Output to user-defined folder structure and file format (csv/parquet/json), or query DuckDB tables directly
  • Deployed in production by Oregon DOT since July 2024

Installation

pip install atspm

Or pinned to a specific version:

pip install atspm==2.x.x 

atspm works on Python 3.10-3.12 and is tested on Ubuntu, Windows, and MacOS.

Quick Start

The best place to start is with these self-contained example uses in Colab!<br> Open In Colab

Usage Example

The first step in running the tool is to define the parameters that will dictate how the data is processed. The parameters include global settings for input data, output formats, and options to select specific performance measures.

Exhaustive Parameter List

from atspm import SignalDataProcessor, sample_data

params = {
    # --- Global Settings ---
    'raw_data': sample_data.data,           # Path (CSV/Parquet/JSON) or Pandas DataFrame
    'detector_config': sample_data.config,  # Path (CSV/Parquet/JSON) or Pandas DataFrame
    'bin_size': 15,                         # Aggregation interval in minutes
    'output_dir': 'test_folder',            # Directory to save results
    'output_format': 'csv',                 # 'csv', 'parquet', or 'json'
    'output_file_prefix': 'run1_',          # Optional prefix for output files
    'output_to_separate_folders': True,     # Save each measure in its own subfolder
    'remove_incomplete': True,              # Remove bins with insufficient data (requires 'has_data' agg)
    'verbose': 1,                           # 0: Errors only, 1: Performance, 2: Debug
    'to_sql': False,                        # If True, returns SQL strings instead of executing
    'controller_type': 'maxtime',           # Global: '' (default) or 'maxtime' (case-insensitive)
                                            # When 'maxtime': phase_wait uses ActualCycleLength
                                            # When not 'maxtime': splits & coordination are skipped

    # --- Incremental Processing Settings ---
    'unmatched_event_settings': {
        'df_or_path': 'unmatched.csv',           # Track unmatched timeline events
        'split_fail_df_or_path': 'sf_unmatched.csv', # Track unmatched split failures
        'max_days_old': 7                        # Max age for tracking unmatched events
    },

    # --- Performance Measures (Aggregations) ---
    'aggregations': [
        {
            'name': 'has_data', 
            'params': {
                'no_data_min': 5,           # Min minutes of data required per bin
                'min_data_points': 3        # Min events required per sub-bin
            }
        },
        {
            'name': 'actuations', 
            'params': {
                'fill_in_missing': True,    # Zero-fill missing detector intervals
                'known_detectors_df_or_path': 'known_detectors.csv', # For zero-filling
                'known_detectors_max_days_old': 2
            }
        },
        {
            'name': 'arrival_on_green', 
            'params': {
                'latency_offset_seconds': 0 # Adjust for detector-to-controller latency
            }
        },
        {
            'name': 'split_failures', 
            'params': {
                'red_time': 5,              # Min red time to consider a split failure
                'red_occupancy_threshold': 0.80,
                'green_occupancy_threshold': 0.80,
                'by_approach': True         # Aggregate by approach instead of detector
            }
        },
        {
            'name': 'yellow_red', 
            'params': {
                'latency_offset_seconds': 0
            }
        },
        {
            'name': 'timeline', 
            'params': {
                'maxtime': True,            # Include MAXTIME-specific events
                'min_duration': 1,          # Filter out events shorter than n seconds
                'cushion_time': 1,          # Padding for instant events (seconds)
                'live': False               # If True, keep incomplete events as IsValid=False with common EndTime. This is for troubleshooting.
            }
        },
        {
            'name': 'full_ped', 
            'params': {
                'seconds_between_actuations': 15, # Min time between unique peds
                'return_volumes': True            # Estimate pedestrian volumes
            }
        },
        {
            'name': 'phase_wait',
            'params': {
                'preempt_recovery_seconds': 120, # Time after preempt ends to exclude
                'assumed_cycle_length': 140,     # Fallback cycle length (Free mode)
                'skip_multiplier': 1.5           # Threshold for skipped phases
            }
        },
        {'name': 'ped_delay', 'params': {}},
        {'name': 'terminations', 'params': {}},
        {'name': 'splits', 'params': {}},        # MAXTIME-specific (skipped if controller_type != 'maxtime')
        {'name': 'coordination', 'params': {}},  # MAXTIME-specific (skipped if controller_type != 'maxtime')
        {'name': 'coordination_agg', 'params': {}} # General coordination state (Pattern, Cycle, etc.)
    ]
}

# Running the Processor
# Using 'with' ensures the DuckDB connection is closed automatically
with SignalDataProcessor(**params) as processor:
    processor.load()      # Load raw data into DuckDB
    processor.aggregate() # Run performance measures
    processor.save()      # Save to output_dir

# Alternatively, use the .run() method to perform all steps at once
processor = SignalDataProcessor(**params)
processor.run()

Retrieving Results as a DataFrame

You can query the internal DuckDB database directly. Note that the connection must be open to query data:

processor = SignalDataProcessor(**params)
processor.load()
results = processor.conn.query("SELECT * FROM actuations ORDER BY TimeStamp").df()
print(results.head())
processor.conn.close() # Manually close if not using 'with'

Visualization Options

The data produced by atspm can be visualized using Power BI, Plotly, or other platforms. For example, see the Oregon DOT ATSPM Dashboard.

Note: Use Parquet format in pr

Related Skills

View on GitHub
GitHub Stars20
CategoryDevelopment
Updated1mo ago
Forks6

Languages

Python

Security Score

95/100

Audited on Feb 26, 2026

No findings