Skip to content

Commit 03a31da

Browse files
committed
chore: move off experimental changefeed query for CRDB
1 parent a38bbea commit 03a31da

File tree

8 files changed

+33
-15
lines changed

8 files changed

+33
-15
lines changed

internal/datastore/crdb/crdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
119119
log.Info().Object("version", version).Msg("using changefeed query for CRDB version < 22")
120120
changefeedQuery = queryChangefeedPreV22
121121
}
122+
if version.Major < 25 {
123+
log.Info().Object("version", version).Msg("using changefeed query for CRDB version < 25")
124+
changefeedQuery = queryChangefeedPreV25
125+
}
122126

123127
transactionNowQuery := queryTransactionNow
124128
if version.Major < 23 {

internal/datastore/crdb/crdb_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestWatchFeatureDetection(t *testing.T) {
251251
{
252252
name: "rangefeeds disabled",
253253
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
254-
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = false;`)
254+
_, err := adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = false;`)
255255
require.NoError(t, err)
256256
},
257257
expectEnabled: false,
@@ -260,7 +260,7 @@ func TestWatchFeatureDetection(t *testing.T) {
260260
{
261261
name: "rangefeeds enabled, user doesn't have permission",
262262
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
263-
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
263+
_, err := adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
264264
require.NoError(t, err)
265265
},
266266
expectEnabled: false,
@@ -269,7 +269,7 @@ func TestWatchFeatureDetection(t *testing.T) {
269269
{
270270
name: "rangefeeds enabled, user has permission",
271271
postInit: func(ctx context.Context, adminConn *pgx.Conn) {
272-
_, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
272+
_, err := adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`)
273273
require.NoError(t, err)
274274

275275
_, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, schema.TableTuple))

internal/datastore/crdb/pool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestTxReset(t *testing.T) {
117117

118118
r, err := ds.ReadyState(ctx)
119119
require.NoError(err)
120-
require.True(r.IsReady)
120+
require.True(r.IsReady, "datastore not ready: %s", r.Message)
121121

122122
// WriteNamespace utilizes execute so we'll use it
123123
i := 0

internal/datastore/crdb/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ const MinimumSupportedCockroachDBVersion = "23.1.30"
88
// LatestTestedCockroachDBVersion is the latest version of CockroachDB that has been tested with this driver.
99
//
1010
// NOTE: must match a tag on DockerHub for the `cockroachdb/cockroach` image, without the `v` prefix.
11-
const LatestTestedCockroachDBVersion = "24.2.9"
11+
const LatestTestedCockroachDBVersion = "25.3.2"

internal/datastore/crdb/watch.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727
)
2828

2929
const (
30-
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
30+
queryChangefeed = "CREATE CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
31+
queryChangefeedPreV25 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
3132
queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';"
3233
)
3334

internal/testserver/datastore/crdb.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package datastore
66
import (
77
"context"
88
"fmt"
9+
"sync"
910
"testing"
1011

1112
"github.com/google/uuid"
@@ -24,15 +25,19 @@ const (
2425
enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;`
2526
)
2627

28+
// crdbTester is safe for concurrent use by tests.
2729
type crdbTester struct {
28-
conn *pgx.Conn
29-
hostname string
30-
creds string
31-
port string
30+
conn *pgx.Conn // GUARDED_BY(connMutex)
31+
connMutex sync.Mutex
32+
hostname string
33+
creds string
34+
port string
3235
}
3336

37+
var _ RunningEngineForTest = (*crdbTester)(nil)
38+
3439
// RunCRDBForTesting returns a RunningEngineForTest for CRDB
35-
func RunCRDBForTesting(t testing.TB, bridgeNetworkName string, crdbVersion string) RunningEngineForTest {
40+
func RunCRDBForTesting(t testing.TB, bridgeNetworkName string, crdbVersion string) *crdbTester {
3641
pool, err := dockertest.NewPool("")
3742
require.NoError(t, err)
3843

@@ -55,6 +60,8 @@ func RunCRDBForTesting(t testing.TB, bridgeNetworkName string, crdbVersion strin
5560
creds: "root:fake",
5661
}
5762
t.Cleanup(func() {
63+
builder.connMutex.Lock()
64+
defer builder.connMutex.Unlock()
5865
if builder.conn != nil {
5966
require.NoError(t, builder.conn.Close(context.Background()))
6067
}
@@ -74,26 +81,31 @@ func RunCRDBForTesting(t testing.TB, bridgeNetworkName string, crdbVersion strin
7481
var err error
7582
ctx, cancelConnect := context.WithTimeout(context.Background(), dockerBootTimeout)
7683
defer cancelConnect()
77-
builder.conn, err = pgx.Connect(ctx, uri)
84+
conn, err := pgx.Connect(ctx, uri)
7885
if err != nil {
7986
return err
8087
}
88+
builder.connMutex.Lock()
89+
builder.conn = conn
8190
ctx, cancelRangeFeeds := context.WithTimeout(context.Background(), dockerBootTimeout)
8291
defer cancelRangeFeeds()
8392
_, err = builder.conn.Exec(ctx, enableRangefeeds)
93+
builder.connMutex.Unlock()
8494
return err
8595
}))
8696

8797
return builder
8898
}
8999

90-
// NewDatabase creates a database. It is NOT safe for concurrent use.
100+
// NewDatabase creates a database.
91101
func (r *crdbTester) NewDatabase(t testing.TB) string {
92102
uniquePortion, err := secrets.TokenHex(4)
93103
require.NoError(t, err)
94104

95105
newDBName := "db" + uniquePortion
96106

107+
r.connMutex.Lock()
108+
defer r.connMutex.Unlock()
97109
_, err = r.conn.Exec(context.Background(), "CREATE DATABASE "+newDBName)
98110
require.NoError(t, err)
99111

@@ -107,7 +119,7 @@ func (r *crdbTester) NewDatabase(t testing.TB) string {
107119
return connectStr
108120
}
109121

110-
// NewDatastore creates a database and runs migrations on it. It is NOT safe for concurrent use.
122+
// NewDatastore creates a database and runs migrations on it.
111123
func (r *crdbTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore {
112124
connectStr := r.NewDatabase(t)
113125

magefiles/util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func testWithArgs(ctx context.Context, args ...string) ([]string, error) {
4444
"test",
4545
"-failfast",
4646
"-count=1",
47+
"-race",
4748
}, args...)
4849

4950
return testArgs, nil

pkg/datastore/test/relationships.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) {
5454

5555
ok, err := ds.ReadyState(ctx)
5656
require.NoError(err)
57-
require.True(ok.IsReady)
57+
require.True(ok.IsReady, "datastore not ready: %s", ok.Message)
5858

5959
setupDatastore(ds, require)
6060

0 commit comments

Comments
 (0)