File tree Expand file tree Collapse file tree 1 file changed +6
-4
lines changed
Expand file tree Collapse file tree 1 file changed +6
-4
lines changed Original file line number Diff line number Diff line change @@ -219,8 +219,13 @@ def flush_files(self):
219219 for source in list (sources .keys ()):
220220 if len (sources [source ]) > 0 :
221221 self .send_file (slot , source )
222+ self ._remove_pending ()
223+
224+ def _remove_pending (self ):
222225 to_remove = [k for k , v in self .pending_inputs_to_remove .items () if not v ]
223226 self .remove_inputs (to_remove )
227+ for iname in to_remove :
228+ self .pending_inputs_to_remove .pop (iname )
224229
225230 @abc .abstractmethod
226231 def get_new_inputs (self ) -> Iterable [Tuple [InputSource , PROCESS_INPUT_ARGS_TYPE ]]:
@@ -326,10 +331,7 @@ def workflow_loop(self) -> bool:
326331 total_pending += pending_for_source
327332 LOGGER .info (f"Total urls pending to be wrote: { total_pending } " )
328333
329- to_remove = [k for k , v in self .pending_inputs_to_remove .items () if not v ]
330- self .remove_inputs (to_remove )
331- for iname in to_remove :
332- self .pending_inputs_to_remove .pop (iname )
334+ self ._remove_pending ()
333335
334336 now = int (time .time ())
335337 for slot , sources_items_dict in self .items_queue .items ():
You can’t perform that action at this time.
0 commit comments