Skip to content

Commit b0ba5d3

Browse files
committed
basic CDC functionality for Postgres
1 parent 2a0159c commit b0ba5d3

File tree

13 files changed

+1209
-125
lines changed

13 files changed

+1209
-125
lines changed

airbyte-integrations/connectors/source-postgres/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ airbyteBulkConnector {
1515

1616
dependencies {
1717
implementation 'com.azure:azure-identity:1.17.0'
18-
implementation 'io.debezium:debezium-connector-postgres'
18+
implementation 'io.debezium:debezium-connector-postgres:3.0.1.Final'
1919
implementation 'org.postgresql:postgresql:42.7.7'
2020

2121
testImplementation 'org.testcontainers:postgresql:1.21.3'

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresDebeziumConverter.java

Lines changed: 428 additions & 0 deletions
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceCdcMetaFields.kt

Lines changed: 0 additions & 24 deletions
This file was deleted.

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceCdcPosition.kt

Lines changed: 0 additions & 13 deletions
This file was deleted.

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceDebeziumOperations.kt

Lines changed: 0 additions & 74 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.postgres.cdc
6+
7+
import io.github.oshai.kotlinlogging.KotlinLogging
8+
import java.sql.Date
9+
import java.sql.Time
10+
import java.sql.Timestamp
11+
import java.time.Duration
12+
import java.time.Instant
13+
import java.time.LocalDate
14+
import java.time.LocalDateTime
15+
import java.time.LocalTime
16+
import java.time.OffsetDateTime
17+
import java.time.OffsetTime
18+
import java.time.ZoneOffset
19+
import java.time.ZonedDateTime
20+
import java.time.chrono.IsoEra
21+
import java.time.format.DateTimeFormatter
22+
import java.util.concurrent.TimeUnit
23+
import kotlin.math.abs
24+
import kotlin.math.min
25+
26+
private val LOGGER = KotlinLogging.logger {}
27+
28+
object DateTimeConverter {
29+
30+
val TIME_WITH_TIMEZONE_FORMATTER: DateTimeFormatter =
31+
DateTimeFormatter.ofPattern(
32+
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"
33+
)
34+
private var loggedUnknownTimeWithTimeZoneClass = false
35+
private var loggedUnknownTimeClass = false
36+
private var loggedUnknownTimestampWithTimeZoneClass = false
37+
private var loggedUnknownTimestampClass = false
38+
private var loggedUnknownDateClass = false
39+
40+
@JvmField val TIME_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS")
41+
@JvmField
42+
val TIMESTAMP_FORMATTER: DateTimeFormatter =
43+
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
44+
@JvmField
45+
val TIMETZ_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSXXX")
46+
@JvmField
47+
val TIMESTAMPTZ_FORMATTER: DateTimeFormatter =
48+
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX")
49+
@JvmField val DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
50+
51+
@JvmStatic
52+
fun convertToTimeWithTimezone(time: Any): String {
53+
if (time is OffsetTime) {
54+
return if (hasZeroSecondsAndNanos(time.toLocalTime())) time.format(TIMETZ_FORMATTER)
55+
else time.toString()
56+
} else {
57+
if (!loggedUnknownTimeWithTimeZoneClass) {
58+
LOGGER.info { "Unknown class for Time with timezone data type ${time.javaClass}" }
59+
loggedUnknownTimeWithTimeZoneClass = true
60+
}
61+
val timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER)
62+
return if (hasZeroSecondsAndNanos(timetz.toLocalTime())) timetz.format(TIMETZ_FORMATTER)
63+
else timetz.toString()
64+
}
65+
}
66+
67+
@JvmStatic
68+
fun convertToTimestampWithTimezone(timestamp: Any): String {
69+
if (timestamp is Timestamp) {
70+
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ
71+
// type. Conceptually, a timestamp with timezone is an Instant. But t.toInstant()
72+
// actually mangles the value for ancient dates, because leap years weren't applied
73+
// consistently in ye olden days. Additionally, toInstant() (and toLocalDateTime())
74+
// actually lose the era indicator, so we can't rely on their getEra() methods.
75+
// So we have special handling for this case, which sidesteps the toInstant conversion.
76+
val timestamptz: ZonedDateTime = timestamp.toLocalDateTime().atZone(ZoneOffset.UTC)
77+
val value = timestamptz.format(TIMESTAMPTZ_FORMATTER)
78+
return resolveEra(timestamp, value)
79+
} else if (timestamp is OffsetDateTime) {
80+
return resolveEra(timestamp.toLocalDate(), timestamp.format(TIMESTAMPTZ_FORMATTER))
81+
} else if (timestamp is ZonedDateTime) {
82+
return resolveEra(timestamp.toLocalDate(), timestamp.format(TIMESTAMPTZ_FORMATTER))
83+
} else if (timestamp is Instant) {
84+
val offsetDateTime = OffsetDateTime.ofInstant(timestamp, ZoneOffset.UTC)
85+
val timestamptz = ZonedDateTime.from(offsetDateTime)
86+
val localDate = timestamptz.toLocalDate()
87+
val value = timestamptz.format(TIMESTAMPTZ_FORMATTER)
88+
return resolveEra(localDate, value)
89+
} else {
90+
if (!loggedUnknownTimestampWithTimeZoneClass) {
91+
LOGGER.info {
92+
"Unknown class for Timestamp with time zone data type ${timestamp.javaClass}"
93+
}
94+
loggedUnknownTimestampWithTimeZoneClass = true
95+
}
96+
val instant = Instant.parse(timestamp.toString())
97+
val offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC)
98+
val timestamptz = ZonedDateTime.from(offsetDateTime)
99+
val localDate = timestamptz.toLocalDate()
100+
val value = timestamptz.format(TIMESTAMPTZ_FORMATTER)
101+
return resolveEra(localDate, value)
102+
}
103+
}
104+
105+
/** See [.convertToTimestampWithTimezone] for explanation of the weird things happening here. */
106+
@JvmStatic
107+
fun convertToTimestamp(timestamp: Any): String {
108+
if (timestamp is Timestamp) {
109+
// Snapshot mode
110+
val localDateTime: LocalDateTime = timestamp.toLocalDateTime()
111+
return resolveEra(
112+
timestamp,
113+
if (hasZeroSecondsAndNanos(localDateTime.toLocalTime()))
114+
localDateTime.format(TIMESTAMP_FORMATTER)
115+
else localDateTime.toString()
116+
)
117+
} else if (timestamp is Instant) {
118+
// Incremental mode
119+
return resolveEra(
120+
timestamp.atZone(ZoneOffset.UTC).toLocalDate(),
121+
timestamp.atOffset(ZoneOffset.UTC).toLocalDateTime().format(TIMESTAMP_FORMATTER)
122+
)
123+
} else if (timestamp is LocalDateTime) {
124+
val date: LocalDate = timestamp.toLocalDate()
125+
return resolveEra(
126+
date,
127+
if (hasZeroSecondsAndNanos(timestamp.toLocalTime()))
128+
timestamp.format(TIMESTAMP_FORMATTER)
129+
else timestamp.toString()
130+
)
131+
} else {
132+
if (!loggedUnknownTimestampClass) {
133+
LOGGER.info { "Unknown class for Timestamp data type ${timestamp.javaClass}" }
134+
loggedUnknownTimestampClass = true
135+
}
136+
val localDateTime = LocalDateTime.parse(timestamp.toString())
137+
val date = localDateTime.toLocalDate()
138+
return resolveEra(
139+
date,
140+
if (hasZeroSecondsAndNanos(localDateTime.toLocalTime()))
141+
localDateTime.format(TIMESTAMP_FORMATTER)
142+
else localDateTime.toString()
143+
)
144+
}
145+
}
146+
147+
/** See [.convertToTimestampWithTimezone] for explanation of the weird things happening here. */
148+
@JvmStatic
149+
fun convertToDate(date: Any): String {
150+
if (date is Date) {
151+
// Snapshot mode
152+
val localDate = date.toLocalDate()
153+
return resolveEra(date, localDate.format(DATE_FORMATTER))
154+
} else if (date is LocalDate) {
155+
// Incremental mode
156+
return resolveEra(date, date.format(DATE_FORMATTER))
157+
} else if (date is Int) {
158+
// Incremental mode
159+
return LocalDate.ofEpochDay(date.toLong()).format(DATE_FORMATTER)
160+
} else {
161+
if (!loggedUnknownDateClass) {
162+
LOGGER.info { "Unknown class for Date data type${date.javaClass}" }
163+
loggedUnknownDateClass = true
164+
}
165+
val localDate = LocalDate.parse(date.toString())
166+
return resolveEra(localDate, localDate.format(DATE_FORMATTER))
167+
}
168+
}
169+
170+
@JvmStatic
171+
fun convertToTime(time: Any): String {
172+
if (time is Time) {
173+
return formatTime(time.toLocalTime())
174+
} else if (time is LocalTime) {
175+
return formatTime(time)
176+
} else if (time is Duration) {
177+
val value = time.toNanos()
178+
if (value >= 0 && value < TimeUnit.DAYS.toNanos(1)) {
179+
return formatTime(LocalTime.ofNanoOfDay(value))
180+
} else {
181+
val updatedValue =
182+
min(abs(value.toDouble()), LocalTime.MAX.toNanoOfDay().toDouble()).toLong()
183+
LOGGER.debug {
184+
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its $value, converting to $updatedValue "
185+
}
186+
return formatTime(LocalTime.ofNanoOfDay(updatedValue))
187+
}
188+
} else {
189+
if (!loggedUnknownTimeClass) {
190+
LOGGER.info { "Unknown class for Time data type ${time.javaClass}" }
191+
loggedUnknownTimeClass = true
192+
}
193+
194+
val valueAsString = time.toString()
195+
if (valueAsString.startsWith("24")) {
196+
LOGGER.debug {
197+
"Time value ${valueAsString} is above range, converting to 23:59:59"
198+
}
199+
return LocalTime.MAX.toString()
200+
}
201+
return formatTime(LocalTime.parse(valueAsString))
202+
}
203+
}
204+
205+
@JvmStatic
206+
private fun formatTime(localTime: LocalTime): String {
207+
return if (hasZeroSecondsAndNanos(localTime)) localTime.format(TIME_FORMATTER)
208+
else localTime.toString()
209+
}
210+
211+
@JvmStatic
212+
fun hasZeroSecondsAndNanos(localTime: LocalTime): Boolean {
213+
return (localTime.second == 0 && localTime.nano == 0)
214+
}
215+
216+
/** A Date representing the earliest date in CE. Any date before this is in BCE. */
217+
private val ONE_CE: Date = Date.valueOf("0001-01-01")
218+
219+
/**
220+
* Modifies a string representation of a date/timestamp and normalizes its era indicator.
221+
* Specifically, if this is a BCE value:
222+
*
223+
* * The leading negative sign will be removed if present
224+
* * The "BC" suffix will be appended, if not already present
225+
*
226+
* You most likely would prefer to call one of the overloaded methods, which accept temporal
227+
* types.
228+
*/
229+
fun resolveEra(isBce: Boolean, value: String): String {
230+
var mangledValue = value
231+
if (isBce) {
232+
if (mangledValue.startsWith("-")) {
233+
mangledValue = mangledValue.substring(1)
234+
}
235+
if (!mangledValue.endsWith(" BC")) {
236+
mangledValue += " BC"
237+
}
238+
}
239+
return mangledValue
240+
}
241+
242+
fun isBce(date: LocalDate): Boolean {
243+
return date.era == IsoEra.BCE
244+
}
245+
246+
@JvmStatic
247+
fun resolveEra(date: LocalDate, value: String): String {
248+
return resolveEra(isBce(date), value)
249+
}
250+
251+
/**
252+
* java.sql.Date objects don't properly represent their era (for example, using toLocalDate()
253+
* always returns an object in CE). So to determine the era, we just check whether the date is
254+
* before 1 AD.
255+
*
256+
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.),
257+
* but my understanding is that [.ONE_CE] has the same weirdness, so it cancels out.
258+
*/
259+
@JvmStatic
260+
fun resolveEra(date: Date, value: String): String {
261+
return resolveEra(date.before(ONE_CE), value)
262+
}
263+
264+
/** See [.resolveEra] for explanation. */
265+
@JvmStatic
266+
fun resolveEra(timestamp: Timestamp, value: String): String {
267+
return resolveEra(timestamp.before(ONE_CE), value)
268+
}
269+
}

0 commit comments

Comments
 (0)