diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 2c56b29f18..8ce4b2336a 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -51,14 +51,6 @@ func NewPostgresMetadataFromCatalog(logger log.Logger, pool shared.CatalogPool) } } -func (p *PostgresMetadata) Ping(ctx context.Context) error { - if err := p.pool.Ping(ctx); err != nil { - return fmt.Errorf("metadata db ping failed: %w", err) - } - - return nil -} - func (p *PostgresMetadata) LogFlowInfo(ctx context.Context, flowName string, info string) error { _, err := p.pool.Exec(ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", diff --git a/flow/connectors/mysql/mysql.go b/flow/connectors/mysql/mysql.go index 6aee0ab27c..6cb5dcff0c 100644 --- a/flow/connectors/mysql/mysql.go +++ b/flow/connectors/mysql/mysql.go @@ -143,8 +143,21 @@ func (c *MySqlConnector) Close() error { } func (c *MySqlConnector) ConnectionActive(ctx context.Context) error { - _, err := c.Execute(ctx, "SELECT 1") - return err + // conn, err := c.connect(ctx) + // if err != nil { + // c.logger.Error("failed to connect to MySQL", slog.Any("error", err)) + // return err + // } + // if err := conn.Ping(); err != nil { + // c.logger.Error("failed to ping MySQL connection", slog.Any("error", err)) + // return err + // } + if _, err := c.Execute(ctx, "SELECT 1"); err != nil { + c.logger.Error("failed to execute test query on MySQL connection", slog.Any("error", err)) + return err + } + + return nil } func (c *MySqlConnector) Dialer() client.Dialer { diff --git a/flow/connectors/mysql/rds_test.go b/flow/connectors/mysql/rds_test.go index 360081554b..df6adbae81 100644 --- a/flow/connectors/mysql/rds_test.go +++ b/flow/connectors/mysql/rds_test.go @@ -35,7 +35,7 @@ func TestAwsRDSIAMAuthConnectForMYSQL(t *testing.T) { }, }) require.NoError(t, err) - require.NoError(t, mysqlConnector.Ping(t.Context())) + require.NoError(t, mysqlConnector.ConnectionActive(t.Context())) } func TestAwsRDSIAMAuthConnectForMYSQLViaProxy(t *testing.T) { @@ -66,5 +66,5 @@ func TestAwsRDSIAMAuthConnectForMYSQLViaProxy(t *testing.T) { }, }) require.NoError(t, err) - require.NoError(t, mysqlConnector.Ping(t.Context())) + require.NoError(t, mysqlConnector.ConnectionActive(t.Context())) }