Skip to content

Commit 28f10b0

Browse files
authored
detect when binlog_row_metadata becomes minimal (#3698)
And add a unit test to make sure exceptions are always using pointer receiver for consistency
1 parent aa7ac99 commit 28f10b0

File tree

4 files changed

+82
-1
lines changed

4 files changed

+82
-1
lines changed

flow/alerting/classifier.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ var (
115115
ErrorNotifyBinlogInvalid = ErrorClass{
116116
Class: "NOTIFY_BINLOG_INVALID", action: NotifyUser,
117117
}
118+
ErrorNotifyBinlogRowMetadataInvalid = ErrorClass{
119+
Class: "NOTIFY_BINLOG_ROW_METADATA_INVALID", action: NotifyUser,
120+
}
118121
ErrorNotifyBadGTIDSetup = ErrorClass{
119122
Class: "NOTIFY_BAD_MULTISOURCE_GTID_SETUP", action: NotifyUser,
120123
}
@@ -831,6 +834,14 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
831834
}
832835
}
833836

837+
var unsupportedBinlogRowMetadataError *exceptions.MySQLUnsupportedBinlogRowMetadataError
838+
if errors.As(err, &unsupportedBinlogRowMetadataError) {
839+
return ErrorNotifyBinlogRowMetadataInvalid, ErrorInfo{
840+
Source: ErrorSourceMySQL,
841+
Code: "UNSUPPORTED_BINLOG_ROW_METADATA",
842+
}
843+
}
844+
834845
var postgresPrimaryKeyModifiedError *exceptions.PrimaryKeyModifiedError
835846
if errors.As(err, &postgresPrimaryKeyModifiedError) {
836847
return ErrorUnsupportedSchemaChange, ErrorInfo{

flow/connectors/mysql/cdc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
2929
"github.com/PeerDB-io/peerdb/flow/shared"
3030
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
31+
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
3132
"github.com/PeerDB-io/peerdb/flow/shared/types"
3233
)
3334

@@ -488,6 +489,11 @@ func (c *MySqlConnector) PullRecords(
488489
}
489490
}
490491
case *replication.RowsEvent:
492+
if len(ev.Table.ColumnName) == 0 && len(ev.Table.ColumnType) > 0 {
493+
e := exceptions.NewMySQLUnsupportedBinlogRowMetadataError(string(ev.Table.Schema), string(ev.Table.Table))
494+
c.logger.Error(e.Error())
495+
return e
496+
}
491497
sourceTableName := string(ev.Table.Schema) + "." + string(ev.Table.Table) // TODO this is fragile
492498
destinationTableName := req.TableNameMapping[sourceTableName].Name
493499
exclusion := req.TableNameMapping[sourceTableName].Exclude

flow/shared/exceptions/constructor_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,53 @@ func TestErrorConstructorsShouldReturnPointers(t *testing.T) {
4949
})
5050
}
5151
}
52+
53+
// For consistency/convention, tests that all error types implement Error() method with pointer receiver
54+
func TestErrorMethodsShouldHavePointerReceivers(t *testing.T) {
55+
cfg := &packages.Config{
56+
Mode: packages.NeedFiles | packages.NeedSyntax | packages.NeedTypes | packages.NeedTypesInfo,
57+
Dir: ".",
58+
}
59+
60+
pkgs, err := packages.Load(cfg, ".")
61+
require.NoError(t, err)
62+
require.Len(t, pkgs, 1, "Expected exactly one package")
63+
64+
pkg := pkgs[0]
65+
66+
// Track error type names (structs that end with "Error")
67+
errorTypes := make(map[string]bool)
68+
for _, file := range pkg.Syntax {
69+
ast.Inspect(file, func(n ast.Node) bool {
70+
// Find all struct types that end with "Error"
71+
if typeSpec, ok := n.(*ast.TypeSpec); ok {
72+
if _, isStruct := typeSpec.Type.(*ast.StructType); isStruct {
73+
if strings.HasSuffix(typeSpec.Name.Name, "Error") {
74+
errorTypes[typeSpec.Name.Name] = true
75+
}
76+
}
77+
}
78+
return true
79+
})
80+
}
81+
82+
for _, file := range pkg.Syntax {
83+
ast.Inspect(file, func(n ast.Node) bool {
84+
if fn, ok := n.(*ast.FuncDecl); ok {
85+
// Look for receiver methods
86+
if fn.Recv != nil && len(fn.Recv.List) > 0 {
87+
recvType := fn.Recv.List[0].Type
88+
// Fail if found value receivers in error types
89+
if recv, ok := recvType.(*ast.Ident); ok && errorTypes[recv.Name] {
90+
pos := pkg.Fset.Position(fn.Pos())
91+
assert.Fail(
92+
t, "Error() method should have pointer receiver",
93+
"%s: func (%s) Error() should be func (*%s) Error()",
94+
pos.Filename, recv.Name, recv.Name)
95+
}
96+
}
97+
}
98+
return true
99+
})
100+
}
101+
}

flow/shared/exceptions/mysql.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,21 @@ func NewMySQLIncompatibleColumnTypeError(
2424
}
2525
}
2626

27-
func (e MySQLIncompatibleColumnTypeError) Error() string {
27+
func (e *MySQLIncompatibleColumnTypeError) Error() string {
2828
return fmt.Sprintf("Incompatible type for column %s in table %s, expect qkind %s but data is %s (mysql type %d)",
2929
e.ColumnName, e.TableName, e.qkind, e.dataType, e.columnType)
3030
}
31+
32+
type MySQLUnsupportedBinlogRowMetadataError struct {
33+
SchemaName string
34+
TableName string
35+
}
36+
37+
func NewMySQLUnsupportedBinlogRowMetadataError(schema string, table string) *MySQLUnsupportedBinlogRowMetadataError {
38+
return &MySQLUnsupportedBinlogRowMetadataError{SchemaName: schema, TableName: table}
39+
}
40+
41+
func (e *MySQLUnsupportedBinlogRowMetadataError) Error() string {
42+
return fmt.Sprintf("Detected binlog_row_metadata change from FULL to MINIMAL while processing %s.%s",
43+
e.SchemaName, e.TableName)
44+
}

0 commit comments

Comments
 (0)