Skip to content

Commit 2dd8aec

Browse files
committed
enable ShardedCluster + better code sharing for validation
1 parent b212cf9 commit 2dd8aec

File tree

3 files changed

+184
-67
lines changed

3 files changed

+184
-67
lines changed

flow/connectors/mongo/validate.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,20 @@ package connmongo
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
75

86
"github.com/PeerDB-io/peerdb/flow/generated/protos"
97
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
108
)
119

1210
func (c *MongoConnector) ValidateCheck(ctx context.Context) error {
13-
version, err := c.GetVersion(ctx)
14-
if err != nil {
11+
if err := shared_mongo.ValidateServerCompatibility(ctx, c.client); err != nil {
1512
return err
1613
}
17-
cmp, err := shared_mongo.CompareServerVersions(version, shared_mongo.MinSupportedVersion)
18-
if err != nil {
14+
15+
if err := shared_mongo.ValidateUserRoles(ctx, c.client); err != nil {
1916
return err
2017
}
21-
if cmp == -1 {
22-
return fmt.Errorf("require minimum mongo version %s", shared_mongo.MinSupportedVersion)
23-
}
18+
2419
return nil
2520
}
2621

@@ -29,21 +24,9 @@ func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
2924
return nil
3025
}
3126

32-
if _, err := shared_mongo.GetReplSetGetStatus(ctx, c.client); err != nil {
33-
return err
34-
}
35-
36-
serverStatus, err := shared_mongo.GetServerStatus(ctx, c.client)
37-
if err != nil {
27+
if err := shared_mongo.ValidateOplogRetention(ctx, c.client); err != nil {
3828
return err
3929
}
40-
if serverStatus.StorageEngine.Name != "wiredTiger" {
41-
return errors.New("storage engine must be 'wiredTiger'")
42-
}
43-
if serverStatus.OplogTruncation.OplogMinRetentionHours == 0 ||
44-
serverStatus.OplogTruncation.OplogMinRetentionHours < shared_mongo.MinOplogRetentionHours {
45-
return errors.New("oplog retention must be set to >= 24 hours")
46-
}
4730

4831
return nil
4932
}

flow/shared/mongo/commands.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"go.mongodb.org/mongo-driver/v2/bson"
8+
"go.mongodb.org/mongo-driver/v2/mongo"
9+
)
10+
11+
type BuildInfo struct {
12+
Version string `bson:"version"`
13+
}
14+
15+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
16+
return runCommand[BuildInfo](ctx, client, "buildInfo")
17+
}
18+
19+
type ReplSetGetStatus struct {
20+
Set string `bson:"set"`
21+
MyState int `bson:"myState"`
22+
}
23+
24+
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
25+
return runCommand[ReplSetGetStatus](ctx, client, "replSetGetStatus")
26+
}
27+
28+
type OplogTruncation struct {
29+
OplogMinRetentionHours float64 `bson:"oplogMinRetentionHours"`
30+
}
31+
32+
type StorageEngine struct {
33+
Name string `bson:"name"`
34+
}
35+
36+
type ServerStatus struct {
37+
StorageEngine StorageEngine `bson:"storageEngine"`
38+
OplogTruncation OplogTruncation `bson:"oplogTruncation"`
39+
}
40+
41+
func GetServerStatus(ctx context.Context, client *mongo.Client) (*ServerStatus, error) {
42+
return runCommand[ServerStatus](ctx, client, "serverStatus")
43+
}
44+
45+
type ConnectionStatus struct {
46+
AuthInfo AuthInfo `bson:"authInfo"`
47+
}
48+
49+
type AuthInfo struct {
50+
AuthenticatedUserRoles []Role `bson:"authenticatedUserRoles"`
51+
}
52+
53+
type Role struct {
54+
Role string `bson:"role"`
55+
DB string `bson:"db"`
56+
}
57+
58+
func GetConnectionStatus(ctx context.Context, client *mongo.Client) (*ConnectionStatus, error) {
59+
return runCommand[ConnectionStatus](ctx, client, "connectionStatus")
60+
}
61+
62+
type HelloResponse struct {
63+
Msg string `bson:"msg,omitempty"`
64+
Hosts []string `bson:"hosts,omitempty"`
65+
}
66+
67+
func GetHelloResponse(ctx context.Context, client *mongo.Client) (*HelloResponse, error) {
68+
return runCommand[HelloResponse](ctx, client, "hello")
69+
}
70+
71+
func runCommand[T any](ctx context.Context, client *mongo.Client, command string) (*T, error) {
72+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
73+
bson.E{Key: command, Value: 1},
74+
})
75+
if singleResult.Err() != nil {
76+
return nil, fmt.Errorf("'%s' failed: %v", command, singleResult.Err())
77+
}
78+
79+
var result T
80+
if err := singleResult.Decode(&result); err != nil {
81+
return nil, fmt.Errorf("'%s' failed: %v", command, err)
82+
}
83+
return &result, nil
84+
}

flow/shared/mongo/validation.go

Lines changed: 95 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,125 @@ package mongo
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

7-
"go.mongodb.org/mongo-driver/v2/bson"
88
"go.mongodb.org/mongo-driver/v2/mongo"
99
)
1010

1111
const (
1212
MinSupportedVersion = "5.1.0"
1313
MinOplogRetentionHours = 24
14+
15+
ReplicaSet = "ReplicaSet"
16+
ShardedCluster = "ShardedCluster"
1417
)
1518

16-
type BuildInfo struct {
17-
Version string `bson:"version"`
18-
}
19+
func ValidateServerCompatibility(ctx context.Context, client *mongo.Client) error {
20+
buildInfo, err := GetBuildInfo(ctx, client)
21+
if err != nil {
22+
return err
23+
}
1924

20-
type ReplSetGetStatus struct {
21-
Set string `bson:"set"`
22-
MyState int `bson:"myState"`
23-
}
25+
if cmp, err := CompareServerVersions(buildInfo.Version, MinSupportedVersion); err != nil {
26+
return err
27+
} else if cmp < 0 {
28+
return fmt.Errorf("require minimum mongo version %s", MinSupportedVersion)
29+
}
2430

25-
type OplogTruncation struct {
26-
OplogMinRetentionHours float64 `bson:"oplogMinRetentionHours"`
27-
}
31+
validateStorageEngine := func(instanceCtx context.Context, instanceClient *mongo.Client) error {
32+
ss, err := GetServerStatus(instanceCtx, instanceClient)
33+
if err != nil {
34+
return err
35+
}
2836

29-
type StorageEngine struct {
30-
Name string `bson:"name"`
31-
}
37+
if ss.StorageEngine.Name != "wiredTiger" {
38+
return errors.New("only wiredTiger storage engine is supported")
39+
}
40+
return nil
41+
}
3242

33-
type ServerStatus struct {
34-
StorageEngine StorageEngine `bson:"storageEngine"`
35-
OplogTruncation OplogTruncation `bson:"oplogTruncation"`
43+
topologyType, err := GetTopologyType(ctx, client)
44+
if err != nil {
45+
return err
46+
}
47+
48+
if topologyType == ReplicaSet {
49+
return validateStorageEngine(ctx, client)
50+
} else {
51+
// TODO: run validation on shard
52+
return nil
53+
}
3654
}
3755

38-
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
39-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
40-
if singleResult.Err() != nil {
41-
return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err())
56+
func ValidateUserRoles(ctx context.Context, client *mongo.Client) error {
57+
RequiredRoles := []string{"readAnyDatabase", "clusterMonitor"}
58+
59+
connectionStatus, err := GetConnectionStatus(ctx, client)
60+
if err != nil {
61+
return err
4262
}
43-
var info BuildInfo
44-
if err := singleResult.Decode(&info); err != nil {
45-
return nil, fmt.Errorf("failed to decode BuildInfo: %w", err)
63+
64+
hasRole := func(roles []Role, targetRole string) bool {
65+
for _, role := range roles {
66+
if role.Role == targetRole {
67+
return true
68+
}
69+
}
70+
return false
4671
}
47-
return &info, nil
72+
73+
for _, requiredRole := range RequiredRoles {
74+
if !hasRole(connectionStatus.AuthInfo.AuthenticatedUserRoles, requiredRole) {
75+
return fmt.Errorf("missing required role: %s", requiredRole)
76+
}
77+
}
78+
79+
return nil
4880
}
4981

50-
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
51-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
52-
bson.E{Key: "replSetGetStatus", Value: 1},
53-
})
54-
if singleResult.Err() != nil {
55-
return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err())
82+
func ValidateOplogRetention(ctx context.Context, client *mongo.Client) error {
83+
validateOplogRetention := func(instanceCtx context.Context, instanceClient *mongo.Client) error {
84+
ss, err := GetServerStatus(instanceCtx, instanceClient)
85+
if err != nil {
86+
return err
87+
}
88+
if ss.OplogTruncation.OplogMinRetentionHours == 0 ||
89+
ss.OplogTruncation.OplogMinRetentionHours < MinOplogRetentionHours {
90+
return fmt.Errorf("oplog retention must be set to >= 24 hours, but got %f",
91+
ss.OplogTruncation.OplogMinRetentionHours)
92+
}
93+
return nil
5694
}
57-
var status ReplSetGetStatus
58-
if err := singleResult.Decode(&status); err != nil {
59-
return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err)
95+
96+
topology, err := GetTopologyType(ctx, client)
97+
if err != nil {
98+
return err
99+
}
100+
if topology == ReplicaSet {
101+
return validateOplogRetention(ctx, client)
102+
} else {
103+
// TODO: run validation on shard
104+
return nil
60105
}
61-
return &status, nil
62106
}
63107

64-
func GetServerStatus(ctx context.Context, client *mongo.Client) (*ServerStatus, error) {
65-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
66-
bson.E{Key: "serverStatus", Value: 1},
67-
})
68-
if singleResult.Err() != nil {
69-
return nil, fmt.Errorf("failed to run 'serverStatus' command: %w", singleResult.Err())
108+
func GetTopologyType(ctx context.Context, client *mongo.Client) (string, error) {
109+
hello, err := GetHelloResponse(ctx, client)
110+
if err != nil {
111+
return "", err
70112
}
71-
var status ServerStatus
72-
if err := singleResult.Decode(&status); err != nil {
73-
return nil, fmt.Errorf("failed to decode ServerStatus: %w", err)
113+
114+
// Only replica set has 'hosts' field
115+
// https://www.mongodb.com/docs/manual/reference/command/hello/#mongodb-data-hello.hosts
116+
if len(hello.Hosts) > 0 {
117+
return ReplicaSet, nil
118+
}
119+
120+
// Only sharded cluster has 'msg' field, and equals to 'isdbgrid'
121+
// https://www.mongodb.com/docs/manual/reference/command/hello/#mongodb-data-hello.msg
122+
if hello.Msg == "isdbgrid" {
123+
return ShardedCluster, nil
74124
}
75-
return &status, nil
125+
return "", errors.New("topology type must be ReplicaSet or ShardedCluster")
76126
}

0 commit comments

Comments
 (0)