Circuitry
Decouple ruby applications using SNS fanout with SQS processing.
Install / Use
/learn @kapost/CircuitryREADME
Circuitry
Decouple ruby applications using SNS fanout with SQS processing.
A Circuitry publisher application can broadcast events which can be fanned out to any number of SQS queues. This technique is a common approach to implementing an enterprise message bus. For example, applications which care about billing or new user onboarding can react when a user signs up, without the origin web application being concerned with those domains. In this way, new capabilities can be connected to an enterprise system without change proliferation.
Features
What circuitry provides:
- Decoupling: apps can send and receive messages to each other without explicitly coding destinations into your app.
- Fan-out: multiple queues (i.e.: multiple apps) can receive the same message by publishing it a single time.
- Reliability: if your app goes down (intentionally or otherwise), messages will be waiting in the queue whenever it starts up again.
- Speed: because it's built on AWS, message delivery and receipt is fast.
- Duplication: although SQS messages can be delivered multiple times, circuitry safeguards to ensure they're only received by your app once.
- Retries: if a received message fails to be processed, it will be retried (unless otherwise configured).
- Customization: configure your publisher and subscriber to behave the way each app independently expects.
What circuitry does not provide:
- Ordering: messages may not arrive in the order they were sent.
- Scheduling: messages are processed as they're received.
Example
A circuitry-example app is available to quickly try the gem on your own AWS account.
Installation
Add this line to your application's Gemfile:
gem 'circuitry', '~> 3.1'
And then execute:
$ bundle
Or install it yourself as:
$ gem install circuitry
Usage
Circuitry is configured via its configuration object or via circuitry.yml config.
Circuitry.subscriber_config do |c|
c.queue_name = "#{Rails.env}-appname"
c.dead_letter_queue_name = "#{Rails.env}-appname-failures"
c.max_receive_count = 8
c.access_key = 'YOUR_AWS_ACCESS_KEY'
c.secret_key = 'YOUR_AWS_SECRET_KEY'
c.region = 'us-east-1'
c.logger = Rails.logger
c.error_handler = proc do |error|
HoneyBadger.notify(error)
HoneyBadger.flush
end
c.lock_strategy = Circuitry::Locks::Redis.new(url: 'redis://localhost:6379')
c.async_strategy = :thread
c.on_async_exit = proc { Mongoid.disconnect_sessions }
end
Circuitry.publisher_config do |c|
c.access_key = 'YOUR_AWS_ACCESS_KEY'
c.secret_key = 'YOUR_AWS_SECRET_KEY'
c.region = 'us-east-1'
c.logger = Rails.logger
c.error_handler = proc do |error|
HoneyBadger.notify(error)
HoneyBadger.flush
end
c.async_strategy = :batch
end
Many of the advanced options, such as error_handler or async_strategy require this initializer-
style configuration. A simpler option is available via config/circuitry.yml
(or config/circuitry.yml.erb):
---
access_key: "YOUR_AWS_ACCESS_KEY"
secret_key: "YOUR_AWS_SECRET_KEY"
region: "us-east-1"
development:
publisher:
topic_names:
- brandonc-appname-user-create
- brandonc-appname-user-destroy
subscriber:
queue_name: "brandonc-appname"
dead_letter_queue_name: "brandonc-appname-failures"
topic_names:
- brandonc-otherapp-content-create
- brandonc-otherapp-content-destroy
production:
publisher:
topic_names:
- production-appname-user-create
- production-appname-user-destroy
subscriber:
queue_name: "production-appname"
dead_letter_queue_name: "production-appname-failures"
topic_names:
- production-otherapp-content-create
- production-otherapp-content-destroy
Available configuration options for both subscriber and publisher applications include:
access_key: The AWS access key ID that has access to SNS publishing and/or SQS subscribing. (required unless using iam profile)secret_key: The AWS secret access key that has access to SNS publishing and/or SQS subscribing. (required unless using iam profile)use_iam_profile: Whether or not to use an iam profile for authenticating to AWS. This will only work when running your application inside of an AWS instance. AcceptstrueorfalsePlease refer to the AWS Docs for more details about using iam profiles for authentication. (required unless using access and secret keys, default:false)region: The AWS region that your SNS and/or SQS account lives in. (optional, default: "us-east-1")logger: The logger to use for informational output, warnings, and error messages. (optional, default:Logger.new(STDOUT))error_handler: An object that responds tocallwith two arguments: the deserialized message contents and the topic name used when publishing to SNS. (optional, default:nil)async_strategy: One of:fork,:thread, or:batchthat determines how asynchronous publish requests are processed. (optional, default::fork):fork: Forks a detached child process that immediately sends the request.:thread: Creates a new thread that immediately sends the request. Because threads are not guaranteed to complete when the process exits, completion can be ensured by callingCircuitry.flush.:batch: Stores the request in memory to be submitted later. Batched requests must be manually sent by callingCircuitry.flush. Only valid as a publishing strategy
on_async_exit: An object that responds tocall. This is useful for managing shared resources such as database connections that require closing. (optional, default:nil)topic_names: An array of topic names that your application will publish and/or subscribe to. This configuration is only used during provisioning.middleware: A chain of middleware that messages must go through when sent or received. Please refer to the Middleware section for more details regarding this option.aws_options_overrides: A key/value hash of option overrides and additions passed through to theAWS::SQS::Client(for subscriber config) orAWS::SNS::Client(for publisher config)
Available configuration options for subscriber applications include:
queue_name: The name of the SQS queue that your subscriber application will listen to. This queue will be created or configured during provisioning.dead_letter_queue_name: The name of the SQS dead letter queue that will be used after all retries fail. This configuration value is only used during provisioning. (optional, default:<subscriber_queue_name>-failures)lock_strategy- The store used to ensure that no duplicate messages are processed. Please refer to the Lock Strategies section for more details regarding this option. (default:Circuitry::Locks::Memory.new)max_receive_count- The number of times a message will be received by the queue after unsuccessful attempts to process it before it is discarded or added to thedead_letter_queue_namequeue. This configuration value is only used during provisioning. (optional, default: 8)visibility_timeout- A period of time during which Amazon SQS prevents other subscribers from receiving and processing that message (before it is deleted by circuitry after being processed successfully.) This configuration value is only used during provisioning. (optional, default: 1800)
Provisioning
You can automatically provision SQS queues, SNS topics, and the subscriptions between them using
two methods: the circuitry CLI or the rake circuitry:setup task. The rake task will provision the
subscriber queue and publishing topics that are configured within your application.
require 'circuitry/tasks'
Circuitry.subscriber_config do |c|
c.queue_name = 'myapp-production-events'
c.topic_names = ['theirapp-production-stuff-created', 'theirapp-production-stuff-deleted']
end
When provisioning, a dead letter queue is also created using the name "<queue_name>-failures". You can customize the dead letter queue name in your configuration.
Run circuitry help provision for help using CLI provisioning.
Publishing
Publishing is done via the Circuitry.publish method. It accepts a topic name
that represents the SNS topic along with any non-nil object, representing the
data to be serialized. Whatever object is called will have its to_json method
called for serialization.
obj = { foo: 'foo', bar: 'bar' }
Circuitry.publish('any-topic-name', obj)
The publish method also accepts options that impact instantiation of the
Publisher object, which currently includes the following options.
:async- Whether or not publishing should occur in the background. Accepts one of:fork,:thread,:batch,true, orfalse. Passingtrueuses theasync_strategyvalue from the gem configuration. Please refer to the Asynchronous Support section for more details regarding this option. (default:false):timeout- The maximum amount of time in seconds that publishing a message will be attempted before giving up. If the timeout is exceeded, an exception
Related Skills
node-connect
346.8kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
107.6kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
346.8kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
346.8kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
