Production-grade communication via AMQP for distributed, eventually consistent systems running on Node.js.
- Dynamic topology
- Request-reply (RPC)
- Events (pub/sub)
- Tasks
- Pipelines
- Reply streams
- Content encoding
- Flow control and back pressure handling
- Consumer acknowledgments and publisher confirms
- Poison message handling
- Connection tolerance and broker restart resilience
- Sharded connection 🚀
- Singleton connection
- Graceful shutdown
CommonJS, ECMAScript, and TypeScript compatible (types included).
npm i comq
async connect(url: string): IO
Returns an instance of IO
once a successful connection to the broker is
established.
url
is passed
to amqplib.connect
.
import { connect } from 'comq'
const url = 'amqp://developer:secret@localhost'
const io = await connect(url)
// ...
await io.close()
The following documentation refers to a few terms:
Request is an AMQP message that is sent to a queue and has the replyTo
and correlationId
properties set.
Reply is an AMQP message sent in response to a Request and sent to the queue specified in
the replyTo
property of the Request. The correlationId
property of the Reply is set to the same
value as in the Request.
Event is an AMQP message published to an exchange.
Task is an AMQP message sent to a queue without a replyTo
property set.
Producer is an application role that receives Requests and Tasks, and produces Replies and Events.
Consumer is an application role that sends Requests and Tasks, and consumes Replies and Events.
async IO.reply(queue: string, producer): void
producer
function's signature is async? (message: any): any
Assert a queue
and start consuming Requests. Received messages are decoded and the resulting
content is passed to the producer
. The result returned by the producer
is then encoded and sent
back to the queue specified in the replyTo
property of the Request, along with a correlationId
that has the same value as in the Request.
The Reply message is encoded using the same encoding format as the Request message, unless the
producer
function returns a Buffer
. In that case, the encoding format will be set to
application/octet-stream
. If the encoding format of the Request message is set to
application/octet-stream
and the producer
function returns something other than a Buffer
, an
exception will be thrown.
The
replyTo
queue is not asserted, as it is expected to be done by the Consumer.
If the incoming message does not have a
replyTo
property, the result of theproducer
is ignored.
await io.reply('add_numbers', ({ a, b }) => (a + b))
async IO.request(queue: string, payload: any, encoding?: string): any
Send encoded Request message with replyTo
and correlationId
properties set and
return decoded Reply content.
On the initial call, queues for Requests and Replies are asserted.
const sum = await io.request('add_numbers', { a: 1, b: 2 })
async IO.consume(exchange: string, group?: string, consumer): void
consumer
function's signature is async? (payload: any): void
Start consuming decoded Events.
Asserts fanout exchange
(once per unique exchange
) and the queue for the Consumer group
(once
per unique exchange
and group
pair), and then binds the queue to the exchange. That is, one
Event message is delivered to a single Consumer within each group.
Typically, the value of
group
refers to the name of a microservice running in multiple instances.
If the group
is undefined
or omitted, a queue for the Consumer is asserted as exclusive with
auto-generated name.
// with a consumer function
await io.consume('numbers_added', 'logger',
({ a, b }) => console.log(`${a} was added to ${b}`))
async IO.emit(exchange: string, payload: any, encoding?: string): void
Publish encoded Event to the exchange
.
On the initial call, a fanout exchange is asserted.
await io.emit('numbers_added', { a: 1, b: 2 })
async IO.enqueue(queue: string, payload: any, encoding?: string): void
Publish encoded Task to the queue
.
On the initial call, the queue
is asserted on the Events channel using Event topology.
async IO.process(queue: string, processor): void
processor
function's signature is async? (payload: any): void
Process decoded Task from the queue
.
The queue
is asserted on the Events channel using Event topology.
Payloads for Requests, Events and Tasks can be passed as a readable stream in object mode, enabling the handling of large amounts of data with the benefits of RabbitMQ back pressure and flow control.
async IO.request(queue: string, stream: Readable, encoding?: string): Readable
Returns a readable stream of replies.
async IO.emit(exchange: string, stream: Readable, encoding?: string): void
async IO.enqueue(queue: string, stream: Readable, encoding?: string): void
function * generate () {
yield { a: 1, b: 2 };
yield { a: 3, b: 4 };
}
const events = Readable.from(generate())
await io.emit('numbers_added', events)
const tasks = Readable.from(generate())
await io.enqueue('add_numbers', tasks)
const requests = Readable.from(generate())
for await (const reply of io.request('add_numbers', requests))
console.log(reply)
The producer
function of IO.reply
may return a non-array
(Async)Iterator.
In this case, the yielded values will be sent to the replyTo
queue until the iterator is finished,
or a cancellation message is received, or the replyTo
queue is deleted.
await io.reply('get_numbers', function * ({ amount }) {
for (let i = 0; i < amount; i++) yield i
})
The Reply stream may be consumed by using the IO.request
function:
const stream = await io.request('get_numbers', { amount: 10 })
for await (const number of stream)
console.log(number)
When the producer function of IO.reply
returns an Iterator for the first time across all request queues,
a control queue is asserted on the Reply channel
using the reply topology.
When the Consumer destroys the Reply stream, a stream cancellation message is sent to the Producer's control queue.
Upon receiving a request, the Producer sends a confirmation message to the replyTo
queue.
If the underlying connection is lost before the Consumer receives the confirmation message,
the request will be retransmitted upon reconnection.
A heartbeat message is sent to the replyTo
queue whenever a Reply stream idles for 5 seconds.
If the Consumer of the reply stream doesn't receive a reply or a heartbeat message for 12 seconds, the stream returned
by IO.request
is destroyed.
These intervals are not configurable.
An "end stream" message is sent to the replyTo
queue when the Reply stream is finished.
See also Reply stream shutdown.
While consuming the Reply stream if the broker connection is lost,
or if the Consumer crashes or destroys the stream returned by IO.request
,
some of the values yielded by the Reply stream may be lost.
To avoid inconsistency, it is strongly recommended to use the Reply stream only with safe Producers, which do not change the application state.
The reply topology guarantees that the order of yielded values is preserved. At the same time, there is no guarantee that the stream will be transmitted to the end.
When using the Sharded connection, the order of yielded values is maintained through buffering. However, there is a scenario in which some of the yielded values may be lost if a broker crashes. In this case, the Reply stream will be destroyed once the buffer's maximum size is exceeded. Also, buffered control messages can result in stream idling.
By default, outgoing message contents are encoded with JSON and
the contentType
property is set to application/json
.
If the encoding format is specified (request, emit), contents are encoded accordingly.
Exceptions are Buffers, which are sent without encoding and the contentType
property set
to specified encoding format or application/octet-stream
by default.
Incoming messages are decoded based on the presence and value of the contentType
property. If the
property is present, the message is decoded. If the header is missing or its value
is application/octet-stream
, the message is passed as a raw Buffer object.
If the specified encoding format is not supported, an exception will be thrown.
The following encoding formats are supported:
application/json
application/msgpack
application/octet-stream
text/plain
When back pressure is applied to a channel or the
underlying broker connection is lost, any current and future outgoing messages will be paused.
Corresponding returned promises will remain in a pending
state until the pressure is removed or
the connection is restored.
When the established connection is lost, it will be automatically restored. Reconnection attempts will be made indefinitely, with intervals increasing up to 30 seconds. If the broker rejects the connection, for example, due to access being denied, an exception will be thrown. Once reconnected, the topology will be recovered, and any unanswered requests and unconfirmed events will be retransmitted.
Send to one, receive from all.
A sharded connection is a mechanism that uses multiple connections simultaneously to achieve load balancing and mitigate failover scenarios, utilizing a set of broker instances that are not combined into a cluster.
Outgoing messages are sent to a single connection chosen at random from the shard pool. Shards that lose their underlying connection or experience channel back pressure on a corresponding channel are removed from the pool until the issue is resolved. Pending messages meeting these conditions are immediately routed among the remaining shards in the pool. If no shards are available, messages will wait until a shard's connection is re-established.
Incoming messages are consumed from all shards.
async connect(...shards: string[]): IO
Returns an instance of IO
once a successful connection to one of the shards is established.
const shard0 = 'amqp://developer:secret@localhost:5673'
const shard1 = 'amqp://developer:secret@localhost:5674'
const io = await connect(shard0, shard1)
// ...
await io.close()
async assert(url: string): IO
Similar to connect
, but it utilizes shared underlying connections.
The connection is established once per unique url
among instances of IO
created with assert
,
and it will be closed when the last instance of IO
using that connection is disconnected.
Sharded connections are also supported.
async assert(...shards: string[]): IO
Topology is designed to deliver maximum performance while ensuring that the at least once guarantee provided by RabbitMQ is maintained.
Static topology refers to the process of defining the complete topology declaration along with the code that uses it. While this approach may provide a clear and comprehensive view of the system's architecture, it can be prone to duplication of effort. Moreover, some topologies are inherently dynamic, such as those that depend on runtime data like incoming messages, making static topology impossible or hard to maintain. The tradeoff of potentially encountering runtime topology declaration exceptions, which are more likely to happen during development, is deemed acceptable.
IO
lazy creates individual channels for Requests, Replies, and Events.
- Prefetch count for incoming
Requests and Events are separated. Each is set to
300
(currently non-configurable). - Incoming Replies have no prefetch limit.
- Outgoing Events are transmitted using confirmation mechanism.
Channel segregation addresses the potential issue of a prefetch deadlock1, which may take place when using a single channel or channel pool.
- Exchanges and queues for Events, and queues for Requests are durable.
- Queues for Replies are exclusive and auto deleted.
- Events are persistent (delivery mode 2), while Requests and Replies are not (mode 1).
- Events and Requests are consumed using manual acknowledgment mode, and Replies are consumed using automatic mode.
If an incoming message causes an exception, the corresponding channel is sealed, the message is republished, and the exception is thrown. If the message causes exceptions five times in a row, it is discarded.
It is highly recommended to set up a dead letter exchange policy to analyze messages that caused exceptions. Note that in some cases, if the problematic message is a Request, a Consumer will never receive a Reply, and this can result in a prefetch deadlock of a Consumer.
See:
- Consumer Acknowledgments and Publisher Confirms
- Negative Acknowledgment and Requeuing of Deliveries
- Dead Letter Exchanges
Message | Prefetch | Confirms | Queue | Acknowledgment | Persistent |
---|---|---|---|---|---|
Request | limited | no | durable | manual | no |
Reply | unlimited | no | exclusive | automatic | no |
Event | limited | yes | durable | manual | yes |
async IO.seal(): void
Stop receiving new Events and Requests. Sending Requests, receiving Replies, and emitting Events will still be available.
async IO.close(): void
- Call
IO.seal()
. - Wait for any outstanding messages to be processed2 and acknowledged.
- Close the connection.
IO.close()
tracks the completion of producer
and consumer
function
calls, by waiting for their returned promises to be settled. However, it is possible for an attempt
to be made to send an outgoing message after the connection has been closed, resulting in the
Channel ended, no reply will be forthcoming
exception. This may occur at least in the following
scenarios:
- The
producer
orconsumer
function spawns a new asynchronous context that attempts to send an outgoing message after the returned promise has been settled. - An application has other incoming communication channels, such as an HTTP API, that may lead to
an attempt to send an outgoing message after
IO.close()
has closed the connection.
In these or other similar scenarios, it is recommended to call IO.seal()
to stop receiving new
messages, ensure that any code execution that may send outgoing messages is completed before
calling IO.close()
.
All current Reply streams of the corresponding Producer or Consumer instance are destroyed when:
- the
IO.seal
function is called on the Consumer - the
IO.close
function is called on the Producer
IO
emits events for testing, diagnostics, or logging purposes.
IO.diagnose(event: string, listener: Function): void
Subscribe to one of the diagnostic events:
open
: connection is opened3.close
: connection is closed. Optionalerror
is passed as an argument.flow
: back pressure is applied to a channel. Channel type is passed as an argument.drain
: back pressure is removed from a channel. Channel type is passed.remove
: channel is removed from the pool.recover
: channel's topology is recovered. Channel type is passed.discard
: message is discarded as it repeatedly caused exceptions. Channel type, raw amqp message object and the exception are passed as arguments.pause
: channel is paused. Channel type is passed.resume
: channel is resumed. Channel type is passed.
In the case of a sharded connection, an additional argument specifying the
shard number will be passed to listeners.
This is applicable except for the pause
and resume
events,
which are emitted when the associated channels are paused or resumed across all shards.
The shard number corresponds to the position of the argument used in the connect
function call.
io.diagnose('flow', (type) => console.log(`Back pressure was applied to the ${type} channel`))
I want to express my deep appreciation to @mzabolotko for his generous contribution of time and expertise.
Footnotes
-
The maximum number of messages has been consumed while handlers of those messages have sent requests and are expecting replies. ↩
-
Therefore, if the underlying connection is lost,
.close()
will only be completed once the connection is recovered. ↩ -
As the
connect
function returns an instance ofIO
after the connection has been established, there is no way to capture the initialopen
event. ↩