Skip to content

Commit e444db6

Browse files
authored
chore: Internal feature to intercept and emit additional events (#32823)
1 parent de7e385 commit e444db6

File tree

7 files changed

+283
-32
lines changed

7 files changed

+283
-32
lines changed

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,8 @@ class ReplicatedEventPublishingSpec
444444
MyReplicatedBehavior.Command,
445445
String,
446446
Set[String]] =
447-
_.withReplicatedEventTransformation { (_, eventWithMeta) =>
448-
EventWithMetadata(eventWithMeta.event.toUpperCase, Nil)
447+
_.withReplicatedEventsTransformation { (_, eventWithMeta) =>
448+
EventWithMetadata(eventWithMeta.event.toUpperCase, Nil) :: Nil
449449
}
450450
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation))
451451
actor ! MyReplicatedBehavior.Add("one", probe.ref)
@@ -466,6 +466,40 @@ class ReplicatedEventPublishingSpec
466466
probe.expectMessage(Set("one", "TWO", "three"))
467467
}
468468

469+
"transform replicated events and emit additional events" in {
470+
val id = nextEntityId()
471+
val probe = createTestProbe[Any]()
472+
case class Intercepted(origin: ReplicaId, seqNr: Long, event: String)
473+
val addTransformation
474+
: EventSourcedBehavior[MyReplicatedBehavior.Command, String, Set[String]] => EventSourcedBehavior[
475+
MyReplicatedBehavior.Command,
476+
String,
477+
Set[String]] =
478+
_.withReplicatedEventsTransformation { (_, eventWithMeta) =>
479+
EventWithMetadata(eventWithMeta.event.toUpperCase + "-1", Nil) ::
480+
EventWithMetadata(eventWithMeta.event.toUpperCase + "-2", Nil) ::
481+
EventWithMetadata(eventWithMeta.event.toUpperCase + "-3", Nil) ::
482+
Nil
483+
}
484+
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation))
485+
actor ! MyReplicatedBehavior.Add("one", probe.ref)
486+
probe.expectMessage(Done)
487+
488+
// simulate a published event from another replica
489+
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
490+
ReplicationId(EntityType, id, DCB).persistenceId,
491+
1L,
492+
"two",
493+
System.currentTimeMillis(),
494+
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty, None)),
495+
None)
496+
actor ! MyReplicatedBehavior.Add("three", probe.ref)
497+
probe.expectMessage(Done)
498+
499+
actor ! MyReplicatedBehavior.Get(probe.ref)
500+
probe.expectMessage(Set("one", "TWO-1", "TWO-2", "TWO-3", "three"))
501+
}
502+
469503
}
470504

471505
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala

Lines changed: 136 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -774,14 +774,14 @@ class ReplicatedEventSourcingSpec
774774
val addTransformation: (
775775
EventSourcedBehavior[Command, String, State],
776776
ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) =>
777-
behv.withReplicatedEventTransformation { (_, eventWithMeta) =>
777+
behv.withReplicatedEventsTransformation { (_, eventWithMeta) =>
778778
val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context)
779779
val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata]
780780
if (resMeta1 != resMeta2)
781781
throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2")
782782

783783
val newMeta = eventWithMeta.metadata[Meta].map(m => m.copy(m.value.toUpperCase)).toList
784-
EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta)
784+
EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta) :: Nil
785785
}
786786
}
787787
val r1 = spawn(
@@ -830,6 +830,140 @@ class ReplicatedEventSourcingSpec
830830
r2 ! GetState(stateProbe.ref)
831831
stateProbe.expectMessage(State(Vector("FROM R1", "from r2")))
832832
}
833+
834+
"transform replicated events and emit additional events" in {
835+
val entityId = nextEntityId
836+
val probe = createTestProbe[Done]()
837+
val eventProbe1 = createTestProbe[EventAndContext]()
838+
val eventProbe2 = createTestProbe[EventAndContext]()
839+
val addTransformation: (
840+
EventSourcedBehavior[Command, String, State],
841+
ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) =>
842+
behv.withReplicatedEventsTransformation { (_, eventWithMeta) =>
843+
val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context)
844+
val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata]
845+
if (resMeta1 != resMeta2)
846+
throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2")
847+
848+
if (eventWithMeta.event.startsWith("transformed")) {
849+
// break the loop
850+
eventWithMeta :: Nil
851+
} else {
852+
EventWithMetadata("transformed-1: " + eventWithMeta.event.toUpperCase, Meta("meta-1")) ::
853+
EventWithMetadata("transformed-2: " + eventWithMeta.event.toUpperCase, Meta("meta-2")) ::
854+
EventWithMetadata("transformed-3: " + eventWithMeta.event.toUpperCase, Meta("meta-3")) ::
855+
Nil
856+
}
857+
}
858+
}
859+
val r1 = spawn(
860+
testBehaviorWithContext(entityId, "R1", probe = Some(eventProbe1.ref), modifyBehavior = addTransformation))
861+
val r2 = spawn(
862+
testBehaviorWithContext(entityId, "R2", probe = Some(eventProbe2.ref), modifyBehavior = addTransformation))
863+
864+
r1 ! StoreMeWithMeta("from r1", probe.ref, Meta("meta from r1"))
865+
eventProbe1.expectMessage(
866+
EventAndContext(
867+
"from r1",
868+
ReplicaId("R1"),
869+
recoveryRunning = false,
870+
concurrent = false,
871+
Some(Meta("meta from r1"))))
872+
// replicated to r2, and transformed
873+
eventProbe2.expectMessage(
874+
EventAndContext(
875+
"transformed-1: FROM R1",
876+
ReplicaId("R1"),
877+
recoveryRunning = false,
878+
concurrent = false,
879+
meta = Some(Meta("meta-1"))))
880+
eventProbe2.expectMessage(
881+
EventAndContext(
882+
"transformed-2: FROM R1",
883+
ReplicaId("R2"), // this is R2 because it was R2 that emitted it
884+
recoveryRunning = false,
885+
concurrent = false,
886+
meta = Some(Meta("meta-2"))))
887+
eventProbe2.expectMessage(
888+
EventAndContext(
889+
"transformed-3: FROM R1",
890+
ReplicaId("R2"),
891+
recoveryRunning = false,
892+
concurrent = false,
893+
meta = Some(Meta("meta-3"))))
894+
895+
// R2 emitted two additional events, and those are replicated back to R1
896+
eventProbe1.expectMessage(
897+
EventAndContext(
898+
"transformed-2: FROM R1",
899+
ReplicaId("R2"),
900+
recoveryRunning = false,
901+
concurrent = false,
902+
meta = Some(Meta("meta-2"))))
903+
904+
eventProbe1.expectMessage(
905+
EventAndContext(
906+
"transformed-3: FROM R1",
907+
ReplicaId("R2"),
908+
recoveryRunning = false,
909+
concurrent = false,
910+
meta = Some(Meta("meta-3"))))
911+
912+
eventProbe1.expectNoMessage()
913+
eventProbe2.expectNoMessage()
914+
915+
r2 ! StoreMeWithMeta("from r2", probe.ref, Meta("meta from r2"))
916+
eventProbe2.expectMessage(
917+
EventAndContext(
918+
"from r2",
919+
ReplicaId("R2"),
920+
recoveryRunning = false,
921+
concurrent = false,
922+
Some(Meta("meta from r2"))))
923+
// replicated to r1, and transformed
924+
eventProbe1.expectMessage(
925+
EventAndContext(
926+
"transformed-1: FROM R2",
927+
ReplicaId("R2"),
928+
recoveryRunning = false,
929+
concurrent = false,
930+
meta = Some(Meta("meta-1"))))
931+
eventProbe1.expectMessage(
932+
EventAndContext(
933+
"transformed-2: FROM R2",
934+
ReplicaId("R1"),
935+
recoveryRunning = false,
936+
concurrent = false,
937+
meta = Some(Meta("meta-2"))))
938+
eventProbe1.expectMessage(
939+
EventAndContext(
940+
"transformed-3: FROM R2",
941+
ReplicaId("R1"),
942+
recoveryRunning = false,
943+
concurrent = false,
944+
meta = Some(Meta("meta-3"))))
945+
946+
// R1 emitted two additional events, and those are replicated back to R2
947+
eventProbe2.expectMessage(
948+
EventAndContext(
949+
"transformed-2: FROM R2",
950+
ReplicaId("R1"),
951+
recoveryRunning = false,
952+
concurrent = false,
953+
meta = Some(Meta("meta-2"))))
954+
955+
eventProbe2.expectMessage(
956+
EventAndContext(
957+
"transformed-3: FROM R2",
958+
ReplicaId("R1"),
959+
recoveryRunning = false,
960+
concurrent = false,
961+
meta = Some(Meta("meta-3"))))
962+
963+
eventProbe1.expectNoMessage()
964+
eventProbe2.expectNoMessage()
965+
966+
}
833967
}
834968

835969
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withReplicatedEventsTransformation")

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private[akka] final class BehaviorSetup[C, E, S](
6666
private var retentionInProgress: Boolean,
6767
val instrumentation: EventSourcedBehaviorInstrumentation,
6868
val replicationInterceptor: Option[ReplicationInterceptor[S, E]],
69-
val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => EventWithMetadata[E]]) {
69+
val replicatedEventTransformation: Option[(S, EventWithMetadata[E]) => Seq[EventWithMetadata[E]]]) {
7070

7171
import BehaviorSetup._
7272
import InternalProtocol.RecoveryTickEvent

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
121121
publishEvents: Boolean = true,
122122
customStashCapacity: Option[Int] = None,
123123
replicatedEventInterceptor: Option[ReplicationInterceptor[State, Event]] = None,
124-
replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => EventWithMetadata[Event]] = None)
124+
replicatedEventTransformation: Option[(State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]]] = None)
125125
extends EventSourcedBehavior[Command, Event, State] {
126126

127127
import EventSourcedBehaviorImpl.WriterIdentity
@@ -345,6 +345,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
345345

346346
override def withReplicatedEventTransformation(
347347
f: (State, EventWithMetadata[Event]) => EventWithMetadata[Event]): EventSourcedBehavior[Command, Event, State] =
348+
copy(replicatedEventTransformation = Some((s: State, e: EventWithMetadata[Event]) => f(s, e) :: Nil))
349+
350+
override def withReplicatedEventsTransformation(f: (State, EventWithMetadata[Event]) => Seq[EventWithMetadata[Event]])
351+
: EventSourcedBehavior[Command, Event, State] =
348352
copy(replicatedEventTransformation = Some(f))
349353

350354
}

0 commit comments

Comments
 (0)