Skip to content

Conversation

@mwbayley
Copy link
Contributor

@mwbayley mwbayley commented Dec 1, 2025

What

Introduce basic CDC functionality for source-postgres on the bulk CDK. Does not include replication slot validation or management.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

@github-actions
Copy link
Contributor

github-actions bot commented Dec 1, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Helpful Resources

PR Slash Commands

Airbyte Maintainers (that's you!) can execute the following slash commands on your PR:

  • /format-fix - Fixes most formatting issues.
  • /bump-version - Bumps connector versions.
    • You can specify a custom changelog by passing changelog. Example: /bump-version changelog="My cool update"
    • Leaving the changelog arg blank will auto-populate the changelog from the PR title.
  • /run-cat-tests - Runs legacy CAT tests (Connector Acceptance Tests)
  • /build-connector-images - Builds and publishes a pre-release docker image for the modified connector(s).
  • JVM connectors:
    • /update-connector-cdk-version connector=<CONNECTOR_NAME> - Updates the specified connector to the latest CDK version.
      Example: /update-connector-cdk-version connector=destination-bigquery
    • /bump-bulk-cdk-version bump=patch changelog='foo' - Bump the Bulk CDK's version. bump can be major/minor/patch.
  • Python connectors:
    • /poe connector source-example lock - Run the Poe lock task on the source-example connector, committing the results back to the branch.
    • /poe source example lock - Alias for /poe connector source-example lock.
    • /poe source example use-cdk-branch my/branch - Pin the source-example CDK reference to the branch name specified.
    • /poe source example use-cdk-latest - Update the source-example CDK dependency to the latest available version.

📝 Edit this welcome message.

Copy link
Contributor Author

@mwbayley mwbayley Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was taken verbatim from the old connector, with some inlining of constants from otherwise unneeded objects and classes. It's a plugin for Debezium at runtime, and my understanding is that it shouldn't need to be changed substantially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert to kotlin - you can do one click machine conversion from the IDE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that it had to be Java, but Claude thinks Kotlin will work too. I'll give it a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated code isn't compiling, and it looks like it'll take more than a few minutes to fix. I'm going to leave it as-is for the moment. We can revisit this later, if you'd like.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 1, 2025

source-postgres Connector Test Results

0 tests  ±0   0 ✅ ±0   0s ⏱️ ±0s
0 suites ±0   0 💤 ±0 
0 files   ±0   0 ❌ ±0 

Results for commit a7c2ccd. ± Comparison against base commit cb7cc14.

♻️ This comment has been updated with latest results.

@mwbayley mwbayley force-pushed the mbayley/source-postgres/cdc-on-bulk-cdk branch 3 times, most recently from 2fb6bba to b0ba5d3 Compare December 1, 2025 23:38
@mwbayley mwbayley force-pushed the mbayley/source-postgres/cdc-on-bulk-cdk branch from b0ba5d3 to c20d070 Compare December 1, 2025 23:40
Comment on lines +70 to +75
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ
// type. Conceptually, a timestamp with timezone is an Instant. But t.toInstant()
// actually mangles the value for ancient dates, because leap years weren't applied
// consistently in ye olden days. Additionally, toInstant() (and toLocalDateTime())
// actually lose the era indicator, so we can't rely on their getEra() methods.
// So we have special handling for this case, which sidesteps the toInstant conversion.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the previous implementation, including the comment. The bit about snapshot mode confuses me a bit - are there any cases where we let Debezium take snapshots? Or is this a relic of a bygone era?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code carries a lot of legacy. For example snapshot mode is no longer valid as we took over snapshotting.
Doesn't need to be on first pass, but we might want to clean up this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also comes from the old implementation. The only addition here is the @JvmField val ... bits from line 40-49. These came from another constants class which was otherwise unused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert to kotlin - you can do one click machine conversion from the IDE

Comment on lines +70 to +75
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ
// type. Conceptually, a timestamp with timezone is an Instant. But t.toInstant()
// actually mangles the value for ancient dates, because leap years weren't applied
// consistently in ye olden days. Additionally, toInstant() (and toLocalDateTime())
// actually lose the era indicator, so we can't rely on their getEra() methods.
// So we have special handling for this case, which sidesteps the toInstant conversion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code carries a lot of legacy. For example snapshot mode is no longer valid as we took over snapshotting.
Doesn't need to be on first pass, but we might want to clean up this file

val lsn: Lsn,
val lsnProc: Lsn,
) : Comparable<PostgresSourceCdcPosition> {
override fun compareTo(other: PostgresSourceCdcPosition): Int =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the equivalent of what we do in the legacy connector?
#35939

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val txId: Long
val lsn: Long
// TODO: Does it matter if we take timestamp before or after fetching lsn and txid?
val time: Instant = Instant.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe that's what you refer here in the // TODO above:
Since startState is lazy the time will only be set when lazily wired. is that expected?

Copy link
Contributor Author

@mwbayley mwbayley Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's expected. I meant within this block. I don't think this is the case, but if Debezium filtered out records with a lower timestamp than the state, for example, we could lose records if we took the timestamp after the current LSN. They're not in the same transaction, you know? Now that I think about it, we should probably take the timestamp from the DB rather than the application. We weren't doing that previously, though.

val resultRow: NativeRecordPayload = mutableMapOf()
for (field in stream.schema) {
data[field.id] ?: continue
when (data[field.id]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure types received from debezium are working as expected.
There were some rough edged here that needed minor conversions in mysql CDC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably easier handled with an integration test. We can use all the same test cases we already defined for the JDBC type support. I filed a sub-issue for this: https://github.com/airbytehq/airbyte-internal-issues/issues/13822?issue=airbytehq%7Cairbyte-internal-issues%7C15470

@@ -93,6 +95,13 @@ data class CdcIncrementalConfiguration(
val initialLoadTimeout: Duration,
val invalidCdcCursorPositionBehavior: InvalidCdcCursorPositionBehavior,
val shutdownTimeout: Duration,
val replicationSlot: String,
val publication: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publication is optional I think in legacy
String? ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is/was required when using CDC:
image
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debezium has a default value (dbz_publication), but we don't use it. Our docs describe it as required, as well.

"A Postgres publication used for consuming changes. Read about <a href=\\\"https://docs.airbyte.com/integrations/sources/postgres#step-4-create-publications-and-replication-identities-for-tables\\\">publications and replication identities</a>."
)
@JsonSchemaInject(json = """{"order":5,"always_show":true, "minLength":1}""")
lateinit var publication: String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think publication is mandatory.
It's a logical filter on top of a replication slot. No publication is equivalent to all tables publication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A publication is necessary for CDC. That can be for all tables, or a subset. Debezium has the feature to provision one for you if it doesn't exist, but this requires superuser permissions, so most users provision it themselves. We don't use it either.

@mwbayley mwbayley marked this pull request as ready for review December 3, 2025 23:48
@mwbayley mwbayley merged commit fd5bfb5 into source-postgres/bulk-cdk Dec 3, 2025
25 of 29 checks passed
@mwbayley mwbayley deleted the mbayley/source-postgres/cdc-on-bulk-cdk branch December 3, 2025 23:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants