-
Notifications
You must be signed in to change notification settings - Fork 36
feat: Preserve original event order of replicated events #1380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
patriknw
commented
Nov 20, 2025
- the EventOriginFilter is an optimization to avoid transferring full event payloads from all replicas
- this may cause events to be delivered in another order than they were originally written when using more than two replicas
- this would send events from all replicas and thereby preserve the causality, since transitively received events are also delivered via other replicas
- to still reduce overhead of sending full payloads from all replicas acks are sent back the producer side, which can on best effort filter out already acked events
- deduplication is already in place in the receiving EventSourcedBehavior (seenPerReplica)
* the EventOriginFilter is an optimization to avoid transferring full event payloads from all replicas * this may cause events to be delivered in another order than they were originally written when using more than two replicas * this would send events from all replicas and thereby preserve the causality, since transitively received events are also delivered via other replicas * to still reduce overhead of sending full payloads from all replicas acks are sent back the producer side, which can on best effort filter out already acked events * deduplication is already in place in the receiving EventSourcedBehavior (seenPerReplica)
| val streamId: String, | ||
| val originPersistenceId: String, | ||
| val originSeqNr: Long) | ||
| extends SubscriberCommand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the same mechanism as we already have for propagating filter changes from consumer to producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is enough. I think it's only published to grpc streams on the local node, but there can be other projection instances on other nodes in the consumer cluster. It probably works for filters because for them we use ddata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it (normally) only be about pids in this slice, and therefore not need to be propagated to other gRCP streams though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm thinking wrong....
scenario:
- r2 writes evt-1
- r2 writes evt-2
- r3 receives evt-1 and evt-2 from r2
- r3 writes evt-3
- r1 receives evt-1 from r3 (originally from r2)
- r1 receives evt-2 from r3 (originally from r2)
- r1 receives evt-3 from r2
Here, the purpose is that r1 sends then acks of evt-1 and evt-2 to r2, so that r2 doesn't have to send full evt-1 and evt-2 to r1.
In r1 there are two consumer projections. One to consume from r2 and another to consume from r3. Those projections might not be collocated on the same node in r1 cluster.
Additionally, the incoming evt-1 from r3 has a different pid, and slice than the incoming evt-1 from r2, because of the different region suffixes, so they come from different slice ranges (but more fundamentally from different projections).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I probably had bad luck thinking, ofc we would want it distributed to be filters in producers for the same entity type in the different regions. (I was thinking ddata in one producing cluster)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For filters we have ddata in the consumer side, which distributes to other nodes in the consumer cluster, which then publishes to the local grpc streams there. (if I understand what we did 😄 )
For the acks we should probably use pubsub on the consumer side.
| logPrefix, | ||
| ack.getOriginEvent.persistenceId, | ||
| ack.getOriginEvent.seqNr) | ||
| // FIXME implement a "cache" of acks, which is used onPush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking it would be enough to keep the most recent in memory.
| 10.seconds, | ||
| 8, | ||
| R2dbcReplication()) | ||
| .withEventOriginFilterEnabled(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have verified in logs that the acks reach the FilterStage on the producer side.