Skip to content

Commit b1c5b28

Browse files
committed
Filter out change events of logical decoding messages in user.source.streaming connector
1 parent 8be6c29 commit b1c5b28

File tree

4 files changed

+6
-1
lines changed

4 files changed

+6
-1
lines changed

compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,13 @@ services:
122122
OFFSET_STORAGE_TOPIC: kafka-connect.offset
123123
STATUS_STORAGE_TOPIC: kafka-connect.status
124124
ENABLE_APICURIO_CONVERTERS: true
125+
ENABLE_DEBEZIUM_SCRIPTING: true
125126
CONNECT_EXACTLY_ONCE_SOURCE_SUPPORT: enabled
126127
CONNECT_CONFIG_PROVIDERS: "file"
127128
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: "org.apache.kafka.common.config.provider.FileConfigProvider"
128129
CONNECT_LOG4J_LOGGER_org.apache.kafka.clients: ERROR
129130
volumes:
131+
- ./kafka-connect/filtering/groovy:/kafka/connect/debezium-connector-postgres/filtering/groovy
130132
- ./kafka-connect/postgres.properties:/secrets/postgres.properties:ro
131133
- ./kafka-connect/logs/:/kafka/logs/
132134

kafka-connect/connectors/user.source.streaming.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
"column.exclude.list": "public.library_user.created_at,public.library_user.updated_at",
1919
"heartbeat.interval.ms": "5000",
2020
"publication.autocreate.mode": "filtered",
21-
"transforms": "renameTopic",
21+
"transforms": "filter,renameTopic",
22+
"transforms.filter.type": "io.debezium.transforms.Filter",
23+
"transforms.filter.language": "jsr223.groovy",
24+
"transforms.filter.condition": "valueSchema.field('op') != null && value.op != 'm'",
2225
"transforms.renameTopic.type": "io.debezium.transforms.ByLogicalTableRouter",
2326
"transforms.renameTopic.topic.regex": "(.*)public.(.*)",
2427
"transforms.renameTopic.topic.replacement": "$1users",
7.25 MB
Binary file not shown.
16.5 KB
Binary file not shown.

0 commit comments

Comments
 (0)