Skip to content

Commit 0544beb

Browse files
committed
Support Key-based batcher for producer (#418)
(cherry picked from commit d3174fa)
1 parent 295dfec commit 0544beb

File tree

5 files changed

+147
-1
lines changed

5 files changed

+147
-1
lines changed

index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export interface ProducerConfig {
6767
chunkingEnabled?: boolean;
6868
schema?: SchemaInfo;
6969
accessMode?: ProducerAccessMode;
70+
batchingType?: ProducerBatchType;
7071
}
7172

7273
export class Producer {
@@ -163,6 +164,7 @@ export class Message {
163164
getEventTimestamp(): number;
164165
getRedeliveryCount(): number;
165166
getPartitionKey(): string;
167+
getOrderingKey(): string;
166168
}
167169

168170
export class MessageId {
@@ -271,6 +273,10 @@ export type CompressionType =
271273
'ZSTD' |
272274
'SNAPPY';
273275

276+
export type ProducerBatchType =
277+
'DefaultBatching' |
278+
'KeyBasedBatching';
279+
274280
export type ProducerCryptoFailureAction =
275281
'FAIL' |
276282
'SEND';

src/Message.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
4545
InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
4646
InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
4747
InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
48-
InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
48+
InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
49+
InstanceMethod("getOrderingKey", &Message::GetOrderingKey)});
4950

5051
constructor = Napi::Persistent(func);
5152
constructor.SuppressDestruct();
@@ -138,6 +139,14 @@ Napi::Value Message::GetPartitionKey(const Napi::CallbackInfo &info) {
138139
return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage.get()));
139140
}
140141

142+
Napi::Value Message::GetOrderingKey(const Napi::CallbackInfo &info) {
143+
Napi::Env env = info.Env();
144+
if (!ValidateCMessage(env)) {
145+
return env.Null();
146+
}
147+
return Napi::String::New(env, pulsar_message_get_orderingKey(this->cMessage.get()));
148+
}
149+
141150
bool Message::ValidateCMessage(Napi::Env env) {
142151
if (this->cMessage.get()) {
143152
return true;

src/Message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class Message : public Napi::ObjectWrap<Message> {
4444
Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
4545
Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
4646
Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
47+
Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
4748
Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
4849
bool ValidateCMessage(Napi::Env env);
4950

src/ProducerConfig.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "SchemaInfo.h"
2020
#include "ProducerConfig.h"
2121
#include <map>
22+
#include "pulsar/ProducerConfiguration.h"
2223

2324
static const std::string CFG_TOPIC = "topic";
2425
static const std::string CFG_PRODUCER_NAME = "producerName";
@@ -40,6 +41,11 @@ static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
4041
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
4142
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
4243
static const std::string CFG_ACCESS_MODE = "accessMode";
44+
static const std::string CFG_BATCHING_TYPE = "batchingType";
45+
46+
struct _pulsar_producer_configuration {
47+
pulsar::ProducerConfiguration conf;
48+
};
4349

4450
static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = {
4551
{"UseSinglePartition", pulsar_UseSinglePartition},
@@ -71,6 +77,11 @@ static std::map<std::string, pulsar_producer_access_mode> PRODUCER_ACCESS_MODE =
7177
{"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing},
7278
};
7379

80+
static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUCER_BATCHING_TYPE = {
81+
{"DefaultBatching", pulsar::ProducerConfiguration::DefaultBatching},
82+
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
83+
};
84+
7485
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
7586
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
7687
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
@@ -208,6 +219,11 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
208219
pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(),
209220
PRODUCER_ACCESS_MODE.at(accessMode));
210221
}
222+
223+
std::string batchingType = producerConfig.Get(CFG_BATCHING_TYPE).ToString().Utf8Value();
224+
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
225+
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
226+
}
211227
}
212228

213229
ProducerConfig::~ProducerConfig() {}

tests/end_to_end.test.js

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,4 +1330,118 @@ const Pulsar = require('../index');
13301330
await client.close();
13311331
});
13321332
});
1333+
describe('KeyBasedBatchingTest', () => {
1334+
let client;
1335+
let producer;
1336+
let consumer;
1337+
let topicName;
1338+
1339+
beforeAll(async () => {
1340+
client = new Pulsar.Client({
1341+
serviceUrl: 'pulsar://localhost:6650',
1342+
});
1343+
});
1344+
1345+
afterAll(async () => {
1346+
await client.close();
1347+
});
1348+
1349+
beforeEach(async () => {
1350+
topicName = `KeyBasedBatchingTest-${Date.now()}`;
1351+
});
1352+
1353+
afterEach(async () => {
1354+
if (producer) await producer.close();
1355+
if (consumer) await consumer.close();
1356+
});
1357+
1358+
const initProducer = async (maxMessages) => {
1359+
producer = await client.createProducer({
1360+
topic: topicName,
1361+
batchingEnabled: true,
1362+
batchingMaxMessages: maxMessages,
1363+
batchingType: 'KeyBasedBatching',
1364+
batchingMaxPublishDelayMs: 3600 * 1000,
1365+
});
1366+
};
1367+
1368+
const initConsumer = async () => {
1369+
consumer = await client.subscribe({
1370+
topic: topicName,
1371+
subscription: 'SubscriptionName',
1372+
subscriptionType: 'Exclusive',
1373+
});
1374+
};
1375+
1376+
const receiveAndAck = async () => {
1377+
const msg = await consumer.receive();
1378+
await consumer.acknowledge(msg);
1379+
return msg;
1380+
};
1381+
1382+
test('testSequenceId', async () => {
1383+
await initProducer(6);
1384+
await initConsumer();
1385+
1386+
// 0. Send 6 messages, use different keys and order
1387+
producer.send({ data: Buffer.from('0'), partitionKey: 'A' });
1388+
producer.send({ data: Buffer.from('1'), partitionKey: 'B' });
1389+
producer.send({ data: Buffer.from('2'), partitionKey: 'C' });
1390+
producer.send({ data: Buffer.from('3'), partitionKey: 'B' });
1391+
producer.send({ data: Buffer.from('4'), partitionKey: 'C' });
1392+
producer.send({ data: Buffer.from('5'), partitionKey: 'A' });
1393+
await producer.flush();
1394+
1395+
// 1. Receive all messages
1396+
const received = [];
1397+
for (let i = 0; i < 6; i += 1) {
1398+
const msg = await receiveAndAck();
1399+
received.push({
1400+
key: msg.getPartitionKey().toString(),
1401+
value: msg.getData().toString(),
1402+
});
1403+
}
1404+
1405+
// 2. Verify message order (based on key dictionary order)
1406+
const expected = [
1407+
{ key: 'B', value: '1' },
1408+
{ key: 'B', value: '3' },
1409+
{ key: 'C', value: '2' },
1410+
{ key: 'C', value: '4' },
1411+
{ key: 'A', value: '0' },
1412+
{ key: 'A', value: '5' },
1413+
];
1414+
1415+
expect(received).toEqual(expected);
1416+
});
1417+
1418+
test('testOrderingKeyPriority', async () => {
1419+
await initProducer(3);
1420+
await initConsumer();
1421+
1422+
// 1. Send 3 messages to verify orderingKey takes precedence over partitionKey
1423+
producer.send({
1424+
data: Buffer.from('0'),
1425+
orderingKey: 'A',
1426+
partitionKey: 'B',
1427+
});
1428+
producer.send({ data: Buffer.from('2'), orderingKey: 'B' });
1429+
producer.send({ data: Buffer.from('1'), orderingKey: 'A' });
1430+
await producer.flush();
1431+
1432+
// 2. Receive messages and verify their order and keys
1433+
const msg1 = await receiveAndAck();
1434+
expect(msg1.getData().toString()).toBe('2');
1435+
expect(msg1.getOrderingKey().toString()).toBe('B');
1436+
1437+
const msg2 = await receiveAndAck();
1438+
expect(msg2.getData().toString()).toBe('0');
1439+
expect(msg2.getOrderingKey()).toBe('A');
1440+
expect(msg2.getPartitionKey()).toBe('B');
1441+
1442+
const msg3 = await receiveAndAck();
1443+
expect(msg3.getData().toString()).toBe('1');
1444+
expect(msg3.getOrderingKey().toString()).toBe('A');
1445+
});
1446+
});
13331447
})();

0 commit comments

Comments
 (0)