@probitas/client-rabbitmq
RabbitMQ client for Probitas scenario testing framework.
This package provides a RabbitMQ client designed for integration testing of message-driven applications.
Features
- Queue Operations: Declare, bind, purge, and delete queues
- Exchange Operations: Declare and delete exchanges (direct, topic, fanout, headers)
- Publishing: Publish messages with routing keys and headers
- Consuming: Consume messages with acknowledgment support
- Resource Management: Implements
AsyncDisposablefor proper cleanup
Installation
deno add jsr:@probitas/client-rabbitmq
Quick Start
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
// Using string URL
const client = await createRabbitMqClient({
url: "amqp://localhost:5672",
});
// Or using connection config object
const client2 = await createRabbitMqClient({
url: {
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
},
});
// Create a channel
const channel = await client.channel();
// Declare a queue
await channel.assertQueue("test-queue", { durable: true });
// Publish a message
const content = new TextEncoder().encode("Hello, World!");
await channel.sendToQueue("test-queue", content, {
contentType: "text/plain",
});
// Consume messages
for await (const msg of channel.consume("test-queue")) {
console.log("Received:", new TextDecoder().decode(msg.content));
await channel.ack(msg);
break;
}
await client.close();
await client2.close();
Exchange and Binding
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();
// Declare an exchange
await channel.assertExchange("events", "topic", { durable: true });
// Declare a queue and bind to exchange
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");
// Publish to exchange with routing key
const content = new TextEncoder().encode(JSON.stringify({ id: 1, name: "Alice" }));
await channel.publish("events", "user.created", content, {
contentType: "application/json",
});
await client.close();
Using with using Statement
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
await using client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();
await channel.assertQueue("test");
// Client automatically closed when scope exits
Related Packages
| Package | Description |
|---|---|
| `@probitas/client` | Core utilities and types |
| `@probitas/client-sqs` | AWS SQS client |
Links
Installation
deno add jsr:@probitas/client-rabbitmqClasses
#RabbitMqChannelError
class RabbitMqChannelError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ channel operation fails.
Constructor
new RabbitMqChannelError(message: string, options?: RabbitMqChannelErrorOptions)Properties
- readonly
namestring - readonly
kind"channel" - readonly
channelId?number
#RabbitMqConnectionError
class RabbitMqConnectionError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ connection cannot be established.
Constructor
new RabbitMqConnectionError(message: string, options?: RabbitMqErrorOptions)Properties
- readonly
namestring - readonly
kind"connection"
#RabbitMqError
class RabbitMqError extends ClientErrorClientErrorBase error class for RabbitMQ client errors.
Constructor
new RabbitMqError(message: string, _: unknown, options?: RabbitMqErrorOptions)Properties
- readonly
namestring - readonly
code?number
#RabbitMqNotFoundError
class RabbitMqNotFoundError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ resource (queue or exchange) is not found.
Constructor
new RabbitMqNotFoundError(
message: string,
options: RabbitMqNotFoundErrorOptions,
)Properties
- readonly
namestring - readonly
kind"not_found" - readonly
resourcestring
#RabbitMqPreconditionFailedError
class RabbitMqPreconditionFailedError extends RabbitMqErrorRabbitMqErrorError thrown when a RabbitMQ precondition check fails.
Constructor
new RabbitMqPreconditionFailedError(
message: string,
options: RabbitMqPreconditionFailedErrorOptions,
)Properties
- readonly
namestring - readonly
kind"precondition_failed" - readonly
reasonstring
Interfaces
#RabbitMqAckResultError
interface RabbitMqAckResultError extends RabbitMqAckResultBaseAck result with RabbitMQ error.
Properties
- readonly
processedtrue - readonly
okfalse
#RabbitMqAckResultFailure
interface RabbitMqAckResultFailure extends RabbitMqAckResultBaseAck result with connection failure.
Properties
- readonly
processedfalse - readonly
okfalse
#RabbitMqAckResultSuccess
interface RabbitMqAckResultSuccess extends RabbitMqAckResultBaseSuccessful ack result.
Properties
- readonly
processedtrue - readonly
oktrue - readonly
errornull
#RabbitMqChannel
interface RabbitMqChannel extends AsyncDisposableRabbitMQ channel interface.
| Name | Description |
|---|---|
assertExchange() | — |
deleteExchange() | — |
assertQueue() | — |
deleteQueue() | — |
purgeQueue() | — |
bindQueue() | — |
unbindQueue() | — |
publish() | — |
sendToQueue() | — |
get() | — |
consume() | — |
ack() | — |
nack() | — |
reject() | — |
prefetch() | — |
close() | — |
Methods
assertExchange(
name: string,
type: RabbitMqExchangeType,
options?: RabbitMqExchangeOptions,
): Promise<RabbitMqExchangeResult>Parameters
namestringoptions?RabbitMqExchangeOptions
deleteExchange(
name: string,
options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>Parameters
namestringoptions?RabbitMqOptions
assertQueue(
name: string,
options?: RabbitMqQueueOptions,
): Promise<RabbitMqQueueResult>Parameters
namestringoptions?RabbitMqQueueOptions
deleteQueue(
name: string,
options?: RabbitMqOptions,
): Promise<RabbitMqQueueResult>Parameters
namestringoptions?RabbitMqOptions
purgeQueue(
name: string,
options?: RabbitMqOptions,
): Promise<RabbitMqQueueResult>Parameters
namestringoptions?RabbitMqOptions
bindQueue(
queue: string,
exchange: string,
routingKey: string,
options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>Parameters
queuestringexchangestringroutingKeystringoptions?RabbitMqOptions
unbindQueue(
queue: string,
exchange: string,
routingKey: string,
options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>Parameters
queuestringexchangestringroutingKeystringoptions?RabbitMqOptions
publish(
exchange: string,
routingKey: string,
content: Uint8Array,
options?: RabbitMqPublishOptions,
): Promise<RabbitMqPublishResult>Parameters
exchangestringroutingKeystringcontentUint8Arrayoptions?RabbitMqPublishOptions
sendToQueue(
queue: string,
content: Uint8Array,
options?: RabbitMqPublishOptions,
): Promise<RabbitMqPublishResult>Parameters
queuestringcontentUint8Arrayoptions?RabbitMqPublishOptions
get(queue: string, options?: RabbitMqOptions): Promise<RabbitMqConsumeResult>Parameters
queuestringoptions?RabbitMqOptions
consume(
queue: string,
options?: RabbitMqConsumeOptions,
): AsyncIterable<RabbitMqMessage>Parameters
queuestringoptions?RabbitMqConsumeOptions
ack(
message: RabbitMqMessage,
options?: RabbitMqOptions,
): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessageoptions?RabbitMqOptions
nack(
message: RabbitMqMessage,
options?: RabbitMqNackOptions,
): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessageoptions?RabbitMqNackOptions
reject(
message: RabbitMqMessage,
options?: RabbitMqRejectOptions,
): Promise<RabbitMqAckResult>Parameters
messageRabbitMqMessageoptions?RabbitMqRejectOptions
prefetch(count: number): Promise<void>Parameters
countnumber
close(): Promise<void>#RabbitMqChannelErrorOptions
interface RabbitMqChannelErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ channel errors.
| Name | Description |
|---|---|
channelId | — |
Properties
- readonly
channelId?number
#RabbitMqClient
interface RabbitMqClient extends AsyncDisposableRabbitMQ client interface.
Properties
Methods
channel(): Promise<RabbitMqChannel>close(): Promise<void>#RabbitMqClientConfig
interface RabbitMqClientConfig extends CommonOptionsRabbitMQ client configuration.
| Name | Description |
|---|---|
url | RabbitMQ connection URL or configuration object. |
heartbeat | Heartbeat interval in seconds |
prefetch | Default prefetch count for channels |
throwOnError | Whether to throw errors instead of returning them in results. |
Properties
RabbitMQ connection URL or configuration object.
- readonly
heartbeat?numberHeartbeat interval in seconds
- readonly
prefetch?numberDefault prefetch count for channels
- readonly
throwOnError?booleanWhether to throw errors instead of returning them in results.
#RabbitMqConnectionConfig
interface RabbitMqConnectionConfig extends CommonConnectionConfigRabbitMQ connection configuration.
Extends CommonConnectionConfig with RabbitMQ-specific options.
| Name | Description |
|---|---|
vhost | Virtual host. |
Properties
- readonly
vhost?stringVirtual host.
#RabbitMqConsumeOptions
interface RabbitMqConsumeOptions extends RabbitMqOptionsConsume options.
Properties
- readonly
noAck?boolean - readonly
exclusive?boolean - readonly
priority?number
#RabbitMqConsumeResultError
interface RabbitMqConsumeResultError extends RabbitMqConsumeResultBaseConsume result with RabbitMQ error.
Properties
- readonly
processedtrue - readonly
okfalse - readonly
messagenull
#RabbitMqConsumeResultFailure
interface RabbitMqConsumeResultFailure extends RabbitMqConsumeResultBaseConsume result with connection failure.
Properties
- readonly
processedfalse - readonly
okfalse - readonly
messagenull
#RabbitMqConsumeResultSuccess
interface RabbitMqConsumeResultSuccess extends RabbitMqConsumeResultBaseSuccessful consume result.
Properties
- readonly
processedtrue - readonly
oktrue - readonly
errornull
#RabbitMqErrorOptions
interface RabbitMqErrorOptions extends ErrorOptionsOptions for RabbitMQ errors.
| Name | Description |
|---|---|
code | — |
Properties
- readonly
code?number
#RabbitMqExchangeOptions
interface RabbitMqExchangeOptions extends RabbitMqOptionsExchange options.
| Name | Description |
|---|---|
durable | — |
autoDelete | — |
internal | — |
arguments | — |
Properties
- readonly
durable?boolean - readonly
autoDelete?boolean - readonly
internal?boolean - readonly
arguments?Record<string, unknown>
#RabbitMqExchangeResultError
interface RabbitMqExchangeResultError extends RabbitMqExchangeResultBaseExchange result with RabbitMQ error.
Properties
- readonly
processedtrue - readonly
okfalse
#RabbitMqExchangeResultFailure
interface RabbitMqExchangeResultFailure extends RabbitMqExchangeResultBaseExchange result with connection failure.
Properties
- readonly
processedfalse - readonly
okfalse
#RabbitMqExchangeResultSuccess
interface RabbitMqExchangeResultSuccess extends RabbitMqExchangeResultBaseSuccessful exchange result.
Properties
- readonly
processedtrue - readonly
oktrue - readonly
errornull
#RabbitMqMessage
interface RabbitMqMessageRabbitMQ message.
| Name | Description |
|---|---|
content | — |
properties | — |
fields | — |
Properties
- readonly
contentUint8Array
#RabbitMqMessageFields
interface RabbitMqMessageFieldsRabbitMQ message fields.
| Name | Description |
|---|---|
deliveryTag | — |
redelivered | — |
exchange | — |
routingKey | — |
Properties
- readonly
deliveryTagbigint - readonly
redeliveredboolean - readonly
exchangestring - readonly
routingKeystring
#RabbitMqMessageProperties
interface RabbitMqMessagePropertiesRabbitMQ message properties.
| Name | Description |
|---|---|
contentType | — |
contentEncoding | — |
headers | — |
deliveryMode | 1: non-persistent, 2: persistent |
priority | — |
correlationId | — |
replyTo | — |
expiration | — |
messageId | — |
timestamp | — |
type | — |
userId | — |
appId | — |
Properties
- readonly
contentType?string - readonly
contentEncoding?string - readonly
headers?Record<string, unknown> - readonly
deliveryMode?1 | 21: non-persistent, 2: persistent
- readonly
priority?number - readonly
correlationId?string - readonly
replyTo?string - readonly
expiration?string - readonly
messageId?string - readonly
timestamp?number - readonly
type?string - readonly
userId?string - readonly
appId?string
#RabbitMqNackOptions
interface RabbitMqNackOptions extends RabbitMqOptionsNack options.
Properties
- readonly
requeue?boolean - readonly
allUpTo?boolean
#RabbitMqNotFoundErrorOptions
interface RabbitMqNotFoundErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ not found errors.
| Name | Description |
|---|---|
resource | — |
Properties
- readonly
resourcestring
#RabbitMqOptions
interface RabbitMqOptions extends CommonOptionsBase options for RabbitMQ operations.
| Name | Description |
|---|---|
throwOnError | Whether to throw errors instead of returning them in results. |
Properties
- readonly
throwOnError?booleanWhether to throw errors instead of returning them in results. Overrides the client-level
throwOnErrorsetting.
#RabbitMqPreconditionFailedErrorOptions
interface RabbitMqPreconditionFailedErrorOptions extends RabbitMqErrorOptionsOptions for RabbitMQ precondition failed errors.
| Name | Description |
|---|---|
reason | — |
Properties
- readonly
reasonstring
#RabbitMqPublishOptions
interface RabbitMqPublishOptions extends RabbitMqOptionsPublish options.
| Name | Description |
|---|---|
persistent | — |
contentType | — |
contentEncoding | — |
headers | — |
correlationId | — |
replyTo | — |
expiration | — |
messageId | — |
priority | — |
Properties
- readonly
persistent?boolean - readonly
contentType?string - readonly
contentEncoding?string - readonly
headers?Record<string, unknown> - readonly
correlationId?string - readonly
replyTo?string - readonly
expiration?string - readonly
messageId?string - readonly
priority?number
#RabbitMqPublishResultError
interface RabbitMqPublishResultError extends RabbitMqPublishResultBasePublish result with RabbitMQ error.
Properties
- readonly
processedtrue - readonly
okfalse
#RabbitMqPublishResultFailure
interface RabbitMqPublishResultFailure extends RabbitMqPublishResultBasePublish result with connection failure.
Properties
- readonly
processedfalse - readonly
okfalse
#RabbitMqPublishResultSuccess
interface RabbitMqPublishResultSuccess extends RabbitMqPublishResultBaseSuccessful publish result.
Properties
- readonly
processedtrue - readonly
oktrue - readonly
errornull
#RabbitMqQueueOptions
interface RabbitMqQueueOptions extends RabbitMqOptionsQueue options.
| Name | Description |
|---|---|
durable | — |
exclusive | — |
autoDelete | — |
arguments | — |
messageTtl | — |
maxLength | — |
deadLetterExchange | — |
deadLetterRoutingKey | — |
Properties
- readonly
durable?boolean - readonly
exclusive?boolean - readonly
autoDelete?boolean - readonly
arguments?Record<string, unknown> - readonly
messageTtl?number - readonly
maxLength?number - readonly
deadLetterExchange?string - readonly
deadLetterRoutingKey?string
#RabbitMqQueueResultError
interface RabbitMqQueueResultError extends RabbitMqQueueResultBaseQueue result with RabbitMQ error.
| Name | Description |
|---|---|
processed | — |
ok | — |
error | — |
queue | — |
messageCount | — |
consumerCount | — |
Properties
- readonly
processedtrue - readonly
okfalse - readonly
queuenull - readonly
messageCountnull - readonly
consumerCountnull
#RabbitMqQueueResultFailure
interface RabbitMqQueueResultFailure extends RabbitMqQueueResultBaseQueue result with connection failure.
| Name | Description |
|---|---|
processed | — |
ok | — |
error | — |
queue | — |
messageCount | — |
consumerCount | — |
Properties
- readonly
processedfalse - readonly
okfalse - readonly
queuenull - readonly
messageCountnull - readonly
consumerCountnull
#RabbitMqQueueResultSuccess
interface RabbitMqQueueResultSuccess extends RabbitMqQueueResultBaseSuccessful queue result.
| Name | Description |
|---|---|
processed | — |
ok | — |
error | — |
queue | — |
messageCount | — |
consumerCount | — |
Properties
- readonly
processedtrue - readonly
oktrue - readonly
errornull - readonly
queuestring - readonly
messageCountnumber - readonly
consumerCountnumber
Functions
#createRabbitMqClient
async function createRabbitMqClient(
config: RabbitMqClientConfig,
): Promise<RabbitMqClient>Create a new RabbitMQ client instance.
The client provides queue and exchange management, message publishing and consumption, and acknowledgment handling via AMQP protocol.
Parameters
configRabbitMqClientConfig- RabbitMQ client configuration
Returns
Promise<RabbitMqClient> — A promise resolving to a new RabbitMQ client instance
Examples
Basic usage with string URL
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const rabbit = await createRabbitMqClient({
url: "amqp://guest:guest@localhost:5672",
});
const channel = await rabbit.channel();
await channel.assertQueue("my-queue", { durable: true });
const content = new TextEncoder().encode(JSON.stringify({ type: "ORDER" }));
await channel.sendToQueue("my-queue", content, { persistent: true });
await channel.close();
await rabbit.close();
With connection config object
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const rabbit = await createRabbitMqClient({
url: {
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
},
});
await rabbit.close();
Exchange and binding
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();
// Create exchange and queue
await channel.assertExchange("events", "topic", { durable: true });
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");
// Publish to exchange
const content = new TextEncoder().encode(JSON.stringify({ id: 1 }));
await channel.publish("events", "user.created", content);
await rabbit.close();
Consuming messages
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();
await channel.assertQueue("my-queue");
for await (const message of channel.consume("my-queue")) {
console.log("Received:", new TextDecoder().decode(message.content));
await channel.ack(message);
break;
}
await rabbit.close();
Get single message (polling)
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();
await channel.assertQueue("my-queue");
const result = await channel.get("my-queue");
if (result.message) {
await channel.ack(result.message);
}
await rabbit.close();
Using await using for automatic cleanup
import { createRabbitMqClient } from "@probitas/client-rabbitmq";
await using rabbit = await createRabbitMqClient({
url: "amqp://localhost:5672",
});
const channel = await rabbit.channel();
await channel.assertQueue("test");
// Client automatically closed when scope exits
Types
#RabbitMqAckResult
type RabbitMqAckResult = RabbitMqAckResultSuccess | RabbitMqAckResultError | RabbitMqAckResultFailureAck/Nack result.
#RabbitMqConsumeResult
type RabbitMqConsumeResult = RabbitMqConsumeResultSuccess
| RabbitMqConsumeResultError
| RabbitMqConsumeResultFailureConsume result (single message retrieval).
#RabbitMqExchangeResult
type RabbitMqExchangeResult = RabbitMqExchangeResultSuccess
| RabbitMqExchangeResultError
| RabbitMqExchangeResultFailureExchange declaration result.
#RabbitMqExchangeType
type RabbitMqExchangeType = "direct" | "topic" | "fanout" | "headers"Exchange type.
#RabbitMqFailureError
type RabbitMqFailureError = RabbitMqConnectionError | AbortError | TimeoutErrorError types that indicate the operation was not processed. These are errors that occur before the operation reaches the RabbitMQ server.
#RabbitMqOperationError
type RabbitMqOperationError = RabbitMqChannelError
| RabbitMqNotFoundError
| RabbitMqPreconditionFailedError
| RabbitMqErrorError types that indicate a RabbitMQ operation error. These are errors where the operation reached the server but failed.
#RabbitMqPublishResult
type RabbitMqPublishResult = RabbitMqPublishResultSuccess
| RabbitMqPublishResultError
| RabbitMqPublishResultFailurePublish result.
#RabbitMqQueueResult
type RabbitMqQueueResult = RabbitMqQueueResultSuccess
| RabbitMqQueueResultError
| RabbitMqQueueResultFailureQueue declaration result.
#RabbitMqResult
type RabbitMqResult = RabbitMqPublishResult
| RabbitMqConsumeResult
| RabbitMqAckResult
| RabbitMqQueueResult
| RabbitMqExchangeResultUnion of all RabbitMQ result types.
