Grpcqueue
GRPCQueue transforms synchronous gRPC calls into asynchronous queue-based messages, enabling reliable fire-and-forget RPC patterns with any message queue backend.
Install / Use
/learn @Aryon-Security/GrpcqueueREADME
GRPCQueue - Async gRPC over Message Queues
<p align="center"> <img src="assets/gopher.png" alt="GRPCQueue - Async gRPC over Message Queues" width="600"> </p> <p align="center"> <a href="https://github.com/Aryon-Security/grpcqueue/actions/workflows/ci.yml"><img src="https://github.com/Aryon-Security/grpcqueue/actions/workflows/ci.yml/badge.svg" alt="CI"></a> <a href="https://pkg.go.dev/github.com/Aryon-Security/grpcqueue"><img src="https://pkg.go.dev/badge/github.com/Aryon-Security/grpcqueue.svg" alt="Go Reference"></a> <a href="https://goreportcard.com/report/github.com/Aryon-Security/grpcqueue"><img src="https://goreportcard.com/badge/github.com/Aryon-Security/grpcqueue" alt="Go Report Card"></a> </p>GRPCQueue transforms synchronous gRPC calls into asynchronous queue-based messages, enabling reliable fire-and-forget RPC patterns with any message queue backend.
Overview
GRPCQueue allows you to:
- Make gRPC calls that are queued instead of executed immediately
- Use any message queue (SQS, RabbitMQ, Kafka, Redis) as transport
- Maintain standard gRPC interfaces and generated client code
- Add reliability, buffering, and decoupling to microservices
- Process messages concurrently with configurable workers
Inspiration: Based on ppg/grpc-queue
Features
- Transport Agnostic: Works with any message queue via pluggable Transport interface
- Standard gRPC: Use generated client code without modifications
- FIFO Support: Optional message grouping for ordered processing
- Interceptors: Full support for gRPC client and server interceptors (auth, logging, tracing)
- Error Handling: Automatic Ack/Nack with retry support
- Concurrency: Process multiple messages in parallel with errgroup
- Type Safety: Uses Protocol Buffers for message serialization
Usage
Producer (Client Side)
The Producer implements grpc.ClientConnInterface, making it a drop-in replacement for grpc.ClientConn.
import (
"github.com/Aryon-Security/grpcqueue"
pb "your/proto/package"
)
// Create transport that implements grpcqueue.Transport
transport := myCustomTransport
// Create producer with options
producer := grpcqueue.NewProducer(
transport,
grpcqueue.WithInterceptor(yourClientInterceptor),
grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
// Extract grouping key for FIFO queues
return extractTenantID(msg), nil
}),
)
defer producer.Close()
// Use with generated gRPC client (no code changes needed!)
client := pb.NewYourServiceClient(producer)
err := client.YourMethod(ctx, &pb.YourRequest{...})
Consumer (Server Side)
The Consumer implements grpc.ServiceRegistrar, allowing you to register standard gRPC service implementations.
import (
"github.com/Aryon-Security/grpcqueue"
pb "your/proto/package"
)
// Create transport
transport := myCustomTransport
// Create consumer with options
consumer := grpcqueue.NewConsumer(
transport,
grpcqueue.WithServerInterceptor(yourServerInterceptor),
)
// Register your gRPC service implementation (standard code!)
pb.RegisterYourServiceServer(consumer, &YourServiceImpl{})
// Start consuming messages
ctx := context.Background()
if err := consumer.Serve(ctx); err != nil {
log.Fatal(err)
}
Transport Interface
GRPCQueue uses a simple, pluggable Transport interface:
type Transport interface {
Sender
Receiver
io.Closer
}
type Sender interface {
Send(ctx context.Context, msgs ...*Message) error
}
type Receiver interface {
Receive(ctx context.Context) (<-chan *Message, error)
}
Custom Transports
Implement the Sender and Receiver interfaces for custom message queues:
type MyCustomTransport struct {
// your queue client
}
func (t *MyCustomTransport) Send(ctx context.Context, msgs ...*grpcqueue.Message) error {
// Send messages to your queue
return nil
}
func (t *MyCustomTransport) Receive(ctx context.Context) (<-chan *grpcqueue.Message, error) {
// Return channel that emits messages
ch := make(chan *grpcqueue.Message)
go t.pollMessages(ctx, ch)
return ch, nil
}
func (t *MyCustomTransport) Close() error {
// Cleanup
return nil
}
Message Format
Messages are serialized as JSON using Protocol Buffers' standard JSON mapping:
{
"@type": "type.googleapis.com/infra.QueueItem",
"service": "pkg.YourService",
"method": "YourMethod",
"payload": {
"@type": "type.googleapis.com/pkg.YourRequest",
"field1": "value1",
"field2": 42
}
}
The payload uses google.protobuf.Any for type-safe deserialization.
FIFO Queues and Message Grouping
For FIFO queues (like AWS SQS FIFO), use WithMessageGroupIDExtractor:
producer := grpcqueue.NewProducer(
transport,
grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
// Group by tenant for per-tenant ordering
if req, ok := msg.(*pb.TenantRequest); ok {
return req.TenantId, nil
}
return "", nil
}),
)
The group ID is set as metadata with key x-message-group-id (AWS SQS compatible).
Interceptors
GRPCQueue fully supports gRPC interceptors for cross-cutting concerns:
// Client interceptor (auth, retries, logging)
clientInterceptor := func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Add authentication, logging, etc.
log.Printf("Sending %s", method)
return invoker(ctx, method, req, reply, cc, opts...)
}
producer := grpcqueue.NewProducer(
transport,
grpcqueue.WithInterceptor(clientInterceptor),
)
// Server interceptor (auth validation, logging, metrics)
serverInterceptor := func(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Validate auth, log requests, etc.
log.Printf("Processing %s", info.FullMethod)
return handler(ctx, req)
}
consumer := grpcqueue.NewConsumer(
transport,
grpcqueue.WithServerInterceptor(serverInterceptor),
)
Metadata is automatically propagated through the queue.
Error Handling
- Producer: Returns errors immediately if queue publish fails
- Consumer: Automatically Nacks failed messages for retry
- Panics: Recovered and converted to Nacks
- Context Cancellation: Gracefully stops processing in-flight messages
func (s *YourService) YourMethod(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// If this returns an error, message is Nacked for retry
// If this panics, message is Nacked
// If context is canceled, processing stops gracefully
return processRequest(req)
}
Testing
Create an in-memory transport for testing:
// Create in-memory transport
transport := NewInMemoryTransport()
// Use in tests
producer := grpcqueue.NewProducer(transport)
consumer := grpcqueue.NewConsumer(transport)
Limitations
- Unary RPCs Only: Streaming RPCs are not supported (returns error)
- Fire-and-Forget: Producer doesn't receive responses (replies are ignored)
- No Load Balancing: Use queue consumer groups for horizontal scaling
- Message Size: Limited by underlying queue.
Performance Considerations
- Concurrency: Consumer uses errgroup to process messages in parallel
- Batching: Some transports support batch sends (check transport docs)
- Latency: Adds queue latency (~10-100ms depending on transport)
- Throughput: Limited by queue throughput (SQS: 3000 msg/s, Kafka: much higher)
Dependencies
google.golang.org/grpc- gRPC frameworkgoogle.golang.org/protobuf- Protocol Buffersgolang.org/x/sync/errgroup- Concurrent message processing
Contributing
See CONTRIBUTING.md for guidelines.
