Pipeline is a local, YAML-driven record processing system for messy CSV exports.
It is designed to help an operator take inconsistent source files, normalize them into canonical fields, compare them against a reference dataset, score likely matches, and export clean review-ready outputs without relying on a remote backend.
The project includes:
- a Python CLI for repeatable jobs
- a local web UI for non-terminal workflows
- configurable normalization profiles
- matching, enrichment, scoring, and export stages
- job specs that describe input files, mappings, stages, and outputs
- custom workflow-builder jobs for advanced composition
Many CSV-heavy workflows break down because source files are inconsistent:
- headers change across exports
- addresses are split or malformed
- duplicate detection depends on brittle exact matches
- operators need review buckets, not just pass/fail output
This project turns those problems into a repeatable local workflow.
This project started from real scripts and YAML workflows I originally wrote for CSV-heavy EDA operations: cleaning files, reconciling records, matching against reference data, and exporting workable outputs.
Over time, I turned that original workflow set into a more reusable local tool with a shared engine, shipped demos, tests, and a browser UI so future operators would not need to work entirely from the terminal.
I also used Codex CLI to help finish and harden the project. I leveraged it to speed up refactoring, UI work, demo/test coverage, and documentation, while building from the original workflow design and real use cases.
Pipeline can:
- normalize source files into canonical field names
- split or clean address data before matching
- compare primary records against a reference dataset
- classify rows into confident matches, review matches, and unmatched records
- enrich rows with reference-side fields
- score priority for follow-up workflows
- export result files and a
run_summary.jsonfor each run
The main entrypoint is:
python3 pipeline_runner.pyYou can:
- list available workflows
- describe workflow inputs and thresholds
- inspect headers from source files
- suggest canonical field mappings
- validate job specs
- run complete jobs
Examples:
python3 pipeline_runner.py list
python3 pipeline_runner.py describe match_records_to_reference
python3 pipeline_runner.py validate-job jobs/demo_match_job.yaml
python3 pipeline_runner.py run-job jobs/demo_match_job.yaml
python3 pipeline_runner.py run profiles/demo_split.yamlThe project also includes a browser-based local UI for operators who do not want to manage YAML by hand.
Start it with:
./run_webapp.shThen open:
http://127.0.0.1:8765
The web app supports:
- workflow selection grouped by intent
- richer workflow detail panes under the workflow picker
- file inspection
- grouped header-family inspection
- suggested field mappings
- ordered fallback mappings per canonical field
- clickable header chips that can populate mapping slots
- normalization profile selection
- a visual
custom_jobworkflow builder with grouped stage cards - a zoomable workflow canvas with arrows between steps
- live builder templates for common workflow shapes
- quick run-control presets plus collapsible advanced settings
- preset saving
- asynchronous background job runs with status polling
- output review previews
The top-level UI is now organized as:
Prep: one-file cleanup and normalizationMatch: compare, match, and custom reconciliation flowsUtilities: one-off operational jobs such as split, projection, and reference enrichment
At a high level, the system works like this:
- Load source datasets from CSV into pandas dataframes.
- Apply optional normalization profiles and text cleanup.
- Rename or coalesce source-specific headers into canonical fields.
- Build a preset or custom workflow in the visual builder.
- Run matching, enrichment, classification, scoring, and export stages through the shared execution path.
- Write output CSVs plus a
run_summary.json.
In the web app, runs are now backgrounded rather than kept on one long blocking request:
POST /api/run-job-asyncstarts the runGET /api/job-status?id=...reports queued, running, completed, or failed state- the browser polls status until results are ready
The orchestration layer is now partly engine-driven:
- shared runtime loading
- shared stage registry
- shared reporting
- preset adapters for shipped workflows
- direct custom jobs through
stage_sequence
Important code areas:
- pipeline_runner.py: CLI entrypoint and workflow execution
- core/runtime_loader.py: shared dataset loading and canonical mapping
- core/stages.py: reusable stage registry and stage runner
- core/preset_plans.py: preset stage-plan builders
- core/runtime_reporting.py: shared run summary helpers
- webapp/server.py: local HTTP UI
- core/jobs.py: runtime config and job loading
- services/workflow_service.py: workflow metadata, validation, and mapping suggestions
- normalize: normalization helpers
- reconcile: matching and reconciliation logic
- enrich: field enrichment
- score: priority scoring
- export: output projection/export logic
One common use case is comparing a new incoming file against an existing reference file to determine:
- which rows are strong matches
- which rows need manual review
- which rows are likely truly new
The fastest shipped sample job file is:
There is also a shipped synthetic demo pack for safe public walkthroughs:
- demo_data
- jobs/demo_match_job.yaml
- jobs/demo_custom_match_job.yaml
- jobs/demo_random_custom_job.yaml
- jobs/demo_full_custom_job.yaml
- jobs/demo_profiled_custom_job.yaml
- profiles/demo_enrich.yaml
- profiles/demo_extract.yaml
- profiles/demo_full_process.yaml
- profiles/demo_split.yaml
Typical operator flow:
- Inspect headers from the source files.
- Apply canonical mappings.
- When a file spreads address data across multiple families, set fallback mappings for the same canonical field. The engine will use the first non-empty mapped source.
- Use grouped header families in the Match tab to spot email, phone, identity, address, date, and money fields quickly.
- Optionally normalize messy addresses or source fields first.
- Validate the job spec.
- Run the workflow.
- Review the generated outputs and
run_summary.json.
The shipped tests now cover both matching and utility demos, including:
- custom and preset matching
- broader custom workflow-builder demos
- normalization-profile-driven imports
- full-process preset execution
- reference enrichment
- projection/extract jobs
- alternating split jobs
- address normalization and mapping regressions
Use the synthetic demo pack to learn the system by capability instead of by implementation file. The shipped match demos now come in a clear primary/reference pair and use roughly 1,000-row synthetic fixtures so the outputs feel more realistic, including a deliberate review bucket for ambiguous cases.
- jobs/demo_match_job.yaml Teaches the simplest preset-style compare and match flow.
- jobs/demo_custom_match_job.yaml
Teaches how the same match logic can run through a
custom_jobworkflow builder. - jobs/demo_random_custom_job.yaml Teaches canonical mapping from non-standard source headers.
- jobs/demo_profiled_custom_job.yaml Teaches how a normalization profile can reshape an awkward import before matching.
- jobs/demo_full_custom_job.yaml Teaches a broader custom workflow with date normalization, address normalization, dedupe, matching, address classification, contact aggregation, scoring, and multiple outputs.
- profiles/demo_enrich.yaml Teaches exact-key reference enrichment.
- profiles/demo_extract.yaml Teaches projection and simple output formatting.
- profiles/demo_split.yaml Teaches one-time utility splitting.
- profiles/demo_full_process.yaml Teaches the heavier preset path that chains normalization, dedupe, reconcile, address status, contact aggregation, and scoring.
Together these demos cover Prep, Match, Utilities, normalization profiles, presets, and custom workflow-builder jobs.
For compare and match workflows:
primaryis the incoming working file you want to evaluatereferenceis the existing system-of-record file you trust as the comparison baseline
In other words, if you are checking whether a normalized upload file already exists in a system of record:
- the normalized upload file should be
primary - the system export should be
reference
The output buckets should be read like this:
matched_records: rows in the new working file that already appear to exist in the reference exportreview_records: rows that need manual verificationnew_records: rows in the new working file that do not appear to exist in the reference export
From the repo root:
cd <repo-root>
./setup_venv.sh
./run_local.sh listWhy this is preferred:
- the repo already has environment-aware launchers
- bare
python3on this machine may not have the required packages
There is a launcher that prefers .venv, then python3:
./setup_venv.sh
./run_local.sh listIf you need to call Python directly, prefer:
./.venv/bin/python pipeline_runner.py listThe repo now has a first unit-test layer under tests.
Current coverage includes:
- address splitting behavior
- mapping suggestion regressions
- header-family classification regressions
custom_jobvalidation aroundstage_sequence
Run the suite with the repo environment:
./.venv/bin/python -m unittest discover -s tests -vDo not rely on bare system Python for the suite unless your global environment already has the required dependencies installed.
For the full day-to-day usage guide, see OPERATOR_MANUAL.md.
- DOCS_INDEX.md: documentation map
- API.md: local web API reference
- GETTING_STARTED.md: shortest operator path through the app
- OPERATOR_MANUAL.md: detailed day-to-day usage manual
- TECHNICAL_DESIGN.md: deeper architecture notes
- CUSTOM_MATCHING_GUIDE.md: practical guide for custom matching jobs
- CSV_HEADERS.md: canonical field reference
- normalization_profiles: reusable normalization definitions
- demo_data/README.md: shipped synthetic demo fixtures
This public repo ships only synthetic demo CSVs. Treat generated outputs as disposable runtime artifacts, not source data.

