Skip to content

kibae/pg-logical-replication

Repository files navigation

pg-logical-replication

NPM Version License

PostgreSQL Versions on Node.js 16, 18, 20, 22, 24
PostgreSQL 14 Node.js(16, 18, 20, 22, 24) w/Postgres 14
PostgreSQL 15 Node.js(16, 18, 20, 22, 24) w/Postgres 15
PostgreSQL 16 Node.js(16, 18, 20, 22, 24) w/Postgres 16
PostgreSQL 17 Node.js(16, 18, 20, 22, 24) w/Postgres 17
PostgreSQL 18 Node.js(16, 18, 20, 22, 24) w/Postgres 18

1. Install

$ npm install pg-logical-replication

2. Usage

  • This is an example using wal2json. A replication slot(test_slot_wal2json) must be created on the PostgreSQL server.
    • SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';

const service = new LogicalReplicationService(
  /**
   * node-postgres Client options for connection
   * https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
   */
  {
    database: 'playground',
    // ...
  },
  /**
   * Logical replication service config
   * https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
   */
  {
    acknowledge: {
      auto: true,
      timeoutSeconds: 10
    }
  }
)

// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonPlugin({
  /**
   * Plugin options for wal2json
   * https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
   */
  //...
});

/**
 * Wal2Json.Output
 * https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
 */
service.on('data', (lsn: string, log: Wal2Json.Output) => {
  // Do something what you want.
  // log.change.filter((change) => change.kind === 'insert').length;
});

// Start subscribing to data change events.
(function proc() {
  service.subscribe(plugin, slotName)
    .catch((e) => {
      console.error(e);
    })
    .then(() => {
      setTimeout(proc, 100);
    });
})();

3. LogicalReplicationService

3-1. Constructor(clientConfig: ClientConfig, config?: Partial<LogicalReplicationConfig>)

const service = new LogicalReplicationService({
  /**
   * node-postgres Client options for connection
   * https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
   */
  clientConfig: {
    user? : string | undefined;
    database? : string | undefined;
    password? : string | (() => string | Promise<string>) | undefined;
    port? : number | undefined;
    host? : string | undefined;
    connectionString? : string | undefined;
    keepAlive? : boolean | undefined;
    stream? : stream.Duplex | undefined;
    statement_timeout? : false | number | undefined;
    parseInputDatesAsUTC? : boolean | undefined;
    ssl? : boolean | ConnectionOptions | undefined;
    query_timeout? : number | undefined;
    keepAliveInitialDelayMillis? : number | undefined;
    idle_in_transaction_session_timeout? : number | undefined;
    application_name? : string | undefined;
    connectionTimeoutMillis? : number | undefined;
    types? : CustomTypesConfig | undefined;
    options? : string | undefined;
  },
  /**
   * Logical replication service config
   * https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
   */
  config? : Partial<{
    acknowledge?: {
      /**
       * If the value is false, acknowledge must be done manually.
       * Default: true
       */
      auto: boolean;
      /**
       * Periodically sends standby status (keepalive) to PostgreSQL.
       * When auto is true, also advances the WAL flush position.
       * Set to 0 to disable. Default: 10
       */
      timeoutSeconds: 0 | 10 | number;
    };
    flowControl?: {
      /**
       * If true, pause the stream until the data handler completes.
       * This enables backpressure support for async handlers.
       * Default: false
       */
      enabled: boolean;
    };
  }>
})

3-2. subscribe(plugin: AbstractPlugin, slotName: string, uptoLsn?: string): Promise<this>

3-3. acknowledge(lsn: string): Promise<boolean>

  • After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
  • Usually this is done automatically.
  • Manually use only when new LogicalReplicationService({}, {acknowledge: {auto: false}}).

Manual acknowledge example:

const service = new LogicalReplicationService(clientConfig, {
  acknowledge: {
    auto: false,      // Disable automatic acknowledgement
    timeoutSeconds: 0 // Disable periodic standby status as well
  }
});

service.on('data', async (lsn: string, log: Wal2Json.Output) => {
  try {
    // Process the change
    await processChange(log);

    // Manually acknowledge only after successful processing.
    // PostgreSQL will not advance the replication slot until this is called.
    await service.acknowledge(lsn);
  } catch (err) {
    // If you don't acknowledge, PostgreSQL will re-send this change on reconnect.
    console.error('Failed to process, skipping ack:', err);
  }
});

Note: When auto: false, timeoutSeconds still sends periodic standby status (keepalive) to PostgreSQL to maintain the connection, but does not advance the WAL flush position. Set timeoutSeconds: 0 to disable keepalive entirely.

3-4. Flow Control (Backpressure)

When processing messages takes longer than the rate at which PostgreSQL sends them, the internal buffer can grow indefinitely, leading to memory issues (OOM). The flowControl option enables backpressure support to prevent this.

const service = new LogicalReplicationService(clientConfig, {
  acknowledge: { auto: true, timeoutSeconds: 10 },
  flowControl: { enabled: true }  // Enable backpressure support
});

// Now async handlers are fully supported - the stream pauses until processing completes
service.on('data', async (lsn: string, log: Pgoutput.Message) => {
  await someSlowAsyncOperation(log);  // Safe: next message waits for this to complete
});

How it works:

  • When flowControl.enabled is true, the stream is paused while processing each message
  • Messages are queued and processed sequentially
  • The stream resumes only after the handler (including async operations) completes
  • This prevents memory overflow when handlers are slower than the incoming message rate

Default behavior:

  • flowControl.enabled defaults to false for backward compatibility
  • When disabled, messages are emitted immediately without waiting for handler completion

3-5. Event

  • on(event: 'start', listener: () => Promise<void> | void)
    • Emitted when replication starts.
  • on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
    • Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
  • on(event: 'error', listener: (err: Error) => void)
  • on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
    • Emitted when acknowledging automatically.
  • on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
    • A heartbeat check signal has been received from the server. You may need to run service.acknowledge().

3-6. Misc. method

  • stop(): Promise<this>
    • Terminate the server's connection and stop replication.
    • Event listeners registered on the service are preserved, so you can re-subscribe and receive events again.
  • destroy(): Promise<this>
    • Calls stop() and then removes all event listeners via removeAllListeners().
    • Use this when you are done with the service entirely and want to ensure clean shutdown (e.g. at the end of a test or application lifecycle).
  • isStop(): boolean
    • Returns false when replication starts from the server.
  • lastLsn(): string

4. Output Plugins

4-1. PgoutputPlugin for pgoutput (Native to PostgreSQL)

  • Use the pgoutput plugin to process large-scale transactions.

4-2. Wal2JsonPlugin for wal2json

4-3. ProtocolBuffersPlugin for decoderbufs

4-4. TestDecodingPlugin for test_decoding (Not recommended)

Contributors