Skip to content

Commit 7524d20

Browse files
authored
Merge pull request #8 from Trendyol/feature/topicmapping
fix: fix table topic mapping control to resolving topic name
2 parents 54a77ab + 831e6e7 commit 7524d20

File tree

5 files changed

+25
-18
lines changed

5 files changed

+25
-18
lines changed

benchmark/go-pq-cdc-kafka/go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.22.4
55
replace github.com/Trendyol/go-pq-cdc-kafka => ../../
66

77
require (
8-
github.com/Trendyol/go-pq-cdc v0.0.10
8+
github.com/Trendyol/go-pq-cdc v1.0.1
99
github.com/Trendyol/go-pq-cdc-kafka v0.0.1
1010
github.com/segmentio/kafka-go v0.4.47
1111
)
@@ -18,21 +18,21 @@ require (
1818
github.com/jackc/pgpassfile v1.0.0 // indirect
1919
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
2020
github.com/jackc/pgx/v5 v5.6.0 // indirect
21-
github.com/klauspost/compress v1.17.9 // indirect
21+
github.com/klauspost/compress v1.18.0 // indirect
2222
github.com/lib/pq v1.10.9 // indirect
2323
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
2424
github.com/pierrec/lz4/v4 v4.1.21 // indirect
2525
github.com/pkg/errors v0.9.1 // indirect
26-
github.com/prometheus/client_golang v1.20.5 // indirect
26+
github.com/prometheus/client_golang v1.22.0 // indirect
2727
github.com/prometheus/client_model v0.6.1 // indirect
28-
github.com/prometheus/common v0.55.0 // indirect
28+
github.com/prometheus/common v0.62.0 // indirect
2929
github.com/prometheus/procfs v0.15.1 // indirect
3030
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
3131
github.com/xdg-go/scram v1.1.2 // indirect
3232
github.com/xdg-go/stringprep v1.0.4 // indirect
3333
golang.org/x/crypto v0.24.0 // indirect
34-
golang.org/x/sys v0.22.0 // indirect
35-
golang.org/x/text v0.16.0 // indirect
36-
google.golang.org/protobuf v1.34.2 // indirect
34+
golang.org/x/sys v0.30.0 // indirect
35+
golang.org/x/text v0.21.0 // indirect
36+
google.golang.org/protobuf v1.36.5 // indirect
3737
gopkg.in/yaml.v2 v2.4.0 // indirect
3838
)

benchmark/go-pq-cdc-kafka/go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
github.com/Trendyol/go-pq-cdc v0.0.5 h1:9bn2vMGxOPrg7bNJnfMMjkBPCGw2IkQRlsXkLx2a/AU=
22
github.com/Trendyol/go-pq-cdc v0.0.5/go.mod h1:RIooS3DPOWkXxq7nhrOuGgkD4x3ondWEYOrOEHAHnxc=
33
github.com/Trendyol/go-pq-cdc v0.0.10/go.mod h1:RIooS3DPOWkXxq7nhrOuGgkD4x3ondWEYOrOEHAHnxc=
4+
github.com/Trendyol/go-pq-cdc v1.0.1/go.mod h1:RIooS3DPOWkXxq7nhrOuGgkD4x3ondWEYOrOEHAHnxc=
45
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
56
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
67
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -25,6 +26,7 @@ github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFr
2526
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
2627
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
2728
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
29+
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
2830
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
2931
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
3032
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -44,10 +46,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
4446
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4547
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
4648
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
49+
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
4750
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
4851
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
4952
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
5053
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
54+
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
5155
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
5256
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
5357
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
@@ -99,6 +103,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
99103
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
100104
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
101105
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
106+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
102107
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
103108
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
104109
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -113,13 +118,15 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
113118
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
114119
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
115120
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
121+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
116122
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
117123
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
118124
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
119125
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
120126
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
121127
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
122128
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
129+
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
123130
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
124131
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
125132
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

connector.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func (c *connector) listener(ctx *replication.ListenerContext) {
125125

126126
fullTableName := c.getFullTableName(msg.TableNamespace, msg.TableName)
127127

128-
topicName := c.resolveTableToTopicName(fullTableName, msg.TableNamespace, msg.TableName)
129-
if topicName == "" {
128+
topicName, ok := c.resolveTableToTopicName(fullTableName, msg.TableNamespace, msg.TableName)
129+
if !ok {
130130
if err := ctx.Ack(); err != nil {
131131
logger.Error("ack", "error", err)
132132
}
@@ -165,31 +165,31 @@ func getTopicName(defaultTopic, messageTopic string) string {
165165
return defaultTopic
166166
}
167167

168-
func (c *connector) resolveTableToTopicName(fullTableName, tableNamespace, tableName string) string {
168+
func (c *connector) resolveTableToTopicName(fullTableName, tableNamespace, tableName string) (string, bool) {
169169
tableTopicMapping := c.cfg.Kafka.TableTopicMapping
170170
if len(tableTopicMapping) == 0 {
171-
return ""
171+
return "", true
172172
}
173173

174174
if topicName, exists := tableTopicMapping[fullTableName]; exists {
175-
return topicName
175+
return topicName, true
176176
}
177177

178178
if t, ok := timescaledb.HyperTables.Load(fullTableName); ok {
179179
parentName := t.(string)
180180
if topicName, exists := tableTopicMapping[parentName]; exists {
181-
return topicName
181+
return topicName, true
182182
}
183183
}
184184

185185
parentTableName := c.getParentTableName(fullTableName, tableNamespace, tableName)
186186
if parentTableName != "" {
187187
if topicName, exists := tableTopicMapping[parentTableName]; exists {
188-
return topicName
188+
return topicName, true
189189
}
190190
}
191191

192-
return ""
192+
return "", false
193193
}
194194

195195
func (c *connector) getParentTableName(fullTableName, tableNamespace, tableName string) string {

example/simple/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.22.5
55
replace github.com/Trendyol/go-pq-cdc-kafka => ../../
66

77
require (
8-
github.com/Trendyol/go-pq-cdc v0.0.12
8+
github.com/Trendyol/go-pq-cdc v1.0.1
99
github.com/Trendyol/go-pq-cdc-kafka v0.0.0-00010101000000-000000000000
1010
github.com/segmentio/kafka-go v0.4.47
1111
)

example/simple/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/Trendyol/go-pq-cdc v0.0.12 h1:VE38j8n47ALMJxtHWro67Fd6kyPuuMiDZc7Q3EUbomo=
2-
github.com/Trendyol/go-pq-cdc v0.0.12/go.mod h1:RIooS3DPOWkXxq7nhrOuGgkD4x3ondWEYOrOEHAHnxc=
1+
github.com/Trendyol/go-pq-cdc v1.0.1 h1:Dz1e2pImvWmFG8YbsVI5wd/QnohsJTX//NtzsV22ofY=
2+
github.com/Trendyol/go-pq-cdc v1.0.1/go.mod h1:RIooS3DPOWkXxq7nhrOuGgkD4x3ondWEYOrOEHAHnxc=
33
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
44
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
55
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

0 commit comments

Comments
 (0)