diff --git a/TIMING_RESULTS.md b/TIMING_RESULTS.md new file mode 100644 index 0000000..cf6339a --- /dev/null +++ b/TIMING_RESULTS.md @@ -0,0 +1,307 @@ +# ADSReferencePipeline Timing Results Guide + +This document explains how to read the Markdown timing report produced by the ADSReferencePipeline benchmark tooling. It is written as a general reference for any future benchmark run, not for one specific run. + +The benchmark report is intended to answer three questions: + +- How fast did the pipeline process references overall? +- Which source formats, parsers, or raw subfamilies were slower or faster? +- What was happening on the host while the benchmark was running? + +## Artifact Files + +An attached benchmark run normally produces several artifacts. + +The most useful files are: + +- `attached_benchmark_manifest.json`: Host-side wrapper manifest. This is the canonical attached-run record. It includes host context, container exit status, and paths to in-container benchmark artifacts. +- `attached_benchmark_manifest.md`: Short host-side summary intended for quick inspection. +- `attached_*.json`: Full in-container benchmark summary. This contains all detailed timing data used to render the Markdown report. +- `attached_*.md`: Full timing report for humans. This is the primary report described by this guide. +- `attached_*.source_types.csv`: Flat source-type table for spreadsheet comparison across runs. +- `perf_events.jsonl`: Raw event stream emitted during the run. This is useful for debugging aggregation logic or building custom analyses. + +## Run Config + +The `Run Config` section records the benchmark inputs and execution options. + +Common fields include: + +- `run_id`: Unique benchmark run identifier. +- `context_id`: Unique metrics context used to isolate events for this run. +- `input_path`: File or directory benchmarked inside the container. +- `extensions`: File patterns used to select benchmark inputs. +- `max_files`: Optional cap on the number of files processed. +- `mode`: Benchmark mode. `mock` replaces resolver calls with deterministic in-process responses; `real` uses the configured resolver service. +- `group_by`: Preferred grouping mode requested by the benchmark command. +- `git_commit`: Git commit detected inside the benchmark environment, when available. +- `timestamp_utc`: UTC timestamp for the benchmark run. +- `system_load_enabled`: Whether in-container system-load sampling was enabled. +- `system_sample_interval_s`: Sampling interval for system-load metrics. +- `warmup`: Whether a warmup pass was attempted before the measured run. + +Use this section first when comparing runs. Differences in input set, mode, file cap, or warmup can make timing numbers non-comparable. + +## Top-Line Results + +The `Top-Line Results` section summarizes the overall run. + +Important fields: + +- `Status`: `complete` means the benchmark completed its expected processing path. `incomplete` means the run should be inspected before comparing performance. +- `Files Processed`: Number of source files that emitted timing events. +- `Records Processed`: Number of reference records observed in the per-record timing events. +- `Throughput`: Overall processed records per minute, based on wall-clock benchmark duration. +- `Load-Adjusted Throughput`: Throughput multiplied by a host-load adjustment factor. This is useful as a rough comparison signal, not a replacement for raw throughput. +- `Wall Duration`: Total measured benchmark wall-clock duration in seconds. + +Throughput is record-based, not file-based. A single input file can produce many records. + +## Per-Record Metrics + +The `Per-Record Metrics (ms)` table reports timing distributions in milliseconds per processed record. + +Columns: + +- `Metric`: Timing category. +- `Count`: Number of timing samples in that category. +- `p50 / record`: Median per-record latency. +- `p95 / record`: 95th percentile per-record latency. +- `p99 / record`: 99th percentile per-record latency. +- `Mean / record`: Average per-record latency. + +Common metrics: + +- `wall_time`: End-to-end time for one record through the measured per-record task path. +- `parse_stage`: Per-record normalized parsing time. File-level parse time is divided by records produced by that file or block. +- `resolver_stage`: Time spent in resolver call handling for one record. In `mock` mode this should be very small because no real network resolver call is made. +- `db_stage`: Per-record normalized database update or insertion time. + +These are per-record values. They are not per file and not totals for the run. + +## Stage Latency + +The `Stage Latency (ms)` section lists lower-level benchmark stages emitted by instrumentation points. + +Unless otherwise noted, stage timings are normalized to per-record milliseconds when a stage handles multiple records. + +Common stages: + +- `file_wall`: Time spent processing one source file. This stage may appear in broader latency summaries but is not the same as per-record wall time. +- `parser_lookup`: Time spent selecting the parser for a source filename. +- `parser_init`: Time spent constructing the parser object. +- `parse_dispatch`: Time spent parsing and dispatching references from a source file. +- `pre_resolved_db`: Time spent creating initial database rows before resolution. +- `queue_references`: Time spent iterating through references and invoking the task path. +- `record_wall`: End-to-end time for one record in the task path. +- `resolver_http`: Time spent resolving one record. In `mock` mode this measures the mock resolver path; in `real` mode it includes the configured resolver request. +- `post_resolved_db`: Time spent updating resolved-reference rows after resolution. + +Use this section to identify which pipeline phase is driving latency. + +## Slowest Source Types + +The `Slowest Source Types` table ranks source types by `Wall p95 / record`. + +Columns: + +- `Source Type`: Normalized input type, such as `.raw`, `.jats.xml`, `.iop.xml`, `.html`, or `.ocr.txt`. +- `Files`: Number of source files in that source-type group. +- `Records`: Number of processed references in that source-type group. +- `Wall p95 / record`: 95th percentile per-record wall time for that source type. +- `Wall Mean / record`: Mean per-record wall time for that source type. +- `Throughput (records/min)`: Estimated throughput for records in that source-type group. + +This table is useful for quickly identifying the source formats with the slowest tail latency. + +## Source Types + +The `Source Types` table provides a full source-type breakdown. + +All mean values are per-record milliseconds. + +Columns: + +- `Source Type`: Normalized file/source type. +- `Files`: Number of input source files in the group. +- `Records`: Number of extracted and processed references in the group. +- `Wall Mean / record`: Mean end-to-end per-record wall time. +- `Parse Mean / record`: Mean per-record normalized parsing time. +- `Resolver Mean / record`: Mean per-record resolver time. +- `DB Mean / record`: Mean per-record database time. +- `Throughput (records/min)`: Estimated records-per-minute throughput for that source type. + +Important interpretation: + +- `Files` and `Records` are different units. +- `Wall Mean / record` is not the time to process one file. +- A file type with high total runtime may still have a low per-record wall mean if it produced many references. +- A source type with only one file or very few records may have unstable percentile estimates. + +## Parsers + +The `Parsers` section groups records by parser name. + +Columns: + +- `Parser`: Parser selected by ADSReferencePipeline. +- `Files`: Number of source files handled by that parser. +- `Records`: Number of processed references handled by that parser. +- `Wall Mean / record`: Mean per-record wall time for records handled by that parser. + +Use parser breakdowns when multiple source types map to related parser behavior, or when a single source type can be processed differently based on journal/path matching. + +## Raw Subfamilies + +The `.raw` source type includes several materially different input families. The `Raw Subfamilies` section preserves these distinctions without relying on noisy filename artifacts such as `.z.raw`. + +Common raw subfamilies: + +- `raw_arxiv`: arXiv-style raw input. +- `raw_adstxt`: generic ADS text raw input. +- `raw_ref_raw`: `*.ref.raw` input. +- `raw_pasj_html`: PASJ HTML-derived raw input. +- `raw_pasp_html`: PASP HTML-derived raw input. +- `raw_jlven_html`: JLVEn HTML-derived raw input. +- `raw_aas`: AAS raw fixture family. +- `raw_icarus`: Icarus raw fixture family. +- `raw_pthph`: PThPh raw input. +- `raw_pthps`: PThPS raw input. +- `raw_other`: Fallback for raw inputs that do not match a known subfamily. + +Columns: + +- `Raw Subfamily`: Normalized raw subtype. +- `Files`: Number of raw source files in that subfamily. +- `Records`: Number of processed references in that subfamily. +- `Wall Mean / record`: Mean per-record wall time. +- `Wall p95 / record`: 95th percentile per-record wall time. +- `Throughput (records/min)`: Estimated records-per-minute throughput. + +Use this section when `.raw` looks unusually fast or slow. The overall `.raw` source-type row can hide meaningful differences between raw formats. + +## System Load + +The `System Load` section describes the benchmark host while the run was executing. + +Collection fields: + +- `Enabled`: Whether system sampling was enabled. +- `Sample Count`: Number of samples collected. +- `Sample Interval`: Requested seconds between samples. +- `Platform`: Operating system reported by Python. +- `CPU Count`: CPU count visible to the benchmark process. +- `Memory Probe`: Method used to collect memory data, such as `linux_meminfo` or `macos_vm_stat`. + +Raw CPU load fields: + +- `Mean Raw Load (1m / 5m / 15m)`: Mean host load average across collected samples. +- `Peak Raw Load (1m)`: Maximum observed 1-minute load average. + +Normalized CPU load fields: + +- `Mean Normalized Load (1m)`: Mean 1-minute load divided by CPU count. +- `Peak Normalized Load (1m)`: Maximum normalized 1-minute load. + +Normalized load interpretation: + +- Around `0.0` to `1.0`: Load is at or below visible CPU capacity. +- Greater than `1.0`: More runnable work than visible CPU capacity. +- This is a coarse indicator. It is not the same as CPU utilization percentage. + +Memory fields: + +- `Mean Memory Total`: Mean total memory visible to the sampler. +- `Mean Memory Available`: Mean available memory. +- `Minimum Memory Available`: Lowest observed available memory. +- `Mean Memory Used`: Mean estimated used memory. +- `Peak Memory Used`: Highest estimated used memory. +- `Mean Memory Available Ratio`: Available memory divided by total memory. +- `Minimum Memory Available Ratio`: Lowest observed available-memory ratio. +- `Mean Memory Used Ratio`: Used memory divided by total memory. +- `Peak Memory Used Ratio`: Highest observed used-memory ratio. + +The `System Load Samples` table gives mean, min, max, p50, and p95 for raw load, normalized load, and memory metrics. + +Use this section when comparing runs. A slower run under higher load or lower available memory may reflect host contention rather than a pipeline regression. + +## CSV Output + +The `attached_*.source_types.csv` file contains a flat source-type summary for comparison across benchmark runs. + +Typical columns: + +- `source_type` +- `file_count` +- `record_count` +- `wall_mean_ms` +- `wall_p95_ms` +- `parse_mean_ms` +- `resolver_mean_ms` +- `db_mean_ms` +- `throughput_records_per_minute` + +This CSV is best for trend tracking or spreadsheet comparison. It does not include every nested metric from the JSON summary. + +## JSON Output + +The full `attached_*.json` benchmark summary is the most complete machine-readable output. + +Important top-level keys: + +- `counts`: File and record counts. +- `duration_s`: Overall wall-clock duration. +- `throughput`: Raw and load-adjusted throughput. +- `latency_ms`: Stage latency statistics. +- `task_timing_ms`: Task-level timing statistics. +- `app_timing_ms`: Application-level timing statistics. +- `per_record_metrics_ms`: Main per-record timing summary. +- `source_type_breakdown`: Metrics grouped by normalized source type. +- `parser_breakdown`: Metrics grouped by parser. +- `raw_subfamily_breakdown`: Metrics grouped by raw subfamily. +- `system_load`: Raw samples and aggregated host-load/memory statistics. +- `run_metadata`: Configuration and identifying metadata. +- `errors`: Stage-level error counts. + +Use JSON for automated analysis and Markdown for human review. + +## Interpreting Mock vs Real Mode + +Benchmark mode strongly affects interpretation. + +In `mock` mode: + +- Resolver calls are replaced by deterministic in-process responses. +- `Resolver Mean / record` should be near zero. +- Results are useful for measuring parser, database, task orchestration, and benchmark overhead. +- Results do not measure real resolver network/service latency. + +In `real` mode: + +- Resolver calls use the configured resolver service. +- `Resolver Mean / record` includes real service request behavior. +- Results are closer to production end-to-end performance. +- Results may vary more due to network, service, and external load. + +Do not compare `mock` and `real` runs as if they measure the same workload. + +## Comparison Tips + +For meaningful comparisons: + +- Compare runs with the same `mode`. +- Compare runs with the same input path and extension selection. +- Check `Files Processed` and `Records Processed` before comparing throughput. +- Inspect `System Load` to identify host contention. +- Prefer p95 latency for tail-performance regressions. +- Use source-type and raw-subfamily breakdowns to avoid hiding format-specific behavior. +- Treat very small groups with caution; percentile values are less reliable when record counts are low. + +## Common Misreadings + +- `Wall Mean / record` is not per file. +- `Throughput` is records per minute, not files per minute. +- `Parse Mean / record` is often normalized from file-level parse timing. +- In `mock` mode, resolver timing is intentionally tiny. +- `Load-Adjusted Throughput` is a rough comparison aid, not a direct measurement. +- A source type with more files is not necessarily slower; compare record counts and per-record latency. diff --git a/adsrefpipe/app.py b/adsrefpipe/app.py index 5ca8c33..ac99573 100755 --- a/adsrefpipe/app.py +++ b/adsrefpipe/app.py @@ -10,6 +10,7 @@ from datetime import datetime, timedelta from typing import List, Dict +from adsrefpipe import perf_metrics from adsrefpipe.models import Action, Parser, ReferenceSource, ProcessedHistory, ResolvedReference, CompareClassic from adsrefpipe.utils import get_date_created, get_date_modified, get_date_now, get_resolved_filename, \ compare_classic_and_service, ReprocessQueryType @@ -116,40 +117,45 @@ def get_parser(self, source_filename: str) -> Dict: :param source_filename: filename of the source reference :return: parser details as a dictionary """ - if not self.default_parsers: - self.init_default_parsers() - - journal, volume, basefile = source_filename.split('/')[-3:] - if journal and volume and basefile: - match = self.RE_MATCH_EXT.search(basefile) - if match: - # with multiple extensions - extension = match.group(1) - else: - # with single extension - extension = ".%s"%basefile.rsplit('.', 1)[-1] + event_extra = perf_metrics.build_event_extra(source_filename=source_filename) + with perf_metrics.timed_profile( + category='app_timing', + name='get_parser', + extra=event_extra, + ): + if not self.default_parsers: + self.init_default_parsers() + + journal, volume, basefile = source_filename.split('/')[-3:] + if journal and volume and basefile: + match = self.RE_MATCH_EXT.search(basefile) + if match: + # with multiple extensions + extension = match.group(1) + else: + # with single extension + extension = ".%s"%basefile.rsplit('.', 1)[-1] - # if one of the default ones - if self.default_parsers.get(extension, None): - return self.default_parsers[extension] + # if one of the default ones + if self.default_parsers.get(extension, None): + return self.default_parsers[extension] - with self.session_scope() as session: - # start_time = time.time() - rows = session.query(Parser).filter(and_(Parser.extension_pattern == extension, - Parser.matches.contains([{"journal": journal}]))).all() - # if no records, try with single extension, if possible - if not rows and extension.count('.') >= 2: - rows = session.query(Parser).filter(and_(Parser.extension_pattern == extension[extension.rfind('.'):], + with self.session_scope() as session: + rows = session.query(Parser).filter(and_(Parser.extension_pattern == extension, Parser.matches.contains([{"journal": journal}]))).all() - if len(rows) == 1: - return rows[0].toJSON() - if len(rows) > 1: - match = self.match_parser(rows, journal, volume) - if match: - return match - else: - self.logger.error("Unrecognizable source file %s."%source_filename) - return {} + # if no records, try with single extension, if possible + if not rows and extension.count('.') >= 2: + rows = session.query(Parser).filter(and_(Parser.extension_pattern == extension[extension.rfind('.'):], + Parser.matches.contains([{"journal": journal}]))).all() + if len(rows) == 1: + return rows[0].toJSON() + if len(rows) > 1: + match = self.match_parser(rows, journal, volume) + if match: + return match + else: + self.logger.error("Unrecognizable source file %s."%source_filename) + return {} def get_reference_service_endpoint(self, parsername: str) -> str: """ @@ -496,30 +502,41 @@ def populate_tables_pre_resolved_initial_status(self, source_bibcode: str, sourc :param references: List of references :return: List of processed references """ - with self.session_scope() as session: - try: - reference_record = ReferenceSource(bibcode=source_bibcode, - source_filename=source_filename, - resolved_filename=get_resolved_filename(source_filename), - parser_name=parsername) - bibcode, filename = self.insert_reference_source_record(session, reference_record) - if bibcode and filename: - history_record = ProcessedHistory(bibcode=bibcode, - source_filename=source_filename, - source_modified=get_date_modified(source_filename), - status=Action().get_status_new(), - date=get_date_now(), - total_ref=len(references)) - history_id = self.insert_history_record(session, history_record) - resolved_records, references = self.populate_resolved_reference_records_pre_resolved(references, history_id) - self.insert_resolved_reference_records(session, resolved_records) - session.commit() - self.logger.info("Source file %s for bibcode %s with %d references, processed successfully." % (source_filename, source_bibcode, len(references))) - return references - except SQLAlchemyError as e: - session.rollback() - self.logger.error("Source file %s information failed to get added to database. Error: %s" % (source_filename, str(e))) - return [] + event_extra = perf_metrics.build_event_extra( + source_filename=source_filename, + parser_name=parsername, + source_bibcode=source_bibcode, + record_count=len(references), + ) + with perf_metrics.timed_profile( + category='app_timing', + name='populate_tables_pre_resolved_initial_status', + extra=event_extra, + ): + with self.session_scope() as session: + try: + reference_record = ReferenceSource(bibcode=source_bibcode, + source_filename=source_filename, + resolved_filename=get_resolved_filename(source_filename), + parser_name=parsername) + bibcode, filename = self.insert_reference_source_record(session, reference_record) + if bibcode and filename: + history_record = ProcessedHistory(bibcode=bibcode, + source_filename=source_filename, + source_modified=get_date_modified(source_filename), + status=Action().get_status_new(), + date=get_date_now(), + total_ref=len(references)) + history_id = self.insert_history_record(session, history_record) + resolved_records, references = self.populate_resolved_reference_records_pre_resolved(references, history_id) + self.insert_resolved_reference_records(session, resolved_records) + session.commit() + self.logger.info("Source file %s for bibcode %s with %d references, processed successfully." % (source_filename, source_bibcode, len(references))) + return references + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Source file %s information failed to get added to database. Error: %s" % (source_filename, str(e))) + return [] def populate_tables_pre_resolved_retry_status(self, source_bibcode: str, source_filename: str, source_modified: str, retry_records: List[Dict]) -> List[Dict]: """ @@ -531,24 +548,34 @@ def populate_tables_pre_resolved_retry_status(self, source_bibcode: str, source_ :param retry_records: List of references to be reprocessed :return: List of processed references """ - with self.session_scope() as session: - try: - history_record = ProcessedHistory(bibcode=source_bibcode, - source_filename=source_filename, - source_modified=source_modified, - status=Action().get_status_retry(), - date=get_date_now(), - total_ref=len(retry_records)) - history_id = self.insert_history_record(session, history_record) - resolved_records, references = self.populate_resolved_reference_records_pre_resolved(retry_records, history_id) - self.insert_resolved_reference_records(session, resolved_records) - session.commit() - self.logger.info("Source file %s for bibcode %s with %d references, for reprocessing added successfully." % (source_filename, source_bibcode, len(references))) - return references - except SQLAlchemyError as e: - session.rollback() - self.logger.error("Source file %s information for reprocessing failed to get added to database. Error: %s" % (source_filename, str(e))) - return [] + event_extra = perf_metrics.build_event_extra( + source_filename=source_filename, + source_bibcode=source_bibcode, + record_count=len(retry_records), + ) + with perf_metrics.timed_profile( + category='app_timing', + name='populate_tables_pre_resolved_retry_status', + extra=event_extra, + ): + with self.session_scope() as session: + try: + history_record = ProcessedHistory(bibcode=source_bibcode, + source_filename=source_filename, + source_modified=source_modified, + status=Action().get_status_retry(), + date=get_date_now(), + total_ref=len(retry_records)) + history_id = self.insert_history_record(session, history_record) + resolved_records, references = self.populate_resolved_reference_records_pre_resolved(retry_records, history_id) + self.insert_resolved_reference_records(session, resolved_records) + session.commit() + self.logger.info("Source file %s for bibcode %s with %d references, for reprocessing added successfully." % (source_filename, source_bibcode, len(references))) + return references + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Source file %s information for reprocessing failed to get added to database. Error: %s" % (source_filename, str(e))) + return [] def populate_tables_post_resolved(self, resolved_reference: List, source_bibcode: str, classic_resolved_filename: str) -> bool: """ @@ -559,15 +586,25 @@ def populate_tables_post_resolved(self, resolved_reference: List, source_bibcode :param classic_resolved_filename: filename of classic resolved references :return: True if successful """ - with self.session_scope() as session: - try: - # if the filename for classic resolver output is supplied, read the resolved information - # make sure that the length matches resolved, classic does some breaking a reference into two - # and hence messes up the order if we want to compare one-to-one, if that is the case, just - # ignore the result - resolved_classic = None - if classic_resolved_filename: - resolved_classic = compare_classic_and_service(resolved_reference, source_bibcode, classic_resolved_filename) + event_extra = perf_metrics.build_event_extra( + source_bibcode=source_bibcode, + record_count=len(resolved_reference), + source_filename=classic_resolved_filename, + ) + with perf_metrics.timed_profile( + category='app_timing', + name='populate_tables_post_resolved', + extra=event_extra, + ): + with self.session_scope() as session: + try: + # if the filename for classic resolver output is supplied, read the resolved information + # make sure that the length matches resolved, classic does some breaking a reference into two + # and hence messes up the order if we want to compare one-to-one, if that is the case, just + # ignore the result + resolved_classic = None + if classic_resolved_filename: + resolved_classic = compare_classic_and_service(resolved_reference, source_bibcode, classic_resolved_filename) resolved_records = [] compare_records = [] @@ -594,18 +631,16 @@ def populate_tables_post_resolved(self, resolved_reference: List, source_bibcode score=int(resolved_classic[i][2]), state=resolved_classic[i][3]) compare_records.append(compare_record) + self.update_resolved_reference_records(session, resolved_records) if resolved_classic: - self.update_resolved_reference_records(session, resolved_records) self.insert_compare_records(session, compare_records) - else: - self.update_resolved_reference_records(session, resolved_records) session.commit() self.logger.info("Updated %d resolved reference records successfully." % len(resolved_reference)) return True - except SQLAlchemyError as e: - session.rollback() - self.logger.error("Failed to update %d resolved reference records successfully. Error %s" % (len(resolved_reference), str(e))) - return False + except SQLAlchemyError as e: + session.rollback() + self.logger.error("Failed to update %d resolved reference records successfully. Error %s" % (len(resolved_reference), str(e))) + return False def get_count_reference_source_records(self, session: object) -> int: """ diff --git a/adsrefpipe/benchmark.py b/adsrefpipe/benchmark.py new file mode 100644 index 0000000..3f21c34 --- /dev/null +++ b/adsrefpipe/benchmark.py @@ -0,0 +1,362 @@ +"""Benchmark CLI for ADSReferencePipeline throughput profiling.""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import threading +import time +import uuid +from contextlib import contextmanager +from datetime import datetime, timezone +from typing import Any, Dict, Iterable, List, Optional, TypedDict + +try: + from adsputils import load_config +except ImportError: # pragma: no cover + def load_config(*args, **kwargs): + return {} + +import adsrefpipe.perf_metrics as perf_metrics +import adsrefpipe.utils as utils + + +DEFAULT_EXTENSIONS = "*.raw,*.xml,*.txt,*.html,*.tex,*.refs,*.pairs" +LOGGER = logging.getLogger(__name__) + + +class SourceFileClassification(TypedDict): + source_filename: str + parser_name: Optional[str] + input_extension: Optional[str] + source_type: Optional[str] + + +def _pipeline_run_module(): + import run as pipeline_run + + return pipeline_run + + +def _utc_timestamp() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def _parse_csv_list(value: str) -> List[str]: + if not value: + return [] + return [item.strip() for item in str(value).split(",") if item.strip()] + + +def _sample_interval_arg(value: str) -> float: + parsed = float(value) + if parsed < 0.1: + raise argparse.ArgumentTypeError("--system-sample-interval must be >= 0.1 seconds") + return parsed + + +def _safe_git_commit() -> Optional[str]: + try: + import subprocess + + proc = subprocess.run( + ["git", "rev-parse", "HEAD"], + capture_output=True, + text=True, + check=False, + ) + if proc.returncode == 0: + return proc.stdout.strip() or None + except Exception: + return None + return None + + +def collect_candidate_files(input_path: str, extensions: Iterable[str]) -> List[str]: + if os.path.isfile(input_path): + return [input_path] + + patterns = [pattern.strip() for pattern in extensions if pattern.strip()] + if not patterns: + patterns = ["*"] + + matched = [] + for root, _, files in os.walk(input_path): + for basename in files: + if basename.endswith(".result") or basename.endswith(".pyc") or basename == ".DS_Store": + continue + full_path = os.path.join(root, basename) + for pattern in patterns: + if __import__("fnmatch").fnmatch(basename, pattern): + matched.append(full_path) + break + return sorted(set(matched)) + + +def classify_source_file( + filename: str, + parser_info: Optional[Dict[str, object]] = None, +) -> SourceFileClassification: + parser_info = parser_info or {} + input_extension = parser_info.get("extension_pattern") or perf_metrics.source_type_from_filename(filename) + source_type = input_extension or perf_metrics.source_type_from_filename(filename) + return { + "source_filename": filename, + "parser_name": parser_info.get("name"), + "input_extension": input_extension, + "source_type": source_type, + } + + +def _mock_resolved_reference(reference: dict, service_url: str) -> list: + reference_id = str(reference.get("id") or "mock-record") + refstring = reference.get("refstr") or reference.get("refplaintext") or reference.get("refraw") or "" + return [{ + "id": reference_id, + "refstring": refstring, + "bibcode": "2000mock........A", + "scix_id": "mock:%s" % reference_id, + "score": 1.0, + "external_identifier": ["mock:%s" % reference_id], + "publication_year": reference.get("publication_year"), + "refereed_status": reference.get("refereed_status"), + "service_url": service_url, + }] + + +@contextmanager +def mock_resolver(enabled: bool): + original = utils.post_request_resolved_reference + if enabled: + utils.post_request_resolved_reference = _mock_resolved_reference + try: + yield + finally: + if enabled: + utils.post_request_resolved_reference = original + + +@contextmanager +def benchmark_environment( + run_id: str, + context_id: str, + events_path: str, + mode: str, + config: Optional[dict] = None, +): + previous = {} + updates = { + "PERF_METRICS_ENABLED": "true", + "PERF_METRICS_RUN_ID": str(run_id), + "PERF_METRICS_CONTEXT_ID": str(context_id), + "PERF_METRICS_PATH": str(events_path), + "PERF_BENCHMARK_MODE": str(mode), + "PERF_BENCHMARK_CONTINUE_ON_ERROR": "true", + } + context_dir = perf_metrics.metrics_context_dir(config=config) + if context_dir: + updates["PERF_METRICS_CONTEXT_DIR"] = context_dir + + for key, value in updates.items(): + previous[key] = os.environ.get(key) + os.environ[key] = value + + perf_metrics.register_run_metrics_context( + run_id=run_id, + enabled=True, + path=events_path, + context_id=context_id, + config=config, + context_dir=context_dir, + ) + try: + yield + finally: + for key, old_value in previous.items(): + if old_value is None: + os.environ.pop(key, None) + else: + os.environ[key] = old_value + + +def _write_run_artifacts(summary: Dict[str, object], output_dir: str) -> Dict[str, str]: + os.makedirs(output_dir, exist_ok=True) + run_id = ((summary.get("run_metadata") or {}).get("run_id") or "unknown") + stem = "ads_reference_benchmark_%s_run%s" % (_utc_timestamp(), run_id) + json_path = os.path.join(output_dir, "%s.json" % stem) + md_path = os.path.join(output_dir, "%s.md" % stem) + csv_path = os.path.join(output_dir, "%s.source_types.csv" % stem) + perf_metrics.write_json(json_path, summary) + perf_metrics.render_markdown(summary, md_path) + perf_metrics.write_source_type_csv(summary, csv_path) + return {"json": json_path, "markdown": md_path, "source_type_csv": csv_path} + + +def _run_warmup(files: List[str], mode: str) -> None: + if not files: + return + try: + with mock_resolver(mode == "mock"): + _pipeline_run_module().process_files(files[:1]) + except Exception: + # Warmup is best-effort and should not prevent the measured run from + # executing, especially in real mode where a resolver can rate-limit. + return + + +def _run_case( + input_path: str, + extensions: List[str], + max_files: Optional[int], + mode: str, + events_path: str, + system_sample_interval_s: float, + system_load_enabled: bool, + warmup: bool, + group_by: str, +) -> Dict[str, Any]: + config = load_config(proj_home=os.path.realpath(os.path.join(os.path.dirname(__file__), "../"))) + all_files = collect_candidate_files(input_path, extensions) + selected_files = all_files[:max_files] if max_files else all_files + + if not selected_files: + raise RuntimeError("No benchmark candidate files found under %s" % input_path) + + if warmup: + _run_warmup(selected_files, mode=mode) + + run_id = uuid.uuid4().hex + context_id = uuid.uuid4().hex + system_samples = [] + sampler_stop = threading.Event() + + def _sample_loop() -> None: + while not sampler_stop.wait(system_sample_interval_s): + system_samples.append(perf_metrics.collect_system_sample()) + + sampler_thread = None + start_wall = time.time() + with benchmark_environment(run_id=run_id, context_id=context_id, events_path=events_path, mode=mode, config=config): + try: + if system_load_enabled: + system_samples.append(perf_metrics.collect_system_sample()) + sampler_thread = threading.Thread(target=_sample_loop, daemon=True) + sampler_thread.start() + + with mock_resolver(mode == "mock"): + _pipeline_run_module().process_files(selected_files) + finally: + if system_load_enabled: + sampler_stop.set() + if sampler_thread is not None: + sampler_thread.join(timeout=max(0.1, system_sample_interval_s)) + if sampler_thread.is_alive(): + LOGGER.warning( + "System load sampler thread did not terminate before aggregation", + extra={ + "sample_interval_s": system_sample_interval_s, + "run_id": run_id, + "context_id": context_id, + }, + ) + system_samples.append(perf_metrics.collect_system_sample()) + end_wall = time.time() + + events = perf_metrics.load_events(events_path, run_id=run_id, context_id=context_id) + summary = perf_metrics.aggregate_ads_events( + events, + started_at=start_wall, + ended_at=end_wall, + expected_files=len(selected_files), + ) + summary["run_metadata"] = { + "run_id": run_id, + "context_id": context_id, + "input_path": input_path, + "extensions": extensions, + "max_files": max_files, + "mode": mode, + "group_by": group_by, + "git_commit": _safe_git_commit(), + "timestamp_utc": _utc_timestamp(), + "system_sample_interval_s": system_sample_interval_s, + "system_load_enabled": system_load_enabled, + "warmup": bool(warmup), + } + summary["selected_files"] = selected_files + summary["counts"]["files_selected"] = len(selected_files) + summary["system_load"] = perf_metrics.aggregate_system_samples( + system_samples if system_load_enabled else [], + enabled=system_load_enabled, + sample_interval_s=system_sample_interval_s, + ) + perf_metrics.apply_system_load_adjustment(summary) + return summary + + +def cmd_run(args) -> int: + config = load_config(proj_home=os.path.realpath(os.path.join(os.path.dirname(__file__), "../"))) + output_dir = args.output_dir or config.get("PERF_METRICS_OUTPUT_DIR", os.path.join("logs", "benchmarks")) + os.makedirs(output_dir, exist_ok=True) + events_path = args.events_path or os.path.join(output_dir, "perf_events.jsonl") + extensions = _parse_csv_list(args.extensions) or _parse_csv_list(DEFAULT_EXTENSIONS) + + summary = _run_case( + input_path=args.input_path, + extensions=extensions, + max_files=args.max_files, + mode=args.mode, + events_path=events_path, + system_sample_interval_s=args.system_sample_interval, + system_load_enabled=not bool(args.disable_system_load), + warmup=bool(args.warmup), + group_by=args.group_by, + ) + + artifacts = _write_run_artifacts(summary, output_dir=output_dir) + print(json.dumps({ + "status": summary.get("status"), + "throughput": ((summary.get("throughput") or {}).get("overall_records_per_minute")), + "load_adjusted_throughput": ((summary.get("throughput") or {}).get("load_adjusted_records_per_minute")), + "json": artifacts["json"], + "markdown": artifacts["markdown"], + "source_type_csv": artifacts["source_type_csv"], + }, indent=2, sort_keys=True)) + return 0 if summary.get("status") == "complete" else 2 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="ADS reference throughput benchmark CLI") + subparsers = parser.add_subparsers(dest="command", required=True) + + run_parser = subparsers.add_parser("run", help="Run one benchmark configuration") + run_parser.add_argument( + "--input-path", + default=os.path.join(os.path.dirname(__file__), "tests", "unittests", "stubdata"), + help="File or directory to benchmark", + ) + run_parser.add_argument("--extensions", default=DEFAULT_EXTENSIONS) + run_parser.add_argument("--max-files", type=int, default=None) + run_parser.add_argument("--mode", choices=["real", "mock"], default="mock") + run_parser.add_argument("--output-dir", default=None) + run_parser.add_argument("--events-path", default=None) + run_parser.add_argument("--timeout", type=int, default=900) + run_parser.add_argument("--system-sample-interval", type=_sample_interval_arg, default=1.0) + run_parser.add_argument("--disable-system-load", action="store_true", default=False) + run_parser.add_argument("--group-by", choices=["source_type", "parser", "none"], default="source_type") + run_parser.add_argument("--no-warmup", dest="warmup", action="store_false") + run_parser.set_defaults(warmup=True) + run_parser.set_defaults(func=cmd_run) + return parser + + +def main(argv: Optional[List[str]] = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + return int(args.func(args)) + + +if __name__ == "__main__": # pragma: no cover + raise SystemExit(main()) diff --git a/adsrefpipe/perf_metrics.py b/adsrefpipe/perf_metrics.py new file mode 100644 index 0000000..9802f91 --- /dev/null +++ b/adsrefpipe/perf_metrics.py @@ -0,0 +1,1304 @@ +"""Performance metrics helpers for ADSReferencePipeline benchmarks. + +This module stays stdlib-only and uses file-backed JSONL events so benchmark +instrumentation remains lightweight and safe to leave disabled by default. +""" + +from __future__ import annotations + +import json +import logging +import os +import platform +import re +import threading +import time +from contextlib import contextmanager +from functools import wraps +from typing import Any, Dict, Iterable, List, Optional, TypedDict + +LOGGER = logging.getLogger(__name__) +_EVENT_WRITE_LOCK = threading.Lock() + + +class AdsBenchmarkSummary(TypedDict): + counts: Dict[str, Any] + throughput: Dict[str, Any] + latency_ms: Dict[str, Any] + task_timing_ms: Dict[str, Any] + app_timing_ms: Dict[str, Any] + duration_s: Dict[str, Any] + per_record_metrics_ms: Dict[str, Any] + source_type_breakdown: Dict[str, Any] + parser_breakdown: Dict[str, Any] + raw_subfamily_breakdown: Dict[str, Any] + errors: Dict[str, Any] + status: str + selected_files: List[str] + file_wall_ms: Dict[str, Any] + + +_PROGRESS_MESSAGE_RE = re.compile( + r"^Source file (?P.+?) for bibcode .+? with " + r"(?P\d+) references, processed successfully\.$" +) + + +def format_benchmark_progress_line_from_log_line(line: str) -> Optional[str]: + """Return a compact benchmark progress line for one structured log line.""" + try: + payload = json.loads(line) + except (TypeError, json.JSONDecodeError): + return None + if not isinstance(payload, dict): + return None + + message = payload.get("message") + if not isinstance(message, str): + return None + match = _PROGRESS_MESSAGE_RE.match(message) + if not match: + return None + + timestamp = payload.get("timestamp") or payload.get("asctime") + if not isinstance(timestamp, str): + return None + if "T" in timestamp: + time_part = timestamp.split("T", 1)[1] + else: + time_part = timestamp + time_part = time_part.split(".", 1)[0].replace("Z", "") + + return "%s %s with %d references" % ( + time_part, + os.path.basename(match.group("source_filename")), + int(match.group("record_count")), + ) + + +def _as_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if value is None: + return False + return str(value).strip().lower() in {"1", "true", "yes", "on", "active"} + + +def _metrics_debug(message: str, **context: Any) -> None: + if context: + suffix = ", ".join("%s=%r" % (key, value) for key, value in sorted(context.items())) + LOGGER.debug("%s (%s)", message, suffix) + return + LOGGER.debug("%s", message) + + +def _deep_get(data: Optional[Dict[str, Any]], *keys: str, default: Any = None) -> Any: + current: Any = data + for key in keys: + if not isinstance(current, dict): + return default + current = current.get(key) + if current is None: + return default + return current + + +def metrics_enabled(config: Optional[dict] = None) -> bool: + env_value = os.getenv("PERF_METRICS_ENABLED") + if env_value is not None: + return _as_bool(env_value) + if config is None: + return False + return _as_bool(config.get("PERF_METRICS_ENABLED", False)) + + +def metrics_path(config: Optional[dict] = None) -> Optional[str]: + env_path = os.getenv("PERF_METRICS_PATH") + if env_path: + return env_path + if config is None: + return None + config_path = config.get("PERF_METRICS_PATH") + if config_path: + return config_path + output_dir = config.get("PERF_METRICS_OUTPUT_DIR") + if output_dir: + return os.path.join(output_dir, "perf_events.jsonl") + return None + + +def metrics_context_dir(config: Optional[dict] = None) -> Optional[str]: + env_dir = os.getenv("PERF_METRICS_CONTEXT_DIR") + if env_dir: + return env_dir + if config is not None: + config_dir = config.get("PERF_METRICS_CONTEXT_DIR") + if config_dir: + return config_dir + base_path = metrics_path(config=config) + if base_path: + return os.path.join(os.path.dirname(base_path), "perf_run_context") + return None + + +def current_run_id() -> Optional[str]: + run_id = os.getenv("PERF_METRICS_RUN_ID") + return str(run_id) if run_id else None + + +def current_context_id() -> Optional[str]: + context_id = os.getenv("PERF_METRICS_CONTEXT_ID") + return str(context_id) if context_id else None + + +def current_run_mode() -> Optional[str]: + value = os.getenv("PERF_BENCHMARK_MODE") or os.getenv("PERF_RUN_MODE") + return str(value) if value else None + + +def source_type_from_filename(filename: Optional[str]) -> Optional[str]: + if not filename: + return None + basename = os.path.basename(str(filename)) + if "." not in basename: + return None + + parts = basename.split(".") + last = parts[-1].lower() + prev = parts[-2].lower() if len(parts) >= 2 else "" + + if last == "xml": + if len(parts) >= 3 and len(prev) > 1: + return ".%s.xml" % prev + return ".xml" + + if last == "raw": + return ".raw" + + if last == "txt": + if prev == "ocr": + return ".ocr.txt" + return ".txt" + + if last in {"html", "tex", "refs", "pairs"}: + return ".%s" % last + + return ".%s" % last + + +def journal_from_filename(filename: Optional[str]) -> Optional[str]: + if not filename: + return None + parts = str(filename).replace("\\", "/").split("/") + if len(parts) < 3: + return None + return parts[-3] or None + + +def raw_subfamily_from_metadata( + filename: Optional[str] = None, + parser_name: Optional[str] = None, + input_extension: Optional[str] = None, + source_type: Optional[str] = None, +) -> Optional[str]: + effective_source_type = source_type or input_extension or source_type_from_filename(filename) + if effective_source_type != ".raw": + return None + + parser = str(parser_name or "").strip() + journal = str(journal_from_filename(filename) or "").strip() + basename = os.path.basename(str(filename or "")).lower() + + if parser == "arXiv" or journal == "arXiv": + return "raw_arxiv" + if parser == "ThreeBibsTxt" or basename.endswith(".ref.raw"): + return "raw_ref_raw" + if parser == "JLVEnHTML" or journal == "JLVEn": + return "raw_jlven_html" + if parser == "PASJhtml" or journal == "PASJ": + return "raw_pasj_html" + if parser == "PASPhtml" or journal == "PASP": + return "raw_pasp_html" + if parser == "AAS" or basename.endswith(".aas.raw"): + return "raw_aas" + if parser == "ICARUS" or basename.endswith(".icarus.raw"): + return "raw_icarus" + if parser == "PThPhTXT" or journal == "PThPh": + return "raw_pthph" + if journal == "PThPS": + return "raw_pthps" + if parser == "ADStxt" or journal == "ADS": + return "raw_adstxt" + return "raw_other" + + +def build_event_extra( + source_filename: Optional[str] = None, + parser_name: Optional[str] = None, + source_bibcode: Optional[str] = None, + input_extension: Optional[str] = None, + source_type: Optional[str] = None, + record_count: Optional[int] = None, + extra: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + payload = dict(extra or {}) + payload.setdefault("source_filename", source_filename) + payload.setdefault("parser_name", parser_name) + payload.setdefault("source_bibcode", source_bibcode) + payload.setdefault("input_extension", input_extension or source_type_from_filename(source_filename)) + payload.setdefault("source_type", source_type or payload.get("input_extension") or source_type_from_filename(source_filename)) + payload.setdefault( + "raw_subfamily", + raw_subfamily_from_metadata( + filename=source_filename, + parser_name=parser_name, + input_extension=payload.get("input_extension"), + source_type=payload.get("source_type"), + ), + ) + payload.setdefault("run_mode", current_run_mode()) + if record_count is not None: + payload["record_count"] = int(record_count) + return payload + + +def _run_context_path( + run_id: Any, + config: Optional[dict] = None, + context_dir: Optional[str] = None, + context_id: Optional[str] = None, +) -> Optional[str]: + if run_id is None: + return None + directory = context_dir or metrics_context_dir(config=config) + if not directory: + return None + if context_id: + return os.path.join(directory, "run_%s_%s.json" % (run_id, context_id)) + return os.path.join(directory, "run_%s.json" % run_id) + + +def register_run_metrics_context( + run_id: Any, + enabled: bool, + path: Optional[str], + context_id: Optional[str] = None, + config: Optional[dict] = None, + context_dir: Optional[str] = None, +) -> None: + try: + targets = [] + target = _run_context_path(run_id, config=config, context_dir=context_dir, context_id=context_id) + if target: + targets.append(target) + generic_target = _run_context_path(run_id, config=config, context_dir=context_dir) + if generic_target and generic_target not in targets: + targets.append(generic_target) + if not targets: + return + payload = { + "enabled": bool(enabled), + "path": path, + "context_id": context_id, + "updated_at": time.time(), + } + for current_target in targets: + directory = os.path.dirname(current_target) + if directory: + os.makedirs(directory, exist_ok=True) + with open(current_target, "w") as handle: + json.dump(payload, handle, sort_keys=True) + except Exception as exc: + _metrics_debug( + "Failed to register metrics context", + run_id=run_id, + context_id=context_id, + path=path, + context_dir=context_dir, + error=str(exc), + ) + return + + +def resolve_run_metrics_context( + run_id: Any, + config: Optional[dict] = None, + context_id: Optional[str] = None, +) -> Dict[str, Any]: + target = _run_context_path(run_id, config=config, context_id=context_id) + if (not target or not os.path.exists(target)) and context_id is not None: + target = _run_context_path(run_id, config=config) + if not target or not os.path.exists(target): + return {"enabled": None, "path": None, "context_id": None} + try: + with open(target, "r") as handle: + payload = json.load(handle) + return { + "enabled": payload.get("enabled"), + "path": payload.get("path"), + "context_id": payload.get("context_id"), + } + except Exception as exc: + _metrics_debug( + "Failed to resolve metrics context", + run_id=run_id, + context_id=context_id, + target=target, + error=str(exc), + ) + return {"enabled": None, "path": None, "context_id": None} + + +def _append_jsonl_record(target_path: str, payload: Dict[str, Any]) -> None: + serialized_line = json.dumps(payload, sort_keys=True) + "\n" + with _EVENT_WRITE_LOCK: + with open(target_path, "a") as handle: + try: + import fcntl # POSIX-only best effort. + except ImportError: + fcntl = None + if fcntl is not None: + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + except OSError as exc: + _metrics_debug( + "Failed to acquire POSIX metrics file lock; continuing with in-process lock only", + target_path=target_path, + error=str(exc), + ) + try: + handle.write(serialized_line) + finally: + if fcntl is not None: + try: + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + except OSError: + pass + + +def emit_event( + stage: str, + run_id: Optional[Any] = None, + context_id: Optional[str] = None, + record_id: Optional[str] = None, + duration_ms: Optional[float] = None, + status: str = "ok", + extra: Optional[dict] = None, + config: Optional[dict] = None, + path: Optional[str] = None, +) -> None: + try: + resolved_run_id = run_id if run_id is not None else current_run_id() + resolved_context_id = context_id or current_context_id() + run_context = ( + resolve_run_metrics_context(resolved_run_id, config=config, context_id=resolved_context_id) + if resolved_run_id is not None + else {"enabled": None, "path": None, "context_id": None} + ) + enabled = metrics_enabled(config=config) + if run_context.get("enabled") is not None: + enabled = bool(run_context.get("enabled")) + if not enabled: + return + + target_path = path or run_context.get("path") or metrics_path(config=config) + if not target_path: + return + + payload = { + "ts": time.time(), + "stage": stage, + "run_id": str(resolved_run_id) if resolved_run_id is not None else None, + "context_id": resolved_context_id or run_context.get("context_id"), + "record_id": record_id, + "duration_ms": float(duration_ms) if duration_ms is not None else None, + "status": status, + "extra": extra or {}, + } + + directory = os.path.dirname(target_path) + if directory: + os.makedirs(directory, exist_ok=True) + + _append_jsonl_record(target_path, payload) + except Exception as exc: + _metrics_debug( + "Failed to emit metrics event", + stage=stage, + run_id=run_id, + context_id=context_id, + path=path, + error=str(exc), + ) + return + + +@contextmanager +def timed_stage( + stage: str, + run_id: Optional[Any] = None, + context_id: Optional[str] = None, + record_id: Optional[str] = None, + status: str = "ok", + extra: Optional[dict] = None, + config: Optional[dict] = None, + path: Optional[str] = None, +): + start = time.perf_counter() + outcome = status + try: + yield + except Exception: + outcome = "error" + raise + finally: + emit_event( + stage=stage, + run_id=run_id, + context_id=context_id, + record_id=record_id, + duration_ms=(time.perf_counter() - start) * 1000.0, + status=outcome, + extra=extra, + config=config, + path=path, + ) + + +@contextmanager +def timed_profile( + category: str, + name: str, + run_id: Optional[Any] = None, + context_id: Optional[str] = None, + record_id: Optional[str] = None, + status: str = "ok", + extra: Optional[dict] = None, + config: Optional[dict] = None, + path: Optional[str] = None, +): + payload_extra = {"name": name} + if extra: + payload_extra.update(extra) + with timed_stage( + stage=category, + run_id=run_id, + context_id=context_id, + record_id=record_id, + status=status, + extra=payload_extra, + config=config, + path=path, + ): + yield + + +def profiled_function( + category: str, + name: Optional[str] = None, + run_id_getter=None, + context_id_getter=None, + record_id_getter=None, + extra_getter=None, + config_getter=None, +): + def decorator(func): + profile_name = name or func.__name__ + + @wraps(func) + def wrapper(*args, **kwargs): + run_id = run_id_getter(*args, **kwargs) if run_id_getter else None + context_id = context_id_getter(*args, **kwargs) if context_id_getter else None + record_id = record_id_getter(*args, **kwargs) if record_id_getter else None + extra = extra_getter(*args, **kwargs) if extra_getter else None + config = config_getter(*args, **kwargs) if config_getter else None + with timed_profile( + category=category, + name=profile_name, + run_id=run_id, + context_id=context_id, + record_id=record_id, + extra=extra, + config=config, + ): + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def load_events(path: str, run_id: Optional[Any] = None, context_id: Optional[str] = None) -> List[Dict[str, Any]]: + if not path or not os.path.exists(path): + return [] + + run_id_str = str(run_id) if run_id is not None else None + context_id_str = str(context_id) if context_id is not None else None + output = [] + try: + with open(path, "r") as handle: + for line_number, line in enumerate(handle, start=1): + line = line.strip() + if not line: + continue + try: + payload = json.loads(line) + except Exception as exc: + _metrics_debug( + "Failed to parse metrics event line", + path=path, + line_number=line_number, + error=str(exc), + ) + continue + if run_id_str is not None and payload.get("run_id") != run_id_str: + continue + if context_id_str is not None and payload.get("context_id") != context_id_str: + continue + output.append(payload) + except Exception as exc: + _metrics_debug( + "Failed to load metrics events", + path=path, + run_id=run_id_str, + context_id=context_id_str, + error=str(exc), + ) + return [] + return output + + +def percentile(values: Iterable[float], pct: float) -> Optional[float]: + data = sorted(float(v) for v in values) + if not data: + return None + if pct <= 0: + return data[0] + if pct >= 100: + return data[-1] + idx = (len(data) - 1) * (pct / 100.0) + lower = int(idx) + upper = min(lower + 1, len(data) - 1) + weight = idx - lower + return data[lower] * (1.0 - weight) + data[upper] * weight + + +def _numeric_stats(values: List[float], include_p99: bool = True) -> Dict[str, Any]: + if not values: + output = { + "count": 0, + "min": None, + "max": None, + "mean": None, + "p50": None, + "p95": None, + } + if include_p99: + output["p99"] = None + return output + + output = { + "count": len(values), + "min": min(values), + "max": max(values), + "mean": sum(values) / float(len(values)), + "p50": percentile(values, 50), + "p95": percentile(values, 95), + } + if include_p99: + output["p99"] = percentile(values, 99) + return output + + +def _read_linux_meminfo(path: str = "/proc/meminfo") -> Optional[Dict[str, float]]: + try: + values = {} + with open(path, "r") as handle: + for line in handle: + parts = line.split(":") + if len(parts) != 2: + continue + match = re.search(r"(\d+)", parts[1]) + if match: + values[parts[0].strip()] = int(match.group(1)) + total_kib = values.get("MemTotal") + available_kib = values.get("MemAvailable") + if total_kib is None or available_kib is None or total_kib <= 0: + return None + total_bytes = int(total_kib) * 1024 + available_bytes = int(available_kib) * 1024 + return { + "memory_total_bytes": total_bytes, + "memory_available_bytes": available_bytes, + "memory_available_ratio": float(available_bytes) / float(total_bytes), + "memory_probe": "linux_meminfo", + } + except Exception as exc: + _metrics_debug("Failed to read Linux memory info", path=path, error=str(exc)) + return None + + +def _read_macos_memory() -> Optional[Dict[str, float]]: + try: + import subprocess + + total_proc = subprocess.run( + ["sysctl", "-n", "hw.memsize"], + capture_output=True, + text=True, + check=False, + ) + if total_proc.returncode != 0: + return None + total_bytes = int((total_proc.stdout or "").strip()) + vm_proc = subprocess.run( + ["vm_stat"], + capture_output=True, + text=True, + check=False, + ) + if vm_proc.returncode != 0: + return None + page_size = 4096 + page_size_match = re.search(r"page size of (\d+) bytes", vm_proc.stdout or "") + if page_size_match: + page_size = int(page_size_match.group(1)) + pages = {} + for line in (vm_proc.stdout or "").splitlines(): + match = re.match(r"([^:]+):\s+(\d+)\.", line.strip()) + if match: + pages[match.group(1)] = int(match.group(2)) + available_pages = pages.get("Pages free", 0) + pages.get("Pages inactive", 0) + pages.get("Pages speculative", 0) + available_bytes = int(available_pages) * int(page_size) + return { + "memory_total_bytes": int(total_bytes), + "memory_available_bytes": available_bytes, + "memory_available_ratio": float(available_bytes) / float(total_bytes) if total_bytes else None, + "memory_probe": "macos_vm_stat", + } + except Exception as exc: + _metrics_debug("Failed to read macOS memory info", error=str(exc)) + return None + + +def _host_memory_snapshot() -> Dict[str, Optional[float]]: + system = platform.system().lower() + if system == "linux": + snapshot = _read_linux_meminfo() + if snapshot: + return snapshot + return { + "memory_total_bytes": None, + "memory_available_bytes": None, + "memory_available_ratio": None, + "memory_probe": "linux_meminfo_unavailable", + } + if system == "darwin": + snapshot = _read_macos_memory() + if snapshot: + return snapshot + return { + "memory_total_bytes": None, + "memory_available_bytes": None, + "memory_available_ratio": None, + "memory_probe": "macos_vm_stat_unavailable", + } + return { + "memory_total_bytes": None, + "memory_available_bytes": None, + "memory_available_ratio": None, + "memory_probe": "unsupported", + } + + +def _host_load_snapshot() -> Dict[str, Optional[float]]: + cpu_count = os.cpu_count() + try: + load1, load5, load15 = os.getloadavg() + except Exception as exc: + _metrics_debug("Failed to read host load average", error=str(exc)) + load1 = load5 = load15 = None + + def _normalize(value: Optional[float]) -> Optional[float]: + if value is None or not cpu_count: + return None + return float(value) / float(cpu_count) + + return { + "platform": platform.system().lower(), + "cpu_count": cpu_count, + "loadavg_1m": float(load1) if load1 is not None else None, + "loadavg_5m": float(load5) if load5 is not None else None, + "loadavg_15m": float(load15) if load15 is not None else None, + "normalized_load_1m": _normalize(load1), + "normalized_load_5m": _normalize(load5), + "normalized_load_15m": _normalize(load15), + } + + +def collect_system_sample() -> Dict[str, Optional[float]]: + sample = {"ts": time.time()} + sample.update(_host_load_snapshot()) + sample.update(_host_memory_snapshot()) + total_bytes = sample.get("memory_total_bytes") + available_bytes = sample.get("memory_available_bytes") + if total_bytes is not None and available_bytes is not None: + used_bytes = max(0.0, float(total_bytes) - float(available_bytes)) + sample["memory_used_bytes"] = used_bytes + sample["memory_used_ratio"] = used_bytes / float(total_bytes) if float(total_bytes) > 0 else None + else: + sample["memory_used_bytes"] = None + sample["memory_used_ratio"] = None + return sample + + +def aggregate_system_samples(samples: List[Dict[str, Any]], enabled: bool = True, sample_interval_s: float = 1.0) -> Dict[str, Any]: + platform_name = platform.system().lower() + cpu_count = os.cpu_count() + memory_probe = "unsupported" + if samples: + platform_name = str(samples[0].get("platform") or platform_name) + cpu_count = samples[0].get("cpu_count") + memory_probe = str(samples[0].get("memory_probe") or memory_probe) + + summary = {} + for key in ( + "loadavg_1m", + "loadavg_5m", + "loadavg_15m", + "normalized_load_1m", + "normalized_load_5m", + "normalized_load_15m", + "memory_total_bytes", + "memory_available_bytes", + "memory_used_bytes", + "memory_available_ratio", + "memory_used_ratio", + ): + values = [float(sample[key]) for sample in samples if sample.get(key) is not None] + summary[key] = _numeric_stats(values, include_p99=False) + + return { + "collection": { + "enabled": bool(enabled), + "sample_interval_s": float(sample_interval_s), + "sample_count": len(samples), + "platform": platform_name, + "cpu_count": cpu_count, + "memory_probe": memory_probe, + }, + "samples": samples, + "summary": summary, + } + + +def apply_system_load_adjustment(summary: Dict[str, Any]) -> Dict[str, Any]: + throughput = summary.setdefault("throughput", {}) + raw = throughput.get("overall_records_per_minute") + mean_load = _deep_get(summary, "system_load", "summary", "normalized_load_1m", "mean") + factor = max(1.0, float(mean_load)) if mean_load is not None else 1.0 + throughput["host_load_adjustment_factor"] = factor + throughput["load_adjusted_records_per_minute"] = (float(raw) * factor) if raw is not None else None + return summary + + +def aggregate_ads_events( + events: List[Dict[str, Any]], + started_at: Optional[float] = None, + ended_at: Optional[float] = None, + expected_files: Optional[int] = None, +) -> AdsBenchmarkSummary: + stage_timings = {} + task_timings = {} + app_timings = {} + errors_by_stage = {} + file_wall = [] + record_wall = [] + resolver_wall = [] + db_wall = [] + + source_type_groups = {} + parser_groups = {} + raw_subfamily_groups = {} + file_names = set() + record_ids = set() + records_submitted = 0 + failure_count = 0 + + event_timestamps = [event.get("ts") for event in events if event.get("ts") is not None] + + def _group_entry(groups: Dict[str, Dict[str, Any]], key: Optional[str]) -> Dict[str, Any]: + group_key = str(key or "unknown") + groups.setdefault(group_key, { + "file_names": set(), + "record_ids": set(), + "wall": [], + "parse": [], + "resolver": [], + "db": [], + }) + return groups[group_key] + + for event in events: + stage = str(event.get("stage") or "unknown") + status = str(event.get("status") or "ok") + duration = event.get("duration_ms") + extra = event.get("extra", {}) or {} + record_count = int(extra.get("record_count", 1) or 1) + source_filename = extra.get("source_filename") + source_type = extra.get("source_type") or extra.get("input_extension") or source_type_from_filename(source_filename) + parser_name = extra.get("parser_name") + raw_subfamily = extra.get("raw_subfamily") or raw_subfamily_from_metadata( + filename=source_filename, + parser_name=parser_name, + input_extension=extra.get("input_extension"), + source_type=source_type, + ) + record_id = event.get("record_id") + + if source_filename: + file_names.add(source_filename) + if record_id: + record_ids.add(record_id) + + if status != "ok": + failure_count += 1 + errors_by_stage[stage] = errors_by_stage.get(stage, 0) + 1 + + if stage == "ingest_enqueue": + records_submitted += int(extra.get("record_count", 1) or 1) + + if duration is None: + continue + + duration_value = float(duration) + normalized_value = duration_value / float(record_count) if record_count > 0 else duration_value + stage_timings.setdefault(stage, []).append(normalized_value) + + if stage == "task_timing": + task_timings.setdefault(str(extra.get("name") or "unknown"), []).append(duration_value) + continue + if stage == "app_timing": + app_timings.setdefault(str(extra.get("name") or "unknown"), []).append(duration_value) + continue + + if stage == "file_wall": + file_wall.append(normalized_value) + elif stage == "record_wall": + record_wall.append(duration_value) + elif stage == "resolver_http": + resolver_wall.append(duration_value) + elif stage in {"pre_resolved_db", "post_resolved_db"}: + db_wall.append(normalized_value) + + type_group = _group_entry(source_type_groups, source_type) + parser_group = _group_entry(parser_groups, parser_name) + raw_group = _group_entry(raw_subfamily_groups, raw_subfamily) if raw_subfamily else None + for group in tuple(group for group in (type_group, parser_group, raw_group) if group is not None): + if source_filename: + group["file_names"].add(source_filename) + if record_id: + group["record_ids"].add(record_id) + if stage == "record_wall": + group["wall"].append(duration_value) + elif stage == "parse_dispatch": + group["parse"].append(normalized_value) + elif stage == "resolver_http": + group["resolver"].append(duration_value) + elif stage in {"pre_resolved_db", "post_resolved_db"}: + group["db"].append(normalized_value) + + if started_at is None: + started_at = min(event_timestamps) if event_timestamps else None + if ended_at is None: + ended_at = max(event_timestamps) if event_timestamps else None + wall_duration_s = None if started_at is None or ended_at is None else max(0.0, float(ended_at) - float(started_at)) + + throughput = None + if wall_duration_s and wall_duration_s > 0: + throughput = (float(len(record_ids) or records_submitted) / float(wall_duration_s)) * 60.0 + + def _serialize_group(groups: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: + output = {} + for key, payload in groups.items(): + record_total = len(payload["record_ids"]) + throughput_value = None + if wall_duration_s and wall_duration_s > 0 and record_total > 0: + throughput_value = (float(record_total) / float(wall_duration_s)) * 60.0 + output[key] = { + "file_count": len(payload["file_names"]), + "record_count": record_total, + "wall_time_ms": _numeric_stats(payload["wall"], include_p99=True), + "parse_stage_ms": _numeric_stats(payload["parse"], include_p99=True), + "resolver_stage_ms": _numeric_stats(payload["resolver"], include_p99=True), + "db_stage_ms": _numeric_stats(payload["db"], include_p99=True), + "throughput_records_per_minute": throughput_value, + } + return output + + status = "complete" + if expected_files is not None and len(file_names) < int(expected_files): + status = "incomplete" + if expected_files == 0: + status = "incomplete" + + return { + "counts": { + "files_selected": int(expected_files or 0), + "files_processed": len(file_names), + "records_submitted": records_submitted, + "records_processed": len(record_ids), + "failures": failure_count, + }, + "throughput": { + "overall_records_per_minute": throughput, + }, + "latency_ms": { + stage: _numeric_stats(values, include_p99=True) + for stage, values in stage_timings.items() + if stage not in {"task_timing", "app_timing"} + }, + "task_timing_ms": {name: _numeric_stats(values, include_p99=True) for name, values in task_timings.items()}, + "app_timing_ms": {name: _numeric_stats(values, include_p99=True) for name, values in app_timings.items()}, + "duration_s": { + "wall_clock": wall_duration_s, + }, + "per_record_metrics_ms": { + "wall_time": _numeric_stats(record_wall, include_p99=True), + "parse_stage": _numeric_stats(stage_timings.get("parse_dispatch", []), include_p99=True), + "resolver_stage": _numeric_stats(resolver_wall, include_p99=True), + "db_stage": _numeric_stats(db_wall, include_p99=True), + }, + "source_type_breakdown": _serialize_group(source_type_groups), + "parser_breakdown": _serialize_group(parser_groups), + "raw_subfamily_breakdown": _serialize_group(raw_subfamily_groups), + "errors": { + "by_stage": errors_by_stage, + }, + "status": status, + "selected_files": sorted(file_names), + "file_wall_ms": _numeric_stats(file_wall, include_p99=True), + } + + +def write_json(path: str, payload: Dict[str, Any]) -> None: + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + with open(path, "w") as handle: + json.dump(payload, handle, indent=2, sort_keys=True) + handle.write("\n") + + +def _fmt(value: Optional[float], places: int = 2) -> str: + if value is None: + return "n/a" + return ("%0." + str(places) + "f") % float(value) + + +def _fmt_bytes(value: Optional[float]) -> str: + if value is None: + return "n/a" + units = ["B", "KiB", "MiB", "GiB", "TiB"] + current = float(value) + for unit in units: + if abs(current) < 1024.0 or unit == units[-1]: + return "%0.2f %s" % (current, unit) + current /= 1024.0 + + +def _blank_if_none(value: Any) -> Any: + return "" if value is None else value + + +def render_markdown(summary: Dict[str, Any], output_path: str) -> None: + counts = summary.get("counts", {}) or {} + throughput = summary.get("throughput", {}) or {} + latency = summary.get("latency_ms", {}) or {} + per_record = summary.get("per_record_metrics_ms", {}) or {} + source_type_breakdown = summary.get("source_type_breakdown", {}) or {} + parser_breakdown = summary.get("parser_breakdown", {}) or {} + raw_subfamily_breakdown = summary.get("raw_subfamily_breakdown", {}) or {} + system_load = summary.get("system_load", {}) or {} + run_metadata = summary.get("run_metadata", {}) or {} + + lines = [ + "# ADS Reference Benchmark Report", + "", + "## Run Config", + "", + ] + for key in sorted(run_metadata.keys()): + lines.append("- **%s**: `%s`" % (key, run_metadata[key])) + + lines.extend([ + "", + "## Top-Line Results", + "", + "- **Status**: `%s`" % summary.get("status", "unknown"), + "- **Files Processed**: `%s`" % counts.get("files_processed", 0), + "- **Records Processed**: `%s`" % counts.get("records_processed", 0), + "- **Throughput**: `%s records/min`" % _fmt(throughput.get("overall_records_per_minute")), + "- **Load-Adjusted Throughput**: `%s records/min`" % _fmt(throughput.get("load_adjusted_records_per_minute")), + "- **Wall Duration**: `%s s`" % _fmt(_deep_get(summary, "duration_s", "wall_clock")), + "", + "## Per-Record Metrics (ms)", + "", + "All values in this section are per processed record, in milliseconds.", + "", + "| Metric | Count | p50 / record | p95 / record | p99 / record | Mean / record |", + "|---|---:|---:|---:|---:|---:|", + ]) + + for key in ("wall_time", "parse_stage", "resolver_stage", "db_stage"): + stats = per_record.get(key, {}) or {} + lines.append( + "| {name} | {count} | {p50} | {p95} | {p99} | {mean} |".format( + name=key, + count=stats.get("count", 0), + p50=_fmt(stats.get("p50")), + p95=_fmt(stats.get("p95")), + p99=_fmt(stats.get("p99")), + mean=_fmt(stats.get("mean")), + ) + ) + + if latency: + lines.extend([ + "", + "## Stage Latency (ms)", + "", + "Unless noted otherwise, stage latency values are normalized to per-record milliseconds.", + "", + "| Stage | Count | p50 / record | p95 / record | Mean / record |", + "|---|---:|---:|---:|---:|", + ]) + for stage in sorted(latency.keys()): + stats = latency[stage] + lines.append( + "| {stage} | {count} | {p50} | {p95} | {mean} |".format( + stage=stage, + count=stats.get("count", 0), + p50=_fmt(stats.get("p50")), + p95=_fmt(stats.get("p95")), + mean=_fmt(stats.get("mean")), + ) + ) + + if source_type_breakdown: + ranked_source_types = sorted( + source_type_breakdown.items(), + key=lambda item: (((item[1].get("wall_time_ms") or {}).get("p95")) or 0.0), + reverse=True, + ) + lines.extend([ + "", + "## Slowest Source Types", + "", + "Wall values below are per-record milliseconds aggregated across all records in the source-type group.", + "", + "| Source Type | Files | Records | Wall p95 / record | Wall Mean / record | Throughput (records/min) |", + "|---|---:|---:|---:|---:|---:|", + ]) + for source_type, stats in ranked_source_types[:10]: + lines.append( + "| {source_type} | {files} | {records} | {wall_p95} | {wall_mean} | {throughput} |".format( + source_type=source_type, + files=stats.get("file_count", 0), + records=stats.get("record_count", 0), + wall_p95=_fmt(_deep_get(stats, "wall_time_ms", "p95")), + wall_mean=_fmt(_deep_get(stats, "wall_time_ms", "mean")), + throughput=_fmt(stats.get("throughput_records_per_minute")), + ) + ) + + lines.extend([ + "", + "## Source Types", + "", + "`Files` counts source files in the group. `Records` counts extracted/processed references. All mean values below are per-record milliseconds.", + "", + "| Source Type | Files | Records | Wall Mean / record | Parse Mean / record | Resolver Mean / record | DB Mean / record | Throughput (records/min) |", + "|---|---:|---:|---:|---:|---:|---:|---:|", + ]) + for source_type, stats in ranked_source_types: + lines.append( + "| {source_type} | {files} | {records} | {wall} | {parse} | {resolver} | {db} | {throughput} |".format( + source_type=source_type, + files=stats.get("file_count", 0), + records=stats.get("record_count", 0), + wall=_fmt(_deep_get(stats, "wall_time_ms", "mean")), + parse=_fmt(_deep_get(stats, "parse_stage_ms", "mean")), + resolver=_fmt(_deep_get(stats, "resolver_stage_ms", "mean")), + db=_fmt(_deep_get(stats, "db_stage_ms", "mean")), + throughput=_fmt(stats.get("throughput_records_per_minute")), + ) + ) + + if parser_breakdown: + lines.extend([ + "", + "## Parsers", + "", + "Wall Mean below is per-record milliseconds for records handled by each parser group.", + "", + "| Parser | Files | Records | Wall Mean / record |", + "|---|---:|---:|---:|", + ]) + for parser_name in sorted(parser_breakdown.keys()): + stats = parser_breakdown[parser_name] + lines.append( + "| {parser_name} | {files} | {records} | {wall} |".format( + parser_name=parser_name, + files=stats.get("file_count", 0), + records=stats.get("record_count", 0), + wall=_fmt(_deep_get(stats, "wall_time_ms", "mean")), + ) + ) + + if raw_subfamily_breakdown: + lines.extend([ + "", + "## Raw Subfamilies", + "", + "Wall values below are per-record milliseconds within each raw subfamily.", + "", + "| Raw Subfamily | Files | Records | Wall Mean / record | Wall p95 / record | Throughput (records/min) |", + "|---|---:|---:|---:|---:|---:|", + ]) + for raw_subfamily, stats in sorted( + raw_subfamily_breakdown.items(), + key=lambda item: (((item[1].get("wall_time_ms") or {}).get("p95")) or 0.0), + reverse=True, + ): + lines.append( + "| {raw_subfamily} | {files} | {records} | {wall_mean} | {wall_p95} | {throughput} |".format( + raw_subfamily=raw_subfamily, + files=stats.get("file_count", 0), + records=stats.get("record_count", 0), + wall_mean=_fmt(_deep_get(stats, "wall_time_ms", "mean")), + wall_p95=_fmt(_deep_get(stats, "wall_time_ms", "p95")), + throughput=_fmt(stats.get("throughput_records_per_minute")), + ) + ) + + if system_load: + collection = system_load.get("collection", {}) or {} + load_summary = system_load.get("summary", {}) or {} + mean_load_1m = _deep_get(load_summary, "loadavg_1m", "mean") + mean_load_5m = _deep_get(load_summary, "loadavg_5m", "mean") + mean_load_15m = _deep_get(load_summary, "loadavg_15m", "mean") + max_load_1m = _deep_get(load_summary, "loadavg_1m", "max") + mean_norm_1m = _deep_get(load_summary, "normalized_load_1m", "mean") + max_norm_1m = _deep_get(load_summary, "normalized_load_1m", "max") + mean_mem_total = _deep_get(load_summary, "memory_total_bytes", "mean") + mean_mem_available = _deep_get(load_summary, "memory_available_bytes", "mean") + min_mem_available = _deep_get(load_summary, "memory_available_bytes", "min") + mean_mem_used = _deep_get(load_summary, "memory_used_bytes", "mean") + max_mem_used = _deep_get(load_summary, "memory_used_bytes", "max") + mean_mem_available_ratio = _deep_get(load_summary, "memory_available_ratio", "mean") + min_mem_available_ratio = _deep_get(load_summary, "memory_available_ratio", "min") + mean_mem_used_ratio = _deep_get(load_summary, "memory_used_ratio", "mean") + max_mem_used_ratio = _deep_get(load_summary, "memory_used_ratio", "max") + lines.extend([ + "", + "## System Load", + "", + "- **Enabled**: `%s`" % collection.get("enabled"), + "- **Sample Count**: `%s`" % collection.get("sample_count"), + "- **Sample Interval**: `%s s`" % _fmt(collection.get("sample_interval_s")), + "- **Platform**: `%s`" % collection.get("platform"), + "- **CPU Count**: `%s`" % collection.get("cpu_count"), + "- **Memory Probe**: `%s`" % collection.get("memory_probe"), + "- **Mean Raw Load (1m / 5m / 15m)**: `%s / %s / %s`" % (_fmt(mean_load_1m), _fmt(mean_load_5m), _fmt(mean_load_15m)), + "- **Peak Raw Load (1m)**: `%s`" % _fmt(max_load_1m), + "- **Mean Normalized Load (1m)**: `%s`" % _fmt(mean_norm_1m), + "- **Peak Normalized Load (1m)**: `%s`" % _fmt(max_norm_1m), + "- **Mean Memory Total**: `%s`" % _fmt_bytes(mean_mem_total), + "- **Mean Memory Available**: `%s`" % _fmt_bytes(mean_mem_available), + "- **Minimum Memory Available**: `%s`" % _fmt_bytes(min_mem_available), + "- **Mean Memory Used**: `%s`" % _fmt_bytes(mean_mem_used), + "- **Peak Memory Used**: `%s`" % _fmt_bytes(max_mem_used), + "- **Mean Memory Available Ratio**: `%s`" % _fmt(mean_mem_available_ratio), + "- **Minimum Memory Available Ratio**: `%s`" % _fmt(min_mem_available_ratio), + "- **Mean Memory Used Ratio**: `%s`" % _fmt(mean_mem_used_ratio), + "- **Peak Memory Used Ratio**: `%s`" % _fmt(max_mem_used_ratio), + "", + "### System Load Samples", + "", + "| Metric | Mean | Min | Max | p50 | p95 |", + "|---|---:|---:|---:|---:|---:|", + ]) + metric_rows = [ + ("Raw load 1m", "loadavg_1m", "number"), + ("Raw load 5m", "loadavg_5m", "number"), + ("Raw load 15m", "loadavg_15m", "number"), + ("Normalized load 1m", "normalized_load_1m", "number"), + ("Memory total", "memory_total_bytes", "bytes"), + ("Memory available", "memory_available_bytes", "bytes"), + ("Memory used", "memory_used_bytes", "bytes"), + ("Memory available ratio", "memory_available_ratio", "number"), + ("Memory used ratio", "memory_used_ratio", "number"), + ] + for label, key, value_type in metric_rows: + stats = load_summary.get(key) or {} + formatter = _fmt_bytes if value_type == "bytes" else _fmt + lines.append( + "| {label} | {mean} | {min_v} | {max_v} | {p50} | {p95} |".format( + label=label, + mean=formatter(stats.get("mean")), + min_v=formatter(stats.get("min")), + max_v=formatter(stats.get("max")), + p50=formatter(stats.get("p50")), + p95=formatter(stats.get("p95")), + ) + ) + + directory = os.path.dirname(output_path) + if directory: + os.makedirs(directory, exist_ok=True) + with open(output_path, "w") as handle: + handle.write("\n".join(lines) + "\n") + + +def write_source_type_csv(summary: Dict[str, Any], output_path: str) -> None: + import csv + + directory = os.path.dirname(output_path) + if directory: + os.makedirs(directory, exist_ok=True) + + rows = [] + for source_type, stats in sorted( + (summary.get("source_type_breakdown", {}) or {}).items(), + key=lambda item: (((item[1].get("wall_time_ms") or {}).get("p95")) or 0.0), + reverse=True, + ): + rows.append({ + "source_type": source_type, + "file_count": stats.get("file_count", 0), + "record_count": stats.get("record_count", 0), + "wall_mean_ms": _blank_if_none(_deep_get(stats, "wall_time_ms", "mean")), + "wall_p95_ms": _blank_if_none(_deep_get(stats, "wall_time_ms", "p95")), + "parse_mean_ms": _blank_if_none(_deep_get(stats, "parse_stage_ms", "mean")), + "resolver_mean_ms": _blank_if_none(_deep_get(stats, "resolver_stage_ms", "mean")), + "db_mean_ms": _blank_if_none(_deep_get(stats, "db_stage_ms", "mean")), + "throughput_records_per_minute": _blank_if_none(stats.get("throughput_records_per_minute")), + }) + + fieldnames = [ + "source_type", + "file_count", + "record_count", + "wall_mean_ms", + "wall_p95_ms", + "parse_mean_ms", + "resolver_mean_ms", + "db_mean_ms", + "throughput_records_per_minute", + ] + with open(output_path, "w", newline="") as handle: + writer = csv.DictWriter(handle, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) diff --git a/adsrefpipe/tasks.py b/adsrefpipe/tasks.py index 32775c5..51cdef4 100755 --- a/adsrefpipe/tasks.py +++ b/adsrefpipe/tasks.py @@ -3,6 +3,7 @@ import os +import adsrefpipe.perf_metrics as perf_metrics import adsrefpipe.utils as utils from adsputils import load_config @@ -35,20 +36,53 @@ def task_process_reference(reference_task: dict) -> bool: :param reference_task: dictionary containing reference details and service url :return: True if processing is successful, False otherwise """ + reference_payload = reference_task.get('reference', {}) or {} + # Older tests and some call sites provide a single-item reference list, + # while the task instrumentation only needs one dict for metadata lookup. + if isinstance(reference_payload, dict): + reference_record = reference_payload + elif isinstance(reference_payload, list) and reference_payload and isinstance(reference_payload[0], dict): + reference_record = reference_payload[0] + else: + reference_record = {} + event_extra = perf_metrics.build_event_extra( + source_filename=reference_task.get('source_filename'), + parser_name=reference_task.get('parser_name'), + source_bibcode=reference_task.get('source_bibcode'), + input_extension=reference_task.get('input_extension'), + source_type=reference_task.get('source_type'), + record_count=1, + ) + record_id = reference_record.get('id') try: - resolved = utils.post_request_resolved_reference(reference_task['reference'], reference_task['resolver_service_url']) - # if failed to connect to reference service, raise a exception to requeue, for max_retries times - if not resolved: - raise FailedRequest - - # TODO: remove comparing to classic before going to production - classic_resolved_filename = reference_task['source_filename'].replace('sources', 'resolved') + '.result' if config['COMPARE_CLASSIC'] else None - - status = app.populate_tables_post_resolved(resolved, reference_task['source_bibcode'], classic_resolved_filename) - if not status: - return False - - return True + with perf_metrics.timed_stage( + stage='record_wall', + record_id=record_id, + extra=event_extra, + ): + with perf_metrics.timed_stage( + stage='resolver_http', + record_id=record_id, + extra=event_extra, + ): + resolved = utils.post_request_resolved_reference(reference_payload, reference_task['resolver_service_url']) + # if failed to connect to reference service, raise a exception to requeue, for max_retries times + if not resolved: + raise FailedRequest + + # TODO: remove comparing to classic before going to production + classic_resolved_filename = reference_task['source_filename'].replace('sources', 'resolved') + '.result' if config['COMPARE_CLASSIC'] else None + + with perf_metrics.timed_stage( + stage='post_resolved_db', + record_id=record_id, + extra=event_extra, + ): + status = app.populate_tables_post_resolved(resolved, reference_task['source_bibcode'], classic_resolved_filename) + if not status: + return False + + return True except KeyError: return False diff --git a/adsrefpipe/tests/unittests/test_benchmark.py b/adsrefpipe/tests/unittests/test_benchmark.py new file mode 100644 index 0000000..8116a2d --- /dev/null +++ b/adsrefpipe/tests/unittests/test_benchmark.py @@ -0,0 +1,281 @@ +import types +import json +import os +import tempfile +import unittest +from unittest.mock import patch + +import sys + +fake_requests = types.SimpleNamespace( + post=lambda *args, **kwargs: None, + get=lambda *args, **kwargs: None, + exceptions=types.SimpleNamespace(RequestException=Exception), +) +sys.modules.setdefault("requests", fake_requests) +sys.modules.setdefault( + "adsputils", + types.SimpleNamespace( + load_config=lambda *args, **kwargs: {}, + setup_logging=lambda *args, **kwargs: types.SimpleNamespace(info=lambda *a, **k: None, error=lambda *a, **k: None, debug=lambda *a, **k: None), + ), +) + +from adsrefpipe import benchmark +from adsrefpipe import perf_metrics + + +class TestBenchmark(unittest.TestCase): + + def test_collect_candidate_files_filters_results(self): + with tempfile.TemporaryDirectory() as tmpdir: + raw_path = os.path.join(tmpdir, "one.raw") + xml_path = os.path.join(tmpdir, "two.jats.xml") + ignored_path = os.path.join(tmpdir, "three.raw.result") + with open(raw_path, "w") as handle: + handle.write("raw") + with open(xml_path, "w") as handle: + handle.write("xml") + with open(ignored_path, "w") as handle: + handle.write("ignored") + + files = benchmark.collect_candidate_files(tmpdir, ["*.raw", "*.xml"]) + self.assertEqual(files, sorted([raw_path, xml_path])) + + def test_classify_source_file_prefers_parser_extension(self): + payload = benchmark.classify_source_file( + "/tmp/foo.iop.xml", + {"name": "IOP", "extension_pattern": ".iop.xml"}, + ) + self.assertEqual(payload["parser_name"], "IOP") + self.assertEqual(payload["source_type"], ".iop.xml") + + def test_mock_resolver_returns_deterministic_payload(self): + resolved = benchmark._mock_resolved_reference({"id": "H1I1", "refstr": "A ref"}, "http://example/text") + self.assertEqual(resolved[0]["id"], "H1I1") + self.assertEqual(resolved[0]["bibcode"], "2000mock........A") + + def test_progress_line_formatter_delegates_to_perf_metrics(self): + self.assertIsNone(perf_metrics.format_benchmark_progress_line_from_log_line("not-json")) + self.assertIsNone(perf_metrics.format_benchmark_progress_line_from_log_line(json.dumps({ + "timestamp": "2026-04-10T18:29:31.196Z", + "message": "Updated 1 resolved reference records successfully.", + }))) + self.assertEqual( + perf_metrics.format_benchmark_progress_line_from_log_line(json.dumps({ + "timestamp": "2026-04-10T18:29:31.196Z", + "message": ( + "Source file /app/adsrefpipe/tests/unittests/stubdata/test.jats.xml " + "for bibcode 0000HiA.....Z with 16 references, processed successfully." + ), + })), + "18:29:31 test.jats.xml with 16 references", + ) + + def test_run_case_mock_mode_collects_summary(self): + with tempfile.TemporaryDirectory() as tmpdir: + sample_file = os.path.join(tmpdir, "sample.raw") + with open(sample_file, "w") as handle: + handle.write("content") + events_path = os.path.join(tmpdir, "events.jsonl") + + def fake_process_files(files): + for index, filename in enumerate(files, start=1): + extra = { + "source_filename": filename, + "source_type": ".raw", + "input_extension": ".raw", + "parser_name": "arXiv", + "record_count": 1, + } + benchmark.perf_metrics.emit_event("ingest_enqueue", record_id=None, extra=extra) + benchmark.perf_metrics.emit_event("parse_dispatch", record_id=None, duration_ms=5.0, extra=extra) + benchmark.perf_metrics.emit_event("resolver_http", record_id="rec-%s" % index, duration_ms=4.0, extra=extra) + benchmark.perf_metrics.emit_event("post_resolved_db", record_id="rec-%s" % index, duration_ms=3.0, extra=extra) + benchmark.perf_metrics.emit_event("record_wall", record_id="rec-%s" % index, duration_ms=12.0, extra=extra) + benchmark.perf_metrics.emit_event("file_wall", record_id=None, duration_ms=12.0, extra=extra) + + class FakePipelineRun: + @staticmethod + def process_files(files): + return fake_process_files(files) + + with patch.object(benchmark, "_pipeline_run_module", return_value=FakePipelineRun): + summary = benchmark._run_case( + input_path=tmpdir, + extensions=["*.raw"], + max_files=1, + mode="mock", + events_path=events_path, + system_sample_interval_s=0.01, + system_load_enabled=False, + warmup=False, + group_by="source_type", + ) + + self.assertEqual(summary["status"], "complete") + self.assertEqual(summary["counts"]["files_processed"], 1) + self.assertEqual(summary["counts"]["records_processed"], 1) + self.assertIn(".raw", summary["source_type_breakdown"]) + self.assertGreater(summary["throughput"]["overall_records_per_minute"], 0) + + def test_cmd_run_prints_artifacts(self): + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, "input") + os.makedirs(input_path) + with open(os.path.join(input_path, "sample.raw"), "w") as handle: + handle.write("content") + + fake_summary = { + "status": "complete", + "counts": {"files_selected": 1, "files_processed": 1, "records_submitted": 1, "records_processed": 1, "failures": 0}, + "throughput": {"overall_records_per_minute": 60.0, "load_adjusted_records_per_minute": 60.0}, + "duration_s": {"wall_clock": 1.0}, + "per_record_metrics_ms": {}, + "source_type_breakdown": {".raw": {"file_count": 1, "record_count": 1, "wall_time_ms": {}, "parse_stage_ms": {}, "resolver_stage_ms": {}, "db_stage_ms": {}, "throughput_records_per_minute": 60.0}}, + "parser_breakdown": {}, + "system_load": {"collection": {"enabled": False}, "summary": {}}, + "run_metadata": {"run_id": "run-1"}, + } + + args = benchmark.build_parser().parse_args([ + "run", + "--input-path", input_path, + "--output-dir", tmpdir, + "--events-path", os.path.join(tmpdir, "events.jsonl"), + "--no-warmup", + "--disable-system-load", + ]) + + with patch.object(benchmark, "_run_case", return_value=fake_summary): + with patch("sys.stdout.write") as mock_write: + rc = benchmark.cmd_run(args) + + self.assertEqual(rc, 0) + rendered = "".join(call.args[0] for call in mock_write.call_args_list) + self.assertIn('"status": "complete"', rendered) + self.assertTrue(any(name.endswith(".json") for name in os.listdir(tmpdir))) + self.assertTrue(any(name.endswith(".source_types.csv") for name in os.listdir(tmpdir))) + + def test_cmd_run_with_fake_pipeline_generates_artifacts(self): + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, "input") + os.makedirs(input_path) + sample_path = os.path.join(input_path, "sample.raw") + with open(sample_path, "w") as handle: + handle.write("content") + events_path = os.path.join(tmpdir, "events.jsonl") + + class FakePipelineRun: + @staticmethod + def process_files(files): + for index, filename in enumerate(files, start=1): + extra = { + "source_filename": filename, + "source_type": ".raw", + "input_extension": ".raw", + "parser_name": "arXiv", + "record_count": 1, + } + benchmark.perf_metrics.emit_event("ingest_enqueue", record_id=None, extra=extra) + benchmark.perf_metrics.emit_event("parse_dispatch", record_id=None, duration_ms=5.0, extra=extra) + benchmark.perf_metrics.emit_event("resolver_http", record_id="rec-%s" % index, duration_ms=4.0, extra=extra) + benchmark.perf_metrics.emit_event("post_resolved_db", record_id="rec-%s" % index, duration_ms=3.0, extra=extra) + benchmark.perf_metrics.emit_event("record_wall", record_id="rec-%s" % index, duration_ms=12.0, extra=extra) + benchmark.perf_metrics.emit_event("file_wall", record_id=None, duration_ms=12.0, extra=extra) + + args = benchmark.build_parser().parse_args([ + "run", + "--input-path", input_path, + "--output-dir", tmpdir, + "--events-path", events_path, + "--no-warmup", + "--disable-system-load", + ]) + + with patch.object(benchmark, "_pipeline_run_module", return_value=FakePipelineRun): + with patch.object(benchmark, "_safe_git_commit", return_value="deadbeef"): + with patch("sys.stdout.write") as mock_write: + rc = benchmark.cmd_run(args) + + self.assertEqual(rc, 0) + rendered = "".join(call.args[0] for call in mock_write.call_args_list) + self.assertIn('"status": "complete"', rendered) + created_files = os.listdir(tmpdir) + json_artifacts = [name for name in created_files if name.endswith(".json")] + markdown_artifacts = [name for name in created_files if name.endswith(".md")] + csv_artifacts = [name for name in created_files if name.endswith(".source_types.csv")] + self.assertTrue(json_artifacts) + self.assertTrue(markdown_artifacts) + self.assertTrue(csv_artifacts) + benchmark_json = os.path.join(tmpdir, sorted(json_artifacts)[0]) + with open(benchmark_json, "r") as handle: + summary = json.load(handle) + self.assertEqual(summary["status"], "complete") + self.assertIn(".raw", summary["source_type_breakdown"]) + + def test_build_parser_rejects_invalid_sample_interval(self): + parser = benchmark.build_parser() + with self.assertRaises(SystemExit): + parser.parse_args([ + "run", + "--system-sample-interval", "0", + ]) + + def test_run_case_warns_when_sampler_thread_stays_alive(self): + with tempfile.TemporaryDirectory() as tmpdir: + sample_file = os.path.join(tmpdir, "sample.raw") + with open(sample_file, "w") as handle: + handle.write("content") + events_path = os.path.join(tmpdir, "events.jsonl") + + def fake_process_files(files): + extra = { + "source_filename": files[0], + "source_type": ".raw", + "input_extension": ".raw", + "parser_name": "arXiv", + "record_count": 1, + } + benchmark.perf_metrics.emit_event("ingest_enqueue", record_id=None, extra=extra) + benchmark.perf_metrics.emit_event("record_wall", record_id="rec-1", duration_ms=5.0, extra=extra) + + class FakePipelineRun: + @staticmethod + def process_files(files): + return fake_process_files(files) + + class FakeSamplerThread: + def __init__(self, *args, **kwargs): + self._alive = True + + def start(self): + return None + + def join(self, timeout=None): + return None + + def is_alive(self): + return self._alive + + with patch.object(benchmark, "_pipeline_run_module", return_value=FakePipelineRun): + with patch.object(benchmark.threading, "Thread", return_value=FakeSamplerThread()): + with patch.object(benchmark, "LOGGER") as mock_logger: + summary = benchmark._run_case( + input_path=tmpdir, + extensions=["*.raw"], + max_files=1, + mode="mock", + events_path=events_path, + system_sample_interval_s=0.1, + system_load_enabled=True, + warmup=False, + group_by="source_type", + ) + + self.assertEqual(summary["status"], "complete") + mock_logger.warning.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/adsrefpipe/tests/unittests/test_perf_metrics.py b/adsrefpipe/tests/unittests/test_perf_metrics.py new file mode 100644 index 0000000..1ebc01b --- /dev/null +++ b/adsrefpipe/tests/unittests/test_perf_metrics.py @@ -0,0 +1,426 @@ +import json +import os +import tempfile +import threading +import unittest +from unittest.mock import patch + +import adsrefpipe.perf_metrics as perf_metrics + + +class TestPerfMetrics(unittest.TestCase): + + def test_source_type_from_filename(self): + self.assertEqual(perf_metrics.source_type_from_filename("foo.raw"), ".raw") + self.assertEqual(perf_metrics.source_type_from_filename("foo.jats.xml"), ".jats.xml") + self.assertEqual(perf_metrics.source_type_from_filename("foo.iop.xml"), ".iop.xml") + self.assertEqual(perf_metrics.source_type_from_filename("foo.living.xml"), ".living.xml") + self.assertEqual(perf_metrics.source_type_from_filename("foo.ref.ocr.txt"), ".ocr.txt") + self.assertEqual(perf_metrics.source_type_from_filename("0000A&A.....0.....Z.raw"), ".raw") + self.assertEqual(perf_metrics.source_type_from_filename("foo.tex"), ".tex") + self.assertEqual( + perf_metrics.raw_subfamily_from_metadata( + filename="/tmp/txt/arXiv/0/00000.raw", + parser_name="arXiv", + source_type=".raw", + ), + "raw_arxiv", + ) + self.assertEqual( + perf_metrics.raw_subfamily_from_metadata( + filename="/tmp/txt/ARA+A/0/0000ADSTEST.0.....Z.ref.raw", + parser_name="ThreeBibsTxt", + source_type=".raw", + ), + "raw_ref_raw", + ) + self.assertEqual( + perf_metrics.raw_subfamily_from_metadata( + filename="/tmp/html/PASJ/0/iss0.raw", + parser_name="PASJhtml", + source_type=".raw", + ), + "raw_pasj_html", + ) + self.assertEqual( + perf_metrics.raw_subfamily_from_metadata( + filename="/tmp/test.aas.raw", + parser_name="AAS", + source_type=".raw", + ), + "raw_aas", + ) + + def test_format_benchmark_progress_line_from_log_line(self): + line = json.dumps({ + "timestamp": "2026-04-10T18:29:31.196Z", + "message": ( + "Source file /app/adsrefpipe/tests/unittests/stubdata/test.jats.xml " + "for bibcode 0000HiA.....Z with 16 references, processed successfully." + ), + }) + + rendered = perf_metrics.format_benchmark_progress_line_from_log_line(line) + + self.assertEqual(rendered, "18:29:31 test.jats.xml with 16 references") + + def test_format_benchmark_progress_line_from_log_line_ignores_non_matches(self): + self.assertIsNone(perf_metrics.format_benchmark_progress_line_from_log_line("not-json")) + self.assertIsNone(perf_metrics.format_benchmark_progress_line_from_log_line(json.dumps({ + "timestamp": "2026-04-10T18:29:31.196Z", + "message": "Updated 1 resolved reference records successfully.", + }))) + + def test_emit_event_uses_registered_context(self): + with tempfile.TemporaryDirectory() as tmpdir: + events_path = os.path.join(tmpdir, "events.jsonl") + context_dir = os.path.join(tmpdir, "context") + config = { + "PERF_METRICS_ENABLED": False, + "PERF_METRICS_CONTEXT_DIR": context_dir, + } + os.environ["PERF_METRICS_CONTEXT_DIR"] = context_dir + try: + perf_metrics.register_run_metrics_context( + run_id="run-1", + enabled=True, + path=events_path, + context_id="ctx-1", + config=config, + context_dir=context_dir, + ) + perf_metrics.emit_event( + stage="record_wall", + run_id="run-1", + context_id="ctx-1", + record_id="rec-1", + duration_ms=5.0, + extra={"source_type": ".raw"}, + config=config, + ) + payloads = perf_metrics.load_events(events_path, run_id="run-1", context_id="ctx-1") + self.assertEqual(len(payloads), 1) + self.assertEqual(payloads[0]["record_id"], "rec-1") + self.assertEqual(payloads[0]["stage"], "record_wall") + finally: + os.environ.pop("PERF_METRICS_CONTEXT_DIR", None) + + def test_emit_event_is_safe_under_concurrent_writes(self): + with tempfile.TemporaryDirectory() as tmpdir: + events_path = os.path.join(tmpdir, "events.jsonl") + context_dir = os.path.join(tmpdir, "context") + config = { + "PERF_METRICS_ENABLED": False, + "PERF_METRICS_CONTEXT_DIR": context_dir, + } + perf_metrics.register_run_metrics_context( + run_id="run-concurrent", + enabled=True, + path=events_path, + context_id="ctx-concurrent", + config=config, + context_dir=context_dir, + ) + + def worker(worker_id): + for event_id in range(25): + perf_metrics.emit_event( + stage="record_wall", + run_id="run-concurrent", + context_id="ctx-concurrent", + record_id="worker-%d-event-%d" % (worker_id, event_id), + duration_ms=float(event_id), + extra={"source_type": ".raw"}, + config=config, + ) + + threads = [threading.Thread(target=worker, args=(worker_id,)) for worker_id in range(8)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + with open(events_path, "r") as handle: + lines = [line.strip() for line in handle if line.strip()] + + self.assertEqual(len(lines), 200) + payloads = [json.loads(line) for line in lines] + self.assertEqual(len(payloads), 200) + + def test_register_run_metrics_context_logs_on_failure(self): + with self.assertLogs("adsrefpipe.perf_metrics", level="DEBUG") as logs: + with patch("adsrefpipe.perf_metrics.json.dump", side_effect=OSError("boom")): + perf_metrics.register_run_metrics_context( + run_id="run-1", + enabled=True, + path="/tmp/events.jsonl", + context_id="ctx-1", + context_dir="/tmp/perf-metrics-test-context", + ) + self.assertIn("Failed to register metrics context", "\n".join(logs.output)) + + def test_emit_event_logs_on_failure(self): + with tempfile.TemporaryDirectory() as tmpdir: + events_path = os.path.join(tmpdir, "events.jsonl") + context_dir = os.path.join(tmpdir, "context") + config = { + "PERF_METRICS_ENABLED": False, + "PERF_METRICS_CONTEXT_DIR": context_dir, + } + perf_metrics.register_run_metrics_context( + run_id="run-emit-fail", + enabled=True, + path=events_path, + context_id="ctx-emit-fail", + config=config, + context_dir=context_dir, + ) + with self.assertLogs("adsrefpipe.perf_metrics", level="DEBUG") as logs: + with patch("adsrefpipe.perf_metrics._append_jsonl_record", side_effect=OSError("append failed")): + perf_metrics.emit_event( + stage="record_wall", + run_id="run-emit-fail", + context_id="ctx-emit-fail", + record_id="rec-1", + duration_ms=1.0, + config=config, + ) + self.assertIn("Failed to emit metrics event", "\n".join(logs.output)) + + def test_load_events_logs_on_parse_failure(self): + with tempfile.TemporaryDirectory() as tmpdir: + events_path = os.path.join(tmpdir, "events.jsonl") + with open(events_path, "w") as handle: + handle.write("{not-json}\n") + handle.write(json.dumps({"run_id": "run-1", "context_id": "ctx-1", "stage": "ok"}) + "\n") + + with self.assertLogs("adsrefpipe.perf_metrics", level="DEBUG") as logs: + payloads = perf_metrics.load_events(events_path, run_id="run-1", context_id="ctx-1") + + self.assertEqual(len(payloads), 1) + self.assertIn("Failed to parse metrics event line", "\n".join(logs.output)) + + def test_load_events_missing_file_returns_empty_list(self): + with tempfile.TemporaryDirectory() as tmpdir: + missing_path = os.path.join(tmpdir, "missing.jsonl") + payloads = perf_metrics.load_events(missing_path, run_id="run-1", context_id="ctx-1") + self.assertEqual(payloads, []) + + def test_emit_event_file_permission_failure_logs_and_does_not_raise(self): + with tempfile.TemporaryDirectory() as tmpdir: + events_path = os.path.join(tmpdir, "events.jsonl") + context_dir = os.path.join(tmpdir, "context") + config = { + "PERF_METRICS_ENABLED": False, + "PERF_METRICS_CONTEXT_DIR": context_dir, + } + perf_metrics.register_run_metrics_context( + run_id="run-permission-fail", + enabled=True, + path=events_path, + context_id="ctx-permission-fail", + config=config, + context_dir=context_dir, + ) + with self.assertLogs("adsrefpipe.perf_metrics", level="DEBUG") as logs: + with patch("adsrefpipe.perf_metrics._append_jsonl_record", side_effect=PermissionError("denied")): + perf_metrics.emit_event( + stage="record_wall", + run_id="run-permission-fail", + context_id="ctx-permission-fail", + record_id="rec-1", + duration_ms=1.0, + config=config, + ) + self.assertIn("Failed to emit metrics event", "\n".join(logs.output)) + + def test_aggregate_ads_events_groups_by_source_type(self): + events = [ + { + "ts": 1.0, + "stage": "ingest_enqueue", + "duration_ms": 2.0, + "status": "ok", + "record_id": None, + "extra": {"record_count": 2, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + { + "ts": 2.0, + "stage": "parse_dispatch", + "duration_ms": 20.0, + "status": "ok", + "record_id": None, + "extra": {"record_count": 2, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + { + "ts": 3.0, + "stage": "resolver_http", + "duration_ms": 8.0, + "status": "ok", + "record_id": "r1", + "extra": {"record_count": 1, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + { + "ts": 4.0, + "stage": "post_resolved_db", + "duration_ms": 6.0, + "status": "ok", + "record_id": "r1", + "extra": {"record_count": 1, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + { + "ts": 5.0, + "stage": "record_wall", + "duration_ms": 18.0, + "status": "ok", + "record_id": "r1", + "extra": {"record_count": 1, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + { + "ts": 6.0, + "stage": "record_wall", + "duration_ms": 22.0, + "status": "ok", + "record_id": "r2", + "extra": {"record_count": 1, "source_type": ".raw", "parser_name": "arXiv", "source_filename": "a.raw"}, + }, + ] + + summary = perf_metrics.aggregate_ads_events(events, started_at=1.0, ended_at=6.0, expected_files=1) + self.assertEqual(summary["counts"]["records_submitted"], 2) + self.assertEqual(summary["counts"]["records_processed"], 2) + self.assertIn(".raw", summary["source_type_breakdown"]) + self.assertEqual(summary["source_type_breakdown"][".raw"]["record_count"], 2) + self.assertIn("raw_arxiv", summary["raw_subfamily_breakdown"]) + self.assertEqual(summary["raw_subfamily_breakdown"]["raw_arxiv"]["record_count"], 2) + self.assertEqual(summary["per_record_metrics_ms"]["parse_stage"]["p95"], 10.0) + self.assertEqual(summary["status"], "complete") + + def test_render_markdown_and_write_json(self): + with tempfile.TemporaryDirectory() as tmpdir: + summary = { + "status": "complete", + "counts": {"files_processed": 1, "records_processed": 2}, + "throughput": {"overall_records_per_minute": 120.0, "load_adjusted_records_per_minute": 140.0}, + "duration_s": {"wall_clock": 1.0}, + "per_record_metrics_ms": { + "wall_time": {"count": 2, "mean": 10.0, "p50": 10.0, "p95": 11.0, "p99": 11.0}, + "parse_stage": {"count": 2, "mean": 3.0, "p50": 3.0, "p95": 4.0, "p99": 4.0}, + "resolver_stage": {"count": 2, "mean": 4.0, "p50": 4.0, "p95": 5.0, "p99": 5.0}, + "db_stage": {"count": 2, "mean": 2.0, "p50": 2.0, "p95": 3.0, "p99": 3.0}, + }, + "source_type_breakdown": {}, + "parser_breakdown": {}, + "system_load": { + "collection": { + "enabled": True, + "sample_count": 2, + "sample_interval_s": 1.0, + "platform": "linux", + "cpu_count": 4, + "memory_probe": "linux_meminfo", + }, + "summary": { + "loadavg_1m": {"mean": 2.0, "min": 1.0, "max": 3.0, "p50": 2.0, "p95": 2.9}, + "loadavg_5m": {"mean": 1.5, "min": 1.0, "max": 2.0, "p50": 1.5, "p95": 1.95}, + "loadavg_15m": {"mean": 1.0, "min": 0.8, "max": 1.2, "p50": 1.0, "p95": 1.18}, + "normalized_load_1m": {"mean": 0.5, "min": 0.25, "max": 0.75, "p50": 0.5, "p95": 0.72}, + "memory_total_bytes": {"mean": 1000.0, "min": 1000.0, "max": 1000.0, "p50": 1000.0, "p95": 1000.0}, + "memory_available_bytes": {"mean": 250.0, "min": 200.0, "max": 300.0, "p50": 250.0, "p95": 295.0}, + "memory_used_bytes": {"mean": 750.0, "min": 700.0, "max": 800.0, "p50": 750.0, "p95": 795.0}, + "memory_available_ratio": {"mean": 0.25, "min": 0.2, "max": 0.3, "p50": 0.25, "p95": 0.295}, + "memory_used_ratio": {"mean": 0.75, "min": 0.7, "max": 0.8, "p50": 0.75, "p95": 0.795}, + }, + }, + "run_metadata": {"mode": "mock"}, + } + json_path = os.path.join(tmpdir, "summary.json") + md_path = os.path.join(tmpdir, "summary.md") + perf_metrics.write_json(json_path, summary) + perf_metrics.render_markdown(summary, md_path) + csv_path = os.path.join(tmpdir, "summary.source_types.csv") + perf_metrics.write_source_type_csv(summary, csv_path) + self.assertTrue(os.path.exists(json_path)) + self.assertTrue(os.path.exists(md_path)) + self.assertTrue(os.path.exists(csv_path)) + with open(json_path, "r") as handle: + self.assertEqual(json.loads(handle.read())["status"], "complete") + with open(md_path, "r") as handle: + rendered = handle.read() + self.assertIn("ADS Reference Benchmark Report", rendered) + self.assertIn("Mean Raw Load", rendered) + self.assertIn("Memory used", rendered) + with open(csv_path, "r") as handle: + csv_rendered = handle.read() + self.assertIn("source_type", csv_rendered) + + def test_write_source_type_csv_blanks_missing_values(self): + with tempfile.TemporaryDirectory() as tmpdir: + csv_path = os.path.join(tmpdir, "summary.source_types.csv") + perf_metrics.write_source_type_csv( + { + "source_type_breakdown": { + ".raw": { + "file_count": 1, + "record_count": 2, + "wall_time_ms": {"mean": None, "p95": None}, + "parse_stage_ms": {"mean": None}, + "resolver_stage_ms": {"mean": None}, + "db_stage_ms": {"mean": None}, + "throughput_records_per_minute": None, + } + } + }, + csv_path, + ) + with open(csv_path, "r") as handle: + lines = [line.rstrip("\n") for line in handle] + self.assertEqual( + lines[1], + ".raw,1,2,,,,,,", + ) + + def test_aggregate_system_samples_includes_raw_host_usage(self): + samples = [ + { + "platform": "linux", + "cpu_count": 4, + "memory_probe": "linux_meminfo", + "loadavg_1m": 2.0, + "loadavg_5m": 1.5, + "loadavg_15m": 1.0, + "normalized_load_1m": 0.5, + "normalized_load_5m": 0.375, + "normalized_load_15m": 0.25, + "memory_total_bytes": 1000.0, + "memory_available_bytes": 300.0, + "memory_used_bytes": 700.0, + "memory_available_ratio": 0.3, + "memory_used_ratio": 0.7, + }, + { + "platform": "linux", + "cpu_count": 4, + "memory_probe": "linux_meminfo", + "loadavg_1m": 4.0, + "loadavg_5m": 3.0, + "loadavg_15m": 2.0, + "normalized_load_1m": 1.0, + "normalized_load_5m": 0.75, + "normalized_load_15m": 0.5, + "memory_total_bytes": 1000.0, + "memory_available_bytes": 200.0, + "memory_used_bytes": 800.0, + "memory_available_ratio": 0.2, + "memory_used_ratio": 0.8, + }, + ] + + result = perf_metrics.aggregate_system_samples(samples, enabled=True, sample_interval_s=2.0) + self.assertEqual(result["summary"]["loadavg_1m"]["mean"], 3.0) + self.assertEqual(result["summary"]["memory_available_bytes"]["min"], 200.0) + self.assertEqual(result["summary"]["memory_used_ratio"]["max"], 0.8) + + +if __name__ == "__main__": + unittest.main() diff --git a/run.py b/run.py index 96fc167..9bf55d9 100644 --- a/run.py +++ b/run.py @@ -9,6 +9,7 @@ import argparse from adsrefpipe import tasks +from adsrefpipe import perf_metrics from adsrefpipe.refparsers.handler import verify from adsrefpipe.utils import get_date_modified_struct_time, ReprocessQueryType @@ -20,6 +21,10 @@ processed_log = setup_logging('processed_subdirectories.py') +def _benchmark_continue_on_error() -> bool: + return os.getenv("PERF_BENCHMARK_CONTINUE_ON_ERROR", "").strip().lower() in {"1", "true", "yes", "on"} + + def positive_float(value: str) -> float: """ argparse type for positive floating point values. @@ -100,14 +105,48 @@ def queue_references(references: list, source_filename: str, source_bibcode: str :return: None """ resolver_service_url = config['REFERENCE_PIPELINE_SERVICE_URL'] + app.get_reference_service_endpoint(parsername) - for reference in references: - reference_task = {'reference': reference, - 'source_bibcode': source_bibcode, - 'source_filename': source_filename, - 'resolver_service_url': resolver_service_url} - # tasks.task_process_reference.delay(reference_task) - print('---here') - tasks.task_process_reference(reference_task) + event_extra = perf_metrics.build_event_extra( + source_filename=source_filename, + parser_name=parsername, + source_bibcode=source_bibcode, + record_count=len(references), + ) + perf_metrics.emit_event( + stage='ingest_enqueue', + extra=event_extra, + ) + with perf_metrics.timed_stage( + stage='queue_references', + extra=event_extra, + ): + for reference in references: + reference_task = {'reference': reference, + 'source_bibcode': source_bibcode, + 'source_filename': source_filename, + 'resolver_service_url': resolver_service_url, + 'parser_name': parsername, + 'input_extension': event_extra.get('input_extension'), + 'source_type': event_extra.get('source_type')} + try: + tasks.task_process_reference(reference_task) + except Exception as exc: + if not _benchmark_continue_on_error(): + raise + perf_metrics.emit_event( + stage='record_error', + record_id=reference.get('id'), + status='error', + extra=perf_metrics.build_event_extra( + source_filename=source_filename, + parser_name=parsername, + source_bibcode=source_bibcode, + input_extension=event_extra.get('input_extension'), + source_type=event_extra.get('source_type'), + record_count=1, + extra={'error': str(exc), 'error_type': exc.__class__.__name__}, + ), + ) + logger.error("Benchmark continuing after record failure for %s: %s" % (source_filename, str(exc))) def process_files(filenames: list) -> None: @@ -121,53 +160,69 @@ def process_files(filenames: list) -> None: :return: None """ for filename in filenames: - # from filename get the parser info - # file extension, and bibstem and volume directories are used to query database and return the parser info - # ie for filename `adsrefpipe/tests/unittests/stubdata/txt/ARA+A/0/0000ADSTEST.0.....Z.ref.raw` - # extension ref.raw, bibstem directory ARA+A and volume directory 0 is used and the - # parser info is {'name': 'ThreeBibsTxt', - # 'extension_pattern': '.ref.raw', - # 'reference_service_endpoint': '/text', - # 'matches': [[{'journal': 'AnRFM', 'volume_end': 37, 'volume_begin': 34}, - # {'journal': 'ARA+A', 'volume_end': 43, 'volume_begin': 40}, - # {'journal': 'ARNPS', 'volume_end': 56, 'volume_begin': 52}]]} - parser_dict = app.get_parser(filename) - # now map parser name to the class (see adsrefpipe/refparsers/handler.py) - # ie parser name ThreeBibsTxt is mapped to ThreeBibstemsTXTtoREFs - # 'ThreeBibsTxt': ThreeBibstemsTXTtoREFs, - # note that from the class name it is clear which type of parser this is - # (ie, this is a TXT parser implemented in module adsrefpipe/refparsers/ADStxt.py) - parser = verify(parser_dict.get('name')) - if not parser: - logger.error("Unable to detect which parser to use for the file %s." % filename) - continue - - # now read the source file - toREFs = parser(filename=filename, buffer=None) - if toREFs: - # next parse the references - parsed_references = toREFs.process_and_dispatch() - if not parsed_references: - logger.error("Unable to parse %s." % toREFs.filename) + current_filename = filename + file_event_extra = perf_metrics.build_event_extra(source_filename=filename) + with perf_metrics.timed_stage(stage='file_wall', extra=file_event_extra): + # from filename get the parser info + # file extension, and bibstem and volume directories are used to query database and return the parser info + # ie for filename `adsrefpipe/tests/unittests/stubdata/txt/ARA+A/0/0000ADSTEST.0.....Z.ref.raw` + # extension ref.raw, bibstem directory ARA+A and volume directory 0 is used and the + # parser info is {'name': 'ThreeBibsTxt', + # 'extension_pattern': '.ref.raw', + # 'reference_service_endpoint': '/text', + # 'matches': [[{'journal': 'AnRFM', 'volume_end': 37, 'volume_begin': 34}, + # {'journal': 'ARA+A', 'volume_end': 43, 'volume_begin': 40}, + # {'journal': 'ARNPS', 'volume_end': 56, 'volume_begin': 52}]]} + with perf_metrics.timed_stage(stage='parser_lookup', extra=file_event_extra): + parser_dict = app.get_parser(filename) + file_event_extra = perf_metrics.build_event_extra( + source_filename=filename, + parser_name=parser_dict.get('name'), + input_extension=parser_dict.get('extension_pattern'), + ) + # now map parser name to the class (see adsrefpipe/refparsers/handler.py) + parser = verify(parser_dict.get('name')) + if not parser: + logger.error("Unable to detect which parser to use for the file %s." % filename) continue - for block_references in parsed_references: - # save the initial records in the database, - # this is going to be useful since it allows us to be able to tell if - # anything went wrong with the service that we did not get back the - # resolved reference - references = app.populate_tables_pre_resolved_initial_status(source_bibcode=block_references['bibcode'], - source_filename=filename, - parsername=parser_dict.get('name'), - references=block_references['references']) - if not references: - logger.error("Unable to insert records from %s to db." % toREFs.filename) + with perf_metrics.timed_stage(stage='parser_init', extra=file_event_extra): + toREFs = parser(filename=filename, buffer=None) + current_filename = getattr(toREFs, 'filename', None) or filename + if toREFs: + with perf_metrics.timed_stage(stage='parse_dispatch', extra=file_event_extra): + parsed_references = toREFs.process_and_dispatch() + if not parsed_references: + logger.error("Unable to parse %s." % current_filename) continue - queue_references(references, filename, block_references['bibcode'], parser_dict.get('name')) + total_records = sum(len(block.get('references', [])) for block in parsed_references) + file_event_extra['record_count'] = total_records + for block_references in parsed_references: + block_event_extra = perf_metrics.build_event_extra( + source_filename=filename, + parser_name=parser_dict.get('name'), + source_bibcode=block_references['bibcode'], + input_extension=parser_dict.get('extension_pattern'), + record_count=len(block_references['references']), + ) + # save the initial records in the database, + # this is going to be useful since it allows us to be able to tell if + # anything went wrong with the service that we did not get back the + # resolved reference + with perf_metrics.timed_stage(stage='pre_resolved_db', extra=block_event_extra): + references = app.populate_tables_pre_resolved_initial_status(source_bibcode=block_references['bibcode'], + source_filename=filename, + parsername=parser_dict.get('name'), + references=block_references['references']) + if not references: + logger.error("Unable to insert records from %s to db." % current_filename) + continue + + queue_references(references, filename, block_references['bibcode'], parser_dict.get('name')) - else: - logger.error("Unable to process %s. Skipped!" % toREFs.filename) + else: + logger.error("Unable to process %s. Skipped!" % current_filename) def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bibcode: str = '', date_cutoff: time.struct_time = None) -> None: diff --git a/scripts/run-attached-reference-benchmark.bash b/scripts/run-attached-reference-benchmark.bash new file mode 100755 index 0000000..045f3ef --- /dev/null +++ b/scripts/run-attached-reference-benchmark.bash @@ -0,0 +1,411 @@ +#!/bin/bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +OUTPUT_DIR_REL="Reference/logs/benchmarks/attached_reference_benchmark_runs" +CONTAINER_APP_ROOT="${CONTAINER_APP_ROOT:-/app}" +INPUT_PATH="${INPUT_PATH:-${CONTAINER_APP_ROOT}/adsrefpipe/tests/unittests/stubdata}" +EXTENSIONS="*.raw,*.xml,*.txt,*.html,*.tex,*.refs,*.pairs" +MODE="mock" +MAX_FILES="" +TIMEOUT="900" +READINESS_TIMEOUT="180" +RUNNER_PATH="${RUNNER_PATH:-${CONTAINER_APP_ROOT}/scripts/run-in-container-benchmark.bash}" +TARGET_CONTAINER="reference_pipeline" +SYSTEM_SAMPLE_INTERVAL="1.0" +GROUP_BY="source_type" +DISABLE_SYSTEM_LOAD="false" +PROGRESS="true" + +usage() { + cat <<'USAGE' +Usage: ./run-attached-reference-benchmark.bash [options] + +Options: + --container NAME Target reference container name + --input-path PATH Input file or directory inside the target container + --extensions CSV Comma-separated file patterns + --max-files N Optional file cap + --mode real|mock Benchmark mode + --timeout N Benchmark timeout in seconds + --readiness-timeout N Container readiness timeout in seconds + --output-dir RELPATH Host output directory relative to wrapper root + --runner-path PATH In-container runner path + --system-sample-interval FLOAT Sampling interval in seconds + --group-by source_type|parser|none + --disable-system-load Disable in-container system sampling + --no-progress Disable live compact progress lines + --help Show this help +USAGE +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --container|--target) + TARGET_CONTAINER="$2" + shift 2 + ;; + --input-path) + INPUT_PATH="$2" + shift 2 + ;; + --extensions) + EXTENSIONS="$2" + shift 2 + ;; + --max-files) + MAX_FILES="$2" + shift 2 + ;; + --mode) + MODE="$2" + shift 2 + ;; + --timeout) + TIMEOUT="$2" + shift 2 + ;; + --readiness-timeout) + READINESS_TIMEOUT="$2" + shift 2 + ;; + --output-dir) + OUTPUT_DIR_REL="$2" + shift 2 + ;; + --runner-path) + RUNNER_PATH="$2" + shift 2 + ;; + --system-sample-interval) + SYSTEM_SAMPLE_INTERVAL="$2" + shift 2 + ;; + --group-by) + GROUP_BY="$2" + shift 2 + ;; + --disable-system-load) + DISABLE_SYSTEM_LOAD="true" + shift + ;; + --no-progress) + PROGRESS="false" + shift + ;; + --help) + usage + exit 0 + ;; + *) + echo "Unknown argument: $1" >&2 + usage >&2 + exit 1 + ;; + esac +done + +RUN_STAMP="$(date -u +"%Y%m%dT%H%M%SZ")" +OUTPUT_BASE_DIR="${SCRIPT_DIR}/${OUTPUT_DIR_REL}" +OUTPUT_DIR="${OUTPUT_BASE_DIR}/run_${RUN_STAMP}" +RUN_LOG_DIR="${OUTPUT_DIR}/run_logs" +MANIFEST_PATH="${OUTPUT_DIR}/attached_benchmark_manifest.json" +SUMMARY_PATH="${OUTPUT_DIR}/attached_benchmark_manifest.md" +WRAPPER_LOG="${OUTPUT_DIR}/attached_benchmark.log" +HOST_CONTEXT_PATH="${RUN_LOG_DIR}/host_context.json" +CONTAINER_STDOUT_PATH="${RUN_LOG_DIR}/container_runner.stdout.log" +PROGRESS_LOG_PATH="${RUN_LOG_DIR}/attached_${RUN_STAMP}.stdout.log" +PROGRESS_STOP_PATH="${RUN_LOG_DIR}/progress_display.done" +RESULT_PATH_HOST="${RUN_LOG_DIR}/attached_${RUN_STAMP}.result.json" +mkdir -p "${OUTPUT_DIR}" "${RUN_LOG_DIR}" +: > "${WRAPPER_LOG}" +rm -f "${PROGRESS_STOP_PATH}" + +log() { + local message="$1" + local timestamp + timestamp="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" + echo "[${timestamp}] ${message}" | tee -a "${WRAPPER_LOG}" +} + +check_docker_access() { + docker info >/dev/null 2>&1 +} + +check_target_container() { + docker inspect --format '{{.State.Running}}' "${TARGET_CONTAINER}" 2>/dev/null | grep -q true +} + +wait_for_container_readiness() { + local deadline=$(( $(date +%s) + READINESS_TIMEOUT )) + while [[ $(date +%s) -lt ${deadline} ]]; do + if docker exec "${TARGET_CONTAINER}" python3 -c "import os; import sys; target=os.environ.get('INPUT_PATH', '${INPUT_PATH}'); sys.exit(0 if os.path.exists(target) else 1)" >/dev/null 2>&1; then + return 0 + fi + sleep 2 + done + return 1 +} + +capture_host_context() { + local output_path="$1" + python3 - "${output_path}" <<'PY' +import json +import re +import subprocess +import sys +from datetime import datetime, timezone + +output_path = sys.argv[1] + +def run_command(args): + try: + completed = subprocess.run(args, capture_output=True, text=True, check=False) + return { + "returncode": completed.returncode, + "stdout": completed.stdout.strip(), + "stderr": completed.stderr.strip(), + } + except Exception as exc: + return { + "returncode": None, + "stdout": "", + "stderr": str(exc), + } + +def parse_load(stdout): + match = re.search(r"load averages?: ([0-9.]+)[, ]+([0-9.]+)[, ]+([0-9.]+)", stdout) + if not match: + return None + return { + "load_1m": float(match.group(1)), + "load_5m": float(match.group(2)), + "load_15m": float(match.group(3)), + } + +def parse_memory(stdout): + page_size = 4096 + page_match = re.search(r"page size of (\d+) bytes", stdout) + if page_match: + page_size = int(page_match.group(1)) + values = {} + for key in ["Pages free", "Pages inactive", "Pages speculative"]: + match = re.search(rf"{re.escape(key)}:\s+(\d+)\.", stdout) + if match: + values[key] = int(match.group(1)) + if not values: + return None + available_pages = sum(values.values()) + return { + "page_size_bytes": page_size, + "pages_free": values.get("Pages free"), + "pages_inactive": values.get("Pages inactive"), + "pages_speculative": values.get("Pages speculative"), + "available_bytes_estimate": available_pages * page_size, + } + +payload = { + "captured_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), +} + +uptime_result = run_command(["uptime"]) +payload["cpu_load"] = { + "raw": uptime_result["stdout"], + "parse": parse_load(uptime_result["stdout"]), +} + +vm_stat_result = run_command(["vm_stat"]) +payload["free_memory"] = { + "raw": vm_stat_result["stdout"], + "parse": parse_memory(vm_stat_result["stdout"]), +} + +docker_df_result = run_command(["docker", "system", "df"]) +payload["docker_disk_usage"] = docker_df_result + +with open(output_path, "w") as handle: + json.dump(payload, handle, indent=2, sort_keys=True) + handle.write("\n") +PY +} + +resolve_app_mount() { + docker inspect --format '{{range .Mounts}}{{if eq .Destination "'"${CONTAINER_APP_ROOT}"'"}}{{println .Source}}{{end}}{{end}}' "${TARGET_CONTAINER}" 2>/dev/null | awk 'NF {print; exit}' +} + +APP_HOST_DIR="$(resolve_app_mount || true)" +CONTAINER_OUTPUT_DIR="${BENCHMARK_OUTPUT_DIR:-${CONTAINER_APP_ROOT}/logs/benchmarks/attached_reference_benchmark_runs}" +CONTAINER_LABEL="attached_${RUN_STAMP}" +LOGS_HOST_DIR="${SCRIPT_DIR}/Reference/logs" + +if ! check_docker_access; then + log "Docker daemon is not reachable from this shell" + exit 1 +fi + +if ! check_target_container; then + log "Target container is not running: ${TARGET_CONTAINER}" + exit 1 +fi + +if ! wait_for_container_readiness; then + log "Target container did not become benchmark-ready within ${READINESS_TIMEOUT}s" + exit 1 +fi + +capture_host_context "${HOST_CONTEXT_PATH}" + +CONTAINER_COMMAND="${RUNNER_PATH} --input-path '${INPUT_PATH}' --extensions '${EXTENSIONS}' --mode '${MODE}' --timeout '${TIMEOUT}' --output-dir '${CONTAINER_OUTPUT_DIR}' --label '${CONTAINER_LABEL}' --run-stamp '${RUN_STAMP}' --system-sample-interval '${SYSTEM_SAMPLE_INTERVAL}' --group-by '${GROUP_BY}'" +if [[ -n "${MAX_FILES}" ]]; then + CONTAINER_COMMAND+=" --max-files '${MAX_FILES}'" +fi +if [[ "${DISABLE_SYSTEM_LOAD}" == "true" ]]; then + CONTAINER_COMMAND+=" --disable-system-load" +fi +if [[ "${PROGRESS}" == "false" ]]; then + CONTAINER_COMMAND+=" --no-progress" +fi + +PROGRESS_PID="" +if [[ "${PROGRESS}" == "true" && -n "${APP_HOST_DIR}" ]]; then + python3 - "${PROGRESS_LOG_PATH}" "${PROGRESS_STOP_PATH}" "${APP_HOST_DIR}" <<'PY' & +import os +import sys +import time + +log_path = sys.argv[1] +stop_path = sys.argv[2] +app_host_dir = sys.argv[3] +offset = 0 +seen_log_lines = set() + +if app_host_dir: + sys.path.insert(0, app_host_dir) + +from adsrefpipe import perf_metrics + + +def process_available_lines(): + global offset + if not os.path.exists(log_path): + return False + with open(log_path, "r", errors="replace") as handle: + handle.seek(offset) + lines = handle.readlines() + offset = handle.tell() + for line in lines: + progress = perf_metrics.format_benchmark_progress_line_from_log_line(line) + if progress and line not in seen_log_lines: + seen_log_lines.add(line) + print(progress, flush=True) + return bool(lines) + + +while True: + saw_lines = process_available_lines() + if os.path.exists(stop_path): + process_available_lines() + break + if not saw_lines: + time.sleep(0.5) +PY + PROGRESS_PID=$! +fi + +log "Running in-container benchmark via ${TARGET_CONTAINER}" +if docker exec "${TARGET_CONTAINER}" bash -lc "${CONTAINER_COMMAND}" > "${CONTAINER_STDOUT_PATH}" 2>> "${WRAPPER_LOG}"; then + CONTAINER_EXIT_CODE=0 +else + CONTAINER_EXIT_CODE=$? +fi + +if [[ -n "${PROGRESS_PID}" ]]; then + : > "${PROGRESS_STOP_PATH}" + wait "${PROGRESS_PID}" || true + rm -f "${PROGRESS_STOP_PATH}" +fi + +if ! python3 - "${RESULT_PATH_HOST}" "${CONTAINER_STDOUT_PATH}" "${MANIFEST_PATH}" "${SUMMARY_PATH}" "${HOST_CONTEXT_PATH}" "${TARGET_CONTAINER}" "${APP_HOST_DIR}" "${LOGS_HOST_DIR}" "${CONTAINER_EXIT_CODE}" "${CONTAINER_APP_ROOT}" <<'PY' +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +result_path = Path(sys.argv[1]) +stdout_path = Path(sys.argv[2]) +manifest_path = Path(sys.argv[3]) +summary_path = Path(sys.argv[4]) +host_context_path = Path(sys.argv[5]) +target_container = sys.argv[6] +app_host_dir = sys.argv[7] +logs_host_dir = sys.argv[8] +container_exit_code = int(sys.argv[9]) +container_app_root = sys.argv[10] + +result = None +if result_path.exists(): + try: + candidate = json.loads(result_path.read_text()) + if isinstance(candidate, dict): + result = candidate + except json.JSONDecodeError: + result = None + +host_context = None +if host_context_path.exists(): + host_context = json.loads(host_context_path.read_text()) + +payload = { + "generated_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "target_container": target_container, + "container_exit_code": container_exit_code, + "app_host_dir": app_host_dir or None, + "host_context": host_context, + "result": result, +} + +if result and app_host_dir: + for key in ["artifact_json", "artifact_markdown", "artifact_source_type_csv", "stdout_log", "run_dir"]: + value = result.get(key) + container_logs_prefix = f"{container_app_root}/logs/" + container_app_prefix = f"{container_app_root}/" + if isinstance(value, str) and value.startswith(container_logs_prefix): + result[f"{key}_host"] = str(Path(logs_host_dir) / value[len(container_logs_prefix):]) + elif isinstance(value, str) and value.startswith(container_app_prefix): + result[f"{key}_host"] = str(Path(app_host_dir) / value[len(container_app_prefix):]) + +manifest_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") + +host = host_context or {} +cpu_1m = (((host.get("cpu_load") or {}).get("parse") or {}).get("load_1m")) +free_bytes = (((host.get("free_memory") or {}).get("parse") or {}).get("available_bytes_estimate")) +free_gb = round(free_bytes / (1024 ** 3), 2) if isinstance(free_bytes, (int, float)) else "" +lines = [ + "# Attached Reference Benchmark", + "", + f"- Target container: `{target_container}`", + f"- Container exit code: `{container_exit_code}`", + f"- Artifact JSON: `{(result or {}).get('artifact_json', '')}`", + f"- Artifact JSON host path: `{(result or {}).get('artifact_json_host', '')}`", + "", + "| Status | Throughput | Load-Adj | Files | Records | Per-Record Wall ms | CPU 1m | Free GB |", + "| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| {status} | {throughput} | {load_adjusted} | {files} | {records} | {wall} | {cpu_1m} | {free_gb} |".format( + status=(result or {}).get("status", ""), + throughput=(result or {}).get("throughput", ""), + load_adjusted=(result or {}).get("load_adjusted_throughput", ""), + files=(result or {}).get("files_processed", ""), + records=(result or {}).get("records_processed", ""), + wall=(result or {}).get("per_record_wall_mean_ms", ""), + cpu_1m=cpu_1m if cpu_1m is not None else "", + free_gb=free_gb, + ), +] +summary_path.write_text("\n".join(lines) + "\n") +PY +then + exit 1 +fi + +log "Attached benchmark manifest: ${MANIFEST_PATH}" +log "Attached benchmark summary: ${SUMMARY_PATH}" diff --git a/scripts/run-in-container-benchmark.bash b/scripts/run-in-container-benchmark.bash new file mode 100755 index 0000000..a6cd9ea --- /dev/null +++ b/scripts/run-in-container-benchmark.bash @@ -0,0 +1,300 @@ +#!/bin/bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +APP_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" +DEFAULT_APP_ROOT="${APP_DIR:-/app}" +APP_ROOT="${CONTAINER_APP_ROOT:-${DEFAULT_APP_ROOT}}" +export PYTHONPATH="${APP_DIR}:${PYTHONPATH:-}" + +INPUT_PATH="${BENCHMARK_INPUT_PATH:-${APP_ROOT}/adsrefpipe/tests/unittests/stubdata}" +EXTENSIONS="*.raw,*.xml,*.txt,*.html,*.tex,*.refs,*.pairs" +MODE="mock" +MAX_FILES="" +TIMEOUT="900" +OUTPUT_DIR="${BENCHMARK_OUTPUT_DIR:-${APP_ROOT}/logs/benchmarks/container_benchmark_runs}" +EVENTS_PATH="${BENCHMARK_EVENTS_PATH:-${APP_ROOT}/logs/benchmarks/perf_events.jsonl}" +LABEL="" +RUN_STAMP="" +SYSTEM_SAMPLE_INTERVAL="1.0" +DISABLE_SYSTEM_LOAD="false" +GROUP_BY="source_type" +WARMUP="true" +PROGRESS="true" + +usage() { + cat <&2 + usage >&2 + exit 1 + ;; + esac +done + +RUN_STAMP="${RUN_STAMP:-$(date -u +"%Y%m%dT%H%M%SZ")}" +RUN_LABEL="${LABEL:-benchmark_${RUN_STAMP}}" +RUN_DIR="${OUTPUT_DIR}/run_${RUN_STAMP}" +RUN_LOG_DIR="${RUN_DIR}/run_logs" +STDOUT_PATH="${RUN_LOG_DIR}/${RUN_LABEL}.stdout.log" +RESULT_PATH="${RUN_LOG_DIR}/${RUN_LABEL}.result.json" +PROGRESS_STOP_PATH="${RUN_LOG_DIR}/${RUN_LABEL}.progress.done" +mkdir -p "${RUN_DIR}" "${RUN_LOG_DIR}" +rm -f "${PROGRESS_STOP_PATH}" + +if [[ ! -e "${INPUT_PATH}" ]]; then + cat > "${RESULT_PATH}" < "${STDOUT_PATH}" 2>&1 & +BENCHMARK_PID=$! + +PROGRESS_PID="" +if [[ "${PROGRESS}" == "true" ]]; then + python3 - "${STDOUT_PATH}" "${PROGRESS_STOP_PATH}" <<'PY' & +import os +import sys +import time + +from adsrefpipe import perf_metrics + +log_path = sys.argv[1] +stop_path = sys.argv[2] +offset = 0 +seen_log_lines = set() + + +def process_available_lines(): + global offset + if not os.path.exists(log_path): + return False + with open(log_path, "r", errors="replace") as handle: + handle.seek(offset) + lines = handle.readlines() + offset = handle.tell() + for line in lines: + progress = perf_metrics.format_benchmark_progress_line_from_log_line(line) + if progress and line not in seen_log_lines: + seen_log_lines.add(line) + print(progress, flush=True) + return bool(lines) + + +while True: + saw_lines = process_available_lines() + if os.path.exists(stop_path): + process_available_lines() + break + if not saw_lines: + time.sleep(0.5) +PY + PROGRESS_PID=$! +fi + +if wait "${BENCHMARK_PID}"; then + BENCHMARK_EXIT_CODE=0 +else + BENCHMARK_EXIT_CODE=$? +fi +if [[ -n "${PROGRESS_PID}" ]]; then + : > "${PROGRESS_STOP_PATH}" + wait "${PROGRESS_PID}" || true + rm -f "${PROGRESS_STOP_PATH}" +fi + +BENCHMARK_EXIT_CODE="${BENCHMARK_EXIT_CODE:-0}" + +if ! python3 - "${STDOUT_PATH}" "${RESULT_PATH}" "${RUN_DIR}" "${RUN_LABEL}" "${BENCHMARK_EXIT_CODE}" <<'PY' +import json +import shutil +import sys +from pathlib import Path + +from adsrefpipe import perf_metrics + +stdout_path = Path(sys.argv[1]) +result_path = Path(sys.argv[2]) +run_dir = Path(sys.argv[3]) +run_label = sys.argv[4] +benchmark_exit_code = int(sys.argv[5]) + +payload = { + "status": "failed" if benchmark_exit_code else "complete", + "benchmark_exit_code": benchmark_exit_code, + "run_dir": str(run_dir), + "stdout_log": str(stdout_path), +} + +stdout = stdout_path.read_text() if stdout_path.exists() else "" +decoder = json.JSONDecoder() +benchmark_result = None +for index, char in enumerate(stdout): + if char != "{": + continue + try: + candidate, _ = decoder.raw_decode(stdout[index:]) + except json.JSONDecodeError: + continue + if isinstance(candidate, dict): + benchmark_result = candidate + +if benchmark_result is None: + payload["status"] = "invalid" if benchmark_exit_code == 0 else payload["status"] + payload["error"] = "failed to find final benchmark JSON object in stdout" +else: + payload["status"] = benchmark_result.get("status", payload["status"]) + raw_json_value = benchmark_result.get("json") + raw_md_value = benchmark_result.get("markdown") + raw_csv_value = benchmark_result.get("source_type_csv") + raw_json = Path(raw_json_value) if raw_json_value else None + raw_md = Path(raw_md_value) if raw_md_value else None + raw_csv = Path(raw_csv_value) if raw_csv_value else None + if raw_json is not None and raw_json.is_file(): + summary = json.loads(raw_json.read_text()) + stable_json = run_dir / f"{run_label}.json" + stable_md = run_dir / f"{run_label}.md" + stable_csv = run_dir / f"{run_label}.source_types.csv" + stable_json.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + payload["artifact_json"] = str(stable_json) + payload["throughput"] = ((summary.get("throughput") or {}).get("overall_records_per_minute")) + payload["load_adjusted_throughput"] = ((summary.get("throughput") or {}).get("load_adjusted_records_per_minute")) + payload["wall_duration_s"] = ((summary.get("duration_s") or {}).get("wall_clock")) + payload["records_processed"] = ((summary.get("counts") or {}).get("records_processed")) + payload["files_processed"] = ((summary.get("counts") or {}).get("files_processed")) + payload["per_record_wall_mean_ms"] = ((((summary.get("per_record_metrics_ms") or {}).get("wall_time") or {}).get("mean"))) + payload["source_type_breakdown"] = summary.get("source_type_breakdown") + payload["raw_subfamily_breakdown"] = summary.get("raw_subfamily_breakdown") + if raw_md is not None and raw_md.is_file(): + shutil.copy2(raw_md, stable_md) + payload["artifact_markdown"] = str(stable_md) + else: + perf_metrics.render_markdown(summary, str(stable_md)) + payload["artifact_markdown"] = str(stable_md) + payload["artifact_markdown_generated_from_json"] = True + if raw_csv is not None and raw_csv.is_file(): + shutil.copy2(raw_csv, stable_csv) + payload["artifact_source_type_csv"] = str(stable_csv) + +result_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") +print("Container benchmark result: %s" % result_path) +if payload.get("artifact_markdown"): + print("Container benchmark summary: %s" % payload["artifact_markdown"]) +if payload.get("artifact_source_type_csv"): + print("Container benchmark source-type CSV: %s" % payload["artifact_source_type_csv"]) +print("Container benchmark stdout log: %s" % stdout_path) +PY +then + exit 1 +fi