Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

Commit 236b16a

Browse files
authored
Begin adding syncapi component tests (#2442)
* Add very basic syncapi tests * Add a way to inject jetstream messages * implement add_state_ids * bugfixes * Unbreak tests * Remove now un-needed API call * Linting
1 parent a443d1e commit 236b16a

File tree

9 files changed

+337
-11
lines changed

9 files changed

+337
-11
lines changed

federationapi/federationapi_keys_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestMain(m *testing.M) {
102102
)
103103

104104
// Finally, build the server key APIs.
105-
sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
105+
sbase := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
106106
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
107107
}
108108

setup/base/base.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type BaseDendrite struct {
8686
DNSCache *gomatrixserverlib.DNSCache
8787
Database *sql.DB
8888
DatabaseWriter sqlutil.Writer
89+
EnableMetrics bool
8990
}
9091

9192
const NoListener = ""
@@ -96,7 +97,7 @@ const HTTPClientTimeout = time.Second * 30
9697
type BaseDendriteOptions int
9798

9899
const (
99-
NoCacheMetrics BaseDendriteOptions = iota
100+
DisableMetrics BaseDendriteOptions = iota
100101
UseHTTPAPIs
101102
PolylithMode
102103
)
@@ -107,12 +108,12 @@ const (
107108
func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite {
108109
platformSanityChecks()
109110
useHTTPAPIs := false
110-
cacheMetrics := true
111+
enableMetrics := true
111112
isMonolith := true
112113
for _, opt := range options {
113114
switch opt {
114-
case NoCacheMetrics:
115-
cacheMetrics = false
115+
case DisableMetrics:
116+
enableMetrics = false
116117
case UseHTTPAPIs:
117118
useHTTPAPIs = true
118119
case PolylithMode:
@@ -160,7 +161,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
160161
}
161162
}
162163

163-
cache, err := caching.NewInMemoryLRUCache(cacheMetrics)
164+
cache, err := caching.NewInMemoryLRUCache(enableMetrics)
164165
if err != nil {
165166
logrus.WithError(err).Warnf("Failed to create cache")
166167
}
@@ -246,6 +247,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
246247
apiHttpClient: &apiClient,
247248
Database: db, // set if monolith with global connection pool only
248249
DatabaseWriter: writer, // set if monolith with global connection pool only
250+
EnableMetrics: enableMetrics,
249251
}
250252
}
251253

setup/jetstream/nats.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/sirupsen/logrus"
1414

1515
natsserver "github.com/nats-io/nats-server/v2/server"
16+
"github.com/nats-io/nats.go"
1617
natsclient "github.com/nats-io/nats.go"
1718
)
1819

@@ -21,6 +22,13 @@ type NATSInstance struct {
2122
sync.Mutex
2223
}
2324

25+
func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) {
26+
for _, stream := range streams { // streams are defined in streams.go
27+
name := cfg.Prefixed(stream.Name)
28+
_ = js.DeleteStream(name)
29+
}
30+
}
31+
2432
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
2533
// check if we need an in-process NATS Server
2634
if len(cfg.Addresses) != 0 {

syncapi/sync/requestpool.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ func NewRequestPool(
6565
userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI,
6666
rsAPI roomserverAPI.SyncRoomserverAPI,
6767
streams *streams.Streams, notifier *notifier.Notifier,
68-
producer PresencePublisher,
68+
producer PresencePublisher, enableMetrics bool,
6969
) *RequestPool {
70-
prometheus.MustRegister(
71-
activeSyncRequests, waitingSyncRequests,
72-
)
70+
if enableMetrics {
71+
prometheus.MustRegister(
72+
activeSyncRequests, waitingSyncRequests,
73+
)
74+
}
7375
rp := &RequestPool{
7476
db: db,
7577
cfg: cfg,

syncapi/syncapi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func AddPublicRoutes(
6565
JetStream: js,
6666
}
6767

68-
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
68+
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer, base.EnableMetrics)
6969

7070
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
7171
JetStream: js,

syncapi/syncapi_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package syncapi
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
keyapi "github.com/matrix-org/dendrite/keyserver/api"
12+
"github.com/matrix-org/dendrite/roomserver/api"
13+
rsapi "github.com/matrix-org/dendrite/roomserver/api"
14+
"github.com/matrix-org/dendrite/setup/jetstream"
15+
"github.com/matrix-org/dendrite/syncapi/types"
16+
"github.com/matrix-org/dendrite/test"
17+
userapi "github.com/matrix-org/dendrite/userapi/api"
18+
"github.com/nats-io/nats.go"
19+
)
20+
21+
type syncRoomserverAPI struct {
22+
rsapi.SyncRoomserverAPI
23+
rooms []*test.Room
24+
}
25+
26+
func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error {
27+
var room *test.Room
28+
for _, r := range s.rooms {
29+
if r.ID == req.RoomID {
30+
room = r
31+
break
32+
}
33+
}
34+
if room == nil {
35+
res.RoomExists = false
36+
return nil
37+
}
38+
res.RoomVersion = room.Version
39+
return nil // TODO: return state
40+
}
41+
42+
type syncUserAPI struct {
43+
userapi.SyncUserAPI
44+
accounts []userapi.Device
45+
}
46+
47+
func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
48+
for _, acc := range s.accounts {
49+
if acc.AccessToken == req.AccessToken {
50+
res.Device = &acc
51+
return nil
52+
}
53+
}
54+
res.Err = "unknown user"
55+
return nil
56+
}
57+
58+
func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error {
59+
return nil
60+
}
61+
62+
type syncKeyAPI struct {
63+
keyapi.KeyInternalAPI
64+
}
65+
66+
func TestSyncAPI(t *testing.T) {
67+
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
68+
testSync(t, dbType)
69+
})
70+
}
71+
72+
func testSync(t *testing.T, dbType test.DBType) {
73+
user := test.NewUser()
74+
room := test.NewRoom(t, user)
75+
alice := userapi.Device{
76+
ID: "ALICEID",
77+
UserID: user.ID,
78+
AccessToken: "ALICE_BEARER_TOKEN",
79+
DisplayName: "Alice",
80+
AccountType: userapi.AccountTypeUser,
81+
}
82+
83+
base, close := test.CreateBaseDendrite(t, dbType)
84+
defer close()
85+
86+
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
87+
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
88+
var msgs []*nats.Msg
89+
for _, ev := range room.Events() {
90+
var addsStateIDs []string
91+
if ev.StateKey() != nil {
92+
addsStateIDs = append(addsStateIDs, ev.EventID())
93+
}
94+
msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{
95+
Type: rsapi.OutputTypeNewRoomEvent,
96+
NewRoomEvent: &rsapi.OutputNewRoomEvent{
97+
Event: ev,
98+
AddsStateEventIDs: addsStateIDs,
99+
},
100+
}))
101+
}
102+
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{})
103+
test.MustPublishMsgs(t, jsctx, msgs...)
104+
105+
testCases := []struct {
106+
name string
107+
req *http.Request
108+
wantCode int
109+
wantJoinedRooms []string
110+
}{
111+
{
112+
name: "missing access token",
113+
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
114+
"timeout": "0",
115+
})),
116+
wantCode: 401,
117+
},
118+
{
119+
name: "unknown access token",
120+
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
121+
"access_token": "foo",
122+
"timeout": "0",
123+
})),
124+
wantCode: 401,
125+
},
126+
{
127+
name: "valid access token",
128+
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
129+
"access_token": alice.AccessToken,
130+
"timeout": "0",
131+
})),
132+
wantCode: 200,
133+
wantJoinedRooms: []string{room.ID},
134+
},
135+
}
136+
// TODO: find a better way
137+
time.Sleep(500 * time.Millisecond)
138+
139+
for _, tc := range testCases {
140+
w := httptest.NewRecorder()
141+
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
142+
if w.Code != tc.wantCode {
143+
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
144+
}
145+
if tc.wantJoinedRooms != nil {
146+
var res types.Response
147+
if err := json.NewDecoder(w.Body).Decode(&res); err != nil {
148+
t.Fatalf("%s: failed to decode response body: %s", tc.name, err)
149+
}
150+
if len(res.Rooms.Join) != len(tc.wantJoinedRooms) {
151+
t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res)
152+
}
153+
t.Logf("res: %+v", res.Rooms.Join[room.ID])
154+
155+
gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events))
156+
for i, ev := range res.Rooms.Join[room.ID].Timeline.Events {
157+
gotEventIDs[i] = ev.EventID
158+
}
159+
test.AssertEventIDsEqual(t, gotEventIDs, room.Events())
160+
}
161+
}
162+
}

test/base.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,83 @@
1+
// Copyright 2022 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package test
216

317
import (
18+
"errors"
19+
"fmt"
20+
"io/fs"
21+
"os"
22+
"strings"
23+
"testing"
24+
425
"github.com/matrix-org/dendrite/setup/base"
526
"github.com/matrix-org/dendrite/setup/config"
627
"github.com/nats-io/nats.go"
728
)
829

30+
func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) {
31+
var cfg config.Dendrite
32+
cfg.Defaults(false)
33+
cfg.Global.JetStream.InMemory = true
34+
35+
switch dbType {
36+
case DBTypePostgres:
37+
cfg.Global.Defaults(true) // autogen a signing key
38+
cfg.MediaAPI.Defaults(true) // autogen a media path
39+
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
40+
// the file system event with InMemory=true :(
41+
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
42+
connStr, close := PrepareDBConnectionString(t, dbType)
43+
cfg.Global.DatabaseOptions = config.DatabaseOptions{
44+
ConnectionString: config.DataSource(connStr),
45+
MaxOpenConnections: 10,
46+
MaxIdleConnections: 2,
47+
ConnMaxLifetimeSeconds: 60,
48+
}
49+
return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close
50+
case DBTypeSQLite:
51+
cfg.Defaults(true) // sets a sqlite db per component
52+
// use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use
53+
// the file system event with InMemory=true :(
54+
cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType)
55+
return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), func() {
56+
// cleanup db files. This risks getting out of sync as we add more database strings :(
57+
dbFiles := []config.DataSource{
58+
cfg.AppServiceAPI.Database.ConnectionString,
59+
cfg.FederationAPI.Database.ConnectionString,
60+
cfg.KeyServer.Database.ConnectionString,
61+
cfg.MSCs.Database.ConnectionString,
62+
cfg.MediaAPI.Database.ConnectionString,
63+
cfg.RoomServer.Database.ConnectionString,
64+
cfg.SyncAPI.Database.ConnectionString,
65+
cfg.UserAPI.AccountDatabase.ConnectionString,
66+
}
67+
for _, fileURI := range dbFiles {
68+
path := strings.TrimPrefix(string(fileURI), "file:")
69+
err := os.Remove(path)
70+
if err != nil && !errors.Is(err, fs.ErrNotExist) {
71+
t.Fatalf("failed to cleanup sqlite db '%s': %s", fileURI, err)
72+
}
73+
}
74+
}
75+
default:
76+
t.Fatalf("unknown db type: %v", dbType)
77+
}
78+
return nil, nil
79+
}
80+
981
func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
1082
if cfg == nil {
1183
cfg = &config.Dendrite{}

0 commit comments

Comments
 (0)