Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,052 changes: 1,052 additions & 0 deletions donor_completeness.patch

Large diffs are not rendered by default.

112 changes: 111 additions & 1 deletion src/clinical_etl/CSVConvert.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,101 @@ def load_manifest(manifest_file):
return result


def summarize_completeness(donor_completeness):
"""Aggregate per-donor completeness into ID-free counts.

Produces two independent partitions of all donors:
* minimal: tier_a_min_clinical_complete + tier_b_min_clinical_complete
+ incomplete_min_donors
* fulsome: tier_a_full_clinical_complete + tier_b_full_clinical_complete
+ incomplete_full_donors
A donor is counted in a tier bucket only if it meets that tier AND the
relevant completeness level; everything else (wrong/absent tier, or not
complete) falls into the matching incomplete bucket. Tier assignment is
exclusive, so a Tier A donor is never counted toward a Tier B bucket."""
summary = {
"total_donors": len(donor_completeness),
"tier_a_min_clinical_complete": 0,
"tier_b_min_clinical_complete": 0,
"incomplete_min_donors": 0,
"tier_a_full_clinical_complete": 0,
"tier_b_full_clinical_complete": 0,
"incomplete_full_donors": 0,
}
for rec in donor_completeness.values():
tier = rec["tier"]
# minimal partition
if tier == "A" and rec["minimal_complete"]:
summary["tier_a_min_clinical_complete"] += 1
elif tier == "B" and rec["minimal_complete"]:
summary["tier_b_min_clinical_complete"] += 1
else:
summary["incomplete_min_donors"] += 1
# fulsome partition
if tier == "A" and rec["fulsome_complete"]:
summary["tier_a_full_clinical_complete"] += 1
elif tier == "B" and rec["fulsome_complete"]:
summary["tier_b_full_clinical_complete"] += 1
else:
summary["incomplete_full_donors"] += 1
return summary


def build_completeness_failures(donor_completeness, tier_criteria=None):
"""Build a detailed per-donor report of every donor that is not fully
(tier + fulsome) complete, with the reasons it failed.

A donor is considered failing unless it is assigned a tier (A or B) AND is
fulsome complete. For each failing donor the report lists the offending
sample composition and/or the specific unmet minimal and fulsome fields."""
def _tier_requirement_text():
if not tier_criteria:
return "any tier"
parts = []
for tier, crit in tier_criteria.items():
desc = ", ".join(f"{n} {kind}" for kind, n in crit.items())
parts.append(f"Tier {tier} ({desc})")
return " or ".join(parts)

failing = []
for donor_id, rec in donor_completeness.items():
tiered = rec["tier"] in ("A", "B")
if tiered and rec["fulsome_complete"]:
continue # fully complete -> not a failure

reasons = []
if not tiered:
reasons.append(
f"Sample composition does not satisfy {_tier_requirement_text()}; "
f"found {rec['sample_counts'] or 'no classifiable tumour/normal DNA/RNA samples'}"
)
if not rec["minimal_complete"]:
reasons.append(
f"Fails minimal clinical completeness: {len(rec['minimal_unmet'])} field(s) missing"
)
if not rec["fulsome_complete"]:
reasons.append(
f"Fails fulsome clinical completeness: {len(rec['fulsome_unmet'])} "
f"required/conditionally-required field(s) missing"
)
failing.append({
"donor_id": donor_id,
"tier": rec["tier"],
"minimal_complete": rec["minimal_complete"],
"fulsome_complete": rec["fulsome_complete"],
"reasons": reasons,
"sample_counts": rec["sample_counts"],
"minimal_unmet": rec["minimal_unmet"],
"fulsome_unmet": rec["fulsome_unmet"],
})

return {
"total_donors": len(donor_completeness),
"failing_donors": len(failing),
"donors": failing,
}


def csv_convert(input_path, manifest_file, minify=False, index_output=False, verbose=False):
mappings.VERBOSE = verbose
# read manifest data
Expand Down Expand Up @@ -756,9 +851,24 @@ def csv_convert(input_path, manifest_file, minify=False, index_output=False, ver
schema.validate_ingest_map(result)
validation_results = {"validation_errors": schema.validation_errors,
"validation_warnings": schema.validation_warnings,
"cases_missing_data": schema.statistics["cases_missing_data"]}
"cases_missing_data": schema.statistics["cases_missing_data"],
"donor_completeness": schema.statistics.get("donor_completeness", {})}
result["statistics"] = schema.statistics
result["statistics"].pop("cases_missing_data") # remove donor IDs from _map.json file
# per-donor completeness is keyed by donor ID: keep it out of _map.json too,
# but retain an aggregate tier/level summary (no IDs) in the statistics.
donor_completeness = result["statistics"].pop("donor_completeness", {})
result["statistics"]["completeness_summary"] = summarize_completeness(donor_completeness)
# write a detailed per-donor completeness failure report (contains donor IDs,
# so it is kept out of _map.json, like the validation results)
if donor_completeness:
completeness_failures = build_completeness_failures(
donor_completeness, getattr(schema, "tier_criteria", None))
with open(f"{input_path}_completeness_failures.json", 'w') as f:
json.dump(completeness_failures, f, indent=4)
print(f"{Bcolors.OKGREEN}Completeness failure report ("
f"{completeness_failures['failing_donors']}/{completeness_failures['total_donors']} "
f"donors) written to {input_path}_completeness_failures.json{Bcolors.ENDC}")

# write ingestion and validation json files
print(f"{Bcolors.OKGREEN}Saving packets to file.{Bcolors.ENDC}")
Expand Down
36 changes: 34 additions & 2 deletions src/clinical_etl/completeness_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,39 @@ def generate_csv(input_path):
out.write(f"{k},{field},{total},{missing},{round(missing_percent)}\n")


def generate_donor_completeness_csv(input_path):
"""Write a per-donor tier/level completeness table from a
*_validation_results.json file (which holds the donor-ID-keyed records)."""
output_path = input_path.replace("_validation_results.json", "_donor_completeness.csv")
print(f"Converting {input_path} to {output_path}")
with open(input_path) as f:
donors = json.load(f).get("donor_completeness", {})
with open(output_path, "w") as out:
out.write("Donor,Tier,Level,Type,Minimal Complete,Fulsome Complete,Unmet (fulsome)\n")
for donor_id, rec in donors.items():
out.write(
f"{donor_id},{rec['tier'] or ''},{rec['level']},{rec['type']},"
f"{rec['minimal_complete']},{rec['fulsome_complete']},"
f"{'|'.join(rec['fulsome_unmet'])}\n"
)


def main(input_path):
"""Dispatch on file type: aggregate field stats from a _map.json, or the
per-donor tier/level table from a _validation_results.json."""
with open(input_path) as f:
data = json.load(f)
if "donor_completeness" in data:
generate_donor_completeness_csv(input_path)
elif "statistics" in data:
generate_csv(input_path)
else:
raise SystemExit(
"Input json has neither 'statistics' (a _map.json) nor "
"'donor_completeness' (a _validation_results.json)."
)


if __name__ == "__main__":
args = parse_args()
input_path = args.input
generate_csv(input_path)
main(args.input)
69 changes: 64 additions & 5 deletions src/clinical_etl/mohschemav3.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,64 @@ class MoHSchemaV3(BaseSchema):
}
}

# ------------------------------------------------------------------ #
# Per-donor completeness criteria (consumed by BaseSchema engine) #
# ------------------------------------------------------------------ #

# Tier = sample_registration composition. Ordered strongest-first so that
# a donor satisfying both is assigned the higher tier (A) and is therefore
# NOT also counted in the Tier B total. Criteria are cumulative: Tier A's
# required samples are a superset of Tier B's.
tier_criteria = {
"A": {"tumour_dna": 1, "tumour_rna": 1, "normal_dna": 1},
"B": {"tumour_dna": 1, "normal_dna": 1},
}

# Minimal completeness: reduced field set that must hold valid values on
# every existing instance of each object type.
minimal_criteria = {
"donors": ["gender", "sex_at_birth", "date_of_birth", "date_resolution"],
"primary_diagnoses": ["date_of_diagnosis", "cancer_type_code", "primary_site", "basis_of_diagnosis"],
"specimens": ["specimen_collection_date", "specimen_anatomic_location"],
"sample_registrations": ["specimen_tissue_source", "tumour_normal_designation", "specimen_type", "sample_type"],
}

# Nested objects every donor must have for 'fulsome' completeness. Counted
# anywhere in the donor tree (e.g. treatments live under primary_diagnoses).
required_instances = [
{"key": "treatments", "min": 1},
]

# Conditionally-required fields are NOT re-listed here. 'fulsome' completeness
# is derived directly from the validation pass: every conditional requirement
# in the validate_* methods raises warn(..., conditional_required=True), and
# those warnings are attributed per-donor and fed into the fulsome check
# (see BaseSchema._evaluate_fulsome). Soft notes / consistency warnings are
# marked conditional_required=False so they don't affect completeness.

@staticmethod
def _sample_kind(sample):
"""Classify a sample_registration as e.g. 'tumour_dna' / 'normal_dna'.

ASSUMPTION: the molecule (DNA vs RNA) is read from `sample_type`.
If the MoH model encodes it in a different field, change ONLY this
method (e.g. read 'specimen_type' or an analyte field instead)."""
designation = (sample.get("tumour_normal_designation") or "").lower()
sample_type = (sample.get("sample_type") or "").lower()
if "rna" in sample_type:
molecule = "rna"
elif "dna" in sample_type:
molecule = "dna"
else:
molecule = None
if "tumour" in designation or "tumor" in designation:
tn = "tumour"
elif "normal" in designation:
tn = "normal"
else:
tn = None
return f"{tn}_{molecule}" if (tn and molecule) else None

def validate_donors(self, map_json):
for prop in map_json:
match prop:
Expand All @@ -181,7 +239,8 @@ def validate_donors(self, map_json):
if map_json["lost_to_followup_reason"] is not None:
if "lost_to_followup_after_clinical_event_identifier" not in map_json:
self.warn(
"lost_to_followup_reason should only be submitted if lost_to_followup_after_clinical_event_identifier is submitted")
"lost_to_followup_reason should only be submitted if lost_to_followup_after_clinical_event_identifier is submitted",
conditional_required=False)
case "date_alive_after_lost_to_followup":
if map_json["date_alive_after_lost_to_followup"] is not None:
if "lost_to_followup_after_clinical_event_identifier" not in map_json:
Expand Down Expand Up @@ -239,20 +298,20 @@ def validate_donors(self, map_json):
if ('diagnosis_date' in locals() and diagnosis_date not in [None, ''] and
treatment_end not in [None, ''] and 'treatment_end' in locals() and
treatment_end < diagnosis_date):
self.warn(f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: date_of_diagnosis should be earlier than treatment_end_date ")
self.warn(f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: date_of_diagnosis should be earlier than treatment_end_date ", conditional_required=False)
if 'treatment_start' in locals() and treatment_start not in [None, '']:
if 'death' in locals() and death not in [None, ''] and treatment_start > death:
self.fail(
f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: treatment_start_date cannot be after date_of_death ")
if 'birth' in locals() and birth not in [None, ''] and treatment_start < birth and treatment_start is not None:
self.fail(f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: treatment_start_date cannot be before date_of_birth")
if 'diagnosis_date' in locals() and diagnosis_date not in [None, ''] and treatment_start < diagnosis_date:
self.warn(f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: treatment_start_date should not be before date_of_diagnosis")
self.warn(f"{diagnosis['submitter_primary_diagnosis_id']} > {treatment['submitter_treatment_id']}: treatment_start_date should not be before date_of_diagnosis", conditional_required=False)
diagnosis_values_list = list(diagnoses_dates.values())
if (len(diagnosis_values_list) > 0 and "int" in str(type(diagnosis_values_list[0])) and
0 not in diagnosis_values_list):
self.warn(f"Earliest primary_diagnosis.date_of_diagnosis.month_interval should be 0, current "
f"month_intervals: {diagnoses_dates}")
f"month_intervals: {diagnoses_dates}", conditional_required=False)
case "date_of_death":
if map_json["date_of_death"] is not None:
if map_json["is_deceased"] in ["No", "Not available"]:
Expand Down Expand Up @@ -286,7 +345,7 @@ def validate_donors(self, map_json):

def validate_primary_diagnoses(self, map_json):
if map_json["date_of_diagnosis"] is None:
self.warn("NOTE: cannot calculate any date intervals for this patient without date_of_diagnosis")
self.warn("NOTE: cannot calculate any date intervals for this patient without date_of_diagnosis", conditional_required=False)
if "clinical_tumour_staging_system" not in map_json and "pathological_tumour_staging_system" not in map_json:
self.warn("Either clinical_tumour_staging_system or pathological_staging_system is required")
for prop in map_json:
Expand Down
Loading