Skip to content

ThomasRoyProjects/csv-record-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Pipeline

Pipeline is a local, YAML-driven record processing system for messy CSV exports.

It is designed to help an operator take inconsistent source files, normalize them into canonical fields, compare them against a reference dataset, score likely matches, and export clean review-ready outputs without relying on a remote backend.

Pipeline web app overview

Pipeline workflow builder view

The project includes:

  • a Python CLI for repeatable jobs
  • a local web UI for non-terminal workflows
  • configurable normalization profiles
  • matching, enrichment, scoring, and export stages
  • job specs that describe input files, mappings, stages, and outputs
  • custom workflow-builder jobs for advanced composition

Why This Project Exists

Many CSV-heavy workflows break down because source files are inconsistent:

  • headers change across exports
  • addresses are split or malformed
  • duplicate detection depends on brittle exact matches
  • operators need review buckets, not just pass/fail output

This project turns those problems into a repeatable local workflow.

About This Project

This project started from real scripts and YAML workflows I originally wrote for CSV-heavy EDA operations: cleaning files, reconciling records, matching against reference data, and exporting workable outputs.

Over time, I turned that original workflow set into a more reusable local tool with a shared engine, shipped demos, tests, and a browser UI so future operators would not need to work entirely from the terminal.

I also used Codex CLI to help finish and harden the project. I leveraged it to speed up refactoring, UI work, demo/test coverage, and documentation, while building from the original workflow design and real use cases.

What It Does

Pipeline can:

  • normalize source files into canonical field names
  • split or clean address data before matching
  • compare primary records against a reference dataset
  • classify rows into confident matches, review matches, and unmatched records
  • enrich rows with reference-side fields
  • score priority for follow-up workflows
  • export result files and a run_summary.json for each run

Main Interfaces

CLI

The main entrypoint is:

python3 pipeline_runner.py

You can:

  • list available workflows
  • describe workflow inputs and thresholds
  • inspect headers from source files
  • suggest canonical field mappings
  • validate job specs
  • run complete jobs

Examples:

python3 pipeline_runner.py list
python3 pipeline_runner.py describe match_records_to_reference
python3 pipeline_runner.py validate-job jobs/demo_match_job.yaml
python3 pipeline_runner.py run-job jobs/demo_match_job.yaml
python3 pipeline_runner.py run profiles/demo_split.yaml

Local Web App

The project also includes a browser-based local UI for operators who do not want to manage YAML by hand.

Start it with:

./run_webapp.sh

Then open:

http://127.0.0.1:8765

The web app supports:

  • workflow selection grouped by intent
  • richer workflow detail panes under the workflow picker
  • file inspection
  • grouped header-family inspection
  • suggested field mappings
  • ordered fallback mappings per canonical field
  • clickable header chips that can populate mapping slots
  • normalization profile selection
  • a visual custom_job workflow builder with grouped stage cards
  • a zoomable workflow canvas with arrows between steps
  • live builder templates for common workflow shapes
  • quick run-control presets plus collapsible advanced settings
  • preset saving
  • asynchronous background job runs with status polling
  • output review previews

The top-level UI is now organized as:

  • Prep: one-file cleanup and normalization
  • Match: compare, match, and custom reconciliation flows
  • Utilities: one-off operational jobs such as split, projection, and reference enrichment

Architecture

At a high level, the system works like this:

  1. Load source datasets from CSV into pandas dataframes.
  2. Apply optional normalization profiles and text cleanup.
  3. Rename or coalesce source-specific headers into canonical fields.
  4. Build a preset or custom workflow in the visual builder.
  5. Run matching, enrichment, classification, scoring, and export stages through the shared execution path.
  6. Write output CSVs plus a run_summary.json.

In the web app, runs are now backgrounded rather than kept on one long blocking request:

  • POST /api/run-job-async starts the run
  • GET /api/job-status?id=... reports queued, running, completed, or failed state
  • the browser polls status until results are ready

The orchestration layer is now partly engine-driven:

  • shared runtime loading
  • shared stage registry
  • shared reporting
  • preset adapters for shipped workflows
  • direct custom jobs through stage_sequence

Important code areas:

Example Workflow

One common use case is comparing a new incoming file against an existing reference file to determine:

  • which rows are strong matches
  • which rows need manual review
  • which rows are likely truly new

The fastest shipped sample job file is:

There is also a shipped synthetic demo pack for safe public walkthroughs:

Typical operator flow:

  1. Inspect headers from the source files.
  2. Apply canonical mappings.
  3. When a file spreads address data across multiple families, set fallback mappings for the same canonical field. The engine will use the first non-empty mapped source.
  4. Use grouped header families in the Match tab to spot email, phone, identity, address, date, and money fields quickly.
  5. Optionally normalize messy addresses or source fields first.
  6. Validate the job spec.
  7. Run the workflow.
  8. Review the generated outputs and run_summary.json.

The shipped tests now cover both matching and utility demos, including:

  • custom and preset matching
  • broader custom workflow-builder demos
  • normalization-profile-driven imports
  • full-process preset execution
  • reference enrichment
  • projection/extract jobs
  • alternating split jobs
  • address normalization and mapping regressions

What The Shipped Demos Teach

Use the synthetic demo pack to learn the system by capability instead of by implementation file. The shipped match demos now come in a clear primary/reference pair and use roughly 1,000-row synthetic fixtures so the outputs feel more realistic, including a deliberate review bucket for ambiguous cases.

Together these demos cover Prep, Match, Utilities, normalization profiles, presets, and custom workflow-builder jobs.

Primary Vs Reference

For compare and match workflows:

  • primary is the incoming working file you want to evaluate
  • reference is the existing system-of-record file you trust as the comparison baseline

In other words, if you are checking whether a normalized upload file already exists in a system of record:

  • the normalized upload file should be primary
  • the system export should be reference

The output buckets should be read like this:

  • matched_records: rows in the new working file that already appear to exist in the reference export
  • review_records: rows that need manual verification
  • new_records: rows in the new working file that do not appear to exist in the reference export

Setup

From the repo root:

cd <repo-root>
./setup_venv.sh
./run_local.sh list

Why this is preferred:

  • the repo already has environment-aware launchers
  • bare python3 on this machine may not have the required packages

There is a launcher that prefers .venv, then python3:

./setup_venv.sh
./run_local.sh list

If you need to call Python directly, prefer:

./.venv/bin/python pipeline_runner.py list

Tests

The repo now has a first unit-test layer under tests.

Current coverage includes:

  • address splitting behavior
  • mapping suggestion regressions
  • header-family classification regressions
  • custom_job validation around stage_sequence

Run the suite with the repo environment:

./.venv/bin/python -m unittest discover -s tests -v

Do not rely on bare system Python for the suite unless your global environment already has the required dependencies installed.

For the full day-to-day usage guide, see OPERATOR_MANUAL.md.

Useful Files

Rule

This public repo ships only synthetic demo CSVs. Treat generated outputs as disposable runtime artifacts, not source data.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors