Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,105 changes: 974 additions & 131 deletions package-lock.json

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,33 @@
"url": "https://github.com/cloudamqp/amqp-client.js/issues"
},
"homepage": "https://github.com/cloudamqp/amqp-client.js#readme",
"peerDependencies": {
"lz4-napi": "^2.9.0",
"snappy": "^7.0.0",
"@mongodb-js/zstd": "^1.2.0",
"pako": "^2.0.0"
},
"peerDependenciesMeta": {
"lz4-napi": {
"optional": true
},
"snappy": {
"optional": true
},
"@mongodb-js/zstd": {
"optional": true
},
"pako": {
"optional": true
}
},
"devDependencies": {
"@mongodb-js/zstd": "^1.2.0",
"@types/node": "*",
"@types/pako": "^2.0.0",
"lz4-napi": "^2.9.0",
"pako": "^2.0.0",
"snappy": "^7.0.0",
"@vitest/browser": "*",
"@vitest/coverage-v8": "*",
"eslint": "*",
Expand Down
22 changes: 21 additions & 1 deletion src/amqp-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
import type { AMQPMessage } from "./amqp-message.js"
import type { AMQPBaseClient } from "./amqp-base-client.js"
import type { AMQPProperties } from "./amqp-properties.js"
import { compressionRegistry, CompressionError, type PublishOptions } from "./amqp-compression.js"

/**
* Represents an AMQP Channel. Almost all actions in AMQP are performed on a Channel.
Expand Down Expand Up @@ -327,6 +328,7 @@ export class AMQPChannel {
* @param properties - properties to be published
* @param [mandatory] - if the message should be returned if there's no queue to be delivered to
* @param [immediate] - if the message should be returned if it can't be delivered to a consumer immediately (not supported in RabbitMQ)
* @param options - publish options including compression settings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should map closely to the protocol. Add compression to a high level client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should build out BaseClient to have high level features like auto-reconnect, message coding, rpc client/server etc. Just like the ruby client. This is a WIP in that direction: #180

* @return - fulfilled when the message is enqueue on the socket, or if publish confirm is enabled when the message is confirmed by the server
*/
async basicPublish(
Expand All @@ -336,11 +338,15 @@ export class AMQPChannel {
properties: AMQPProperties = {},
mandatory = false,
immediate = false,
options?: PublishOptions,
): Promise<number> {
if (this.closed) return this.rejectClosed()
if (this.connection.blocked)
return Promise.reject(new AMQPError(`Connection blocked by server: ${this.connection.blocked}`, this.connection))

const compression = options?.compression
const compressionThreshold = options?.compressionThreshold ?? 0

let body: Uint8Array
if (typeof Buffer !== "undefined" && data instanceof Buffer) {
body = data
Expand All @@ -356,6 +362,20 @@ export class AMQPChannel {
throw new TypeError(`Invalid type ${typeof data} for parameter data`)
}

// Apply compression if requested and body exceeds threshold
const actualProperties = { ...properties }
if (compression && body.byteLength > compressionThreshold) {
const codec = await compressionRegistry.getCodec(compression)
if (!codec) {
throw new CompressionError(
`Compression codec '${compression}' is not available. Install the required peer dependency.`,
compression,
)
}
body = codec.compress(body)
actualProperties.contentEncoding = codec.contentEncoding
}

let j = 0
// get a buffer from the pool or create a new, it will later be returned to the pool for reuse
let buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
Expand Down Expand Up @@ -395,7 +415,7 @@ export class AMQPChannel {
j += 4 // bodysize (upper 32 of 64 bits)
buffer.setUint32(j, body.byteLength)
j += 4 // bodysize
j += buffer.setProperties(j, properties)
j += buffer.setProperties(j, actualProperties)
buffer.setUint8(j, AMQPFrame.End.CODE)
j += 1
buffer.setUint32(headerStart + 3, j - headerStart - 8) // update frameSize
Expand Down
163 changes: 163 additions & 0 deletions src/amqp-compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Supported compression algorithms
*/
export type CompressionAlgorithm = "lz4" | "snappy" | "zstd" | "gzip"

/**
* Codec interface for compression/decompression
*/
export interface CompressionCodec {
/** Compress data */
compress(data: Uint8Array): Uint8Array
/** Decompress data */
decompress(data: Uint8Array): Uint8Array
/** Content-Encoding header value */
readonly contentEncoding: string
}

/**
* Options for publish with compression
*/
export interface PublishOptions {
/** Compression algorithm to use */
compression?: CompressionAlgorithm
/** Minimum body size in bytes before compression is applied (default: 0) */
compressionThreshold?: number
}

/**
* Error thrown when a required compression codec is not available
*/
export class CompressionError extends Error {
constructor(
message: string,
public readonly algorithm?: CompressionAlgorithm,
) {
super(message)
this.name = "CompressionError"
}
}

/**
* Registry for compression codecs with lazy loading
*/
class CompressionRegistry {
private codecs = new Map<string, CompressionCodec | null>()
private loadPromises = new Map<string, Promise<CompressionCodec | null>>()

/**
* Get a codec by algorithm name, loading it if necessary
* Returns null if the codec library is not installed
*/
async getCodec(algorithm: CompressionAlgorithm): Promise<CompressionCodec | null> {
// Check cache first
if (this.codecs.has(algorithm)) {
return this.codecs.get(algorithm) || null
}

// Check if already loading
if (this.loadPromises.has(algorithm)) {
return this.loadPromises.get(algorithm)!
}

// Start loading
const loadPromise = this.loadCodec(algorithm)
this.loadPromises.set(algorithm, loadPromise)

const codec = await loadPromise
this.codecs.set(algorithm, codec)
this.loadPromises.delete(algorithm)

return codec
}

/**
* Register a custom codec
*/
registerCodec(contentEncoding: string, codec: CompressionCodec): void {
this.codecs.set(contentEncoding, codec)
}

/**
* Get a codec synchronously (only returns already-loaded codecs)
* Returns null if the codec is not loaded yet
*/
getCodecSync(algorithm: string): CompressionCodec | null {
return this.codecs.get(algorithm) || null
}

private async loadCodec(algorithm: CompressionAlgorithm): Promise<CompressionCodec | null> {
try {
switch (algorithm) {
case "gzip":
return await this.loadGzip()
case "lz4":
return await this.loadLz4()
case "snappy":
return await this.loadSnappy()
case "zstd":
return await this.loadZstd()
default:
return null
}
} catch {
return null
}
}

private async loadGzip(): Promise<CompressionCodec> {
// Use built-in zlib in Node.js, pako in browser
if (typeof process !== "undefined" && process.versions?.node) {
const zlib = await import("zlib")
return {
contentEncoding: "gzip",
compress: (data) => new Uint8Array(zlib.gzipSync(data)),
decompress: (data) => new Uint8Array(zlib.gunzipSync(data)),
}
} else {
// Browser: try pako
const pako = await import("pako")
return {
contentEncoding: "gzip",
compress: (data) => pako.gzip(data),
decompress: (data) => pako.ungzip(data),
}
}
}

private async loadLz4(): Promise<CompressionCodec | null> {
try {
// Using lz4-napi for native bindings
const lz4 = await import("lz4-napi")
return {
contentEncoding: "lz4",
compress: (data) => new Uint8Array(lz4.compressSync(Buffer.from(data))),
decompress: (data) => new Uint8Array(lz4.uncompressSync(Buffer.from(data))),
}
} catch {
return null
}
}

private async loadSnappy(): Promise<CompressionCodec | null> {
try {
const snappy = await import("snappy")
return {
contentEncoding: "snappy",
compress: (data) => new Uint8Array(snappy.compressSync(data)),
decompress: (data) => new Uint8Array(snappy.uncompressSync(data)),
}
} catch {
return null
}
}

private async loadZstd(): Promise<CompressionCodec | null> {
// @mongodb-js/zstd only has async methods, not supported for sync compression
// TODO: Consider using a different zstd library with sync support
return null
}
}

// Singleton instance
export const compressionRegistry = new CompressionRegistry()
52 changes: 47 additions & 5 deletions src/amqp-message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { AMQPChannel } from "./amqp-channel.js"
import type { AMQPProperties } from "./amqp-properties.js"
import { compressionRegistry, CompressionError } from "./amqp-compression.js"

/**
* AMQP message
Expand Down Expand Up @@ -31,22 +32,63 @@ export class AMQPMessage {
replyCode?: number
replyText?: string

/** Cached decompressed body */
private decompressedBody: Uint8Array | null = null

/**
* @param channel - Channel this message was delivered on
*/
constructor(channel: AMQPChannel) {
this.channel = channel
}

/** Known compression encodings */
private static readonly COMPRESSION_ENCODINGS: readonly string[] = ["gzip", "lz4", "snappy", "zstd"]

/**
* Get the decompressed body if contentEncoding indicates compression,
* otherwise returns the raw body.
* @throws CompressionError if the required codec is not available/loaded
*/
bodyDecompressed(): Uint8Array | null {
if (!this.body) return null

// Return cached if available
if (this.decompressedBody) return this.decompressedBody

const encoding = this.properties.contentEncoding
if (!encoding) return this.body

// Check if this is a known compression encoding
if (!AMQPMessage.COMPRESSION_ENCODINGS.includes(encoding)) {
// Unknown encoding, return raw body
return this.body
}

const codec = compressionRegistry.getCodecSync(encoding)
if (!codec) {
throw new CompressionError(
`Cannot decompress message: codec '${encoding}' is not loaded. Ensure the codec was used for compression first.`,
encoding as "gzip" | "lz4" | "snappy" | "zstd",
)
}

this.decompressedBody = codec.decompress(this.body)
return this.decompressedBody
}

/**
* Converts the message (which is deliviered as an uint8array) to a string
* Converts the message (which is delivered as an uint8array) to a string,
* auto-decompressing if the contentEncoding indicates compression.
*/
bodyToString(): string | null {
if (this.body) {
if (typeof Buffer !== "undefined") return Buffer.from(this.body).toString()
else return new TextDecoder().decode(this.body)
const body = this.bodyDecompressed()
if (!body) return null

if (typeof Buffer !== "undefined") {
return Buffer.from(body).toString()
} else {
return null
return new TextDecoder().decode(body)
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/amqp-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { AMQPMessage } from "./amqp-message.js"
import type { AMQPChannel, ConsumeParams } from "./amqp-channel.js"
import type { AMQPProperties } from "./amqp-properties.js"
import type { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
import type { PublishOptions } from "./amqp-compression.js"

/**
* Convenience class for queues
Expand Down Expand Up @@ -46,15 +47,17 @@ export class AMQPQueue {
* Publish a message directly to the queue
* @param body - the data to be published, can be a string or an uint8array
* @param properties - publish properties
* @param options - publish options including compression settings
* @return fulfilled when the message is enqueue on the socket, or if publish confirm is enabled when the message is confirmed by the server
*/
publish(
body: string | Uint8Array | ArrayBuffer | Buffer | null,
properties: AMQPProperties = {},
options?: PublishOptions,
): Promise<AMQPQueue> {
return new Promise<AMQPQueue>((resolve, reject) => {
this.channel
.basicPublish("", this.name, body, properties)
.basicPublish("", this.name, body, properties, false, false, options)
.then(() => resolve(this))
.catch(reject)
})
Expand Down
25 changes: 25 additions & 0 deletions src/compression-codecs.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Type declarations for optional compression peer dependencies

declare module "lz4-napi" {
export function compress(data: Buffer): Promise<Buffer>
export function compressSync(data: Buffer): Buffer
export function uncompress(data: Buffer): Promise<Buffer>
export function uncompressSync(data: Buffer): Buffer
}

declare module "snappy" {
export function compress(data: Uint8Array | Buffer): Promise<Buffer>
export function compressSync(data: Uint8Array | Buffer): Buffer
export function uncompress(data: Uint8Array | Buffer): Promise<Buffer>
export function uncompressSync(data: Uint8Array | Buffer): Buffer
}

declare module "@mongodb-js/zstd" {
export function compress(data: Buffer): Promise<Buffer>
export function decompress(data: Buffer): Promise<Buffer>
}

declare module "pako" {
export function gzip(data: Uint8Array): Uint8Array
export function ungzip(data: Uint8Array): Uint8Array
}
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ export { AMQPError } from "./amqp-error.js"
export { AMQPMessage } from "./amqp-message.js"
export { AMQPProperties, Field } from "./amqp-properties.js"
export { AMQPTlsOptions } from "./amqp-tls-options.js"
export {
compressionRegistry,
CompressionError,
type CompressionAlgorithm,
type CompressionCodec,
type PublishOptions,
} from "./amqp-compression.js"
Loading
Loading