diff --git a/.gitignore b/.gitignore index 83972fad..f3798001 100644 --- a/.gitignore +++ b/.gitignore @@ -216,3 +216,4 @@ __marimo__/ # Streamlit .streamlit/secrets.toml +recidivism.ini diff --git a/README.md b/README.md index 3f400d5d..b586be4a 100644 --- a/README.md +++ b/README.md @@ -1 +1,43 @@ -# recidivism \ No newline at end of file +# recidivism + +Utilities for downloading OSV data, enriching vulnerabilities with a recidivism +metric, and cloning referenced source repositories locally. + +## Configuration + +Copy the default config and edit your local paths: + +```bash +cp recidivism.default.ini recidivism.ini +``` + +Both scripts read settings from `recidivism.ini`. If that file is missing, the +scripts print guidance and fall back to `recidivism.default.ini`. + +## Scripts + +### 1) Download + enrich OSV vulnerabilities + +```bash +python scripts/enrich_osv_recidivism.py \ + --output data/osv_recidivism.jsonl +``` + +This script: +- downloads the OSV dump (`OSV-all.zip` by default), +- extracts all vulnerabilities, +- computes a recidivism metric using CWE recurrence and repository/fix history, +- appends recidivism details to each vulnerability and writes JSONL output. + +### 2) Clone OSV referenced repositories + +```bash +python scripts/clone_osv_repositories.py \ + --osv-dir data/osv_dump \ + --target-dir data/repos \ + --update-existing +``` + +This script scans OSV vulnerabilities for GitHub source references and +clones/updates local copies for research workflows (organized as +`//`). diff --git a/recidivism.default.ini b/recidivism.default.ini new file mode 100644 index 00000000..3190cc9c --- /dev/null +++ b/recidivism.default.ini @@ -0,0 +1,13 @@ +[enrich] +dump_url = https://osv-vulnerabilities.storage.googleapis.com/OSV-all.zip +archive_path = data/OSV-all.zip +extract_dir = data/osv_dump +output = data/osv_recidivism.jsonl +force_download = false +force_extract = false + +[clone] +osv_dir = data/osv_dump +target_dir = data/repos +max_repos = +update_existing = false diff --git a/scripts/clone_osv_repositories.py b/scripts/clone_osv_repositories.py new file mode 100644 index 00000000..71632794 --- /dev/null +++ b/scripts/clone_osv_repositories.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +import argparse +import subprocess +from pathlib import Path +from urllib.parse import urlparse + +from osv_common import extract_repo_urls, iter_vulnerability_files, load_vulnerability +from recidivism_config import get_required_value, load_config_with_source, resolve_config_path + + +def clone_or_update(repo_url: str, target_dir: Path, update_existing: bool) -> None: + parsed = urlparse(repo_url) + parts = [part for part in parsed.path.split("/") if part] + if len(parts) < 2: + print(f"Warning: skipping malformed repository URL: {repo_url}") + return + owner = parts[-2] + repo_name = parts[-1][:-4] if parts[-1].endswith(".git") else parts[-1] + destination = target_dir / owner / repo_name + destination.parent.mkdir(parents=True, exist_ok=True) + if destination.exists(): + if update_existing: + result = subprocess.run( + ["git", "-C", str(destination), "pull", "--ff-only"], + check=False, + capture_output=True, + text=True, + ) + if result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else f"git pull exited with code {result.returncode}" + print(f"Warning: failed to update {destination} ({repo_url}): {stderr}") + return + + result = subprocess.run( + ["git", "clone", repo_url, str(destination)], + check=False, + capture_output=True, + text=True, + ) + if result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else f"git clone exited with code {result.returncode}" + print(f"Warning: failed to clone {repo_url}: {stderr}") + + +def main() -> None: + config, config_source = load_config_with_source("clone") + + parser = argparse.ArgumentParser(description="Clone all repositories referenced by OSV vulnerabilities.") + parser.add_argument( + "--osv-dir", + help="Directory containing extracted OSV JSON files (overrides clone.osv_dir in recidivism.ini)", + ) + parser.add_argument( + "--target-dir", + help="Directory to place local repository clones (overrides clone.target_dir in recidivism.ini)", + ) + parser.add_argument( + "--max-repos", + type=int, + default=None, + help="Optional limit for number of repositories", + ) + parser.add_argument( + "--update-existing", + action=argparse.BooleanOptionalAction, + default=config.getboolean("update_existing", fallback=False), + help="Run git pull on existing clones", + ) + args = parser.parse_args() + + try: + osv_dir = resolve_config_path(args.osv_dir or get_required_value(config, "clone", "osv_dir")) + target_dir = resolve_config_path(args.target_dir or get_required_value(config, "clone", "target_dir")) + except ValueError as error: + parser.error(f"{error} (config: {config_source})") + max_repos = args.max_repos + if max_repos is None: + max_repos_str = config.get("max_repos", fallback="").strip() + if max_repos_str: + try: + max_repos = int(max_repos_str) + except ValueError as error: + parser.error(f"Invalid clone.max_repos value '{max_repos_str}' in {config_source}: {error}") + target_dir.mkdir(parents=True, exist_ok=True) + + repo_urls = set() + for path in iter_vulnerability_files(osv_dir): + vulnerability = load_vulnerability(path) + repo_urls.update(extract_repo_urls(vulnerability)) + + ordered_repos = sorted(repo_urls) + if max_repos is not None: + ordered_repos = ordered_repos[:max_repos] + + for repo_url in ordered_repos: + clone_or_update(repo_url, target_dir, args.update_existing) + + print(f"Processed {len(ordered_repos)} repositories into {target_dir}") + + +if __name__ == "__main__": + main() diff --git a/scripts/enrich_osv_recidivism.py b/scripts/enrich_osv_recidivism.py new file mode 100644 index 00000000..b237e8c6 --- /dev/null +++ b/scripts/enrich_osv_recidivism.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import argparse +import json +import shutil +import tarfile +import zipfile +from pathlib import Path +from urllib.request import urlretrieve + +from osv_common import collect_history, iter_vulnerability_files, load_vulnerability, recidivism_for_vulnerability +from recidivism_config import get_required_value, load_config_with_source, resolve_config_path + + +def download_dump(url: str, destination: Path, force: bool) -> None: + if destination.exists() and not force: + return + destination.parent.mkdir(parents=True, exist_ok=True) + urlretrieve(url, destination) + + +def extract_dump(archive: Path, extract_dir: Path, force: bool) -> None: + if extract_dir.exists() and force: + shutil.rmtree(extract_dir) + extract_dir.mkdir(parents=True, exist_ok=True) + + if archive.suffix == ".zip": + with zipfile.ZipFile(archive, "r") as zf: + zf.extractall(extract_dir) + elif archive.name.endswith(".tar.gz") or archive.suffix == ".tgz": + with tarfile.open(archive, "r:gz") as tf: + tf.extractall(extract_dir) + else: + raise ValueError(f"Unsupported archive format: {archive}") + + +def main() -> None: + config, config_source = load_config_with_source("enrich") + + parser = argparse.ArgumentParser(description="Download OSV dump and enrich with recidivism metrics.") + parser.add_argument("--dump-url", help="Override enrich.dump_url from recidivism.ini") + parser.add_argument("--archive-path", help="Override enrich.archive_path from recidivism.ini") + parser.add_argument("--extract-dir", help="Override enrich.extract_dir from recidivism.ini") + parser.add_argument("--output", help="Override enrich.output from recidivism.ini") + parser.add_argument( + "--force-download", + action=argparse.BooleanOptionalAction, + default=config.getboolean("force_download", fallback=False), + ) + parser.add_argument( + "--force-extract", + action=argparse.BooleanOptionalAction, + default=config.getboolean("force_extract", fallback=False), + ) + args = parser.parse_args() + + try: + dump_url = args.dump_url or get_required_value(config, "enrich", "dump_url") + archive_path = resolve_config_path(args.archive_path or get_required_value(config, "enrich", "archive_path")) + extract_dir = resolve_config_path(args.extract_dir or get_required_value(config, "enrich", "extract_dir")) + output_path = resolve_config_path(args.output or get_required_value(config, "enrich", "output")) + except ValueError as error: + parser.error(f"{error} (config: {config_source})") + + download_dump(dump_url, archive_path, args.force_download) + extract_dump(archive_path, extract_dir, args.force_extract) + + vulnerability_files = list(iter_vulnerability_files(extract_dir)) + cwe_counts, repo_counts = collect_history(load_vulnerability(path) for path in vulnerability_files) + + output_path.parent.mkdir(parents=True, exist_ok=True) + enriched_count = 0 + with output_path.open("w", encoding="utf-8") as handle: + for path in vulnerability_files: + vulnerability = load_vulnerability(path) + metric = recidivism_for_vulnerability(vulnerability, cwe_counts, repo_counts) + dbs = vulnerability.setdefault("database_specific", {}) + if "recidivism" in dbs: + print(f"Overwriting existing recidivism metric for vulnerability {vulnerability.get('id', 'UNKNOWN')}") + dbs["recidivism"] = metric + + severity = [ + item + for item in vulnerability.setdefault("severity", []) + if item.get("type") not in {"RECIDIVISM", "RECIDIVISM_ADJUSTED"} + ] + severity.append({"type": "RECIDIVISM", "score": f"{metric['score']:.2f}"}) + adjusted = metric["adjusted_severity_score"] + if adjusted is not None: + severity.append({"type": "RECIDIVISM_ADJUSTED", "score": f"{adjusted:.2f}"}) + vulnerability["severity"] = severity + + handle.write(json.dumps(vulnerability, sort_keys=True)) + handle.write("\n") + enriched_count += 1 + + print(f"Enriched {enriched_count} vulnerabilities -> {output_path}") + + +if __name__ == "__main__": + main() diff --git a/scripts/osv_common.py b/scripts/osv_common.py new file mode 100644 index 00000000..96c9f65d --- /dev/null +++ b/scripts/osv_common.py @@ -0,0 +1,150 @@ +import json +import re +from pathlib import Path +from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple +from urllib.parse import urlparse + + +_CWE_RE = re.compile(r"CWE-\d+") +_COMMIT_RE = re.compile(r"/commit/([0-9a-fA-F]{7,40})") +_GITHUB_REPO_RE = re.compile(r"^/([^/]+)/([^/]+)") +_HEX_SHA_RE = re.compile(r"^[0-9a-fA-F]{7,40}$") +MAX_SEVERITY_SCORE = 10.0 + + +def iter_vulnerability_files(root: Path) -> Iterator[Path]: + for path in root.rglob("*.json"): + if path.is_file(): + yield path + + +def load_vulnerability(path: Path) -> Dict: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def extract_cwes(vulnerability: Dict) -> Set[str]: + cwes: Set[str] = set() + + def add_candidates(value: object) -> None: + if isinstance(value, str): + cwes.update(_CWE_RE.findall(value)) + elif isinstance(value, list): + for item in value: + add_candidates(item) + + add_candidates(vulnerability.get("database_specific", {}).get("cwe_ids")) + add_candidates(vulnerability.get("database_specific", {}).get("cwe")) + + for affected in vulnerability.get("affected", []): + dbs = affected.get("database_specific", {}) + add_candidates(dbs.get("cwe_ids")) + add_candidates(dbs.get("cwe")) + + return cwes + + +def github_repo_from_url(url: str) -> Optional[str]: + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"}: + return None + if parsed.netloc.lower() != "github.com": + return None + match = _GITHUB_REPO_RE.match(parsed.path) + if not match: + return None + owner, repo = match.groups() + if repo.endswith(".git"): + repo = repo[:-4] + return f"https://github.com/{owner}/{repo}.git" + + +def extract_repo_urls(vulnerability: Dict) -> Set[str]: + repos: Set[str] = set() + for ref in vulnerability.get("references", []): + url = ref.get("url") + if not isinstance(url, str): + continue + repo = github_repo_from_url(url) + if repo: + repos.add(repo) + return repos + + +def extract_fix_commits(vulnerability: Dict) -> Set[str]: + commits: Set[str] = set() + for affected in vulnerability.get("affected", []): + for range_entry in affected.get("ranges", []): + for event in range_entry.get("events", []): + fixed = event.get("fixed") + if isinstance(fixed, str) and _HEX_SHA_RE.match(fixed): + commits.add(fixed.lower()) + + for ref in vulnerability.get("references", []): + url = ref.get("url") + if not isinstance(url, str): + continue + match = _COMMIT_RE.search(url) + if match: + commits.add(match.group(1).lower()) + return commits + + +def parse_base_severity(vulnerability: Dict) -> Optional[float]: + for severity in vulnerability.get("severity", []): + if severity.get("type") in {"RECIDIVISM", "RECIDIVISM_ADJUSTED"}: + continue + score = severity.get("score") + if isinstance(score, str): + try: + return float(score) + except ValueError: + continue + return None + + +def collect_history( + vulnerabilities: Iterable[Dict], +) -> Tuple[Dict[str, int], Dict[str, int]]: + cwe_counts: Dict[str, int] = {} + repo_counts: Dict[str, int] = {} + + for vulnerability in vulnerabilities: + for cwe in extract_cwes(vulnerability): + cwe_counts[cwe] = cwe_counts.get(cwe, 0) + 1 + for repo in extract_repo_urls(vulnerability): + repo_counts[repo] = repo_counts.get(repo, 0) + 1 + + return cwe_counts, repo_counts + + +def recidivism_for_vulnerability( + vulnerability: Dict, + cwe_counts: Dict[str, int], + repo_counts: Dict[str, int], +) -> Dict[str, object]: + cwes = extract_cwes(vulnerability) + repos = extract_repo_urls(vulnerability) + fix_commits = extract_fix_commits(vulnerability) + + cwe_repeat_count = sum(max(cwe_counts.get(cwe, 0) - 1, 0) for cwe in cwes) + repo_repeat_count = sum(max(repo_counts.get(repo, 0) - 1, 0) for repo in repos) + + recidivism_score = float(cwe_repeat_count + repo_repeat_count) + base_score = parse_base_severity(vulnerability) + adjusted_score = ( + max(0.0, min(MAX_SEVERITY_SCORE, base_score + recidivism_score)) + if base_score is not None + else None + ) + + return { + "cwes": sorted(cwes), + "repositories": sorted(repos), + "fix_commits": sorted(fix_commits), + "cwe_repeat_count": cwe_repeat_count, + "repo_repeat_count": repo_repeat_count, + "score": recidivism_score, + "base_severity_score": base_score, + "adjusted_severity_score": adjusted_score, + } diff --git a/scripts/recidivism_config.py b/scripts/recidivism_config.py new file mode 100644 index 00000000..c344d52c --- /dev/null +++ b/scripts/recidivism_config.py @@ -0,0 +1,85 @@ +import configparser +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +LOCAL_CONFIG_FILE = REPO_ROOT / "recidivism.ini" +DEFAULT_CONFIG_FILE = REPO_ROOT / "recidivism.default.ini" + + +def load_config_with_source(section: str) -> tuple[configparser.SectionProxy, Path]: + """Load configuration for a script section. + + Reads `recidivism.ini` from the repository root when present. If it is + missing, prints local setup guidance and falls back to + `recidivism.default.ini`. Raises KeyError when the requested section is not + defined. + + Returns: + Tuple of the requested section proxy and the config file path used. + """ + config = configparser.ConfigParser() + + if LOCAL_CONFIG_FILE.exists(): + source = LOCAL_CONFIG_FILE + config.read(source, encoding="utf-8") + else: + print( + "Missing recidivism.ini. Copy recidivism.default.ini to recidivism.ini " + "and update local input/output paths." + ) + if not DEFAULT_CONFIG_FILE.exists(): + raise FileNotFoundError( + f"Could not find {LOCAL_CONFIG_FILE} or fallback {DEFAULT_CONFIG_FILE}." + ) + source = DEFAULT_CONFIG_FILE + config.read(source, encoding="utf-8") + + if section not in config: + raise KeyError(f"Missing [{section}] section in configuration file.") + + return config[section], source + + +def load_config(section: str) -> configparser.SectionProxy: + """Backwards-compatible section-only loader.""" + config_section, _ = load_config_with_source(section) + return config_section + + +def resolve_config_path(path_value: str) -> Path: + """Resolve a configured path value. + + Absolute paths are normalized directly. Relative paths are interpreted + relative to the repository root. + + Args: + path_value: Path string from configuration or CLI. + + Returns: + Fully resolved filesystem path. + """ + path = Path(path_value) + if path.is_absolute(): + return path.resolve() + return (REPO_ROOT / path).resolve() + + +def get_required_value(config: configparser.SectionProxy, section: str, key: str) -> str: + """Return a required non-empty configuration value. + + Args: + config: Configuration section containing script settings. + section: Section name for diagnostics. + key: Config key to read. + + Returns: + Non-empty configuration value. + + Raises: + ValueError: If the key is missing or empty. + """ + value = config.get(key, fallback=None) + if value is None or not value.strip(): + raise ValueError(f"Missing required config key '{key}' in section [{section}].") + return value diff --git a/tests/test_osv_common.py b/tests/test_osv_common.py new file mode 100644 index 00000000..6936ad71 --- /dev/null +++ b/tests/test_osv_common.py @@ -0,0 +1,82 @@ +import sys +import unittest +from pathlib import Path + +sys.path.insert(0, str((Path(__file__).resolve().parents[1] / "scripts"))) + +from osv_common import ( # noqa: E402 + collect_history, + extract_cwes, + extract_fix_commits, + extract_repo_urls, + recidivism_for_vulnerability, +) + + +class OsvCommonTests(unittest.TestCase): + def test_extractors(self) -> None: + vulnerability = { + "database_specific": {"cwe_ids": ["CWE-79"]}, + "affected": [ + { + "database_specific": {"cwe_ids": ["CWE-89"]}, + "ranges": [{"events": [{"fixed": "a1b2c3d4"}]}], + } + ], + "references": [ + {"url": "https://github.com/example/project"}, + {"url": "https://github.com/example/project/commit/deadbeef"}, + ], + } + + self.assertEqual(extract_cwes(vulnerability), {"CWE-79", "CWE-89"}) + self.assertEqual(extract_repo_urls(vulnerability), {"https://github.com/example/project.git"}) + self.assertEqual(extract_fix_commits(vulnerability), {"a1b2c3d4", "deadbeef"}) + + def test_recidivism_metric(self) -> None: + v1 = { + "id": "A", + "database_specific": {"cwe_ids": ["CWE-79"]}, + "severity": [{"type": "CVSS_V3", "score": "7.5"}], + "references": [{"url": "https://github.com/example/project"}], + } + v2 = { + "id": "B", + "database_specific": {"cwe_ids": ["CWE-79"]}, + "references": [{"url": "https://github.com/example/project"}], + } + + cwe_counts, repo_counts = collect_history([v1, v2]) + metric = recidivism_for_vulnerability(v1, cwe_counts, repo_counts) + + self.assertEqual(metric["cwe_repeat_count"], 1) + self.assertEqual(metric["repo_repeat_count"], 1) + self.assertEqual(metric["score"], 2.0) + self.assertEqual(metric["adjusted_severity_score"], 9.5) + + def test_adjusted_severity_is_lower_bounded(self) -> None: + vulnerability = { + "id": "NEG", + "database_specific": {"cwe_ids": []}, + "severity": [{"type": "CUSTOM", "score": "-2.0"}], + "references": [], + } + metric = recidivism_for_vulnerability(vulnerability, {}, {}) + self.assertEqual(metric["adjusted_severity_score"], 0.0) + + def test_existing_recidivism_severity_is_ignored_for_base_score(self) -> None: + vulnerability = { + "id": "EXISTING", + "database_specific": {"cwe_ids": []}, + "severity": [ + {"type": "RECIDIVISM", "score": "1.0"}, + {"type": "CVSS_V3", "score": "5.0"}, + ], + "references": [], + } + metric = recidivism_for_vulnerability(vulnerability, {}, {}) + self.assertEqual(metric["base_severity_score"], 5.0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_recidivism_config.py b/tests/test_recidivism_config.py new file mode 100644 index 00000000..cb7a70b3 --- /dev/null +++ b/tests/test_recidivism_config.py @@ -0,0 +1,50 @@ +import contextlib +import io +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str((Path(__file__).resolve().parents[1] / "scripts"))) + +from recidivism_config import get_required_value, load_config, resolve_config_path # noqa: E402 + + +class RecidivismConfigTests(unittest.TestCase): + def test_fallback_to_default_when_local_missing(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + default_config = tmp_path / "recidivism.default.ini" + default_config.write_text("[enrich]\noutput = data/out.jsonl\n", encoding="utf-8") + + output = io.StringIO() + with patch("recidivism_config.LOCAL_CONFIG_FILE", tmp_path / "recidivism.ini"), patch( + "recidivism_config.DEFAULT_CONFIG_FILE", default_config + ), contextlib.redirect_stdout(output): + section = load_config("enrich") + + self.assertFalse((tmp_path / "recidivism.ini").exists()) + self.assertEqual(section.get("output"), "data/out.jsonl") + self.assertIn("Missing recidivism.ini", output.getvalue()) + + def test_resolve_config_path_uses_repo_root_for_relative(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + with patch("recidivism_config.REPO_ROOT", tmp_path): + resolved = resolve_config_path("data/example.json") + self.assertEqual(resolved, (tmp_path / "data/example.json").resolve()) + + def test_required_value_raises_for_empty(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + config_path = tmp_path / "recidivism.ini" + config_path.write_text("[clone]\nosv_dir =\n", encoding="utf-8") + with patch("recidivism_config.LOCAL_CONFIG_FILE", config_path): + section = load_config("clone") + with self.assertRaises(ValueError): + get_required_value(section, "clone", "osv_dir") + + +if __name__ == "__main__": + unittest.main()