Kiner
Python AWS Kinesis Producer with error handling and thread support.
Install / Use
/learn @bufferapp/KinerREADME
Features
- Error handling and retrying with exponential backoff
- Automatic batching and flush callbacks
- Threaded execution
Inspired by the AWS blog post Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library.
Installation
You can use pip to install Kiner.
pip install kiner
Usage
To use Kiner, you'll need to have AWS authentication credentials configured
as stated in the boto3 documentation
from kiner.producer import KinesisProducer
p = KinesisProducer('stream-name', batch_size=500, max_retries=5, threads=10)
for i in range(10000):
p.put_record(i)
p.close()
To be notified when data is flushed to AWS Kinesis, provide a flush_callback
from uuid import uuid4
from kiner.producer import KinesisProducer
def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
print(f"""
Flushed {count} messages at timestamp {last_flushed_at}
Last message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)
""")
p = KinesisProducer('stream-name', flush_callback=on_flush)
for i in range(10000):
p.put_record(i, metadata={'id': uuid4()}, partition_key=f"{i % 2}")
p.close()
Contributions
- Logo design by @area55git
Related Skills
tmux
342.0kRemote-control tmux sessions for interactive CLIs by sending keystrokes and scraping pane output.
claude-opus-4-5-migration
84.7kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
blogwatcher
342.0kMonitor blogs and RSS/Atom feeds for updates using the blogwatcher CLI.
LibreChat
35.1kEnhanced ChatGPT Clone: Features Agents, MCP, DeepSeek, Anthropic, AWS, OpenAI, Responses API, Azure, Groq, o1, GPT-5, Mistral, OpenRouter, Vertex AI, Gemini, Artifacts, AI model switching, message search, Code Interpreter, langchain, DALL-E-3, OpenAPI Actions, Functions, Secure Multi-User Auth, Presets, open-source for self-hosting. Active.
