Skip to content

Conversation

@patriknw
Copy link
Contributor

  • the EventOriginFilter is an optimization to avoid transferring full event payloads from all replicas
  • this may cause events to be delivered in another order than they were originally written when using more than two replicas
  • this would send events from all replicas and thereby preserve the causality, since transitively received events are also delivered via other replicas
  • to still reduce overhead of sending full payloads from all replicas acks are sent back the producer side, which can on best effort filter out already acked events
  • deduplication is already in place in the receiving EventSourcedBehavior (seenPerReplica)

* the EventOriginFilter is an optimization to avoid transferring full event payloads
  from all replicas
* this may cause events to be delivered in another order than they were originally written
  when using more than two replicas
* this would send events from all replicas and thereby preserve the causality, since
  transitively received events are also delivered via other replicas
* to still reduce overhead of sending full payloads from all replicas acks are sent back
  the producer side, which can on best effort filter out already acked events
* deduplication is already in place in the receiving EventSourcedBehavior (seenPerReplica)
val streamId: String,
val originPersistenceId: String,
val originSeqNr: Long)
extends SubscriberCommand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same mechanism as we already have for propagating filter changes from consumer to producer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is enough. I think it's only published to grpc streams on the local node, but there can be other projection instances on other nodes in the consumer cluster. It probably works for filters because for them we use ddata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it (normally) only be about pids in this slice, and therefore not need to be propagated to other gRCP streams though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm thinking wrong....

scenario:

- r2 writes evt-1
- r2 writes evt-2

- r3 receives evt-1 and evt-2 from r2
- r3 writes evt-3

- r1 receives evt-1 from r3 (originally from r2)
- r1 receives evt-2 from r3 (originally from r2)
- r1 receives evt-3 from r2

Here, the purpose is that r1 sends then acks of evt-1 and evt-2 to r2, so that r2 doesn't have to send full evt-1 and evt-2 to r1.

In r1 there are two consumer projections. One to consume from r2 and another to consume from r3. Those projections might not be collocated on the same node in r1 cluster.

Additionally, the incoming evt-1 from r3 has a different pid, and slice than the incoming evt-1 from r2, because of the different region suffixes, so they come from different slice ranges (but more fundamentally from different projections).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I probably had bad luck thinking, ofc we would want it distributed to be filters in producers for the same entity type in the different regions. (I was thinking ddata in one producing cluster)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For filters we have ddata in the consumer side, which distributes to other nodes in the consumer cluster, which then publishes to the local grpc streams there. (if I understand what we did 😄 )

For the acks we should probably use pubsub on the consumer side.

logPrefix,
ack.getOriginEvent.persistenceId,
ack.getOriginEvent.seqNr)
// FIXME implement a "cache" of acks, which is used onPush
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking it would be enough to keep the most recent in memory.

@patriknw patriknw marked this pull request as draft November 20, 2025 15:16
10.seconds,
8,
R2dbcReplication())
.withEventOriginFilterEnabled(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have verified in logs that the acks reach the FilterStage on the producer side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants