Skip to content

Commit 51a682e

Browse files
authored
feat: allow fallback store to be eagerly initialized (#170)
1 parent da584a5 commit 51a682e

File tree

8 files changed

+106
-9
lines changed

8 files changed

+106
-9
lines changed

core/src/main/resources/reference.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ akka.persistence.dynamodb {
7979
# Additionally, when reading events, at most this many events (for one query) will have in-flight
8080
# retrievals from the fallback store
8181
batch-size = 16
82+
83+
# Whether to start the fallback store at startup (eager = on) or defer until needed (eager = off)
84+
eager = off
8285
}
8386
}
8487
}
@@ -129,6 +132,9 @@ akka.persistence.dynamodb {
129132
# above), then the snapshot will be written to the fallback store and a "breadcrumb"
130133
# describing how to retrieve the snapshot from the fallback store will be written.
131134
threshold = 300 KiB
135+
136+
# Whether to start the fallback store at startup (eager = on) or defer until needed (eager = off)
137+
eager = off
132138
}
133139
}
134140
}

core/src/main/scala/akka/persistence/dynamodb/DynamoDBSettings.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ final class ClockSkewSettings(config: Config) {
339339

340340
/** Not for user extension */
341341
@DoNotInherit
342-
sealed abstract class FallbackSettings(val plugin: String, val threshold: Int) {
342+
sealed abstract class FallbackSettings(val plugin: String, val threshold: Int, val eager: Boolean) {
343343
require(threshold > 0, "threshold must be positive")
344344
require(threshold <= (400 * 1000), "threshold must be at most 400 KB")
345345

@@ -350,7 +350,8 @@ sealed abstract class FallbackSettings(val plugin: String, val threshold: Int) {
350350
}
351351

352352
@ApiMayChange
353-
final class SnapshotFallbackSettings(plugin: String, threshold: Int) extends FallbackSettings(plugin, threshold) {
353+
final class SnapshotFallbackSettings(plugin: String, threshold: Int, eager: Boolean)
354+
extends FallbackSettings(plugin, threshold, eager) {
354355
override def toString: String =
355356
if (isEnabled)
356357
s"SnapshotFallbackSettings(plugin=$plugin, threshold=${threshold}B)"
@@ -361,14 +362,15 @@ object SnapshotFallbackSettings {
361362
def apply(config: Config): SnapshotFallbackSettings = {
362363
val plugin = config.getString("plugin")
363364
val threshold = java.lang.Long.min(config.getBytes("threshold"), 400 * 1000).toInt
365+
val eager = config.getBoolean("eager")
364366

365-
new SnapshotFallbackSettings(plugin, threshold)
367+
new SnapshotFallbackSettings(plugin, threshold, eager)
366368
}
367369
}
368370

369371
@ApiMayChange
370372
final class JournalFallbackSettings(commonSettings: SnapshotFallbackSettings, val batchSize: Int)
371-
extends FallbackSettings(commonSettings.plugin, commonSettings.threshold) {
373+
extends FallbackSettings(commonSettings.plugin, commonSettings.threshold, commonSettings.eager) {
372374
require(!commonSettings.isEnabled || batchSize > 0, "batch size must be positive")
373375

374376
override def toString: String =

core/src/main/scala/akka/persistence/dynamodb/internal/Fallback.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ class FallbackStoreProvider(system: ActorSystem[_]) extends Extension {
159159
.toList)
160160
.map { _ => Done }(ExecutionContext.parasitic)
161161
}
162+
162163
def eventFallbackStoreFor(configLocation: String): EventFallbackStore[AnyRef] = {
163164
val storePair =
164165
fallbackStores.computeIfAbsent(configLocation, configLocation => pair(constructFallbackStore(configLocation)))
@@ -227,6 +228,15 @@ class FallbackStoreProvider(system: ActorSystem[_]) extends Extension {
227228
}
228229
}
229230

231+
/** INTERNAL API */
232+
@InternalApi
233+
final private[akka] def getFallbackStore(id: String): Option[CommonFallbackStore[AnyRef]] =
234+
fallbackStores.get(id) match {
235+
case null | (null, null) => None
236+
case (null, snap) => Some(snap)
237+
case (evt, _) => Some(evt)
238+
}
239+
230240
private val fallbackStores =
231241
new ConcurrentHashMap[String, (EventFallbackStore[AnyRef], SnapshotFallbackStore[AnyRef])]
232242
}

core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,4 +550,8 @@ import software.amazon.awssdk.services.dynamodb.model.Update
550550
Future.failed(c.getCause)
551551
}(ExecutionContext.parasitic)
552552
}
553+
554+
if (settings.journalFallbackSettings.isEnabled && settings.journalFallbackSettings.eager) {
555+
fallbackStoreProvider.eventFallbackStoreFor(settings.journalFallbackSettings.plugin)
556+
}
553557
}

core/src/main/scala/akka/persistence/dynamodb/internal/SnapshotDao.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,4 +563,8 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
563563
(conditions.result().mkString(" AND "), attributes)
564564
}
565565

566+
if (settings.snapshotFallbackSettings.isEnabled && settings.snapshotFallbackSettings.eager) {
567+
// constructs and saves in a concurrent hashmap for later use
568+
fallbackStoreProvider.snapshotFallbackStoreFor(settings.snapshotFallbackSettings.plugin)
569+
}
566570
}

core/src/test/scala/akka/persistence/dynamodb/cleanup/EventSourcedCleanupSpec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class EventSourcedCleanupSpec
102102
val from = (iteration * batchSize) + 1
103103
iteration = iteration + 1
104104
val to = Math.min(maxSeqNumber, from + batchSize - 1)
105-
val expectedMsg = s"Deleted events from [$from] to [$to] for persistenceId [$pid], consumed [8.0] WCU"
105+
val expectedMsg = s"Deleted events from [$from] to [$to] for persistenceId [$pid], consumed [4.0] WCU"
106106
event.message == expectedMsg
107107
}
108108
.expect {
@@ -168,7 +168,7 @@ class EventSourcedCleanupSpec
168168
val from = (iteration * batchSize) + 1
169169
iteration = iteration + 1
170170
val to = Math.min(maxSeqNumber, from + batchSize - 1)
171-
val expectedMsg = s"Deleted events from [$from] to [$to] for persistenceId [$pid], consumed [8.0] WCU"
171+
val expectedMsg = s"Deleted events from [$from] to [$to] for persistenceId [$pid], consumed [4.0] WCU"
172172
event.message == expectedMsg
173173
}
174174
.expect {
@@ -547,7 +547,7 @@ class EventSourcedCleanupSpec
547547
val to = Math.min(x, from + batchSize - 1)
548548
val expectedMessage =
549549
s"Updated expiry of events for persistenceId [$pid], for sequence numbers [$from] to [$to]," +
550-
s" expiring at [$expiryTimestamp], consumed [8.0] WCU"
550+
s" expiring at [$expiryTimestamp], consumed [4.0] WCU"
551551
event.message == expectedMessage
552552
}
553553
.expect {
@@ -686,7 +686,7 @@ class EventSourcedCleanupSpec
686686
val to = Math.min(n, from + batchSize - 1)
687687
val expectedMessage =
688688
s"Updated expiry of events for persistenceId [$pid], for sequence numbers [$from] to [$to]," +
689-
s" expiring at [$expiryTimestamp], consumed [8.0] WCU"
689+
s" expiring at [$expiryTimestamp], consumed [4.0] WCU"
690690
event.message == expectedMessage
691691
}
692692
.expect {
@@ -828,7 +828,7 @@ class EventSourcedCleanupSpec
828828
val to = Math.min(n, from + batchSize - 1)
829829
val expectedMessage =
830830
s"Updated expiry of events for persistenceId [$pid], for sequence numbers [$from] to [$to]," +
831-
s" expiring at [$expiryTimestamp], consumed [8.0] WCU"
831+
s" expiring at [$expiryTimestamp], consumed [4.0] WCU"
832832
event.message == expectedMessage
833833
}
834834
.expect {

core/src/test/scala/akka/persistence/dynamodb/journal/DynamoDBJournalSpec.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ object DynamoDBJournalSpec {
109109
}
110110
""")
111111
.withFallback(TestConfig.config)
112+
113+
def configWithEagerFallback: Config =
114+
ConfigFactory
115+
.parseString("akka.persistence.dynamodb.journal.fallback-store.eager = on")
116+
.withFallback(configWithSmallFallback)
112117
}
113118

114119
abstract class DynamoDBJournalBaseSpec(config: Config)
@@ -298,6 +303,11 @@ class DynamoDBJournalWithFallbackSpec
298303

299304
"A DynamoDB journal with fallback enabled" should {
300305

306+
"not eagerly initialize the fallback store when eager = off" in new Setup {
307+
FallbackStoreProvider(system).getFallbackStore(
308+
system.settings.config.getString("akka.persistence.dynamodb.journal.fallback-store.plugin")) shouldBe empty
309+
}
310+
301311
"not interact with the fallback store for small events" in withFallbackStoreProbe { (fallbackStore, invocations) =>
302312
new Setup {
303313
val probe = TypedTestProbe[Any]()
@@ -412,3 +422,29 @@ class DynamoDBJournalWithFallbackSpec
412422
invocationsProbe.cancel()
413423
}
414424
}
425+
426+
class DynamoDBJournalWithEagerFallbackSpec
427+
extends ScalaTestWithActorTestKit(DynamoDBJournalSpec.configWithEagerFallback)
428+
with AnyWordSpecLike
429+
with TestData
430+
with TestDbLifecycle
431+
with LogCapturing {
432+
433+
def typedSystem: ActorSystem[_] = testKit.system
434+
435+
class Setup {
436+
val journal = persistenceExt.journalFor("akka.persistence.dynamodb.journal")
437+
}
438+
439+
"A DynamoDB journal with fallback enabled" should {
440+
441+
"eagerly initialize the fallback store when eager = on" in new Setup {
442+
val cfg = system.settings.config.getConfig("akka.persistence.dynamodb.journal.fallback-store")
443+
val plugin = cfg.getString("plugin")
444+
445+
eventually {
446+
FallbackStoreProvider(system).getFallbackStore(plugin) shouldNot be(empty)
447+
}
448+
}
449+
}
450+
}

core/src/test/scala/akka/persistence/dynamodb/snapshot/DynamoDBSnapshotStoreSpec.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ object DynamoDBSnapshotStoreSpec {
9191
}
9292
""")
9393
.withFallback(config)
94+
95+
def configWithEagerFallback: Config =
96+
ConfigFactory
97+
.parseString("akka.persistence.dynamodb.snapshot.fallback-store.eager = on")
98+
.withFallback(configWithSmallFallback)
9499
}
95100

96101
abstract class DynamoDBSnapshotStoreBaseSpec(config: Config)
@@ -212,6 +217,11 @@ class DynamoDBSnapshotFallbackSpec
212217
}
213218

214219
"A DynamoDB snapshot store with fallback enabled" should {
220+
"not eagerly initialize the fallback store when eager = off" in new Setup {
221+
FallbackStoreProvider(system).getFallbackStore(
222+
system.settings.config.getString("akka.persistence.dynamodb.journal.fallback-store.plugin")) shouldBe empty
223+
}
224+
215225
"not interact with the fallback store for small snapshots" in withFallbackStoreProbe {
216226
(fallbackStore, invocations) =>
217227
new Setup {
@@ -319,6 +329,31 @@ class DynamoDBSnapshotFallbackSpec
319329
}
320330
}
321331

332+
class DynamoDBSnapshotWithEagerFallbackSpec
333+
extends ScalaTestWithActorTestKit(DynamoDBSnapshotStoreSpec.configWithEagerFallback)
334+
with AnyWordSpecLike
335+
with TestData
336+
with TestDbLifecycle
337+
with LogCapturing {
338+
339+
def typedSystem: ActorSystem[_] = testKit.system
340+
341+
class Setup {
342+
val snapshotStore = persistenceExt.snapshotStoreFor("akka.persistence.dynamodb.snapshot")
343+
}
344+
345+
"A DynamoDB snapshot store with fallback enabled" should {
346+
"eagerly initialize the fallback store when eager = on" in new Setup {
347+
val cfg = system.settings.config.getConfig("akka.persistence.dynamodb.snapshot.fallback-store")
348+
val plugin = cfg.getString("plugin")
349+
350+
eventually {
351+
FallbackStoreProvider(system).getFallbackStore(plugin) shouldNot be(empty)
352+
}
353+
}
354+
}
355+
}
356+
322357
private def withFallbackStoreProbe(
323358
test: (InMemFallbackStore, TestSubscriber.Probe[InMemFallbackStore.Invocation]) => Unit): Unit = {
324359

0 commit comments

Comments
 (0)