Conversation
Support for notification counting over sync with threaded/unthreaded receipts. Limitations: - no push rules (yet), fixed rule eval (message = notif, m.mention = highlight) - compaction of notification keys implemented, but disabled - individual notification keys are required for accurate counts as receipts move - in future we can compact versions >X old, accepting loss of accuracy
03c1bd6 to
c459bfb
Compare
This handles the expansion of device IDs from "*" for to-device events destined for local users and simplifies all the calls.
There's no real use-case for running these subsets of functionality and it's increasing complexity as a result. Multiple workers is designed to function without needing such config.
175dd9d to
f8fb22b
Compare
Adds all the CSAPI push rules endpoints, sync updates and evaluation when counting room notifications. Limitations: - no actual pushes yet (need pushers APIs) - unoptimized push rule fetch for many users - no display name matching yet
f8fb22b to
699ca8f
Compare
📝 WalkthroughSummary by CodeRabbit
📝 WalkthroughWalkthroughAdds per-user per-room notification counting with versioned deltas and thread-aware aggregation, introduces persistent push rules and pushers, threads push-rule evaluation into event send paths via a Databases wrapper, and adds background iterators for compaction and push delivery; removes many feature-enabled guards so routes/workers run unconditionally. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Routes as Client Routes
participant DB as Databases
participant Accounts as AccountsDB
participant Rooms as RoomsDB
participant FDB as FoundationDB
Client->>Routes: Send event
Routes->>DB: SendLocalEvents(roomID, events)
DB->>Rooms: GetRoom (memberCount)
Rooms->>FDB: read room state/members
FDB-->>Rooms: members
DB->>Accounts: GetPushRulesForUser (per member)
Accounts->>FDB: read push rules
FDB-->>Accounts: push rules
DB->>Rooms: SendLocalEvents(events, userPushRules, roomContext)
Rooms->>Rooms: txnEvaluateNotificationsForEvents (per-user)
Rooms->>Rooms: txnStoreEvents (store events + notification deltas)
Rooms->>FDB: write events + notification keys
FDB-->>Rooms: committed
Rooms-->>DB: result
DB-->>Routes: response
Routes-->>Client: ack
sequenceDiagram
actor Client
participant Routes as Sync Route
participant DB as Databases
participant Accounts as AccountsDB
participant Rooms as RoomsDB
participant UsersDir as UsersDirectory
participant FDB as FoundationDB
Client->>Routes: GET /sync
Routes->>DB: SyncForUser(userID, from)
DB->>Accounts: SyncAccountsForuser(userID, from)
Accounts->>FDB: txn get push rules/version
FDB-->>Accounts: push rules + version
Accounts-->>DB: pushRules
DB->>Rooms: For each joined room -> SumNotificationsByThread(userID, roomID)
Rooms->>UsersDir: TxnSumNotificationsByThread
UsersDir->>FDB: read notification entries
FDB-->>UsersDir: entries
UsersDir-->>Rooms: aggregated counts
Rooms-->>DB: SyncRoom with unread counts
DB-->>Routes: assembled sync JSON (includes m.push_rules if present)
Routes-->>Client: JSON
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/databases/transient/todevicesend.go (2)
40-55: Update logger and panic message to reflect renamed method.The method was renamed to
SendRawToDeviceEvents, but the logger (line 53) and panic message (line 83) still reference the old nameSendToDeviceEvents. This creates confusion when debugging.🔧 Proposed fix
log := zerolog.Ctx(ctx).With(). Str("component", "database"). Str("database", "todevice"). - Str("transaction", "SendToDeviceEvents"). + Str("transaction", "SendRawToDeviceEvents"). Int("events", len(tds)). Logger()And at line 83:
- panic("cannot user * DeviceID in SendToDeviceEvents") + panic("cannot use * DeviceID in SendRawToDeviceEvents")
78-84: Typo in panic message.Line 83 has "cannot user" which should be "cannot use".
🤖 Fix all issues with AI agents
In `@docs/matrix-spec-compatibility.md`:
- Line 47: Fix the typo in the documentation sentence for the /relations API:
change "evens" to "events" in the sentence that currently reads "The
`/relations` API will not return `m.annotation` evens unless the `rel_type` is
explicitly specified (and only `m.annotation` events are returned)." Ensure the
corrected text reads "events" and keep the rest of the wording unchanged,
referencing the `/relations` API and `m.annotation` mention.
In `@internal/databases/accounts/sync.go`:
- Around line 37-38: The loop currently uses iter.MustGet(), which can panic;
change it to call iter.Get() and handle the returned error explicitly inside the
for iter.Advance() loop (the same pattern used elsewhere), e.g., call kv, err :=
iter.Get() and on error return or log/propagate it from the surrounding function
in sync.go so iteration failures are handled consistently rather than panicking;
update any surrounding control flow in the function that iterates (the iter
variable and its Advance() loop) to propagate the error appropriately.
In `@internal/databases/eventsend.go`:
- Around line 63-75: When fetching per-user push rules in the loop over
memberships, if d.Accounts.GetPushRulesForUser(ctx, userID) returns an error, do
not skip adding the user to userPushRules; instead set userPushRules[userID] =
nil so downstream evaluation uses fallback/default notification behavior, and
optionally log the error; also ensure userRoomContext[userID] is still set (as
with &types.PushRuleRoom{MemberCount: memberCount, OwnDisplayname: ""}) so the
user remains in both maps for later processing.
In `@internal/databases/rooms/events/events.go`:
- Around line 442-443: The KeyForRoomThread function is using tuple.Pack but
must use PackWithVersionstamp to correctly handle incomplete versionstamps;
change the implementation in EventsDirectory.KeyForRoomThread to call
e.roomThreadVersionToID.PackWithVersionstamp with tuple.Tuple{roomID.String(),
version} and propagate/handle the returned error (matching the pattern used by
KeyForRoomRelation), ensuring you check for and surface any PackWithVersionstamp
error instead of assuming packing always succeeds.
In `@internal/databases/rooms/eventsend.go`:
- Around line 658-661: Update the stale/confusing comment to accurately describe
how federated events are evaluated: state whether
txnEvaluateNotificationsForEvents internally falls back to a default evaluation
when push rules are absent for federated events, or whether federated events
bypass push rules entirely; reference the parameters passed
(txnEvaluateNotificationsForEvents, userPushRules, userRoomContext,
eventsProvider, evs) and explicitly mention the fallback behavior (e.g.,
"function uses userPushRules when present, otherwise falls back to X") so
readers know why userPushRules and userRoomContext are still being supplied.
In `@internal/databases/rooms/eventsendutil.go`:
- Around line 275-286: The StateMember branch updates room.MemberCount but never
sets the changed/roomChanged flag, so membership changes aren't persisted;
modify the event.StateMember case in the handling logic (where ev.Membership(),
ev.PrevStateEvent and room.MemberCount are used) to set the changed flag (or
return true) whenever room.MemberCount is incremented or decremented so
txnStoreStateEvent sees roomChanged and the caller will call
txn.Set(r.KeyForRoom(room.ID), room.ToMsgpack()) to persist the updated room.
In `@internal/databases/rooms/notifications.go`:
- Around line 134-147: The loop that walks thread relations starting from
relEventID uses currentID and eventsProvider.MustGet and calls
parentEv.RelatesTo() but has no cycle protection; add a visited set (e.g.,
map[string]struct{}) keyed by event IDs or enforce a reasonable max depth to
break the loop and return currentID.String() (or an error/placeholder) when a
cycle or excessive depth is detected, ensuring checks are applied before
reassigning currentID to parentRelID and using event.RelThread as the relation
type identifier.
In `@internal/databases/sync.go`:
- Line 26: Call site uses incorrect casing: replace the invocation of
d.Accounts.SyncAccountsForuser with d.Accounts.SyncAccountsForUser and update
the method declaration/receiver name to match (rename SyncAccountsForuser →
SyncAccountsForUser) so it follows the ForUser naming pattern used by
SyncRoomsForUser, SyncTransientForUser, GetPushRulesForUser; also update any
other references, imports, or tests that call the old name to the corrected
SyncAccountsForUser to keep the API consistent.
In `@internal/notifier/notifiers.go`:
- Around line 20-24: The Transient notifier is incorrectly named "accounts" due
to a copy/paste error: in the Notifiers initializer (return &Notifiers{...})
change the Transient NewNotifier call to use the correct name "transient"
instead of "accounts" (i.e., update the third NewNotifier invocation for
Transient to NewNotifier("transient", cfg.Transient.Notifier, log)); ensure the
Rooms and Accounts calls remain unchanged.
In `@internal/routes/client/client.go`:
- Around line 69-74: Update the comment near the route registrations in
internal/routes/client/client.go: replace the typo "witb" with "with" in the
line describing the Legacy sync route so the comment reads "Legacy (v2/3) sync
with init and increment variants and basic filters, all rooms"; this change is
purely in the comment surrounding the rtr.MethodFunc registration for
c.SyncLegacy.
In `@internal/workers/compactnotificationiterator.go`:
- Around line 97-105: The loop advances currentVersion even when
compactNotificationsForEvents(newEventTups) fails, causing those events to be
skipped; change the flow so that if compactNotificationsForEvents returns an
error you log it (using n.log.Err) and return that error (or break/exit the
function) instead of setting currentVersion =
newEventTups[len(newEventTups)-1].Version and continuing; ensure currentVersion
and newEventTups are only updated when compaction succeeds so failed batches can
be retried (consider returning the error from the surrounding iterator function
so the caller can retry).
In `@internal/workers/pushnotificationiterator.go`:
- Line 38: The PushNotificationIterator worker has its n.handler assignment
commented out and lacks the required handleNotificationsLoop implementation;
implement a method named handleNotificationsLoop on PushNotificationIterator
that mirrors the control flow and lifecycle of CompactNotificationIterator's
loop (context/cancellation handling, iterator.Next(ctx) polling, processing of
items, error handling, and graceful shutdown/cleanup) so the worker can process
notifications, then restore the handler assignment by uncommenting n.handler =
n.handleNotificationsLoop so the worker uses the new method.
🧹 Nitpick comments (7)
internal/databases/rooms/roomstate.go (1)
35-46: Consider optimizing the map pre-allocation size.The map is pre-allocated with
len(memberships)but will typically contain far fewer entries after filtering to only local joined members. This is a minor memory inefficiency.Optional: Use a smaller initial capacity
func (r *RoomsDatabase) GetCurrentRoomLocalJoinedMemberships(ctx context.Context, roomID id.RoomID) (types.RoomMemberships, error) { return util.DoReadTransaction(ctx, r.db, func(txn fdb.ReadTransaction) (types.RoomMemberships, error) { memberships := r.events.TxnLookupCurrentRoomMemberships(txn, roomID, nil) - localMemberships := make(types.RoomMemberships, len(memberships)) + localMemberships := make(types.RoomMemberships) for userID, mTup := range memberships { if userID.Homeserver() == r.config.ServerName && mTup.Membership == event.MembershipJoin { localMemberships[userID] = mTup } } return localMemberships, nil }) }internal/databases/databases.go (1)
66-81: Consider removing redundant nil checks.Since
Rooms,Accounts, andTransientare now initialized unconditionally, the nil checks on lines 69-77 are redundant. Only theMedianil check (line 78) remains necessary.♻️ Optional cleanup
func (d *Databases) Stop() { d.log.Info().Msg("Stopping databases...") - if d.Rooms != nil { - d.Rooms.Stop() - } - if d.Accounts != nil { - d.Accounts.Stop() - } - if d.Transient != nil { - d.Transient.Stop() - } + d.Rooms.Stop() + d.Accounts.Stop() + d.Transient.Stop() if d.Media != nil { d.Media.Stop() } }internal/databases/todevice.go (1)
25-29: Variabledshadows the receiver.The loop variable
dshadows the outer receiverd *Databases, which can be confusing. Consider renaming todeviceordev.Proposed fix
- for _, d := range devices { + for _, device := range devices { tdCopy := *td - tdCopy.DeviceID = d.ID + tdCopy.DeviceID = device.ID expandedTDs = append(expandedTDs, &tdCopy) }internal/workers/pushnotificationiterator.go (1)
13-19: Unused constants due to missing handler implementation.
pushNotificationIteratorPositionsKeyandpushNotificationIteratorBatchSizeare defined but will remain unused until the handler is implemented.internal/routes/client/pushrules.go (3)
155-190: Consider potential TOCTOU window in delete operation.The existence check (line 174) and delete (line 184) are separate database calls. While unlikely to cause issues in practice (worst case: a benign 404 or double-delete), this could be consolidated into a single transaction for consistency.
225-273: Non-atomic read-modify-write forSetPushRuleEnabled.The get (line 253) and put (line 267) operations are separate database transactions. If another request modifies the rule concurrently, changes could be lost. Consider combining the read and write into a single transaction if atomicity is required.
308-356: Non-atomic read-modify-write forSetPushRuleActions.Same pattern as
SetPushRuleEnabled- the get (line 336) and put (line 350) are separate transactions. Consider consolidating if concurrent modifications are a concern.
|
|
||
| ## No Reactions in Relations API | ||
|
|
||
| The `/relations` API will not return `m.annotation` evens unless the `rel_type` is explicitly specified (and only `m.annotation` events are returned). |
There was a problem hiding this comment.
Typo: "evens" should be "events".
-The `/relations` API will not return `m.annotation` evens unless the `rel_type` is explicitly specified (and only `m.annotation` events are returned).
+The `/relations` API will not return `m.annotation` events unless the `rel_type` is explicitly specified (and only `m.annotation` events are returned).📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| The `/relations` API will not return `m.annotation` evens unless the `rel_type` is explicitly specified (and only `m.annotation` events are returned). | |
| The `/relations` API will not return `m.annotation` events unless the `rel_type` is explicitly specified (and only `m.annotation` events are returned). |
🧰 Tools
🪛 LanguageTool
[grammar] ~47-~47: Ensure spelling is correct
Context: ...onsAPI will not returnm.annotationevens unless therel_type` is explicitly spe...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🤖 Prompt for AI Agents
In `@docs/matrix-spec-compatibility.md` at line 47, Fix the typo in the
documentation sentence for the /relations API: change "evens" to "events" in the
sentence that currently reads "The `/relations` API will not return
`m.annotation` evens unless the `rel_type` is explicitly specified (and only
`m.annotation` events are returned)." Ensure the corrected text reads "events"
and keep the rest of the wording unchanged, referencing the `/relations` API and
`m.annotation` mention.
| for iter.Advance() { | ||
| kv, err := iter.Get() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| kv := iter.MustGet() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the repository structure and find the file
find . -type f -name "sync.go" | head -20Repository: beeper/babbleserv
Length of output: 259
🏁 Script executed:
# Look at the specific file and lines in question
if [ -f "internal/databases/accounts/sync.go" ]; then
wc -l internal/databases/accounts/sync.go
fiRepository: beeper/babbleserv
Length of output: 99
🏁 Script executed:
# Read the context around lines 37-38
if [ -f "internal/databases/accounts/sync.go" ]; then
sed -n '25,50p' internal/databases/accounts/sync.go
fiRepository: beeper/babbleserv
Length of output: 759
🏁 Script executed:
# Search for other uses of iterator in the codebase to understand patterns
rg "iter\.(MustGet|Get|Advance)" --type go -B 2 -A 2 | head -100Repository: beeper/babbleserv
Length of output: 5878
🌐 Web query:
FoundationDB Go client iterator MustGet Get panic error handling documentation
💡 Result:
In the FoundationDB Go bindings, the iterator you get from a range read is *fdb.RangeIterator (from RangeResult.Iterator()), and it has both error-returning and panic-based accessors:
ri.Get() (fdb.KeyValue, error): returns the nextKeyValue, or anerrorif an underlying async operation failed. You must callri.Advance()and gettruebefore callingGet(). [2]ri.MustGet() fdb.KeyValue: same, but panics (rather than returning an error) if an underlying async operation failed. You must callri.Advance()and gettruebefore callingMustGet(). [2]
For panic/error handling, the Go bindings explicitly document the pattern:
- Every Future type (and helpers like
MustGet) exists to avoid repetitiveif err != nilchecks;MustGetexposes FoundationDB failures via a panic (typically anfdb.Error). [1] db.Transact(...)recovers panics from inside the transaction function:- if the panic value is an FDB Error,
Transactwill either retry (if retryable) or return it aserr, - if it’s any other panic,
Transactwill re-panic it. [1]
- if the panic value is an FDB Error,
So, in practice:
- Use
Get()when you want expliciterrorhandling. [2] - Use
MustGet()/ iteratorMustGet()insidedb.Transact/db.ReadTransactwhen you want the bindings to catchfdb.Errorpanics and handle retries/return cleanly. [1]
Sources: Go package docs for fdb (“On Panics”) [1] and RangeIterator.Get / RangeIterator.MustGet [2].
Use iter.Get() with explicit error handling instead of MustGet().
The codebase consistently uses iter.Get() with error handling across all other iterator usage (util/database.go, devices.go, pushrules.go, transient/sync.go, etc.). Using MustGet() panics on iterator errors and is inconsistent with this pattern. Align with the established convention:
Proposed fix
- kv := iter.MustGet()
+ kv, err := iter.Get()
+ if err != nil {
+ return nil, err
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for iter.Advance() { | |
| kv, err := iter.Get() | |
| if err != nil { | |
| return nil, err | |
| } | |
| kv := iter.MustGet() | |
| for iter.Advance() { | |
| kv, err := iter.Get() | |
| if err != nil { | |
| return nil, err | |
| } |
🤖 Prompt for AI Agents
In `@internal/databases/accounts/sync.go` around lines 37 - 38, The loop currently
uses iter.MustGet(), which can panic; change it to call iter.Get() and handle
the returned error explicitly inside the for iter.Advance() loop (the same
pattern used elsewhere), e.g., call kv, err := iter.Get() and on error return or
log/propagate it from the surrounding function in sync.go so iteration failures
are handled consistently rather than panicking; update any surrounding control
flow in the function that iterates (the iter variable and its Advance() loop) to
propagate the error appropriately.
| for userID := range memberships { | ||
| // TODO: GetRulesForUsers | ||
| ruleset, err := d.Accounts.GetPushRulesForUser(ctx, userID) | ||
| if err != nil { | ||
| // Log but continue - we'll use fallback notification behavior for this user | ||
| continue | ||
| } | ||
| userPushRules[userID] = ruleset | ||
| userRoomContext[userID] = &types.PushRuleRoom{ | ||
| MemberCount: memberCount, | ||
| // TODO: use GetSpecificMemberEventsForRoom | ||
| OwnDisplayname: "", // Can be enhanced later to fetch user's displayname | ||
| } |
There was a problem hiding this comment.
Ensure fallback notifications still run when rules fetch fails.
Right now an error drops the user from userPushRules, so downstream evaluation skips them entirely. Add the user with a nil ruleset so the default notification behavior applies (and optionally log the failure).
🐛 Proposed fix
for userID := range memberships {
// TODO: GetRulesForUsers
ruleset, err := d.Accounts.GetPushRulesForUser(ctx, userID)
if err != nil {
// Log but continue - we'll use fallback notification behavior for this user
+ userPushRules[userID] = nil
continue
}
userPushRules[userID] = ruleset
userRoomContext[userID] = &types.PushRuleRoom{
MemberCount: memberCount,
// TODO: use GetSpecificMemberEventsForRoom
OwnDisplayname: "", // Can be enhanced later to fetch user's displayname
}
}🤖 Prompt for AI Agents
In `@internal/databases/eventsend.go` around lines 63 - 75, When fetching per-user
push rules in the loop over memberships, if d.Accounts.GetPushRulesForUser(ctx,
userID) returns an error, do not skip adding the user to userPushRules; instead
set userPushRules[userID] = nil so downstream evaluation uses fallback/default
notification behavior, and optionally log the error; also ensure
userRoomContext[userID] is still set (as with &types.PushRuleRoom{MemberCount:
memberCount, OwnDisplayname: ""}) so the user remains in both maps for later
processing.
| func (e *EventsDirectory) KeyForRoomThread(roomID id.RoomID, version tuple.Versionstamp) fdb.Key { | ||
| if key, err := e.roomThreadVersionToID.PackWithVersionstamp(tuple.Tuple{ | ||
| roomID.String(), version, | ||
| }); err != nil { | ||
| panic(err) | ||
| } else { | ||
| return key | ||
| } | ||
| return e.roomThreadVersionToID.Pack(tuple.Tuple{roomID.String(), version}) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
head -n 450 internal/databases/rooms/events/events.go | tail -n 20Repository: beeper/babbleserv
Length of output: 797
🏁 Script executed:
# Check the full context of the KeyForRoomThread method
sed -n '435,450p' internal/databases/rooms/events/events.goRepository: beeper/babbleserv
Length of output: 469
🏁 Script executed:
# Search for callers of KeyForRoomThread
rg "KeyForRoomThread" --type go -B 2 -A 2Repository: beeper/babbleserv
Length of output: 936
🏁 Script executed:
# Check git history to see what was changed
git log --oneline -n 20Repository: beeper/babbleserv
Length of output: 89
🏁 Script executed:
# Look for PackWithVersionstamp usage in the file
rg "PackWithVersionstamp" --type go internal/databases/rooms/events/Repository: beeper/babbleserv
Length of output: 679
🏁 Script executed:
# Find TxnLookupVersionForEventID to understand what kind of versionstamp is returned
rg "TxnLookupVersionForEventID" --type go -B 2 -A 5Repository: beeper/babbleserv
Length of output: 6289
🏁 Script executed:
# Search for roomThreadVersionToID usage - particularly reads/Gets
rg "roomThreadVersionToID" --type go -B 2 -A 2Repository: beeper/babbleserv
Length of output: 1070
🏁 Script executed:
# Get git diff for this specific change
git diff HEAD~1 internal/databases/rooms/events/events.go | head -100Repository: beeper/babbleserv
Length of output: 247
🏁 Script executed:
# Search for Pack vs PackWithVersionstamp patterns in the codebase
rg "\.Pack\(" --type go | grep -i "version" | head -20Repository: beeper/babbleserv
Length of output: 1911
🏁 Script executed:
# Find the PackWithVersionstamp implementation or documentation
rg "PackWithVersionstamp" --type go -B 1 -A 5 | head -40Repository: beeper/babbleserv
Length of output: 2062
Use PackWithVersionstamp to handle incomplete versionstamps correctly.
This method receives a tuple.Versionstamp parameter but uses Pack instead of PackWithVersionstamp. Every other method in the codebase that packs a versionstamp—including KeyForRoomRelation in the same file—uses PackWithVersionstamp with proper error handling. Since TxnLookupVersionForEventID can return incomplete versionstamps (as evidenced by checks for types.ZeroVersionstamp elsewhere), using Pack may encode incorrectly or panic. Revert to the established pattern:
Fix
- return e.roomThreadVersionToID.Pack(tuple.Tuple{roomID.String(), version})
+ if key, err := e.roomThreadVersionToID.PackWithVersionstamp(tuple.Tuple{roomID.String(), version}); err != nil {
+ panic(err)
+ } else {
+ return key
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (e *EventsDirectory) KeyForRoomThread(roomID id.RoomID, version tuple.Versionstamp) fdb.Key { | |
| if key, err := e.roomThreadVersionToID.PackWithVersionstamp(tuple.Tuple{ | |
| roomID.String(), version, | |
| }); err != nil { | |
| panic(err) | |
| } else { | |
| return key | |
| } | |
| return e.roomThreadVersionToID.Pack(tuple.Tuple{roomID.String(), version}) | |
| func (e *EventsDirectory) KeyForRoomThread(roomID id.RoomID, version tuple.Versionstamp) fdb.Key { | |
| if key, err := e.roomThreadVersionToID.PackWithVersionstamp(tuple.Tuple{roomID.String(), version}); err != nil { | |
| panic(err) | |
| } else { | |
| return key | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@internal/databases/rooms/events/events.go` around lines 442 - 443, The
KeyForRoomThread function is using tuple.Pack but must use PackWithVersionstamp
to correctly handle incomplete versionstamps; change the implementation in
EventsDirectory.KeyForRoomThread to call
e.roomThreadVersionToID.PackWithVersionstamp with tuple.Tuple{roomID.String(),
version} and propagate/handle the returned error (matching the pattern used by
KeyForRoomRelation), ensuring you check for and surface any PackWithVersionstamp
error instead of assuming packing always succeeds.
| // Note: federated events use fallback notification evaluation (no push rules) | ||
| eventNotifications := txnEvaluateNotificationsForEvents( | ||
| txn, eventsProvider, evs, userPushRules, userRoomContext, | ||
| ) |
There was a problem hiding this comment.
Clarify or correct the comment about fallback notification evaluation.
The comment states "federated events use fallback notification evaluation (no push rules)" but the code still passes userPushRules and userRoomContext to txnEvaluateNotificationsForEvents. This is confusing:
- If federated events truly bypass push rules, the comment should explain how (e.g., the function internally falls back when rules are empty).
- If the parameters are used, the comment appears stale.
Consider updating the comment to accurately describe the behavior, or clarify whether the fallback logic is internal to txnEvaluateNotificationsForEvents.
🤖 Prompt for AI Agents
In `@internal/databases/rooms/eventsend.go` around lines 658 - 661, Update the
stale/confusing comment to accurately describe how federated events are
evaluated: state whether txnEvaluateNotificationsForEvents internally falls back
to a default evaluation when push rules are absent for federated events, or
whether federated events bypass push rules entirely; reference the parameters
passed (txnEvaluateNotificationsForEvents, userPushRules, userRoomContext,
eventsProvider, evs) and explicitly mention the fallback behavior (e.g.,
"function uses userPushRules when present, otherwise falls back to X") so
readers know why userPushRules and userRoomContext are still being supplied.
| } | ||
|
|
||
| nextAccountsVersion, accounts, err := d.Accounts.SyncAccountsForuser(ctx, userID, versions[types.AccountsVersionKey], options) | ||
| nextAccountsVersion, accounts, pushRules, err := d.Accounts.SyncAccountsForuser(ctx, userID, versions[types.AccountsVersionKey], options) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for SyncAccountsForuser and look at surrounding methods for naming pattern
rg -n 'SyncAccountsForuser|SyncAccountsForUser' --type go -A 3 -B 3Repository: beeper/babbleserv
Length of output: 958
🏁 Script executed:
#!/bin/bash
# Look at the Accounts interface/type definition to understand method naming conventions
rg -n 'func.*SyncAccounts' --type go -A 2Repository: beeper/babbleserv
Length of output: 266
🏁 Script executed:
#!/bin/bash
# Search for method definitions with "Foruser" or "ForUser" patterns
rg -n 'Foruser|ForUser' --type goRepository: beeper/babbleserv
Length of output: 10073
Fix method name casing: SyncAccountsForuser should be SyncAccountsForUser.
The method name uses inconsistent casing compared to the rest of the codebase. All similar methods follow the For[Subject]User pattern with capital 'U': SyncRoomsForUser, SyncTransientForUser, GetPushRulesForUser, etc.
🤖 Prompt for AI Agents
In `@internal/databases/sync.go` at line 26, Call site uses incorrect casing:
replace the invocation of d.Accounts.SyncAccountsForuser with
d.Accounts.SyncAccountsForUser and update the method declaration/receiver name
to match (rename SyncAccountsForuser → SyncAccountsForUser) so it follows the
ForUser naming pattern used by SyncRoomsForUser, SyncTransientForUser,
GetPushRulesForUser; also update any other references, imports, or tests that
call the old name to the corrected SyncAccountsForUser to keep the API
consistent.
| return &Notifiers{ | ||
| Rooms: NewNotifier("rooms", cfg.Rooms.Notifier, log), | ||
| Accounts: NewNotifier("accounts", cfg.Accounts.Notifier, log), | ||
| Transient: NewNotifier("accounts", cfg.Transient.Notifier, log), | ||
| } |
There was a problem hiding this comment.
Typo: Transient notifier is named "accounts" instead of "transient".
Line 23 passes "accounts" as the name for the Transient notifier. This appears to be a copy-paste error from line 22.
Proposed fix
return &Notifiers{
Rooms: NewNotifier("rooms", cfg.Rooms.Notifier, log),
Accounts: NewNotifier("accounts", cfg.Accounts.Notifier, log),
- Transient: NewNotifier("accounts", cfg.Transient.Notifier, log),
+ Transient: NewNotifier("transient", cfg.Transient.Notifier, log),
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return &Notifiers{ | |
| Rooms: NewNotifier("rooms", cfg.Rooms.Notifier, log), | |
| Accounts: NewNotifier("accounts", cfg.Accounts.Notifier, log), | |
| Transient: NewNotifier("accounts", cfg.Transient.Notifier, log), | |
| } | |
| return &Notifiers{ | |
| Rooms: NewNotifier("rooms", cfg.Rooms.Notifier, log), | |
| Accounts: NewNotifier("accounts", cfg.Accounts.Notifier, log), | |
| Transient: NewNotifier("transient", cfg.Transient.Notifier, log), | |
| } |
🤖 Prompt for AI Agents
In `@internal/notifier/notifiers.go` around lines 20 - 24, The Transient notifier
is incorrectly named "accounts" due to a copy/paste error: in the Notifiers
initializer (return &Notifiers{...}) change the Transient NewNotifier call to
use the correct name "transient" instead of "accounts" (i.e., update the third
NewNotifier invocation for Transient to NewNotifier("transient",
cfg.Transient.Notifier, log)); ensure the Rooms and Accounts calls remain
unchanged.
| // Legacy (v2/3) sync witb init and increment variants and basic filters, all rooms | ||
| rtr.MethodFunc(http.MethodGet, "/v3/sync", middleware.RequireUserAuth(c.SyncLegacy)) | ||
| // Simplified sliding "native" sync MSC4186, same as v2 with room filters, roughly | ||
| rtr.MethodFunc(http.MethodGet, "/unstable/org.matrix.simplified_msc3575/sync", middleware.RequireUserAuth(c.SyncSliding)) | ||
| // Beeper's streaming sync, no gaps, firehose style | ||
| rtr.MethodFunc(http.MethodGet, "/unstable/com.beeper.streaming/sync", middleware.RequireUserAuth(c.SyncStreaming)) |
There was a problem hiding this comment.
Fix typo in comment.
Line 69 has "witb" which should be "with".
✏️ Proposed fix
- // Legacy (v2/3) sync witb init and increment variants and basic filters, all rooms
+ // Legacy (v2/3) sync with init and increment variants and basic filters, all rooms📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Legacy (v2/3) sync witb init and increment variants and basic filters, all rooms | |
| rtr.MethodFunc(http.MethodGet, "/v3/sync", middleware.RequireUserAuth(c.SyncLegacy)) | |
| // Simplified sliding "native" sync MSC4186, same as v2 with room filters, roughly | |
| rtr.MethodFunc(http.MethodGet, "/unstable/org.matrix.simplified_msc3575/sync", middleware.RequireUserAuth(c.SyncSliding)) | |
| // Beeper's streaming sync, no gaps, firehose style | |
| rtr.MethodFunc(http.MethodGet, "/unstable/com.beeper.streaming/sync", middleware.RequireUserAuth(c.SyncStreaming)) | |
| // Legacy (v2/3) sync with init and increment variants and basic filters, all rooms | |
| rtr.MethodFunc(http.MethodGet, "/v3/sync", middleware.RequireUserAuth(c.SyncLegacy)) | |
| // Simplified sliding "native" sync MSC4186, same as v2 with room filters, roughly | |
| rtr.MethodFunc(http.MethodGet, "/unstable/org.matrix.simplified_msc3575/sync", middleware.RequireUserAuth(c.SyncSliding)) | |
| // Beeper's streaming sync, no gaps, firehose style | |
| rtr.MethodFunc(http.MethodGet, "/unstable/com.beeper.streaming/sync", middleware.RequireUserAuth(c.SyncStreaming)) |
🤖 Prompt for AI Agents
In `@internal/routes/client/client.go` around lines 69 - 74, Update the comment
near the route registrations in internal/routes/client/client.go: replace the
typo "witb" with "with" in the line describing the Legacy sync route so the
comment reads "Legacy (v2/3) sync with init and increment variants and basic
filters, all rooms"; this change is purely in the comment surrounding the
rtr.MethodFunc registration for c.SyncLegacy.
| if err := n.compactNotificationsForEvents(newEventTups); err != nil { | ||
| n.log.Err(err).Msg("Failed to compact notifications") | ||
| } | ||
|
|
||
| currentVersion = newEventTups[len(newEventTups)-1].Version | ||
|
|
||
| if len(newEventTups) < compactNotificationIteratorBatchSize { | ||
| break | ||
| } |
There was a problem hiding this comment.
Don’t advance the iterator on compaction errors.
Right now a failed batch still advances currentVersion, which permanently skips compaction for those events. Consider returning early so the batch can retry later.
🐛 Proposed fix
- if err := n.compactNotificationsForEvents(newEventTups); err != nil {
- n.log.Err(err).Msg("Failed to compact notifications")
- }
-
- currentVersion = newEventTups[len(newEventTups)-1].Version
+ if err := n.compactNotificationsForEvents(newEventTups); err != nil {
+ n.log.Err(err).Msg("Failed to compact notifications")
+ return
+ }
+
+ currentVersion = newEventTups[len(newEventTups)-1].Version🤖 Prompt for AI Agents
In `@internal/workers/compactnotificationiterator.go` around lines 97 - 105, The
loop advances currentVersion even when
compactNotificationsForEvents(newEventTups) fails, causing those events to be
skipped; change the flow so that if compactNotificationsForEvents returns an
error you log it (using n.log.Err) and return that error (or break/exit the
function) instead of setting currentVersion =
newEventTups[len(newEventTups)-1].Version and continuing; ensure currentVersion
and newEventTups are only updated when compaction succeeds so failed batches can
be retried (consider returning the error from the surrounding iterator function
so the caller can retry).
This means we get type safety on the channels, and can get room IDs from all events subscriptions.
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Fix all issues with AI agents
In `@docs/project-structure.md`:
- Around line 24-40: The bullet points under the module layout (lines describing
"internal/databases/*/" and "internal/databases/*/*/") use unhyphenated compound
adjectives; update them to use hyphenated compounds (e.g., "top-level database
transactions", "domain-specific directories", "key-prefix operations",
"individual-database 'directories'", "common-key-prefix operations") so grammar
is correct and consistent for the entries referencing internal/databases/*/ and
internal/databases/*/*/.
In `@internal/databases/accounts/pushers.go`:
- Around line 34-42: In DeletePusherForUser, the transaction callback currently
ignores the error returned by a.users.TxnDeletePusherForUser; change the
callback to capture and return that error (i.e., return nil, err from the
callback) so util.DoWriteTransaction receives and propagates any failure,
ensuring DeletePusherForUser returns the error from
a.users.TxnDeletePusherForUser rather than always succeeding.
In `@internal/databases/rooms/notifications.go`:
- Around line 51-52: The call to json.Unmarshal(ev.Content,
&mautrixEvt.Content.Raw) ignores its error; update the code that sets
mautrixEvt.Content.Raw (the json.Unmarshal call) to check the returned error and
handle it (e.g., log the error with context or return it) so malformed JSON
won't leave Content.Raw in an unexpected state—use existing logging/error
patterns in this file (or return the error from the surrounding function) to
surface the failure instead of discarding it.
In `@internal/notifier/notifier.go`:
- Line 315: Rename the method unlockedUnusbscribe to unlockedUnsubscribe on the
Notifier receiver (func (n *Notifier) unlockedUnsubscribe(...)) to fix the typo,
and update every call site that invokes unlockedUnusbscribe (including the call
referenced near line 247) to call unlockedUnsubscribe instead so the symbol
names match and the code compiles.
In `@internal/notifier/notifiers.go`:
- Around line 35-36: Fix the typo in the package comment describing the custom
channel subscription: change "chnage" to "change" in the comment that starts
"Similar to subscribe but takes a custom channel..." (near the subscribe-related
notifier logic), ensuring the sentence reads "...if the channel is full the
change will be dropped.".
In `@internal/routes/client/pushers.go`:
- Around line 41-55: The handler currently treats only nil req.Kind as deletion
and allows empty-string kinds and deletes by pushkey only; update validation to
reject empty-string kind (treat "" as invalid and return HTTP 400 via
util.ResponseErrorMessageJSON) before storage in the non-delete branch, and
change deletion semantics so DeletePusherForUser is called with both app_id and
pushkey (use req.AppID and req.PushKey) when req.Kind == nil to match the spec;
ensure SetPusherForUser path still requires req.AppDisplayName, req.AppID and
req.Data as before.
In `@internal/workers/compactnotificationiterator.go`:
- Around line 83-84: The calls to compactNotificationsForRoom are ignoring
returned errors at the two call sites; update both call locations so they
capture the error (err := n.compactNotificationsForRoom(roomID)) and log
failures instead of discarding them (e.g., using the component's logger or
n.logger/errorf) with contextual info including roomID and the error; keep
returning/continuing behavior unchanged except ensure errors are not silently
dropped.
- Around line 76-85: The unlockedHandleChange loop increments
n.roomIDToCount[roomID] and compacts when it exceeds
n.config.Rooms.MaxNotificationsPerUserRoom but then returns, which stops
processing the rest of change.RoomIDs; change the control flow in
unlockedHandleChange so that after calling n.compactNotificationsForRoom(roomID)
it does not return but continues iterating over the remaining roomIDs (i.e.,
remove the early return and let the loop proceed), ensuring each room in
change.RoomIDs is checked and compacted as needed.
- Around line 69-104: unlockedHandleChange currently reads/writes the maps
roomIDToCount and roomIDToCancel without synchronization and spawns goroutines
that capture roomID; wrap all accesses to roomIDToCount and roomIDToCancel
inside n.mu.Lock()/n.mu.Unlock() in unlockedHandleChange (increment, lookup,
store, cancel) and store the cancel under lock, and avoid closure capture by
doing localRoomID := roomID before launching the goroutine. In the goroutine,
reference localRoomID and ensure the call to n.compactNotificationsForRoom is
safe by either acquiring n.mu around any map operations there or by making
compactNotificationsForRoom itself concurrency-safe (documented as thread-safe).
- Around line 90-102: The timeout goroutine spawned in
compactnotificationiterator.go captures roomID and never removes its state,
causing roomIDToCount and roomIDToCancel to grow; modify the goroutine so that
on both ctx.Done() and after calling compactNotificationsForRoom(roomID) it
acquires the iterator's mutex and deletes the entries from n.roomIDToCount and
n.roomIDToCancel (and if you store a cancel func in n.roomIDToCancel ensure you
call it/clear it only once), ensuring any stored cancel is invoked/cleared and
the maps are cleaned up before the goroutine returns.
In `@internal/workers/pushnotificationiterator.go`:
- Around line 100-108: The loop advances currentVersion and may skip retries
when sendPushNotificationsForEvents fails; modify the logic in the loop that
calls sendPushNotificationsForEvents(newEventTups) so that on error you do NOT
update currentVersion or advance the iterator: log the error
(n.log.Err(err).Msg(...)) and return/stop retrying this run (or break) so the
same batch can be retried later, only updating currentVersion =
newEventTups[len(newEventTups)-1].Version and checking len(newEventTups) <
pushNotificationIteratorBatchSize when sendPushNotificationsForEvents succeeds;
key symbols: sendPushNotificationsForEvents, currentVersion, newEventTups,
pushNotificationIteratorBatchSize, n.log.Err.
- Around line 224-226: The Unread field is being double-counted by adding
notifCount + highlightCount; notifCount already includes highlighted messages.
In pushnotificationiterator.go where pushgateway.NotificationCounts is
populated, set Unread to notifCount (not notifCount + highlightCount) and keep
Highlight set to highlightCount (i.e., Unread: notifCount, Highlight:
highlightCount) so counts reflect SumNotifications semantics; update the struct
initialization that currently references Unread, notifCount and highlightCount
accordingly.
🧹 Nitpick comments (5)
internal/databases/accounts/users/pushers.go (1)
48-51: Consider returning an error for consistency with other Txn methods.
TxnDeletePusherForUserreturns nothing, whileTxnSetPusherForUserreturnserror. Althoughtxn.Cleardoesn't return an error, maintaining a consistent signature across transactional helpers improves API predictability.internal/workers/pushnotificationiterator.go (1)
140-156: Unbounded goroutine spawning could overwhelm resources.For rooms with many local members and many events, this spawns
len(memberships) * len(eventTups)goroutines without any throttling. Consider using a semaphore or worker pool to limit concurrency.♻️ Example using a semaphore
+const maxConcurrentPushes = 50 + func (n *PushNotificationIterator) sendPushNotificationsForEvents(tups []types.EventTupWithVersion) error { // Group events by room eventsByRoom := make(map[id.RoomID][]types.EventTupWithVersion) for _, tup := range tups { eventsByRoom[tup.RoomID] = append(eventsByRoom[tup.RoomID], tup) } var wg sync.WaitGroup + sem := make(chan struct{}, maxConcurrentPushes) for roomID, eventTups := range eventsByRoom { // ... for userID := range memberships { for _, eventTup := range eventTups { // ... if notif != nil { wg.Add(1) + sem <- struct{}{} go func(userID id.UserID, eventTup types.EventTupWithVersion, notif types.Notifications) { defer wg.Done() + defer func() { <-sem }() n.sendPushForUser(userID, eventTup, notif) }(userID, eventTup, *notif) } } } }internal/databases/eventsend.go (1)
66-66: TODO comments indicate missing parallelization.Lines 66 and 77 have TODO comments for parallel fetching (
GetRulesForUsers,GetProfilesForUsers). For rooms with many local members, sequential fetching could become a bottleneck.Would you like me to help implement parallel fetching using
errgroupor a similar pattern? I can open an issue to track this optimization.Also applies to: 78-78
internal/databases/rooms/users/notifications.go (2)
21-24: Panic on versionstamp packing error may cause unrecoverable crashes.Using
panicfor error handling in database operations can crash the entire process. Consider returning an error or using a more graceful fallback.♻️ Proposed refactor to return error
-func (u *UsersDirectory) keyForNotificationVersion( +func (u *UsersDirectory) keyForNotificationVersion( userID id.UserID, roomID id.RoomID, version tuple.Versionstamp, -) fdb.Key { +) (fdb.Key, error) { tup := tuple.Tuple{userID.String(), roomID.String(), version} if types.IsIncompleteVersionstamp(version) { key, err := u.notificationVersions.PackWithVersionstamp(tup) if err != nil { - panic(err) + return nil, err } - return key + return key, nil } - return u.notificationVersions.Pack(tup) + return u.notificationVersions.Pack(tup), nil }This would require updating callers to handle the error.
73-100: Thread-specific clearing iterates all notifications.
TxnClearThreadNotificationsUpToreads all notifications up to the version and filters by thread. For users with many notifications, this could be inefficient. Consider if a secondary index by thread would be beneficial for frequent thread-specific operations.
| Babbleserv is roughly divided in three: | ||
|
|
||
| - databases talk to FoundationDB, no access to network/federation, implements Matrix spec on top of the databases (event auth, state res) using FDB transactions | ||
| - routes implement the client/federation Matrix APIs, pre-fetch anything before passing to relevant database call | ||
| - workers handle asynchronous tasks after database changes (federation outgoing, profile updates, presence, push notifications) | ||
|
|
||
| ### `internal/databases/*/` | ||
|
|
||
| - each represents a FDB cluster containing a logical group of sub-databases | ||
| - top level database transactions called by routes | ||
| - call through to the domain specific directories nested modules | ||
| - each database lives under a key prefix | ||
|
|
||
| #### `internal/databases/*/*/` | ||
|
|
||
| - individual database "directories" (FDB thing) | ||
| - individual database "directories" (key prefix) | ||
| - group together common key prefix operations (ie events, users) |
There was a problem hiding this comment.
Hyphenate compound adjectives in module layout bullets.
Minor grammar polish per static analysis.
✏️ Suggested edit
-- top level database transactions called by routes
-- call through to the domain specific directories nested modules
+- top-level database transactions called by routes
+- call through to the domain-specific directories nested modules📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Babbleserv is roughly divided in three: | |
| - databases talk to FoundationDB, no access to network/federation, implements Matrix spec on top of the databases (event auth, state res) using FDB transactions | |
| - routes implement the client/federation Matrix APIs, pre-fetch anything before passing to relevant database call | |
| - workers handle asynchronous tasks after database changes (federation outgoing, profile updates, presence, push notifications) | |
| ### `internal/databases/*/` | |
| - each represents a FDB cluster containing a logical group of sub-databases | |
| - top level database transactions called by routes | |
| - call through to the domain specific directories nested modules | |
| - each database lives under a key prefix | |
| #### `internal/databases/*/*/` | |
| - individual database "directories" (FDB thing) | |
| - individual database "directories" (key prefix) | |
| - group together common key prefix operations (ie events, users) | |
| Babbleserv is roughly divided in three: | |
| - databases talk to FoundationDB, no access to network/federation, implements Matrix spec on top of the databases (event auth, state res) using FDB transactions | |
| - routes implement the client/federation Matrix APIs, pre-fetch anything before passing to relevant database call | |
| - workers handle asynchronous tasks after database changes (federation outgoing, profile updates, presence, push notifications) | |
| ### `internal/databases/*/` | |
| - each represents a FDB cluster containing a logical group of sub-databases | |
| - top-level database transactions called by routes | |
| - call through to the domain-specific directories nested modules | |
| - each database lives under a key prefix | |
| #### `internal/databases/*/*/` | |
| - individual database "directories" (key prefix) | |
| - group together common key prefix operations (ie events, users) |
🧰 Tools
🪛 LanguageTool
[uncategorized] ~33-~33: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...ning a logical group of sub-databases - top level database transactions called by routes ...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[grammar] ~34-~34: Use a hyphen to join words.
Context: ...d by routes - call through to the domain specific directories nested modules - ea...
(QB_NEW_EN_HYPHEN)
🤖 Prompt for AI Agents
In `@docs/project-structure.md` around lines 24 - 40, The bullet points under the
module layout (lines describing "internal/databases/*/" and
"internal/databases/*/*/") use unhyphenated compound adjectives; update them to
use hyphenated compounds (e.g., "top-level database transactions",
"domain-specific directories", "key-prefix operations", "individual-database
'directories'", "common-key-prefix operations") so grammar is correct and
consistent for the entries referencing internal/databases/*/ and
internal/databases/*/*/.
| func (a *AccountsDatabase) DeletePusherForUser( | ||
| ctx context.Context, | ||
| userID id.UserID, | ||
| pushKey string, | ||
| ) error { | ||
| _, err := util.DoWriteTransaction(ctx, a.db, func(txn fdb.Transaction) (types.Nil, error) { | ||
| a.users.TxnDeletePusherForUser(txn, userID, pushKey) | ||
| return nil, nil | ||
| }) |
There was a problem hiding this comment.
Propagate delete errors from TxnDeletePusherForUser.
Line 40 drops any error returned by the helper. If deletion can fail (e.g., validation or transactional errors), the caller will see success incorrectly. Return the helper error from the transaction callback.
🛠️ Suggested fix
- _, err := util.DoWriteTransaction(ctx, a.db, func(txn fdb.Transaction) (types.Nil, error) {
- a.users.TxnDeletePusherForUser(txn, userID, pushKey)
- return nil, nil
- })
+ _, err := util.DoWriteTransaction(ctx, a.db, func(txn fdb.Transaction) (types.Nil, error) {
+ return nil, a.users.TxnDeletePusherForUser(txn, userID, pushKey)
+ })🤖 Prompt for AI Agents
In `@internal/databases/accounts/pushers.go` around lines 34 - 42, In
DeletePusherForUser, the transaction callback currently ignores the error
returned by a.users.TxnDeletePusherForUser; change the callback to capture and
return that error (i.e., return nil, err from the callback) so
util.DoWriteTransaction receives and propagates any failure, ensuring
DeletePusherForUser returns the error from a.users.TxnDeletePusherForUser rather
than always succeeding.
| // Ensure .Raw is populated as is needed for push rule eval | ||
| json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw) |
There was a problem hiding this comment.
Unchecked error from json.Unmarshal.
The error from json.Unmarshal is discarded. While the VeryRaw field is already set, malformed JSON could leave Content.Raw in an unexpected state, potentially affecting push rule evaluation.
🐛 Proposed fix to handle the error
// Ensure .Raw is populated as is needed for push rule eval
- json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw)
+ if err := json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw); err != nil {
+ // Log but continue with VeryRaw only - push rules may not work correctly
+ // but this shouldn't block notifications entirely
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Ensure .Raw is populated as is needed for push rule eval | |
| json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw) | |
| // Ensure .Raw is populated as is needed for push rule eval | |
| if err := json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw); err != nil { | |
| // Log but continue with VeryRaw only - push rules may not work correctly | |
| // but this shouldn't block notifications entirely | |
| } |
🤖 Prompt for AI Agents
In `@internal/databases/rooms/notifications.go` around lines 51 - 52, The call to
json.Unmarshal(ev.Content, &mautrixEvt.Content.Raw) ignores its error; update
the code that sets mautrixEvt.Content.Raw (the json.Unmarshal call) to check the
returned error and handle it (e.g., log the error with context or return it) so
malformed JSON won't leave Content.Raw in an unexpected state—use existing
logging/error patterns in this file (or return the error from the surrounding
function) to surface the failure instead of discarding it.
| } | ||
|
|
||
| func (n *Notifier) unlockedUnusbscribe(ch chan any) { | ||
| func (n *Notifier) unlockedUnusbscribe(ch chan Change) { |
There was a problem hiding this comment.
Typo in function name: unlockedUnusbscribe should be unlockedUnsubscribe.
Proposed fix
-func (n *Notifier) unlockedUnusbscribe(ch chan Change) {
+func (n *Notifier) unlockedUnsubscribe(ch chan Change) {Also update the call site at line 247:
- n.unlockedUnusbscribe(ch)
+ n.unlockedUnsubscribe(ch)🤖 Prompt for AI Agents
In `@internal/notifier/notifier.go` at line 315, Rename the method
unlockedUnusbscribe to unlockedUnsubscribe on the Notifier receiver (func (n
*Notifier) unlockedUnsubscribe(...)) to fix the typo, and update every call site
that invokes unlockedUnusbscribe (including the call referenced near line 247)
to call unlockedUnsubscribe instead so the symbol names match and the code
compiles.
| // Similar to subscribe but takes a custom channel, which can have a greater buffer than default 1, | ||
| // but note the delivery is still lossy - if the channel is full the chnage will be dropped. |
There was a problem hiding this comment.
Typo in comment: "chnage" should be "change".
Proposed fix
// Similar to subscribe but takes a custom channel, which can have a greater buffer than default 1,
-// but note the delivery is still lossy - if the channel is full the chnage will be dropped.
+// but note the delivery is still lossy - if the channel is full the change will be dropped.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Similar to subscribe but takes a custom channel, which can have a greater buffer than default 1, | |
| // but note the delivery is still lossy - if the channel is full the chnage will be dropped. | |
| // Similar to subscribe but takes a custom channel, which can have a greater buffer than default 1, | |
| // but note the delivery is still lossy - if the channel is full the change will be dropped. |
🤖 Prompt for AI Agents
In `@internal/notifier/notifiers.go` around lines 35 - 36, Fix the typo in the
package comment describing the custom channel subscription: change "chnage" to
"change" in the comment that starts "Similar to subscribe but takes a custom
channel..." (near the subscribe-related notifier logic), ensuring the sentence
reads "...if the channel is full the change will be dropped.".
| // Bump counter, if > the notification limit, compact room and exit | ||
| n.roomIDToCount[roomID]++ | ||
| if n.roomIDToCount[roomID] > n.config.Rooms.MaxNotificationsPerUserRoom { | ||
| n.log.Debug(). | ||
| Stringer("room_id", roomID). | ||
| Int("max_notifications", n.config.Rooms.MaxNotificationsPerUserRoom). | ||
| Msg("Compacting room notifications after sufficient traffic") | ||
| n.compactNotificationsForRoom(roomID) | ||
| return | ||
| } |
There was a problem hiding this comment.
Early return only processes first room exceeding threshold.
The return at line 84 exits the entire unlockedHandleChange function after processing the first room that exceeds MaxNotificationsPerUserRoom. If multiple rooms are in change.RoomIDs, the remaining rooms are skipped entirely.
🐛 Proposed fix to continue processing other rooms
n.roomIDToCount[roomID]++
if n.roomIDToCount[roomID] > n.config.Rooms.MaxNotificationsPerUserRoom {
n.log.Debug().
Stringer("room_id", roomID).
Int("max_notifications", n.config.Rooms.MaxNotificationsPerUserRoom).
Msg("Compacting room notifications after sufficient traffic")
n.compactNotificationsForRoom(roomID)
- return
+ delete(n.roomIDToCount, roomID)
+ delete(n.roomIDToCancel, roomID)
+ continue
}🤖 Prompt for AI Agents
In `@internal/workers/compactnotificationiterator.go` around lines 76 - 85, The
unlockedHandleChange loop increments n.roomIDToCount[roomID] and compacts when
it exceeds n.config.Rooms.MaxNotificationsPerUserRoom but then returns, which
stops processing the rest of change.RoomIDs; change the control flow in
unlockedHandleChange so that after calling n.compactNotificationsForRoom(roomID)
it does not return but continues iterating over the remaining roomIDs (i.e.,
remove the early return and let the loop proceed), ensuring each room in
change.RoomIDs is checked and compacted as needed.
| n.compactNotificationsForRoom(roomID) | ||
| return |
There was a problem hiding this comment.
Compaction errors are silently ignored.
compactNotificationsForRoom returns an error, but callers at lines 83 and 100 discard it. Failed compactions should at least be logged.
🐛 Proposed fix to log errors
- n.compactNotificationsForRoom(roomID)
+ if err := n.compactNotificationsForRoom(roomID); err != nil {
+ n.log.Err(err).Stringer("room_id", roomID).Msg("Failed to compact room notifications")
+ }Also applies to: 100-100
🤖 Prompt for AI Agents
In `@internal/workers/compactnotificationiterator.go` around lines 83 - 84, The
calls to compactNotificationsForRoom are ignoring returned errors at the two
call sites; update both call locations so they capture the error (err :=
n.compactNotificationsForRoom(roomID)) and log failures instead of discarding
them (e.g., using the component's logger or n.logger/errorf) with contextual
info including roomID and the error; keep returning/continuing behavior
unchanged except ensure errors are not silently dropped.
| go func() { | ||
| select { | ||
| case <-ctx.Done(): | ||
| n.log.Trace().Msg("Room compact timeout canceled") | ||
| return | ||
| case <-time.After(n.config.Rooms.CompactRoomNotificationsTimeout): | ||
| n.log.Debug(). | ||
| Stringer("room_id", roomID). | ||
| Dur("timeout", n.config.Rooms.CompactRoomNotificationsTimeout). | ||
| Msg("Compacting room notifications after timeout") | ||
| n.compactNotificationsForRoom(roomID) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Timeout goroutine captures loop variable and lacks cleanup.
The goroutine captures roomID from the loop but since Go 1.22+ this is safe. However, when the timeout fires, it doesn't clean up roomIDToCount and roomIDToCancel entries, causing the maps to grow indefinitely.
🐛 Proposed fix to clean up state after compaction
go func() {
select {
case <-ctx.Done():
n.log.Trace().Msg("Room compact timeout canceled")
return
case <-time.After(n.config.Rooms.CompactRoomNotificationsTimeout):
n.log.Debug().
Stringer("room_id", roomID).
Dur("timeout", n.config.Rooms.CompactRoomNotificationsTimeout).
Msg("Compacting room notifications after timeout")
- n.compactNotificationsForRoom(roomID)
+ if err := n.compactNotificationsForRoom(roomID); err != nil {
+ n.log.Err(err).Stringer("room_id", roomID).Msg("Failed to compact room notifications")
+ }
+ // Clean up state (requires mutex protection)
+ n.mu.Lock()
+ delete(n.roomIDToCount, roomID)
+ delete(n.roomIDToCancel, roomID)
+ n.mu.Unlock()
}
}()🤖 Prompt for AI Agents
In `@internal/workers/compactnotificationiterator.go` around lines 90 - 102, The
timeout goroutine spawned in compactnotificationiterator.go captures roomID and
never removes its state, causing roomIDToCount and roomIDToCancel to grow;
modify the goroutine so that on both ctx.Done() and after calling
compactNotificationsForRoom(roomID) it acquires the iterator's mutex and deletes
the entries from n.roomIDToCount and n.roomIDToCancel (and if you store a cancel
func in n.roomIDToCancel ensure you call it/clear it only once), ensuring any
stored cancel is invoked/cleared and the maps are cleaned up before the
goroutine returns.
| if err := n.sendPushNotificationsForEvents(newEventTups); err != nil { | ||
| n.log.Err(err).Msg("Failed to send push notifications") | ||
| } | ||
|
|
||
| currentVersion = newEventTups[len(newEventTups)-1].Version | ||
|
|
||
| if len(newEventTups) < pushNotificationIteratorBatchSize { | ||
| break | ||
| } |
There was a problem hiding this comment.
Batch processing continues despite errors, potentially skipping notifications.
When sendPushNotificationsForEvents fails, the error is logged but currentVersion is still advanced, meaning those events' push notifications may never be retried. This mirrors the issue flagged in compactnotificationiterator.go.
🐛 Proposed fix to retry failed batches
if err := n.sendPushNotificationsForEvents(newEventTups); err != nil {
n.log.Err(err).Msg("Failed to send push notifications")
+ return
}
currentVersion = newEventTups[len(newEventTups)-1].Version📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err := n.sendPushNotificationsForEvents(newEventTups); err != nil { | |
| n.log.Err(err).Msg("Failed to send push notifications") | |
| } | |
| currentVersion = newEventTups[len(newEventTups)-1].Version | |
| if len(newEventTups) < pushNotificationIteratorBatchSize { | |
| break | |
| } | |
| if err := n.sendPushNotificationsForEvents(newEventTups); err != nil { | |
| n.log.Err(err).Msg("Failed to send push notifications") | |
| return | |
| } | |
| currentVersion = newEventTups[len(newEventTups)-1].Version | |
| if len(newEventTups) < pushNotificationIteratorBatchSize { | |
| break | |
| } |
🤖 Prompt for AI Agents
In `@internal/workers/pushnotificationiterator.go` around lines 100 - 108, The
loop advances currentVersion and may skip retries when
sendPushNotificationsForEvents fails; modify the logic in the loop that calls
sendPushNotificationsForEvents(newEventTups) so that on error you do NOT update
currentVersion or advance the iterator: log the error (n.log.Err(err).Msg(...))
and return/stop retrying this run (or break) so the same batch can be retried
later, only updating currentVersion = newEventTups[len(newEventTups)-1].Version
and checking len(newEventTups) < pushNotificationIteratorBatchSize when
sendPushNotificationsForEvents succeeds; key symbols:
sendPushNotificationsForEvents, currentVersion, newEventTups,
pushNotificationIteratorBatchSize, n.log.Err.
| Counts: &pushgateway.NotificationCounts{ | ||
| Unread: notifCount + highlightCount, | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /root && find . -name "pushnotificationiterator.go" -type fRepository: beeper/babbleserv
Length of output: 116
🏁 Script executed:
find . -name "pushnotificationiterator.go" -type fRepository: beeper/babbleserv
Length of output: 107
🏁 Script executed:
cat -n ./internal/workers/pushnotificationiterator.go | head -250 | tail -50Repository: beeper/babbleserv
Length of output: 1922
🏁 Script executed:
cat -n ./internal/workers/pushnotificationiterator.go | head -200 | tail -100Repository: beeper/babbleserv
Length of output: 3561
🏁 Script executed:
find . -name "*.go" -type f | xargs grep -l "SumNotifications" | head -20Repository: beeper/babbleserv
Length of output: 236
🏁 Script executed:
cat -n ./internal/databases/rooms/notifications.goRepository: beeper/babbleserv
Length of output: 7403
🏁 Script executed:
cat -n ./internal/databases/rooms/users/notifications.goRepository: beeper/babbleserv
Length of output: 7882
🏁 Script executed:
grep -n "SumNotifications" ./internal/databases/rooms/sync.go -A 10 -B 2Repository: beeper/babbleserv
Length of output: 1081
🏁 Script executed:
rg "type NotificationCounts" -A 5Repository: beeper/babbleserv
Length of output: 43
🏁 Script executed:
find . -name "*.go" -type f | xargs grep -l "NotificationCounts" | head -10Repository: beeper/babbleserv
Length of output: 217
🏁 Script executed:
cat -n ./internal/types/sync.go | grep -A 10 "NotificationCounts"Repository: beeper/babbleserv
Length of output: 1375
🏁 Script executed:
find . -name "*.go" -type f | xargs grep -l "pushgateway" | head -10Repository: beeper/babbleserv
Length of output: 276
🏁 Script executed:
grep -r "import.*pushgateway" . --include="*.go" | head -5Repository: beeper/babbleserv
Length of output: 43
🏁 Script executed:
grep -n "pushgateway" ./internal/workers/pushnotificationiterator.go | head -20Repository: beeper/babbleserv
Length of output: 584
🏁 Script executed:
cat -n ./internal/workers/pushnotificationiterator.go | sed -n '160,230p'Repository: beeper/babbleserv
Length of output: 2401
Fix double-counting of highlights in unread notification count.
The Unread count is incorrectly set to notifCount + highlightCount. However, notifCount already includes all unread messages (including highlighted ones), and highlightCount is a subset representing which of those are highlighted. Adding them inflates the total.
Based on the SumNotifications implementation, each event generates a single notification entry with both Count and Highlight fields set. When summed, notifCount represents the total of all unread messages, while highlightCount represents how many of those are highlighted—they are not independent counts.
The fix:
Counts: &pushgateway.NotificationCounts{
- Unread: notifCount + highlightCount,
+ Unread: notifCount,
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Counts: &pushgateway.NotificationCounts{ | |
| Unread: notifCount + highlightCount, | |
| }, | |
| Counts: &pushgateway.NotificationCounts{ | |
| Unread: notifCount, | |
| }, |
🤖 Prompt for AI Agents
In `@internal/workers/pushnotificationiterator.go` around lines 224 - 226, The
Unread field is being double-counted by adding notifCount + highlightCount;
notifCount already includes highlighted messages. In pushnotificationiterator.go
where pushgateway.NotificationCounts is populated, set Unread to notifCount (not
notifCount + highlightCount) and keep Highlight set to highlightCount (i.e.,
Unread: notifCount, Highlight: highlightCount) so counts reflect
SumNotifications semantics; update the struct initialization that currently
references Unread, notifCount and highlightCount accordingly.
No description provided.