@probitas/client-rabbitmq

RabbitMQ client for Probitas scenario testing framework.

This package provides a RabbitMQ client designed for integration testing of message-driven applications.

Features

  • Queue Operations: Declare, bind, purge, and delete queues
  • Exchange Operations: Declare and delete exchanges (direct, topic, fanout, headers)
  • Publishing: Publish messages with routing keys and headers
  • Consuming: Consume messages with acknowledgment support
  • Resource Management: Implements AsyncDisposable for proper cleanup

Installation

deno add jsr:@probitas/client-rabbitmq

Quick Start

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

// Using string URL
const client = await createRabbitMqClient({
  url: "amqp://localhost:5672",
});

// Or using connection config object
const client2 = await createRabbitMqClient({
  url: {
    host: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
  },
});

// Create a channel
const channel = await client.channel();

// Declare a queue
await channel.assertQueue("test-queue", { durable: true });

// Publish a message
const content = new TextEncoder().encode("Hello, World!");
await channel.sendToQueue("test-queue", content, {
  contentType: "text/plain",
});

// Consume messages
for await (const msg of channel.consume("test-queue")) {
  console.log("Received:", new TextDecoder().decode(msg.content));
  await channel.ack(msg);
  break;
}

await client.close();
await client2.close();

Exchange and Binding

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();

// Declare an exchange
await channel.assertExchange("events", "topic", { durable: true });

// Declare a queue and bind to exchange
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");

// Publish to exchange with routing key
const content = new TextEncoder().encode(JSON.stringify({ id: 1, name: "Alice" }));
await channel.publish("events", "user.created", content, {
  contentType: "application/json",
});

await client.close();

Using with using Statement

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

await using client = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await client.channel();

await channel.assertQueue("test");
// Client automatically closed when scope exits
PackageDescription
`@probitas/client`Core utilities and types
`@probitas/client-sqs`AWS SQS client

Installation

deno add jsr:@probitas/client-rabbitmq

Classes

class

#RabbitMqChannelError

class RabbitMqChannelError extends RabbitMqError

Error thrown when a RabbitMQ channel operation fails.

NameDescription
name
kind
channelId
Constructor
new RabbitMqChannelError(message: string, options?: RabbitMqChannelErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"channel"
  • readonlychannelId?number
class

#RabbitMqConnectionError

class RabbitMqConnectionError extends RabbitMqError

Error thrown when a RabbitMQ connection cannot be established.

NameDescription
name
kind
Constructor
new RabbitMqConnectionError(message: string, options?: RabbitMqErrorOptions)
Properties
  • readonlynamestring
  • readonlykind"connection"
class

#RabbitMqError

class RabbitMqError extends ClientError

Base error class for RabbitMQ client errors.

NameDescription
name
code
Constructor
new RabbitMqError(message: string, _: unknown, options?: RabbitMqErrorOptions)
Properties
  • readonlynamestring
  • readonlycode?number
class

#RabbitMqNotFoundError

class RabbitMqNotFoundError extends RabbitMqError

Error thrown when a RabbitMQ resource (queue or exchange) is not found.

NameDescription
name
kind
resource
Constructor
new RabbitMqNotFoundError(
  message: string,
  options: RabbitMqNotFoundErrorOptions,
)
Properties
  • readonlynamestring
  • readonlykind"not_found"
  • readonlyresourcestring
class

#RabbitMqPreconditionFailedError

class RabbitMqPreconditionFailedError extends RabbitMqError

Error thrown when a RabbitMQ precondition check fails.

NameDescription
name
kind
reason
Constructor
new RabbitMqPreconditionFailedError(
  message: string,
  options: RabbitMqPreconditionFailedErrorOptions,
)
Properties
  • readonlynamestring
  • readonlykind"precondition_failed"
  • readonlyreasonstring

Interfaces

interface

#RabbitMqAckResultError

interface RabbitMqAckResultError extends RabbitMqAckResultBase

Ack result with RabbitMQ error.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqAckResultFailure

interface RabbitMqAckResultFailure extends RabbitMqAckResultBase

Ack result with connection failure.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqAckResultSuccess

interface RabbitMqAckResultSuccess extends RabbitMqAckResultBase

Successful ack result.

NameDescription
processed
ok
error
Properties
  • readonlyprocessedtrue
  • readonlyoktrue
  • readonlyerrornull
interface

#RabbitMqChannel

interface RabbitMqChannel extends AsyncDisposable

RabbitMQ channel interface.

Methods
assertExchange(
  name: string,
  type: RabbitMqExchangeType,
  options?: RabbitMqExchangeOptions,
): Promise<RabbitMqExchangeResult>
Parameters
deleteExchange(
  name: string,
  options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>
Parameters
assertQueue(
  name: string,
  options?: RabbitMqQueueOptions,
): Promise<RabbitMqQueueResult>
Parameters
deleteQueue(
  name: string,
  options?: RabbitMqOptions,
): Promise<RabbitMqQueueResult>
Parameters
purgeQueue(
  name: string,
  options?: RabbitMqOptions,
): Promise<RabbitMqQueueResult>
Parameters
bindQueue(
  queue: string,
  exchange: string,
  routingKey: string,
  options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>
Parameters
unbindQueue(
  queue: string,
  exchange: string,
  routingKey: string,
  options?: RabbitMqOptions,
): Promise<RabbitMqExchangeResult>
Parameters
publish(
  exchange: string,
  routingKey: string,
  content: Uint8Array,
  options?: RabbitMqPublishOptions,
): Promise<RabbitMqPublishResult>
Parameters
sendToQueue(
  queue: string,
  content: Uint8Array,
  options?: RabbitMqPublishOptions,
): Promise<RabbitMqPublishResult>
Parameters
get(queue: string, options?: RabbitMqOptions): Promise<RabbitMqConsumeResult>
Parameters
consume(
  queue: string,
  options?: RabbitMqConsumeOptions,
): AsyncIterable<RabbitMqMessage>
Parameters
ack(
  message: RabbitMqMessage,
  options?: RabbitMqOptions,
): Promise<RabbitMqAckResult>
Parameters
nack(
  message: RabbitMqMessage,
  options?: RabbitMqNackOptions,
): Promise<RabbitMqAckResult>
Parameters
reject(
  message: RabbitMqMessage,
  options?: RabbitMqRejectOptions,
): Promise<RabbitMqAckResult>
Parameters
prefetch(count: number): Promise<void>
Parameters
  • countnumber
close(): Promise<void>
interface

#RabbitMqChannelErrorOptions

interface RabbitMqChannelErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ channel errors.

NameDescription
channelId
Properties
  • readonlychannelId?number
interface

#RabbitMqClient

interface RabbitMqClient extends AsyncDisposable

RabbitMQ client interface.

NameDescription
config
channel()
close()
Properties
Methods
channel(): Promise<RabbitMqChannel>
close(): Promise<void>
interface

#RabbitMqClientConfig

interface RabbitMqClientConfig extends CommonOptions

RabbitMQ client configuration.

NameDescription
urlRabbitMQ connection URL or configuration object.
heartbeatHeartbeat interval in seconds
prefetchDefault prefetch count for channels
throwOnErrorWhether to throw errors instead of returning them in results.
Properties
  • readonlyurlstring | RabbitMqConnectionConfig

    RabbitMQ connection URL or configuration object.

  • readonlyheartbeat?number

    Heartbeat interval in seconds

  • readonlyprefetch?number

    Default prefetch count for channels

  • readonlythrowOnError?boolean

    Whether to throw errors instead of returning them in results.

interface

#RabbitMqConnectionConfig

interface RabbitMqConnectionConfig extends CommonConnectionConfig

RabbitMQ connection configuration.

Extends CommonConnectionConfig with RabbitMQ-specific options.

NameDescription
vhostVirtual host.
Properties
  • readonlyvhost?string

    Virtual host.

interface

#RabbitMqConsumeOptions

interface RabbitMqConsumeOptions extends RabbitMqOptions

Consume options.

NameDescription
noAck
exclusive
priority
Properties
  • readonlynoAck?boolean
  • readonlyexclusive?boolean
  • readonlypriority?number
interface

#RabbitMqConsumeResultError

interface RabbitMqConsumeResultError extends RabbitMqConsumeResultBase

Consume result with RabbitMQ error.

NameDescription
processed
ok
error
message
Properties
interface

#RabbitMqConsumeResultFailure

interface RabbitMqConsumeResultFailure extends RabbitMqConsumeResultBase

Consume result with connection failure.

NameDescription
processed
ok
error
message
Properties
interface

#RabbitMqConsumeResultSuccess

interface RabbitMqConsumeResultSuccess extends RabbitMqConsumeResultBase

Successful consume result.

NameDescription
processed
ok
error
message
Properties
  • readonlyprocessedtrue
  • readonlyoktrue
  • readonlyerrornull
  • readonlymessageRabbitMqMessage | null
interface

#RabbitMqErrorOptions

interface RabbitMqErrorOptions extends ErrorOptions

Options for RabbitMQ errors.

NameDescription
code
Properties
  • readonlycode?number
interface

#RabbitMqExchangeOptions

interface RabbitMqExchangeOptions extends RabbitMqOptions

Exchange options.

NameDescription
durable
autoDelete
internal
arguments
Properties
  • readonlydurable?boolean
  • readonlyautoDelete?boolean
  • readonlyinternal?boolean
  • readonlyarguments?Record<string, unknown>
interface

#RabbitMqExchangeResultError

interface RabbitMqExchangeResultError extends RabbitMqExchangeResultBase

Exchange result with RabbitMQ error.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqExchangeResultFailure

interface RabbitMqExchangeResultFailure extends RabbitMqExchangeResultBase

Exchange result with connection failure.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqExchangeResultSuccess

interface RabbitMqExchangeResultSuccess extends RabbitMqExchangeResultBase

Successful exchange result.

NameDescription
processed
ok
error
Properties
  • readonlyprocessedtrue
  • readonlyoktrue
  • readonlyerrornull
interface

#RabbitMqMessage

interface RabbitMqMessage

RabbitMQ message.

NameDescription
content
properties
fields
Properties
interface

#RabbitMqMessageFields

interface RabbitMqMessageFields

RabbitMQ message fields.

NameDescription
deliveryTag
redelivered
exchange
routingKey
Properties
  • readonlydeliveryTagbigint
  • readonlyredeliveredboolean
  • readonlyexchangestring
  • readonlyroutingKeystring
interface

#RabbitMqMessageProperties

interface RabbitMqMessageProperties

RabbitMQ message properties.

NameDescription
contentType
contentEncoding
headers
deliveryMode1: non-persistent, 2: persistent
priority
correlationId
replyTo
expiration
messageId
timestamp
type
userId
appId
Properties
  • readonlycontentType?string
  • readonlycontentEncoding?string
  • readonlyheaders?Record<string, unknown>
  • readonlydeliveryMode?1 | 2

    1: non-persistent, 2: persistent

  • readonlypriority?number
  • readonlycorrelationId?string
  • readonlyreplyTo?string
  • readonlyexpiration?string
  • readonlymessageId?string
  • readonlytimestamp?number
  • readonlytype?string
  • readonlyuserId?string
  • readonlyappId?string
interface

#RabbitMqNackOptions

interface RabbitMqNackOptions extends RabbitMqOptions

Nack options.

NameDescription
requeue
allUpTo
Properties
  • readonlyrequeue?boolean
  • readonlyallUpTo?boolean
interface

#RabbitMqNotFoundErrorOptions

interface RabbitMqNotFoundErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ not found errors.

NameDescription
resource
Properties
  • readonlyresourcestring
interface

#RabbitMqOptions

interface RabbitMqOptions extends CommonOptions

Base options for RabbitMQ operations.

NameDescription
throwOnErrorWhether to throw errors instead of returning them in results.
Properties
  • readonlythrowOnError?boolean

    Whether to throw errors instead of returning them in results. Overrides the client-level throwOnError setting.

interface

#RabbitMqPreconditionFailedErrorOptions

interface RabbitMqPreconditionFailedErrorOptions extends RabbitMqErrorOptions

Options for RabbitMQ precondition failed errors.

NameDescription
reason
Properties
  • readonlyreasonstring
interface

#RabbitMqPublishOptions

interface RabbitMqPublishOptions extends RabbitMqOptions

Publish options.

Properties
  • readonlypersistent?boolean
  • readonlycontentType?string
  • readonlycontentEncoding?string
  • readonlyheaders?Record<string, unknown>
  • readonlycorrelationId?string
  • readonlyreplyTo?string
  • readonlyexpiration?string
  • readonlymessageId?string
  • readonlypriority?number
interface

#RabbitMqPublishResultError

interface RabbitMqPublishResultError extends RabbitMqPublishResultBase

Publish result with RabbitMQ error.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqPublishResultFailure

interface RabbitMqPublishResultFailure extends RabbitMqPublishResultBase

Publish result with connection failure.

NameDescription
processed
ok
error
Properties
interface

#RabbitMqPublishResultSuccess

interface RabbitMqPublishResultSuccess extends RabbitMqPublishResultBase

Successful publish result.

NameDescription
processed
ok
error
Properties
  • readonlyprocessedtrue
  • readonlyoktrue
  • readonlyerrornull
interface

#RabbitMqQueueOptions

interface RabbitMqQueueOptions extends RabbitMqOptions

Queue options.

Properties
  • readonlydurable?boolean
  • readonlyexclusive?boolean
  • readonlyautoDelete?boolean
  • readonlyarguments?Record<string, unknown>
  • readonlymessageTtl?number
  • readonlymaxLength?number
  • readonlydeadLetterExchange?string
  • readonlydeadLetterRoutingKey?string
interface

#RabbitMqQueueResultError

interface RabbitMqQueueResultError extends RabbitMqQueueResultBase

Queue result with RabbitMQ error.

NameDescription
processed
ok
error
queue
messageCount
consumerCount
Properties
  • readonlyprocessedtrue
  • readonlyokfalse
  • readonlyqueuenull
  • readonlymessageCountnull
  • readonlyconsumerCountnull
interface

#RabbitMqQueueResultFailure

interface RabbitMqQueueResultFailure extends RabbitMqQueueResultBase

Queue result with connection failure.

NameDescription
processed
ok
error
queue
messageCount
consumerCount
Properties
  • readonlyprocessedfalse
  • readonlyokfalse
  • readonlyqueuenull
  • readonlymessageCountnull
  • readonlyconsumerCountnull
interface

#RabbitMqQueueResultSuccess

interface RabbitMqQueueResultSuccess extends RabbitMqQueueResultBase

Successful queue result.

NameDescription
processed
ok
error
queue
messageCount
consumerCount
Properties
  • readonlyprocessedtrue
  • readonlyoktrue
  • readonlyerrornull
  • readonlyqueuestring
  • readonlymessageCountnumber
  • readonlyconsumerCountnumber
interface

#RabbitMqRejectOptions

interface RabbitMqRejectOptions extends RabbitMqOptions

Reject options.

NameDescription
requeue
Properties
  • readonlyrequeue?boolean

Functions

function

#createRabbitMqClient

async function createRabbitMqClient(
  config: RabbitMqClientConfig,
): Promise<RabbitMqClient>

Create a new RabbitMQ client instance.

The client provides queue and exchange management, message publishing and consumption, and acknowledgment handling via AMQP protocol.

Parameters
Returns

Promise<RabbitMqClient> — A promise resolving to a new RabbitMQ client instance

Examples

Basic usage with string URL

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const rabbit = await createRabbitMqClient({
  url: "amqp://guest:guest@localhost:5672",
});

const channel = await rabbit.channel();
await channel.assertQueue("my-queue", { durable: true });

const content = new TextEncoder().encode(JSON.stringify({ type: "ORDER" }));
await channel.sendToQueue("my-queue", content, { persistent: true });

await channel.close();
await rabbit.close();

With connection config object

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const rabbit = await createRabbitMqClient({
  url: {
    host: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
  },
});

await rabbit.close();

Exchange and binding

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();

// Create exchange and queue
await channel.assertExchange("events", "topic", { durable: true });
await channel.assertQueue("user-events");
await channel.bindQueue("user-events", "events", "user.*");

// Publish to exchange
const content = new TextEncoder().encode(JSON.stringify({ id: 1 }));
await channel.publish("events", "user.created", content);

await rabbit.close();

Consuming messages

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();
await channel.assertQueue("my-queue");

for await (const message of channel.consume("my-queue")) {
  console.log("Received:", new TextDecoder().decode(message.content));
  await channel.ack(message);
  break;
}

await rabbit.close();

Get single message (polling)

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

const rabbit = await createRabbitMqClient({ url: "amqp://localhost:5672" });
const channel = await rabbit.channel();
await channel.assertQueue("my-queue");

const result = await channel.get("my-queue");
if (result.message) {
  await channel.ack(result.message);
}

await rabbit.close();

Using await using for automatic cleanup

import { createRabbitMqClient } from "@probitas/client-rabbitmq";

await using rabbit = await createRabbitMqClient({
  url: "amqp://localhost:5672",
});

const channel = await rabbit.channel();
await channel.assertQueue("test");
// Client automatically closed when scope exits

Types

type

#RabbitMqAckResult

type RabbitMqAckResult = RabbitMqAckResultSuccess | RabbitMqAckResultError | RabbitMqAckResultFailure

Ack/Nack result.

type

#RabbitMqConsumeResult

type RabbitMqConsumeResult = RabbitMqConsumeResultSuccess
  | RabbitMqConsumeResultError
  | RabbitMqConsumeResultFailure

Consume result (single message retrieval).

type

#RabbitMqExchangeResult

type RabbitMqExchangeResult = RabbitMqExchangeResultSuccess
  | RabbitMqExchangeResultError
  | RabbitMqExchangeResultFailure

Exchange declaration result.

type

#RabbitMqExchangeType

type RabbitMqExchangeType = "direct" | "topic" | "fanout" | "headers"

Exchange type.

type

#RabbitMqFailureError

type RabbitMqFailureError = RabbitMqConnectionError | AbortError | TimeoutError

Error types that indicate the operation was not processed. These are errors that occur before the operation reaches the RabbitMQ server.

type

#RabbitMqOperationError

type RabbitMqOperationError = RabbitMqChannelError
  | RabbitMqNotFoundError
  | RabbitMqPreconditionFailedError
  | RabbitMqError

Error types that indicate a RabbitMQ operation error. These are errors where the operation reached the server but failed.

type

#RabbitMqPublishResult

type RabbitMqPublishResult = RabbitMqPublishResultSuccess
  | RabbitMqPublishResultError
  | RabbitMqPublishResultFailure

Publish result.

type

#RabbitMqQueueResult

type RabbitMqQueueResult = RabbitMqQueueResultSuccess
  | RabbitMqQueueResultError
  | RabbitMqQueueResultFailure

Queue declaration result.

type

#RabbitMqResult

type RabbitMqResult = RabbitMqPublishResult
  | RabbitMqConsumeResult
  | RabbitMqAckResult
  | RabbitMqQueueResult
  | RabbitMqExchangeResult

Union of all RabbitMQ result types.

Search Documentation