Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,4 @@ __marimo__/

# Streamlit
.streamlit/secrets.toml
recidivism.ini
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,43 @@
# recidivism
# recidivism

Utilities for downloading OSV data, enriching vulnerabilities with a recidivism
metric, and cloning referenced source repositories locally.

## Configuration

Copy the default config and edit your local paths:

```bash
cp recidivism.default.ini recidivism.ini
```

Both scripts read settings from `recidivism.ini`. If that file is missing, the
scripts print guidance and fall back to `recidivism.default.ini`.

## Scripts

### 1) Download + enrich OSV vulnerabilities

```bash
python scripts/enrich_osv_recidivism.py \
--output data/osv_recidivism.jsonl
```

This script:
- downloads the OSV dump (`OSV-all.zip` by default),
- extracts all vulnerabilities,
- computes a recidivism metric using CWE recurrence and repository/fix history,
- appends recidivism details to each vulnerability and writes JSONL output.

### 2) Clone OSV referenced repositories

```bash
python scripts/clone_osv_repositories.py \
--osv-dir data/osv_dump \
--target-dir data/repos \
--update-existing
```

This script scans OSV vulnerabilities for GitHub source references and
clones/updates local copies for research workflows (organized as
`<target-dir>/<owner>/<repo>`).
13 changes: 13 additions & 0 deletions recidivism.default.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[enrich]
dump_url = https://osv-vulnerabilities.storage.googleapis.com/OSV-all.zip
archive_path = data/OSV-all.zip
extract_dir = data/osv_dump
output = data/osv_recidivism.jsonl
force_download = false
force_extract = false

[clone]
osv_dir = data/osv_dump
target_dir = data/repos
max_repos =
update_existing = false
102 changes: 102 additions & 0 deletions scripts/clone_osv_repositories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
import argparse
import subprocess
from pathlib import Path
from urllib.parse import urlparse

from osv_common import extract_repo_urls, iter_vulnerability_files, load_vulnerability
from recidivism_config import get_required_value, load_config_with_source, resolve_config_path


def clone_or_update(repo_url: str, target_dir: Path, update_existing: bool) -> None:
parsed = urlparse(repo_url)
parts = [part for part in parsed.path.split("/") if part]
if len(parts) < 2:
print(f"Warning: skipping malformed repository URL: {repo_url}")
return
owner = parts[-2]
repo_name = parts[-1][:-4] if parts[-1].endswith(".git") else parts[-1]
destination = target_dir / owner / repo_name
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.exists():
if update_existing:
result = subprocess.run(
["git", "-C", str(destination), "pull", "--ff-only"],
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
stderr = result.stderr.strip() if result.stderr else f"git pull exited with code {result.returncode}"
print(f"Warning: failed to update {destination} ({repo_url}): {stderr}")
return

result = subprocess.run(
["git", "clone", repo_url, str(destination)],
check=False,
capture_output=True,
text=True,
)
if result.returncode != 0:
stderr = result.stderr.strip() if result.stderr else f"git clone exited with code {result.returncode}"
print(f"Warning: failed to clone {repo_url}: {stderr}")


def main() -> None:
config, config_source = load_config_with_source("clone")

parser = argparse.ArgumentParser(description="Clone all repositories referenced by OSV vulnerabilities.")
parser.add_argument(
"--osv-dir",
help="Directory containing extracted OSV JSON files (overrides clone.osv_dir in recidivism.ini)",
)
parser.add_argument(
"--target-dir",
help="Directory to place local repository clones (overrides clone.target_dir in recidivism.ini)",
)
parser.add_argument(
"--max-repos",
type=int,
default=None,
help="Optional limit for number of repositories",
)
parser.add_argument(
"--update-existing",
action=argparse.BooleanOptionalAction,
default=config.getboolean("update_existing", fallback=False),
help="Run git pull on existing clones",
)
args = parser.parse_args()

try:
osv_dir = resolve_config_path(args.osv_dir or get_required_value(config, "clone", "osv_dir"))
target_dir = resolve_config_path(args.target_dir or get_required_value(config, "clone", "target_dir"))
except ValueError as error:
parser.error(f"{error} (config: {config_source})")
max_repos = args.max_repos
if max_repos is None:
max_repos_str = config.get("max_repos", fallback="").strip()
if max_repos_str:
try:
max_repos = int(max_repos_str)
except ValueError as error:
parser.error(f"Invalid clone.max_repos value '{max_repos_str}' in {config_source}: {error}")
target_dir.mkdir(parents=True, exist_ok=True)

repo_urls = set()
for path in iter_vulnerability_files(osv_dir):
vulnerability = load_vulnerability(path)
repo_urls.update(extract_repo_urls(vulnerability))

ordered_repos = sorted(repo_urls)
if max_repos is not None:
ordered_repos = ordered_repos[:max_repos]

for repo_url in ordered_repos:
clone_or_update(repo_url, target_dir, args.update_existing)

print(f"Processed {len(ordered_repos)} repositories into {target_dir}")


if __name__ == "__main__":
main()
100 changes: 100 additions & 0 deletions scripts/enrich_osv_recidivism.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
import argparse
import json
import shutil
import tarfile
import zipfile
from pathlib import Path
from urllib.request import urlretrieve

from osv_common import collect_history, iter_vulnerability_files, load_vulnerability, recidivism_for_vulnerability
from recidivism_config import get_required_value, load_config_with_source, resolve_config_path


def download_dump(url: str, destination: Path, force: bool) -> None:
if destination.exists() and not force:
return
destination.parent.mkdir(parents=True, exist_ok=True)
urlretrieve(url, destination)


def extract_dump(archive: Path, extract_dir: Path, force: bool) -> None:
if extract_dir.exists() and force:
shutil.rmtree(extract_dir)
extract_dir.mkdir(parents=True, exist_ok=True)

if archive.suffix == ".zip":
with zipfile.ZipFile(archive, "r") as zf:
zf.extractall(extract_dir)
elif archive.name.endswith(".tar.gz") or archive.suffix == ".tgz":
with tarfile.open(archive, "r:gz") as tf:
tf.extractall(extract_dir)
else:
raise ValueError(f"Unsupported archive format: {archive}")


def main() -> None:
config, config_source = load_config_with_source("enrich")

parser = argparse.ArgumentParser(description="Download OSV dump and enrich with recidivism metrics.")
parser.add_argument("--dump-url", help="Override enrich.dump_url from recidivism.ini")
parser.add_argument("--archive-path", help="Override enrich.archive_path from recidivism.ini")
parser.add_argument("--extract-dir", help="Override enrich.extract_dir from recidivism.ini")
parser.add_argument("--output", help="Override enrich.output from recidivism.ini")
parser.add_argument(
"--force-download",
action=argparse.BooleanOptionalAction,
default=config.getboolean("force_download", fallback=False),
)
parser.add_argument(
"--force-extract",
action=argparse.BooleanOptionalAction,
default=config.getboolean("force_extract", fallback=False),
)
args = parser.parse_args()

try:
dump_url = args.dump_url or get_required_value(config, "enrich", "dump_url")
archive_path = resolve_config_path(args.archive_path or get_required_value(config, "enrich", "archive_path"))
extract_dir = resolve_config_path(args.extract_dir or get_required_value(config, "enrich", "extract_dir"))
output_path = resolve_config_path(args.output or get_required_value(config, "enrich", "output"))
except ValueError as error:
parser.error(f"{error} (config: {config_source})")

download_dump(dump_url, archive_path, args.force_download)
extract_dump(archive_path, extract_dir, args.force_extract)

vulnerability_files = list(iter_vulnerability_files(extract_dir))
cwe_counts, repo_counts = collect_history(load_vulnerability(path) for path in vulnerability_files)

output_path.parent.mkdir(parents=True, exist_ok=True)
enriched_count = 0
with output_path.open("w", encoding="utf-8") as handle:
for path in vulnerability_files:
vulnerability = load_vulnerability(path)
metric = recidivism_for_vulnerability(vulnerability, cwe_counts, repo_counts)
dbs = vulnerability.setdefault("database_specific", {})
if "recidivism" in dbs:
print(f"Overwriting existing recidivism metric for vulnerability {vulnerability.get('id', 'UNKNOWN')}")
dbs["recidivism"] = metric

severity = [
item
for item in vulnerability.setdefault("severity", [])
if item.get("type") not in {"RECIDIVISM", "RECIDIVISM_ADJUSTED"}
]
severity.append({"type": "RECIDIVISM", "score": f"{metric['score']:.2f}"})
adjusted = metric["adjusted_severity_score"]
if adjusted is not None:
severity.append({"type": "RECIDIVISM_ADJUSTED", "score": f"{adjusted:.2f}"})
vulnerability["severity"] = severity

handle.write(json.dumps(vulnerability, sort_keys=True))
handle.write("\n")
enriched_count += 1

print(f"Enriched {enriched_count} vulnerabilities -> {output_path}")


if __name__ == "__main__":
main()
Loading
Loading