SkillAgentSearch skills...

Rascal

An advanced RabbitMQ / AMQP client

Install / Use

/learn @onebeyond/Rascal
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Rascal - A RabbitMQ / AMQP Client

Rascal is an advanced RabbitMQ / AMQP client built on amqplib.

NPM version NPM downloads Node.js CI

About

Rascal is an advanced RabbitMQ / AMQP client built on amqplib. One of the best things about amqplib is that it doesn't make assumptions about how you use it. Another is that it doesn't attempt to abstract away AMQP Concepts. As a result the library offers a great deal of control and flexibility, but the onus is on you adopt appropriate patterns and configuration. You need to be aware that:

  • Messages are not persistent by default and will be lost if your broker restarts
  • Messages that crash your app will be infinitely retried
  • Without prefetch a sudden flood of messages may bust your event loop
  • Dropped connections and borked channels will not be automatically recovered
  • Any connection or channel errors are emitted as "error" events. Unless you handle them or use domains these will cause your application to crash
  • If a message is published using a confirm channel, and the broker fails to acknowledge, the flow of execution may be blocked indefinitely

Rascal seeks to either solve these problems, make them easier to deal with or bring them to your attention by adding the following to amqplib

  • Config driven vhosts, exchanges, queues, bindings, producers and consumers
  • Cluster connection support
  • Transparent content parsing
  • Transparent encryption / decryption
  • Automatic reconnection and resubscription
  • Advanced error handling including delayed, limited retries
  • RPC Support
  • Redelivery protection
  • Channel pooling
  • Flow control
  • Publication timeouts
  • Safe defaults
  • Promise and callback support
  • TDD support

Concepts

Rascal extends the existing RabbitMQ Concepts of Brokers, Vhosts, Exchanges, Queues, Channels and Connections with two new ones

  1. Publications
  2. Subscriptions

A publication is a named configuration for publishing a message, including the destination queue or exchange, routing configuration, encryption profile and reliability guarantees, message options, etc. A subscription is a named configuration for consuming messages, including the source queue, encryption profile, content encoding, delivery options (e.g. acknowledgement handling and prefetch), etc. These must be configured and supplied when creating the Rascal broker. After the broker has been created the subscriptions and publications can be retrieved from the broker and used to publish and consume messages.

Breaking Changes

Please refer to the Change Log

Special Note

RabbitMQ 3.8.0 introduced quorum queues. Although quorum queues may not be suitable in all situations, they provide poison message handling without the need for an external redelivery counter and offer better data safety in the event of a network partition. You can read more about them here and here.

Examples

Async/Await

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config');

(async () => {
  try {
    const broker = await Broker.create(config);
    broker.on('error', console.error);

    // Publish a message
    const publication = await broker.publish('demo_publication', 'Hello World!');
    publication.on('error', console.error);

    // Consume a message
    const subscription = await broker.subscribe('demo_subscription');
    subscription
      .on('message', (message, content, ackOrNack) => {
        console.log(content);
        ackOrNack();
      })
      .on('error', console.error);
  } catch (err) {
    console.error(err);
  }
})();

Callbacks

const Broker = require('rascal').Broker;
const config = require('./config');

Broker.create(config, (err, broker) => {
  if (err) throw err;

  broker.on('error', console.error);

  // Publish a message
  broker.publish('demo_publication', 'Hello World!', (err, publication) => {
    if (err) throw err;
    publication.on('error', console.error);
  });

  // Consume a message
  broker.subscribe('demo_subscription', (err, subscription) => {
    if (err) throw err;
    subscription
      .on('message', (message, content, ackOrNack) => {
        console.log(content);
        ackOrNack();
      })
      .on('error', console.error);
  });
});

See here for more examples.

Avoiding Potential Message Loss

There are three situations when Rascal will nack a message without requeue, leading to potential data loss.

  1. When it is unable to parse the message content and the subscriber has no 'invalid_content' listener
  2. When the subscriber's (optional) redelivery limit has been exceeded and the subscriber has neither a 'redeliveries_error' nor a 'redeliveries_exceeded' listener
  3. When attempting to recover by republishing, forwarding, but the recovery operation fails.

The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.

Very Important Section About Event Handling

amqplib emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering.

There are four places where you need to register error handlers.

  1. Immediately after obtaining a broker instance

    broker.on('error', (err, { vhost, connectionUrl }) => {
      console.error('Broker error', err, vhost, connectionUrl);
    });
    
  2. After subscribing to a channel

    // Async/Await
    try {
      const subscription = await broker.subscribe('s1');
      subscription
        .on('message', (message, content, ackOrNack) => {
          // Do stuff with message
        })
        .on('error', (err) => {
          console.error('Subscriber error', err);
        });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    
    // Callbacks
    broker.subscribe('s1', (err, subscription) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      subscription
        .on('message', (message, content, ackOrNack) => {
          // Do stuff with message
        })
        .on('error', (err) => {
          console.error('Subscriber error', err);
        });
    });
    
  3. After publishing a message

    // Async/Await
    try {
      const publication = await broker.publish('p1', 'some text');
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    
    // Callbacks
    broker.publish('p1', 'some text', (err, publication) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    });
    
  4. After forwarding a message

    // Async/Await
    try {
      const publication = await broker.forward('p1', message);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    } catch (err) {
      throw new Error(`Rascal config error: ${err.message}`);
    }
    
    // Callbacks
    broker.forward('p1', message, (err, publication) => {
      if (err) throw new Error(`Rascal config error: ${err.message}`);
      publication.on('error', (err, messageId) => {
        console.error('Publisher error', err, messageId);
      });
    });
    

Other Broker Events

vhost_initialised

The broker emits the vhost_initialised event after recovering from a connection error. An object containing the vhost name and connection url (with obfuscated password) are passed to the event handler. e.g.

broker.on('vhost_initialised', ({ vhost, connectionUrl }) => {
  console.log(`Vhost: ${vhost} was initialised using connection: ${connectionUrl}`);
});

blocked / unblocked

RabbitMQ notifies clients of [blocked and unblocked](https://www.rabbitmq.com/connection-blocked

View on GitHub
GitHub Stars477
CategoryDevelopment
Updated5d ago
Forks71

Languages

JavaScript

Security Score

100/100

Audited on Mar 31, 2026

No findings