Skip to content

Commit e0e105b

Browse files
committed
moved load_last_outputs() to base issuer
1 parent e2957b4 commit e0e105b

File tree

1 file changed

+47
-39
lines changed

1 file changed

+47
-39
lines changed

shub_workflow/issuer/__init__.py

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ class IssuerScript(BaseLoopScript, Generic[ITEMTYPE, PROCESS_INPUT_ARGS_TYPE]):
8181
dedupe = True
8282
min_wait_time_secs_to_flush_stopped_spiders: int = 0
8383
max_inputs_per_loop = -1
84+
# how many days back in time is the limit of output and other downstream files to read for filling
85+
# the deduplication cache. If not set, the deduplication performed will be limited to the cache
86+
# in memory during the live of the job. True deduplicator application requires this to be set.
87+
# You must also call load_last_outputs() on __init__() with the proper arguments for your application.
88+
LOAD_DELIVERED_IDS_DAYS: int
8489

8590
def __init__(self):
8691
super().__init__()
@@ -97,6 +102,7 @@ def __init__(self):
97102
self.dupescounters: Dict[str, int] = Counter()
98103
self.totalcounters: Dict[str, int] = Counter()
99104
self.running_spiders_check_time: Dict[Source, int] = {}
105+
self.__loaded_delivered: bool = False
100106

101107
def add_argparser_options(self):
102108
super().add_argparser_options()
@@ -240,9 +246,50 @@ def _issuer_workflow_loop(self) -> int:
240246
break
241247
return new_inputs_count
242248

249+
def load_last_outputs(self, output_folders: Tuple[str, ...], prefix: str = "", basename_re: Optional[str] = None):
250+
"""
251+
Load ids from the last LOAD_DELIVERED_IDS_DAYS days
252+
output_folders: a tuple containing all output folders from where to read the output files. Typically
253+
it is only the issuer output folder, but in some applications with additional post
254+
processing components (other issuers downstream), it can also be the output of them.
255+
prefix: only select files with the given prefix inside the target folders.
256+
basename_re: only select file names that matches the given regular expression.
257+
"""
258+
dtnow = datetime.utcnow()
259+
count = 0
260+
for output_folder in output_folders:
261+
output_folder = os.path.join(output_folder, prefix)
262+
LOGGER.info(f"Reading output folder {output_folder!r}...")
263+
for fname in self.fshelper.list_path(output_folder):
264+
basename = os.path.basename(fname)
265+
if basename_re is None or re.match(basename_re, basename) is not None:
266+
if m := TSTAMP_RE.search(basename):
267+
tstamp = m.group()
268+
dt = dateparser.parse(tstamp, date_formats=("_%Y%m%dT%H%M%S",))
269+
if dt is None or dtnow - dt > timedelta(days=self.LOAD_DELIVERED_IDS_DAYS):
270+
continue
271+
try:
272+
self.fshelper.download_file(fname)
273+
except Exception:
274+
LOGGER.info(f"{fname} is not anymore there.")
275+
continue
276+
with gzip.open(basename) as r:
277+
for line in r:
278+
rec = json.loads(line)
279+
uid = rec["id"]
280+
self.stats.inc_value(f"urls/seen/{rec['source']}")
281+
self.seen.add(uid)
282+
self.stats.inc_value("urls/seen")
283+
count += 1
284+
self.fshelper.rm_file(basename)
285+
self.__loaded_delivered = True
286+
LOGGER.info(f"Loaded {count} seen ids.")
287+
243288
def on_start(self):
244289
assert hasattr(self, "output_folder"), "'output_folder' attribute is required."
245290
super().on_start()
291+
if hasattr(self, "LOAD_DELIVERED_IDS_DAYS") and not self.__loaded_delivered:
292+
raise AssertionError("You set LOAD_DELIVERED_IDS_DAYS but load_last_outputs() was never called on init.")
246293

247294
def workflow_loop(self) -> bool:
248295
new_inputs = self._issuer_workflow_loop()
@@ -306,7 +353,6 @@ class IssuerScriptWithFileSystemInput(IssuerScript[ITEMTYPE, Tuple[()]]):
306353
# The folder where to move processed input files.
307354
# If None (default), input files will be removed instead.
308355
processed_folder: Union[None, str] = None
309-
LOAD_DELIVERED_IDS_DAYS: int
310356

311357
def get_new_inputs(self) -> Iterable[Tuple[InputSource, Tuple[()]]]:
312358
if self.input_slot is not None:
@@ -339,44 +385,6 @@ def remove_inputs(self, inputs: List[InputSource]):
339385
self.fshelper.mv_file(iname, os.path.join(self.processed_folder, os.path.basename(iname)))
340386
self.stats.inc_value("inputs/moved")
341387

342-
def load_last_outputs(self, output_folders: Tuple[str, ...], prefix: str = "", basename_re: Optional[str] = None):
343-
"""
344-
Load ids from the last LOAD_DELIVERED_IDS_DAYS days
345-
output_folders: a tuple containing all output folders from where to read the output files. Typically
346-
it is only the issuer output folder, but in some applications with additional post
347-
processing components (other issuers downstream), it can also be the output of them.
348-
prefix: only select files with the given prefix inside the target folders.
349-
basename_re: only select file names that matches the given regular expression.
350-
"""
351-
dtnow = datetime.utcnow()
352-
count = 0
353-
for output_folder in output_folders:
354-
output_folder = os.path.join(output_folder, prefix)
355-
LOGGER.info(f"Reading output folder {output_folder!r}...")
356-
for fname in self.fshelper.list_path(output_folder):
357-
basename = os.path.basename(fname)
358-
if basename_re is None or re.match(basename_re, basename) is not None:
359-
if m := TSTAMP_RE.search(basename):
360-
tstamp = m.group()
361-
dt = dateparser.parse(tstamp, date_formats=("_%Y%m%dT%H%M%S",))
362-
if dt is None or dtnow - dt > timedelta(days=self.LOAD_DELIVERED_IDS_DAYS):
363-
continue
364-
try:
365-
self.fshelper.download_file(fname)
366-
except Exception:
367-
LOGGER.info(f"{fname} is not anymore there.")
368-
continue
369-
with gzip.open(basename) as r:
370-
for line in r:
371-
rec = json.loads(line)
372-
uid = rec["id"]
373-
self.stats.inc_value(f"urls/seen/{rec['source']}")
374-
self.seen.add(uid)
375-
self.stats.inc_value("urls/seen")
376-
count += 1
377-
self.fshelper.rm_file(basename)
378-
LOGGER.info(f"Loaded {count} seen ids.")
379-
380388

381389
class IssuerScriptWithSCJobInput(IssuerScript[ITEMTYPE, Tuple[JobDict, SpiderName, SpiderName, Type[Spider]]]):
382390

0 commit comments

Comments
 (0)