SkillAgentSearch skills...

Multitasking

Non-blocking Python methods using decorators

Install / Use

/learn @ranaroussi/Multitasking
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

MultiTasking: Non-blocking Python methods using decorators

|Python version| |PyPi version| |PyPi status| |PyPi downloads| |CodeFactor| |Star this repo| |Follow me on twitter|


MultiTasking is a lightweight Python library that lets you convert your Python methods into asynchronous, non-blocking methods simply by using a decorator. Perfect for I/O-bound tasks, API calls, web scraping, and any scenario where you want to run multiple operations concurrently without the complexity of manual thread or process management.

What’s New in v0.0.12

  • 🎯 Full Type Hint Support: Complete type annotations for better IDE support and code safety
  • 📚 Enhanced Documentation: Comprehensive docstrings and inline comments for better maintainability
  • 🔧 Improved Error Handling: More robust exception handling with specific error types
  • 🚀 Better Performance: Optimized task creation and management logic
  • 🛡️ Code Quality: PEP8 compliant, linter-friendly codebase

Quick Start

.. code:: python

import multitasking import time

@multitasking.task def fetch_data(url_id): # Simulate API call or I/O operation time.sleep(1) return f"Data from {url_id}"

These run concurrently, not sequentially!

for i in range(5): fetch_data(i)

Wait for all tasks to complete

multitasking.wait_for_tasks() print("All data fetched!")

Basic Example

.. code:: python

example.py

import multitasking import time import random import signal

Kill all tasks on ctrl-c (recommended for development)

signal.signal(signal.SIGINT, multitasking.killall)

Or, wait for tasks to finish gracefully on ctrl-c:

signal.signal(signal.SIGINT, multitasking.wait_for_tasks)

@multitasking.task # <== this is all it takes! 🎉 def hello(count): sleep_time = random.randint(1, 10) / 2 print(f"Hello {count} (sleeping for {sleep_time}s)") time.sleep(sleep_time) print(f"Goodbye {count} (slept for {sleep_time}s)")

if name == "main": # Launch 10 concurrent tasks for i in range(10): hello(i + 1)

   # Wait for all tasks to complete
   multitasking.wait_for_tasks()
   print("All tasks completed!")

Output:

.. code:: bash

$ python example.py

Hello 1 (sleeping for 0.5s) Hello 2 (sleeping for 1.0s) Hello 3 (sleeping for 5.0s) Hello 4 (sleeping for 0.5s) Hello 5 (sleeping for 2.5s) Hello 6 (sleeping for 3.0s) Hello 7 (sleeping for 0.5s) Hello 8 (sleeping for 4.0s) Hello 9 (sleeping for 3.0s) Hello 10 (sleeping for 1.0s) Goodbye 1 (slept for 0.5s) Goodbye 4 (slept for 0.5s) Goodbye 7 (slept for 0.5s) Goodbye 2 (slept for 1.0s) Goodbye 10 (slept for 1.0s) Goodbye 5 (slept for 2.5s) Goodbye 6 (slept for 3.0s) Goodbye 9 (slept for 3.0s) Goodbye 8 (slept for 4.0s) Goodbye 3 (slept for 5.0s) All tasks completed!

Advanced Usage

Real-World Examples

Web Scraping with Concurrent Requests:

.. code:: python

import multitasking import requests import signal

signal.signal(signal.SIGINT, multitasking.killall)

@multitasking.task def fetch_url(url): try: response = requests.get(url, timeout=10) print(f"✅ {url}: {response.status_code}") return response.text except Exception as e: print(f"❌ {url}: {str(e)}") return None

Fetch multiple URLs concurrently

urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/status/200", "https://httpbin.org/json" ]

for url in urls: fetch_url(url)

multitasking.wait_for_tasks() print(f"Processed {len(urls)} URLs concurrently!")

Database Operations:

.. code:: python

import multitasking import sqlite3 import time

@multitasking.task def process_batch(batch_id, data_batch): # Simulate database processing conn = sqlite3.connect(f'batch_{batch_id}.db') # ... database operations ... conn.close() print(f"Processed batch {batch_id} with {len(data_batch)} records")

Process multiple data batches concurrently

large_dataset = list(range(1000)) batch_size = 100

for i in range(0, len(large_dataset), batch_size): batch = large_dataset[i:i + batch_size] process_batch(i // batch_size, batch)

multitasking.wait_for_tasks()

Pool Management

MultiTasking uses execution pools to manage concurrent tasks. You can create and configure multiple pools for different types of operations:

.. code:: python

import multitasking

Create a pool for API calls (higher concurrency)

multitasking.createPool("api_pool", threads=20, engine="thread")

Create a pool for CPU-intensive tasks (lower concurrency)

multitasking.createPool("cpu_pool", threads=4, engine="process")

Switch between pools

multitasking.use_tag("api_pool") # Future tasks use this pool

@multitasking.task def api_call(endpoint): # This will use the api_pool pass

Get pool information

pool_info = multitasking.getPool("api_pool") print(f"Pool: {pool_info}") # {'engine': 'thread', 'name': 'api_pool', 'threads': 20}

Task Monitoring

Monitor and control your tasks with built-in functions:

.. code:: python

import multitasking import time

@multitasking.task def long_running_task(task_id): time.sleep(2) print(f"Task {task_id} completed")

Start some tasks

for i in range(5): long_running_task(i)

Monitor active tasks

while multitasking.get_active_tasks(): active_count = len(multitasking.get_active_tasks()) total_count = len(multitasking.get_list_of_tasks()) print(f"Progress: {total_count - active_count}/{total_count} completed") time.sleep(0.5)

print("All tasks finished!")

Configuration & Settings

Thread/Process Limits

The default maximum threads equals the number of CPU cores. You can customize this:

.. code:: python

import multitasking

Set maximum concurrent tasks

multitasking.set_max_threads(10)

Scale based on CPU cores (good rule of thumb for I/O-bound tasks)

multitasking.set_max_threads(multitasking.config["CPU_CORES"] * 5)

Unlimited concurrent tasks (use carefully!)

multitasking.set_max_threads(0)

Execution Engine Selection

Choose between threading and multiprocessing based on your use case:

.. code:: python

import multitasking

For I/O-bound tasks (default, recommended for most cases)

multitasking.set_engine("thread")

For CPU-bound tasks (avoids GIL limitations)

multitasking.set_engine("process")

When to use threads vs processes:

  • Threads (default): Best for I/O-bound tasks like file operations, network requests, database queries
  • Processes: Best for CPU-intensive tasks like mathematical computations, image processing, data analysis

Advanced Pool Configuration

Create specialized pools for different workloads:

.. code:: python

import multitasking

Fast pool for quick API calls

multitasking.createPool("fast_api", threads=50, engine="thread")

CPU pool for heavy computation

multitasking.createPool("compute", threads=2, engine="process")

Unlimited pool for lightweight tasks

multitasking.createPool("unlimited", threads=0, engine="thread")

Get current pool info

current_pool = multitasking.getPool() print(f"Using pool: {current_pool['name']}")

Best Practices

Performance Tips

  1. Choose the right engine: Use threads for I/O-bound tasks, processes for CPU-bound tasks
  2. Tune thread counts: Start with CPU cores × 2-5 for I/O tasks, CPU cores for CPU tasks
  3. Use pools wisely: Create separate pools for different types of operations
  4. Monitor memory usage: Each thread/process consumes memory
  5. Handle exceptions: Always wrap risky operations in try-catch blocks

Error Handling

.. code:: python

import multitasking import requests

@multitasking.task def robust_fetch(url): try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print(f"⏰ Timeout fetching {url}") except requests.exceptions.RequestException as e: print(f"❌ Error fetching {url}: {e}") except Exception as e: print(f"💥 Unexpected error: {e}") return None

Resource Management

.. code:: python

import multitasking import signal

Graceful shutdown on interrupt

def cleanup_handler(signum, frame): print("🛑 Shutting down gracefully...") multitasking.wait_for_tasks() print("✅ All tasks completed") exit(0)

signal.signal(signal.SIGINT, cleanup_handler)

Your application code here...

Troubleshooting

Common Issues

Tasks not running concurrently? Check if you’re calling wait_for_tasks() inside your task loop instead of after it.

High memory usage? Reduce the number of concurrent threads or switch to a process-based engine.

Tasks hanging? Ensure your tasks can complete (avoid infinite loops) and handle exceptions properly.

Import errors? Make sure you’re using Python 3.6+ and have installed the latest version.

Debugging

.. code:: python

import multitasking

Enable task monitoring

active_tasks = multitasking.get_active_tasks() all_tasks = multitasking.get_list_of_ta

Related Skills

View on GitHub
GitHub Stars231
CategoryDevelopment
Updated3d ago
Forks46

Languages

Python

Security Score

100/100

Audited on Mar 25, 2026

No findings