Multitasking
Non-blocking Python methods using decorators
Install / Use
/learn @ranaroussi/MultitaskingREADME
MultiTasking: Non-blocking Python methods using decorators
|Python version| |PyPi version| |PyPi status| |PyPi downloads| |CodeFactor| |Star this repo| |Follow me on twitter|
MultiTasking is a lightweight Python library that lets you convert your Python methods into asynchronous, non-blocking methods simply by using a decorator. Perfect for I/O-bound tasks, API calls, web scraping, and any scenario where you want to run multiple operations concurrently without the complexity of manual thread or process management.
✨ What’s New in v0.0.12
- 🎯 Full Type Hint Support: Complete type annotations for better IDE support and code safety
- 📚 Enhanced Documentation: Comprehensive docstrings and inline comments for better maintainability
- 🔧 Improved Error Handling: More robust exception handling with specific error types
- 🚀 Better Performance: Optimized task creation and management logic
- 🛡️ Code Quality: PEP8 compliant, linter-friendly codebase
Quick Start
.. code:: python
import multitasking import time
@multitasking.task def fetch_data(url_id): # Simulate API call or I/O operation time.sleep(1) return f"Data from {url_id}"
These run concurrently, not sequentially!
for i in range(5): fetch_data(i)
Wait for all tasks to complete
multitasking.wait_for_tasks() print("All data fetched!")
Basic Example
.. code:: python
example.py
import multitasking import time import random import signal
Kill all tasks on ctrl-c (recommended for development)
signal.signal(signal.SIGINT, multitasking.killall)
Or, wait for tasks to finish gracefully on ctrl-c:
signal.signal(signal.SIGINT, multitasking.wait_for_tasks)
@multitasking.task # <== this is all it takes! 🎉 def hello(count): sleep_time = random.randint(1, 10) / 2 print(f"Hello {count} (sleeping for {sleep_time}s)") time.sleep(sleep_time) print(f"Goodbye {count} (slept for {sleep_time}s)")
if name == "main": # Launch 10 concurrent tasks for i in range(10): hello(i + 1)
# Wait for all tasks to complete
multitasking.wait_for_tasks()
print("All tasks completed!")
Output:
.. code:: bash
$ python example.py
Hello 1 (sleeping for 0.5s) Hello 2 (sleeping for 1.0s) Hello 3 (sleeping for 5.0s) Hello 4 (sleeping for 0.5s) Hello 5 (sleeping for 2.5s) Hello 6 (sleeping for 3.0s) Hello 7 (sleeping for 0.5s) Hello 8 (sleeping for 4.0s) Hello 9 (sleeping for 3.0s) Hello 10 (sleeping for 1.0s) Goodbye 1 (slept for 0.5s) Goodbye 4 (slept for 0.5s) Goodbye 7 (slept for 0.5s) Goodbye 2 (slept for 1.0s) Goodbye 10 (slept for 1.0s) Goodbye 5 (slept for 2.5s) Goodbye 6 (slept for 3.0s) Goodbye 9 (slept for 3.0s) Goodbye 8 (slept for 4.0s) Goodbye 3 (slept for 5.0s) All tasks completed!
Advanced Usage
Real-World Examples
Web Scraping with Concurrent Requests:
.. code:: python
import multitasking import requests import signal
signal.signal(signal.SIGINT, multitasking.killall)
@multitasking.task def fetch_url(url): try: response = requests.get(url, timeout=10) print(f"✅ {url}: {response.status_code}") return response.text except Exception as e: print(f"❌ {url}: {str(e)}") return None
Fetch multiple URLs concurrently
urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/status/200", "https://httpbin.org/json" ]
for url in urls: fetch_url(url)
multitasking.wait_for_tasks() print(f"Processed {len(urls)} URLs concurrently!")
Database Operations:
.. code:: python
import multitasking import sqlite3 import time
@multitasking.task def process_batch(batch_id, data_batch): # Simulate database processing conn = sqlite3.connect(f'batch_{batch_id}.db') # ... database operations ... conn.close() print(f"Processed batch {batch_id} with {len(data_batch)} records")
Process multiple data batches concurrently
large_dataset = list(range(1000)) batch_size = 100
for i in range(0, len(large_dataset), batch_size): batch = large_dataset[i:i + batch_size] process_batch(i // batch_size, batch)
multitasking.wait_for_tasks()
Pool Management
MultiTasking uses execution pools to manage concurrent tasks. You can create and configure multiple pools for different types of operations:
.. code:: python
import multitasking
Create a pool for API calls (higher concurrency)
multitasking.createPool("api_pool", threads=20, engine="thread")
Create a pool for CPU-intensive tasks (lower concurrency)
multitasking.createPool("cpu_pool", threads=4, engine="process")
Switch between pools
multitasking.use_tag("api_pool") # Future tasks use this pool
@multitasking.task def api_call(endpoint): # This will use the api_pool pass
Get pool information
pool_info = multitasking.getPool("api_pool") print(f"Pool: {pool_info}") # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}
Task Monitoring
Monitor and control your tasks with built-in functions:
.. code:: python
import multitasking import time
@multitasking.task def long_running_task(task_id): time.sleep(2) print(f"Task {task_id} completed")
Start some tasks
for i in range(5): long_running_task(i)
Monitor active tasks
while multitasking.get_active_tasks(): active_count = len(multitasking.get_active_tasks()) total_count = len(multitasking.get_list_of_tasks()) print(f"Progress: {total_count - active_count}/{total_count} completed") time.sleep(0.5)
print("All tasks finished!")
Configuration & Settings
Thread/Process Limits
The default maximum threads equals the number of CPU cores. You can customize this:
.. code:: python
import multitasking
Set maximum concurrent tasks
multitasking.set_max_threads(10)
Scale based on CPU cores (good rule of thumb for I/O-bound tasks)
multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)
Unlimited concurrent tasks (use carefully!)
multitasking.set_max_threads(0)
Execution Engine Selection
Choose between threading and multiprocessing based on your use case:
.. code:: python
import multitasking
For I/O-bound tasks (default, recommended for most cases)
multitasking.set_engine("thread")
For CPU-bound tasks (avoids GIL limitations)
multitasking.set_engine("process")
When to use threads vs processes:
- Threads (default): Best for I/O-bound tasks like file operations, network requests, database queries
- Processes: Best for CPU-intensive tasks like mathematical computations, image processing, data analysis
Advanced Pool Configuration
Create specialized pools for different workloads:
.. code:: python
import multitasking
Fast pool for quick API calls
multitasking.createPool("fast_api", threads=50, engine="thread")
CPU pool for heavy computation
multitasking.createPool("compute", threads=2, engine="process")
Unlimited pool for lightweight tasks
multitasking.createPool("unlimited", threads=0, engine="thread")
Get current pool info
current_pool = multitasking.getPool() print(f"Using pool: {current_pool['name']}")
Best Practices
Performance Tips
- Choose the right engine: Use threads for I/O-bound tasks, processes for CPU-bound tasks
- Tune thread counts: Start with CPU cores × 2-5 for I/O tasks, CPU cores for CPU tasks
- Use pools wisely: Create separate pools for different types of operations
- Monitor memory usage: Each thread/process consumes memory
- Handle exceptions: Always wrap risky operations in try-catch blocks
Error Handling
.. code:: python
import multitasking import requests
@multitasking.task def robust_fetch(url): try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print(f"⏰ Timeout fetching {url}") except requests.exceptions.RequestException as e: print(f"❌ Error fetching {url}: {e}") except Exception as e: print(f"💥 Unexpected error: {e}") return None
Resource Management
.. code:: python
import multitasking import signal
Graceful shutdown on interrupt
def cleanup_handler(signum, frame): print("🛑 Shutting down gracefully...") multitasking.wait_for_tasks() print("✅ All tasks completed") exit(0)
signal.signal(signal.SIGINT, cleanup_handler)
Your application code here...
Troubleshooting
Common Issues
Tasks not running concurrently? Check if you’re calling
wait_for_tasks() inside your task loop instead of after it.
High memory usage? Reduce the number of concurrent threads or switch to a process-based engine.
Tasks hanging? Ensure your tasks can complete (avoid infinite loops) and handle exceptions properly.
Import errors? Make sure you’re using Python 3.6+ and have installed the latest version.
Debugging
.. code:: python
import multitasking
Enable task monitoring
active_tasks = multitasking.get_active_tasks() all_tasks = multitasking.get_list_of_ta
Related Skills
node-connect
339.3kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
claude-opus-4-5-migration
83.9kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
frontend-design
83.9kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
model-usage
339.3kUse CodexBar CLI local cost usage to summarize per-model usage for Codex or Claude, including the current (most recent) model or a full model breakdown. Trigger when asked for model-level usage/cost data from codexbar, or when you need a scriptable per-model summary from codexbar cost JSON.
