@@ -56,6 +56,7 @@ const PROPERTY_NAME_RETENTION_TYPE: &str = "retention-type";
5656const PROPERTY_NAME_RETENTION_DAYS : & str = "retention-days" ;
5757const PROPERTY_NAME_RETENTION_BYTES : & str = "retention-bytes" ;
5858const PROPERTY_NAME_RETENTION_MAINTENANCE_INTERVAL_SECONDS : & str = "retention-maintenance-interval-seconds" ;
59+ const PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC : & str = "simulate-failure-after-sec" ;
5960
6061#[ derive( Debug , Eq , PartialEq , Ord , PartialOrd , Hash , Clone , Copy , glib:: GEnum ) ]
6162#[ repr( u32 ) ]
@@ -244,6 +245,7 @@ struct Settings {
244245 retention_days : Option < f64 > ,
245246 retention_bytes : Option < u64 > ,
246247 retention_maintenance_interval_seconds : u64 ,
248+ simulate_failure_after_sec : Option < u64 > ,
247249}
248250
249251impl Default for Settings {
@@ -263,6 +265,7 @@ impl Default for Settings {
263265 retention_days : None ,
264266 retention_bytes : None ,
265267 retention_maintenance_interval_seconds : DEFAULT_RETENTION_MAINTENANCE_INTERVAL_SECONDS ,
268+ simulate_failure_after_sec : None ,
266269 }
267270 }
268271}
@@ -284,6 +287,7 @@ enum State {
284287 buffers_written : u64 ,
285288 retention_thread_stop_tx : Sender < ( ) > ,
286289 retention_thread_handle : Option < JoinHandle < ( ) > > ,
290+ simulate_failure_after_sec : Option < u64 > ,
287291 } ,
288292}
289293
@@ -481,6 +485,15 @@ impl ObjectImpl for PravegaSink {
481485 DEFAULT_RETENTION_MAINTENANCE_INTERVAL_SECONDS ,
482486 glib:: ParamFlags :: WRITABLE ,
483487 ) ,
488+ glib:: ParamSpec :: new_uint64(
489+ PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC ,
490+ "Simulate failure after seconds" ,
491+ "Simulate raw data writting failure after successful specfied seconds of index wrtting" ,
492+ 0 ,
493+ std:: u64 :: MAX ,
494+ 0 ,
495+ glib:: ParamFlags :: WRITABLE ,
496+ ) ,
484497 ] } ) ;
485498 PROPERTIES . as_ref ( )
486499 }
@@ -664,7 +677,20 @@ impl ObjectImpl for PravegaSink {
664677 if let Err ( err) = res {
665678 gst_error ! ( CAT , obj: obj, "Failed to set property `{}`: {}" , PROPERTY_NAME_RETENTION_MAINTENANCE_INTERVAL_SECONDS , err) ;
666679 }
667- } ,
680+ } ,
681+ PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC => {
682+ let res: Result < ( ) , glib:: Error > = match value. get :: < u64 > ( ) {
683+ Ok ( seconds) => {
684+ let mut settings = self . settings . lock ( ) . unwrap ( ) ;
685+ settings. simulate_failure_after_sec = Some ( seconds) ;
686+ Ok ( ( ) )
687+ } ,
688+ Err ( _) => unreachable ! ( "type checked upstream" ) ,
689+ } ;
690+ if let Err ( err) = res {
691+ gst_error ! ( CAT , obj: obj, "Failed to set property `{}`: {}" , PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC , err) ;
692+ }
693+ } ,
668694 _ => unimplemented ! ( ) ,
669695 } ;
670696 }
@@ -744,6 +770,7 @@ impl BaseSinkImpl for PravegaSink {
744770 gst_info ! ( CAT , obj: element, "start: controller={}" , controller) ;
745771 let keycloak_file = settings. keycloak_file . clone ( ) ;
746772 gst_info ! ( CAT , obj: element, "start: keycloak_file={:?}" , keycloak_file) ;
773+ gst_info ! ( CAT , obj: element, "start: simulate_failure_after_sec={:?}" , settings. simulate_failure_after_sec) ;
747774 let config = utils:: create_client_config ( controller, keycloak_file) . map_err ( |error| {
748775 gst:: error_msg!( gst:: ResourceError :: Settings , [ "Failed to create Pravega client config: {}" , error] )
749776 } ) ?;
@@ -845,6 +872,7 @@ impl BaseSinkImpl for PravegaSink {
845872 buffers_written : 0 ,
846873 retention_thread_stop_tx,
847874 retention_thread_handle,
875+ simulate_failure_after_sec : settings. simulate_failure_after_sec ,
848876 } ;
849877 gst_info ! ( CAT , obj: element, "start: Started" ) ;
850878 Ok ( ( ) )
@@ -867,7 +895,8 @@ impl BaseSinkImpl for PravegaSink {
867895 last_index_time,
868896 final_timestamp,
869897 final_offset,
870- buffers_written) = match * state {
898+ buffers_written,
899+ simulate_failure_after_sec) = match * state {
871900 State :: Started {
872901 ref mut writer,
873902 ref mut index_writer,
@@ -876,14 +905,16 @@ impl BaseSinkImpl for PravegaSink {
876905 ref mut final_timestamp,
877906 ref mut final_offset,
878907 ref mut buffers_written,
908+ simulate_failure_after_sec,
879909 ..
880910 } => ( writer,
881911 index_writer,
882912 first_valid_time,
883913 last_index_time,
884914 final_timestamp,
885915 final_offset,
886- buffers_written) ,
916+ buffers_written,
917+ simulate_failure_after_sec) ,
887918 State :: Stopped => {
888919 gst:: element_error!( element, gst:: CoreError :: Failed , [ "Not started yet" ] ) ;
889920 return Err ( gst:: FlowError :: Error ) ;
@@ -1114,6 +1145,18 @@ impl BaseSinkImpl for PravegaSink {
11141145 }
11151146 * final_offset = Some ( writer_offset_end) ;
11161147
1148+ if let Some ( seconds) = simulate_failure_after_sec {
1149+ if !first_valid_time. is_none ( ) {
1150+ if ( timestamp - first_valid_time. to_owned ( ) ) . nanoseconds ( ) . unwrap ( ) > ( seconds as i32 * SECOND ) . nanoseconds ( ) . unwrap ( ) {
1151+ // TODO: close pravega writers
1152+ //drop(writer.get_mut().get_mut().get_mut());
1153+ //drop(index_writer);
1154+ gst:: element_error!( element, gst:: CoreError :: Failed , [ "Simulate pravegasink failure" ] ) ;
1155+ return Err ( gst:: FlowError :: Error ) ;
1156+ }
1157+ }
1158+ }
1159+
11171160 Ok ( gst:: FlowSuccess :: Ok )
11181161 } ) ( ) ;
11191162 gst_trace ! ( CAT , obj: element, "render: END: result={:?}" , result) ;
0 commit comments