From 14c55504d8a5c81ffc44b32858bd162146f493da Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Tue, 10 Feb 2026 19:11:37 +0530 Subject: [PATCH 1/2] Added event time while publishing message from source connector --- .../datastax/oss/pulsar/source/CassandraSource.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 75b2ecd1..438757f1 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -807,6 +807,11 @@ public Map getProperties() { ? ImmutableMap.of(Constants.WRITETIME, msg.getProperty(Constants.WRITETIME)) : ImmutableMap.of(); } + + @Override + public Optional getEventTime() { + return Optional.of(Long.parseLong(msg.getProperty(Constants.WRITETIME))); + } } @RequiredArgsConstructor @@ -862,5 +867,10 @@ public Schema getValueSchema() { public KeyValueEncodingType getKeyValueEncodingType() { throw new UnsupportedOperationException(); } + + @Override + public Optional getEventTime() { + return kvRecord.getEventTime(); + } } } From bc85f11b6fd5a4d29d6bc19f256873ea8ed8ff11 Mon Sep 17 00:00:00 2001 From: priyanshu-ctds Date: Tue, 10 Feb 2026 20:09:18 +0530 Subject: [PATCH 2/2] Added null check --- .../java/com/datastax/oss/pulsar/source/CassandraSource.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 438757f1..767de5b3 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -810,7 +810,10 @@ public Map getProperties() { @Override public Optional getEventTime() { - return Optional.of(Long.parseLong(msg.getProperty(Constants.WRITETIME))); + if(msg.hasProperty(Constants.WRITETIME) && msg.getProperty(Constants.WRITETIME) != null){ + return Optional.of(Long.parseLong(msg.getProperty(Constants.WRITETIME))); + } + return Optional.empty(); } }