@@ -359,7 +359,10 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do
359359 end
360360 end
361361
362+ @ tag capture_log: true
362363 test "a full pipeline run there are database errors" do
364+ FiggyTestSupport . make_broadway_parallel ( )
365+
363366 with_mocks [
364367 { FiggyRepo , [ :passthrough ] ,
365368 all: [
@@ -377,42 +380,47 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do
377380 ] ,
378381 insert: [
379382 in_series ( [ :_ , :_ ] , [
380- fn _ -> raise ( DBConnection.ConnectionError , "closed" ) end ,
383+ fn _ , _ -> raise ( DBConnection.ConnectionError , "closed" ) end ,
381384 fn changeset , ops -> passthrough ( [ changeset , ops ] ) end
382385 ] )
383386 ] }
384387 ] do
388+ { :ok , tracker_pid } = GenServer . start_link ( AckTracker , self ( ) )
389+ AckTracker . reset_count! ( tracker_pid )
385390 # Start the figgy pipeline in a way that mimics how it is started in
386391 # dev and prod (slightly simplified)
387392 cache_version = 1
388393
389394 children = [
390395 { Figgy.IndexingConsumer ,
391- cache_version: cache_version , batch_size: 50 , write_collection : active_collection ( ) } ,
396+ cache_version: cache_version , batch_size: 50 , solr_index : active_collection ( ) } ,
392397 { Figgy.TransformationConsumer , cache_version: cache_version , batch_size: 50 } ,
393398 { Figgy.HydrationConsumer , cache_version: cache_version , batch_size: 50 }
394399 ]
395400
401+ AckTracker . reset_count! ( tracker_pid )
402+
403+ on_exit ( fn -> Supervisor . stop ( DpulCollections.TestSupervisor , :normal ) end )
404+
396405 Supervisor . start_link ( children ,
397406 strategy: :one_for_one ,
398407 name: DpulCollections.TestSupervisor
399408 )
400409
401- task =
402- Task . async ( fn -> wait_for_index_completion ( ) end )
403-
404- Task . await ( task , 15000 )
410+ # Need a little sleep so the first query isn't from AckTracker.
411+ :timer . sleep ( 50 )
412+ AckTracker . wait_for_pipeline_finished ( tracker_pid )
405413
406414 # the hydrator pulled all ephemera folders and terms
407415 entry_count = Repo . aggregate ( Figgy.HydrationCacheEntry , :count )
408- assert FiggyTestSupport . total_resource_count ( ) == entry_count
416+ assert FiggyTestSupport . total_resource_count ( ) == entry_count + 2
409417
410418 # the transformer only processes ephemera folders
411419 transformation_cache_entry_count = Repo . aggregate ( Figgy.TransformationCacheEntry , :count )
412- assert FiggyTestSupport . ephemera_folder_count ( ) == transformation_cache_entry_count
420+ assert FiggyTestSupport . ephemera_folder_count ( ) + 1 == transformation_cache_entry_count
413421
414422 # indexed all the documents
415- assert Solr . document_count ( ) == transformation_cache_entry_count
423+ assert Solr . document_count ( ) + 1 == transformation_cache_entry_count
416424
417425 # Ensure that the processor markers have the correct cache version
418426 hydration_processor_marker = IndexingPipeline . get_processor_marker! ( "figgy_hydrator" , 1 )
@@ -424,8 +432,6 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do
424432 assert hydration_processor_marker . cache_version == 1
425433 assert transformation_processor_marker . cache_version == 1
426434 assert indexing_processor_marker . cache_version == 1
427-
428- Supervisor . stop ( DpulCollections.TestSupervisor , :normal )
429435 end
430436 end
431437end
0 commit comments