SkillAgentSearch skills...

Fastscheduler

Decorator-first Python scheduler — cron/interval/at jobs with simple persistence and built-in run history.

Install / Use

/learn @MichielMe/Fastscheduler
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

FastScheduler

Simple, lightweight task scheduler for Python with async support, timezone handling, cron expressions, and a beautiful real-time dashboard.

If this saves you time, ⭐️ the repo and open an issue for ideas — I'm actively improving it.

GitHub Stars License: MIT Python 3.10+

FastScheduler Demo

Features

  • 🎯 Simple decorator-based API - Schedule tasks in one line
  • Async/await support - Native async function support
  • 🕐 Timezone support - Schedule jobs in any timezone
  • 📅 Cron expressions - Complex schedules with cron syntax
  • 💾 Persistent state - Survives restarts, handles missed jobs
  • 🗄️ Database support - SQLite, PostgreSQL, MySQL via SQLModel
  • 🎨 FastAPI dashboard - Beautiful real-time monitoring UI
  • 🔄 Automatic retries - Configurable retry with exponential backoff
  • ⏱️ Job timeouts - Kill long-running jobs automatically
  • ⏸️ Pause/Resume - Control jobs without removing them
  • 📋 Dead Letter Queue - Track and debug failed jobs

Installation

# Basic installation
pip install fastscheduler

# With FastAPI dashboard
pip install fastscheduler[fastapi]

# With cron expression support
pip install fastscheduler[cron]

# With database support (SQLite, PostgreSQL, MySQL)
pip install fastscheduler[database]

# All features
pip install fastscheduler[all]

Quick Start

from fastscheduler import FastScheduler

scheduler = FastScheduler(quiet=True)

@scheduler.every(10).seconds
def task():
    print("Task executed")

@scheduler.daily.at("14:30")
async def daily_task():
    print("Daily task at 2:30 PM")

scheduler.start()

Scheduling Options

Interval-based

@scheduler.every(10).seconds
@scheduler.every(5).minutes
@scheduler.every(2).hours
@scheduler.every(1).days

Time-based

@scheduler.daily.at("09:00")              # Daily at 9 AM
@scheduler.hourly.at(":30")               # Every hour at :30
@scheduler.weekly.monday.at("10:00")      # Every Monday at 10 AM
@scheduler.weekly.weekdays.at("09:00")    # Weekdays at 9 AM
@scheduler.weekly.weekends.at("12:00")    # Weekends at noon

Cron Expressions

Requires: pip install fastscheduler[cron]

@scheduler.cron("0 9 * * MON-FRI")        # 9 AM on weekdays
def market_open():
    ...

@scheduler.cron("*/15 * * * *")           # Every 15 minutes
def frequent_check():
    ...

@scheduler.cron("0 0 1 * *")              # First day of each month
def monthly_report():
    ...

One-time Jobs

@scheduler.once(60)                        # Run once after 60 seconds
def delayed_task():
    ...

@scheduler.at("2024-12-25 00:00:00")      # Run at specific datetime
def christmas_task():
    ...

Timezone Support

Schedule jobs in any timezone:

# Using the tz parameter
@scheduler.daily.at("09:00", tz="America/New_York")
def nyc_morning():
    print("Good morning, New York!")

# Using the .tz() method (chainable)
@scheduler.weekly.monday.tz("Europe/London").at("09:00")
def london_standup():
    print("Monday standup")

# With cron expressions
@scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo")
def tokyo_market():
    print("Tokyo market open")

Common timezones: UTC, America/New_York, America/Los_Angeles, Europe/London, Europe/Paris, Asia/Tokyo, Asia/Shanghai, Australia/Sydney

Job Control

Timeouts

Kill jobs that run too long:

@scheduler.every(1).minutes.timeout(30)   # Kill if runs > 30 seconds
def quick_task():
    ...

@scheduler.daily.at("02:00").timeout(3600)  # 1 hour max
def nightly_backup():
    ...

Retries

Configure automatic retries on failure:

@scheduler.every(5).minutes.retries(5)    # Retry up to 5 times
def flaky_api_call():
    ...

Retries use exponential backoff (2s, 4s, 8s, 16s, ...).

Skip Catch-up

Don't run missed jobs after restart:

@scheduler.every(1).hours.no_catch_up()
def hourly_stats():
    ...

Pause, Resume, and Cancel

# Pause a job (stays in queue but won't execute)
scheduler.pause_job("job_0")

# Resume a paused job
scheduler.resume_job("job_0")

# Cancel and remove a job
scheduler.cancel_job("job_0")

# Cancel all jobs with a specific function name
scheduler.cancel_job_by_name("my_task")

FastAPI Integration

Add a beautiful real-time dashboard to your FastAPI app:

from fastapi import FastAPI
from fastscheduler import FastScheduler
from fastscheduler.fastapi_integration import create_scheduler_routes

app = FastAPI()
scheduler = FastScheduler(quiet=True)

# Add dashboard at /scheduler/
app.include_router(create_scheduler_routes(scheduler))

@scheduler.every(30).seconds
def background_task():
    print("Background work")

scheduler.start()

Dashboard Features

Access at http://localhost:8000/scheduler/

FastScheduler Dashboard

  • Real-time updates via Server-Sent Events (SSE)
  • Job table with status indicators, last 5 run results, and countdown timers
  • Quick actions - Run/Pause/Resume/Cancel directly from the UI
  • Execution history tab with filtering and search
  • Dead letter queue tab - view failed jobs with error details
  • Statistics - Success rate, uptime, active jobs count
  • Toast notifications - Alerts for job completions and failures

API Endpoints

| Endpoint | Method | Description | | ------------------------------------- | ------ | ------------------------------- | | /scheduler/ | GET | Dashboard UI | | /scheduler/api/status | GET | Scheduler status | | /scheduler/api/jobs | GET | List all jobs | | /scheduler/api/jobs/{job_id} | GET | Get specific job | | /scheduler/api/jobs/{job_id}/pause | POST | Pause a job | | /scheduler/api/jobs/{job_id}/resume | POST | Resume a job | | /scheduler/api/jobs/{job_id}/run | POST | Trigger immediate execution | | /scheduler/api/jobs/{job_id}/cancel | POST | Cancel a job | | /scheduler/api/history | GET | Execution history | | /scheduler/api/dead-letters | GET | Dead letter queue (failed jobs) | | /scheduler/api/dead-letters | DELETE | Clear dead letter queue | | /scheduler/events | GET | SSE event stream |

Configuration

scheduler = FastScheduler(
    state_file="scheduler.json",    # Persistence file for JSON backend (default: fastscheduler_state.json)
    storage="json",                 # Storage backend: "json" (default) or "sqlmodel"
    database_url=None,              # Database URL for sqlmodel backend
    quiet=True,                     # Suppress log messages (default: False)
    auto_start=False,               # Start immediately (default: False)
    max_history=5000,               # Max history entries to keep (default: 10000)
    max_workers=20,                 # Concurrent job threads (default: 10)
    history_retention_days=8,       # Delete history older than X days (default: 7)
    max_dead_letters=500,           # Max failed jobs in dead letter queue (default: 500)
)

History Retention

History is automatically cleaned up based on two limits (both are enforced):

  • Count limit: max_history - maximum number of entries
  • Time limit: history_retention_days - maximum age in days

Set history_retention_days=0 to disable time-based cleanup (only count limit applies).

Dead Letter Queue

Failed job executions are automatically stored in a separate dead letter queue for debugging:

# Get failed jobs
dead_letters = scheduler.get_dead_letters(limit=100)

# Clear the queue
scheduler.clear_dead_letters()

The dead letter queue:

  • Stores the last max_dead_letters failed jobs (default: 500)
  • Persists to a separate JSON file (*_dead_letters.json) or database table
  • Includes error messages, timestamps, run counts, and execution times
  • Viewable in the dashboard "Failed" tab

Database Storage

For production workloads requiring transactional integrity and concurrency, use database storage instead of JSON files.

Requires: pip install fastscheduler[database]

SQLite (Recommended for Single-Server)

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="sqlite:///scheduler.db"
)

PostgreSQL (Recommended for Production)

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="postgresql://user:password@localhost:5432/mydb"
)

MySQL

scheduler = FastScheduler(
    storage="sqlmodel",
    database_url="mysql://user:password@localhost:3306/mydb"
)

Custom Storage Backend

Implement your own storage by subclassing StorageBackend:

from fastscheduler.storage import StorageBackend

class MyCustomBackend(StorageBackend):
    def save_state(self, jobs, history, statistics, job_counter, scheduler_running):
        # Your implementation
        ...
    
    def load_state(self):
        # Your implementation
        ...
    
    # Implement other required methods...

scheduler = FastScheduler(storage=MyCustomBackend())

Database Tables

When using SQLModel storage, the following tables are created automatically:

| Table | Purpose | |-------|---------| | scheduler_jobs | Active job definitions | | scheduler_history | Execution history | | scheduler_dead_letters | Failed job records | | `scheduler_m

View on GitHub
GitHub Stars420
CategoryDevelopment
Updated6h ago
Forks16

Languages

Python

Security Score

100/100

Audited on Mar 28, 2026

No findings