-
Notifications
You must be signed in to change notification settings - Fork 5k
source-postgres: introduce cdc on bulk cdk #70278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-postgres: introduce cdc on bulk cdk #70278
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Helpful Resources
PR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was taken verbatim from the old connector, with some inlining of constants from otherwise unneeded objects and classes. It's a plugin for Debezium at runtime, and my understanding is that it shouldn't need to be changed substantially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert to kotlin - you can do one click machine conversion from the IDE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that it had to be Java, but Claude thinks Kotlin will work too. I'll give it a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated code isn't compiling, and it looks like it'll take more than a few minutes to fix. I'm going to leave it as-is for the moment. We can revisit this later, if you'd like.
2fb6bba to
b0ba5d3
Compare
b0ba5d3 to
c20d070
Compare
| // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ | ||
| // type. Conceptually, a timestamp with timezone is an Instant. But t.toInstant() | ||
| // actually mangles the value for ancient dates, because leap years weren't applied | ||
| // consistently in ye olden days. Additionally, toInstant() (and toLocalDateTime()) | ||
| // actually lose the era indicator, so we can't rely on their getEra() methods. | ||
| // So we have special handling for this case, which sidesteps the toInstant conversion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes from the previous implementation, including the comment. The bit about snapshot mode confuses me a bit - are there any cases where we let Debezium take snapshots? Or is this a relic of a bygone era?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code carries a lot of legacy. For example snapshot mode is no longer valid as we took over snapshotting.
Doesn't need to be on first pass, but we might want to clean up this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also comes from the old implementation. The only addition here is the @JvmField val ... bits from line 40-49. These came from another constants class which was otherwise unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert to kotlin - you can do one click machine conversion from the IDE
| // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ | ||
| // type. Conceptually, a timestamp with timezone is an Instant. But t.toInstant() | ||
| // actually mangles the value for ancient dates, because leap years weren't applied | ||
| // consistently in ye olden days. Additionally, toInstant() (and toLocalDateTime()) | ||
| // actually lose the era indicator, so we can't rely on their getEra() methods. | ||
| // So we have special handling for this case, which sidesteps the toInstant conversion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code carries a lot of legacy. For example snapshot mode is no longer valid as we took over snapshotting.
Doesn't need to be on first pass, but we might want to clean up this file
| val lsn: Lsn, | ||
| val lsnProc: Lsn, | ||
| ) : Comparable<PostgresSourceCdcPosition> { | ||
| override fun compareTo(other: PostgresSourceCdcPosition): Int = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the equivalent of what we do in the legacy connector?
#35939
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.../main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresSourceDebeziumOperations.kt
Outdated
Show resolved
Hide resolved
| val txId: Long | ||
| val lsn: Long | ||
| // TODO: Does it matter if we take timestamp before or after fetching lsn and txid? | ||
| val time: Instant = Instant.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe that's what you refer here in the // TODO above:
Since startState is lazy the time will only be set when lazily wired. is that expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's expected. I meant within this block. I don't think this is the case, but if Debezium filtered out records with a lower timestamp than the state, for example, we could lose records if we took the timestamp after the current LSN. They're not in the same transaction, you know? Now that I think about it, we should probably take the timestamp from the DB rather than the application. We weren't doing that previously, though.
| val resultRow: NativeRecordPayload = mutableMapOf() | ||
| for (field in stream.schema) { | ||
| data[field.id] ?: continue | ||
| when (data[field.id]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure types received from debezium are working as expected.
There were some rough edged here that needed minor conversions in mysql CDC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably easier handled with an integration test. We can use all the same test cases we already defined for the JDBC type support. I filed a sub-issue for this: https://github.com/airbytehq/airbyte-internal-issues/issues/13822?issue=airbytehq%7Cairbyte-internal-issues%7C15470
| @@ -93,6 +95,13 @@ data class CdcIncrementalConfiguration( | |||
| val initialLoadTimeout: Duration, | |||
| val invalidCdcCursorPositionBehavior: InvalidCdcCursorPositionBehavior, | |||
| val shutdownTimeout: Duration, | |||
| val replicationSlot: String, | |||
| val publication: String, | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publication is optional I think in legacy
String? ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debezium has a default value (dbz_publication), but we don't use it. Our docs describe it as required, as well.
...n/io/airbyte/integrations/source/postgres/config/PostgresSourceConfigurationSpecification.kt
Outdated
Show resolved
Hide resolved
| "A Postgres publication used for consuming changes. Read about <a href=\\\"https://docs.airbyte.com/integrations/sources/postgres#step-4-create-publications-and-replication-identities-for-tables\\\">publications and replication identities</a>." | ||
| ) | ||
| @JsonSchemaInject(json = """{"order":5,"always_show":true, "minLength":1}""") | ||
| lateinit var publication: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think publication is mandatory.
It's a logical filter on top of a replication slot. No publication is equivalent to all tables publication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A publication is necessary for CDC. That can be for all tables, or a subset. Debezium has the feature to provision one for you if it doesn't exist, but this requires superuser permissions, so most users provision it themselves. We don't use it either.


What
Introduce basic CDC functionality for source-postgres on the bulk CDK. Does not include replication slot validation or management.
Can this PR be safely reverted and rolled back?