SkillAgentSearch skills...

Grpcqueue

GRPCQueue transforms synchronous gRPC calls into asynchronous queue-based messages, enabling reliable fire-and-forget RPC patterns with any message queue backend.

Install / Use

/learn @Aryon-Security/Grpcqueue
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

GRPCQueue - Async gRPC over Message Queues

<p align="center"> <img src="assets/gopher.png" alt="GRPCQueue - Async gRPC over Message Queues" width="600"> </p> <p align="center"> <a href="https://github.com/Aryon-Security/grpcqueue/actions/workflows/ci.yml"><img src="https://github.com/Aryon-Security/grpcqueue/actions/workflows/ci.yml/badge.svg" alt="CI"></a> <a href="https://pkg.go.dev/github.com/Aryon-Security/grpcqueue"><img src="https://pkg.go.dev/badge/github.com/Aryon-Security/grpcqueue.svg" alt="Go Reference"></a> <a href="https://goreportcard.com/report/github.com/Aryon-Security/grpcqueue"><img src="https://goreportcard.com/badge/github.com/Aryon-Security/grpcqueue" alt="Go Report Card"></a> </p>

GRPCQueue transforms synchronous gRPC calls into asynchronous queue-based messages, enabling reliable fire-and-forget RPC patterns with any message queue backend.

Overview

GRPCQueue allows you to:

  • Make gRPC calls that are queued instead of executed immediately
  • Use any message queue (SQS, RabbitMQ, Kafka, Redis) as transport
  • Maintain standard gRPC interfaces and generated client code
  • Add reliability, buffering, and decoupling to microservices
  • Process messages concurrently with configurable workers

Inspiration: Based on ppg/grpc-queue

Features

  • Transport Agnostic: Works with any message queue via pluggable Transport interface
  • Standard gRPC: Use generated client code without modifications
  • FIFO Support: Optional message grouping for ordered processing
  • Interceptors: Full support for gRPC client and server interceptors (auth, logging, tracing)
  • Error Handling: Automatic Ack/Nack with retry support
  • Concurrency: Process multiple messages in parallel with errgroup
  • Type Safety: Uses Protocol Buffers for message serialization

Usage

Producer (Client Side)

The Producer implements grpc.ClientConnInterface, making it a drop-in replacement for grpc.ClientConn.

import (
    "github.com/Aryon-Security/grpcqueue"
    pb "your/proto/package"
)

// Create transport that implements grpcqueue.Transport
transport := myCustomTransport

// Create producer with options
producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithInterceptor(yourClientInterceptor),
    grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
        // Extract grouping key for FIFO queues
        return extractTenantID(msg), nil
    }),
)
defer producer.Close()

// Use with generated gRPC client (no code changes needed!)
client := pb.NewYourServiceClient(producer)
err := client.YourMethod(ctx, &pb.YourRequest{...})

Consumer (Server Side)

The Consumer implements grpc.ServiceRegistrar, allowing you to register standard gRPC service implementations.

import (
    "github.com/Aryon-Security/grpcqueue"
    pb "your/proto/package"
)

// Create transport
transport := myCustomTransport

// Create consumer with options
consumer := grpcqueue.NewConsumer(
    transport,
    grpcqueue.WithServerInterceptor(yourServerInterceptor),
)

// Register your gRPC service implementation (standard code!)
pb.RegisterYourServiceServer(consumer, &YourServiceImpl{})

// Start consuming messages
ctx := context.Background()
if err := consumer.Serve(ctx); err != nil {
    log.Fatal(err)
}

Transport Interface

GRPCQueue uses a simple, pluggable Transport interface:

type Transport interface {
    Sender
    Receiver
    io.Closer
}

type Sender interface {
    Send(ctx context.Context, msgs ...*Message) error
}

type Receiver interface {
    Receive(ctx context.Context) (<-chan *Message, error)
}

Custom Transports

Implement the Sender and Receiver interfaces for custom message queues:

type MyCustomTransport struct {
    // your queue client
}

func (t *MyCustomTransport) Send(ctx context.Context, msgs ...*grpcqueue.Message) error {
    // Send messages to your queue
    return nil
}

func (t *MyCustomTransport) Receive(ctx context.Context) (<-chan *grpcqueue.Message, error) {
    // Return channel that emits messages
    ch := make(chan *grpcqueue.Message)
    go t.pollMessages(ctx, ch)
    return ch, nil
}

func (t *MyCustomTransport) Close() error {
    // Cleanup
    return nil
}

Message Format

Messages are serialized as JSON using Protocol Buffers' standard JSON mapping:

{
  "@type": "type.googleapis.com/infra.QueueItem",
  "service": "pkg.YourService",
  "method": "YourMethod",
  "payload": {
    "@type": "type.googleapis.com/pkg.YourRequest",
    "field1": "value1",
    "field2": 42
  }
}

The payload uses google.protobuf.Any for type-safe deserialization.

FIFO Queues and Message Grouping

For FIFO queues (like AWS SQS FIFO), use WithMessageGroupIDExtractor:

producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
        // Group by tenant for per-tenant ordering
        if req, ok := msg.(*pb.TenantRequest); ok {
            return req.TenantId, nil
        }
        return "", nil
    }),
)

The group ID is set as metadata with key x-message-group-id (AWS SQS compatible).

Interceptors

GRPCQueue fully supports gRPC interceptors for cross-cutting concerns:

// Client interceptor (auth, retries, logging)
clientInterceptor := func(ctx context.Context, method string, req, reply interface{},
    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    // Add authentication, logging, etc.
    log.Printf("Sending %s", method)
    return invoker(ctx, method, req, reply, cc, opts...)
}

producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithInterceptor(clientInterceptor),
)

// Server interceptor (auth validation, logging, metrics)
serverInterceptor := func(ctx context.Context, req interface{},
    info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // Validate auth, log requests, etc.
    log.Printf("Processing %s", info.FullMethod)
    return handler(ctx, req)
}

consumer := grpcqueue.NewConsumer(
    transport,
    grpcqueue.WithServerInterceptor(serverInterceptor),
)

Metadata is automatically propagated through the queue.

Error Handling

  • Producer: Returns errors immediately if queue publish fails
  • Consumer: Automatically Nacks failed messages for retry
  • Panics: Recovered and converted to Nacks
  • Context Cancellation: Gracefully stops processing in-flight messages
func (s *YourService) YourMethod(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    // If this returns an error, message is Nacked for retry
    // If this panics, message is Nacked
    // If context is canceled, processing stops gracefully
    return processRequest(req)
}

Testing

Create an in-memory transport for testing:

// Create in-memory transport
transport := NewInMemoryTransport()

// Use in tests
producer := grpcqueue.NewProducer(transport)
consumer := grpcqueue.NewConsumer(transport)

Limitations

  • Unary RPCs Only: Streaming RPCs are not supported (returns error)
  • Fire-and-Forget: Producer doesn't receive responses (replies are ignored)
  • No Load Balancing: Use queue consumer groups for horizontal scaling
  • Message Size: Limited by underlying queue.

Performance Considerations

  • Concurrency: Consumer uses errgroup to process messages in parallel
  • Batching: Some transports support batch sends (check transport docs)
  • Latency: Adds queue latency (~10-100ms depending on transport)
  • Throughput: Limited by queue throughput (SQS: 3000 msg/s, Kafka: much higher)

Dependencies

  • google.golang.org/grpc - gRPC framework
  • google.golang.org/protobuf - Protocol Buffers
  • golang.org/x/sync/errgroup - Concurrent message processing

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License

View on GitHub
GitHub Stars30
CategoryDevelopment
Updated1mo ago
Forks0

Languages

Go

Security Score

95/100

Audited on Feb 26, 2026

No findings