Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ new fusion-move / proposal-based / contraction-based solver:
- `detail/threading.hxx::parallel_for_chunks` — the only threading primitive
we use. New parallel solvers should not introduce alternatives.

### Freeze graphs before parallel fan-out (thread-safety contract)

`UndirectedGraph` (and its subclasses) build the CSR adjacency *lazily*: the
first `node_adjacency` read on a graph built incrementally rebuilds it through
a `mutable` write inside a `const` method, and that rebuild is **not
thread-safe**. Graphs are "dirty" (not yet built) after `insert_edge`-based
construction — this includes the `from_edges` binding, `region_adjacency_graph`,
and `GridGraph` (which defers the build on purpose). `from_sorted_unique_edges`
returns an already-frozen graph.

Rule: **any algorithm that reads `node_adjacency` from `parallel_for_chunks`
(or other threads) MUST call `graph.freeze()` on the calling thread before the
fan-out.** "Reads `node_adjacency`" includes the indirect readers
`breadth_first_search`, `extract_subgraph_from_nodes`, and the multicut
sub-solver `greedy_additive` (via `DynamicGraph::reset`, which sizes per-node
adjacency from `graph.node_adjacency(node)`). Edge-only iteration
(`uv`, `uv_ids`, `number_of_edges`) and `find_edge` (edge-lookup hashmap) do
*not* trigger the rebuild and need no freeze. Symptoms when this is missed:
nondeterministic results for fixed input and intermittent segfaults.

Canonical fixed call sites that already follow this (copy the pattern):
`lifted_multicut/lifted_from_node_labels.hxx` and both
`{multicut,lifted_multicut}/fusion_move.hxx::FusionMoveSolver::optimize`
(the lifted driver freezes the *base* graph — its proposal generators read base
adjacency, while its warm-start only touches the lifted graph).

When porting fusion moves to a new objective (e.g. lifted multicut):

1. The driver loop in `multicut/fusion_move.hxx::FusionMoveSolver::optimize`
Expand Down
15 changes: 11 additions & 4 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,17 @@ Important differences:
- `graph.clone()` returns an independent deep copy. The C++ class is
move-only (it owns a CSR adjacency buffer), so prefer this over
reassignment-by-value.
- `graph.freeze()` eagerly builds the internal adjacency. Call it after a
batch of `insert_edge` calls if you intend to hand the graph to multiple
reader threads, or if you want to ensure subsequent `node_adjacency`
reads carry no first-call rebuild cost.
- The internal adjacency is built *lazily* on the first `node_adjacency`
read, and that lazy build is **not thread-safe**. The built-in
multi-threaded algorithms freeze the graph internally before fanning out, so
passing a graph straight into them is safe. But if you build a graph and then
share it across **your own** threads (concurrent `node_adjacency` reads, a
BFS, etc.), call `graph.freeze()` once on the construction thread first —
racing the first read across threads corrupts the adjacency (nondeterministic
results, possible crashes). `freeze()` eagerly builds the adjacency and is a
no-op once built; it also removes the first-call rebuild cost from later
`node_adjacency` reads. This applies to all graph types (`GridGraph2D`,
`GridGraph3D`, `RegionAdjacencyGraph`).

Common method/property mapping:

Expand Down
154 changes: 154 additions & 0 deletions development/graph/lifted_multicut/repro_lifted_edges_segfault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#!/usr/bin/env python
"""Self-contained reproduction of the bug in
``bioimage_cpp.graph.lifted_multicut.lifted_edges_from_node_labels``.

The function misbehaves once the graph grows past a few hundred nodes. The bug is
*nondeterministic* and shows up in two ways on the very same (deterministic) input:

1. It intermittently SIGSEGVs.
2. When it does return, the number of lifted edges varies from run to run and disagrees
with the reference ``nifty.distributed.liftedNeighborhoodFromNodeLabels`` (which is
stable). E.g. for a 2000-node chain at depth 3 we have seen 3288, 3837, ... while nifty
consistently returns 3993.

A varying result for fixed input plus occasional crashes is the classic signature of a memory
error (out-of-bounds read/write) in the C++ implementation. It reproduces with a trivial chain
graph -- no RAG or production-scale data required -- and is independent of node-label values,
``graph_depth`` and ``mode``. The RegionAdjacencyGraph path tends to crash the most reliably.

Minimal trigger (run on its own a few times: some runs crash, others print a different count):

import numpy as np
import bioimage_cpp as bic

n = 2000
uv = np.array([(i, i + 1) for i in range(n - 1)], dtype="uint64") # a simple chain
g = bic.graph.UndirectedGraph.from_edges(n, uv)
out = bic.graph.lifted_multicut.lifted_edges_from_node_labels(
g, np.zeros(n, "uint64"), graph_depth=3, mode="all")
print(len(out)) # -> Segmentation fault, or a different number each run

Each configuration below is run several times, each in its own child process, so a crash does
not abort the sweep and the run-to-run variation is visible.

Run it with:

python repro_lifted_edges_segfault.py
"""
import multiprocessing as mp
import queue as _queue
import signal

import numpy as np
import bioimage_cpp as bic

try:
import nifty.distributed as ndist
except ImportError:
ndist = None

GRAPH_DEPTH = 3
MODE = "all"
NODE_LADDER = (100, 500, 1000, 2000)
REPS = 5


def _chain_edges(n_nodes):
"""A simple connected chain 0-1-2-...-(n-1); enough to trigger the bug."""
return np.array([(i, i + 1) for i in range(n_nodes - 1)], dtype="uint64")


def _chain_segmentation(n_nodes):
"""A labeled volume whose region adjacency graph is the same chain of ``n_nodes`` nodes."""
return np.repeat(np.arange(n_nodes, dtype="uint32"), 16).reshape(n_nodes, 4, 4)


# --- workers (each invocation runs in its own process) -------------------------------------
def _bic_undirected(n_nodes, q):
g = bic.graph.UndirectedGraph.from_edges(n_nodes, _chain_edges(n_nodes))
node_labels = np.ones(n_nodes, dtype="uint64") # label values are irrelevant to the bug
out = bic.graph.lifted_multicut.lifted_edges_from_node_labels(
g, node_labels, graph_depth=GRAPH_DEPTH, mode=MODE)
q.put(len(out))


def _bic_rag(n_nodes, q):
rag = bic.graph.region_adjacency_graph(_chain_segmentation(n_nodes))
node_labels = np.ones(rag.numberOfNodes, dtype="uint64")
out = bic.graph.lifted_multicut.lifted_edges_from_node_labels(
rag, node_labels, graph_depth=GRAPH_DEPTH, mode=MODE)
q.put(len(out))


def _nifty_undirected(n_nodes, q):
g = ndist.Graph(_chain_edges(n_nodes))
node_labels = np.ones(n_nodes, dtype="uint64") # non-zero so ignoreLabel=0 keeps every pair
out = ndist.liftedNeighborhoodFromNodeLabels(
g, node_labels, GRAPH_DEPTH, mode=MODE, numberOfThreads=1, ignoreLabel=0)
q.put(len(out))


def _run_once(worker, n_nodes, timeout=120):
"""Run ``worker(n_nodes, queue)`` in a child process; return its lifted count or a status."""
q = mp.Queue()
p = mp.Process(target=worker, args=(n_nodes, q))
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
p.join()
return "timeout"
if p.exitcode == -signal.SIGSEGV:
return "segfault"
if p.exitcode == -signal.SIGABRT:
return "abort" # glibc "double free or corruption"
if p.exitcode != 0:
return f"exit {p.exitcode}"
try:
return q.get(timeout=5)
except _queue.Empty:
return "no-result"


def _summary(worker, n_nodes):
"""Run ``worker`` REPS times and summarise crashes and the distinct lifted-edge counts."""
results = [_run_once(worker, n_nodes) for _ in range(REPS)]
crash_labels = ("segfault", "abort")
crashes = sum(1 for r in results if r in crash_labels)
counts = sorted({r for r in results if isinstance(r, int)})
others = [r for r in results if not isinstance(r, int) and r not in crash_labels]

parts = []
if crashes:
kinds = "/".join(sorted({r.upper() for r in results if r in crash_labels}))
parts.append(f"{crashes}/{REPS} {kinds}")
if counts:
flag = " <- NONDETERMINISTIC" if len(counts) > 1 else ""
parts.append("lifted=" + ",".join(map(str, counts)) + flag)
parts.extend(sorted(set(others)))
return "; ".join(parts) if parts else "no output"


def main():
print("Reproducing the lifted_edges_from_node_labels bug "
f"(graph_depth={GRAPH_DEPTH}, mode={MODE!r}, {REPS} runs per case).\n")

workers = [("bic UndirectedGraph ", _bic_undirected),
("bic RegionAdjacencyGraph", _bic_rag)]
if ndist is not None:
workers.append(("nifty (reference) ", _nifty_undirected))

for n in NODE_LADDER:
print(f"=== {n} nodes ===")
for name, worker in workers:
print(f" {name} : {_summary(worker, n)}")
print()

print("Reference (nifty) returns a single stable count; bic varies and/or crashes -> "
"memory corruption in lifted_edges_from_node_labels.")


if __name__ == "__main__":
# "spawn" gives each case a fresh interpreter, so a crash is cleanly attributed.
mp.set_start_method("spawn", force=True)
main()
8 changes: 8 additions & 0 deletions include/bioimage_cpp/graph/lifted_multicut/fusion_move.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public:
return objective.labels();
}

// Proposal generators read base_graph.node_adjacency() concurrently in the
// stage-1 parallel region (the greedy-additive generator does, via
// DynamicGraph::reset). The lazy CSR rebuild is not thread-safe, and the
// warm-start below freezes the *lifted* graph, not the base graph, so freeze
// the base graph on this thread before fan-out. See UndirectedGraph
// thread-safety. (The lifted graph is only read by edge iteration here.)
base_graph.freeze();

const auto effective_threads = ::bioimage_cpp::detail::normalize_thread_count(
number_of_threads_, number_of_parallel_proposals_
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ std::vector<bioimage_cpp::detail::Edge> lifted_edges_from_node_labels(
return {};
}

// The CSR adjacency is rebuilt lazily on the first node_adjacency() read and
// that rebuild is not thread-safe. Freeze it on this thread before the
// parallel BFS fan-out below so worker threads only ever do const reads of
// an already-built adjacency (see the UndirectedGraph thread-safety notes).
graph.freeze();

const auto n_threads = bioimage_cpp::detail::normalize_thread_count(
number_of_threads, n_nodes
);
Expand Down
7 changes: 7 additions & 0 deletions include/bioimage_cpp/graph/multicut/fusion_move.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public:
return objective.labels();
}

// Proposal generators may read graph.node_adjacency() concurrently in the
// stage-1 parallel region (the greedy-additive generator does, via
// DynamicGraph::reset). The lazy CSR rebuild is not thread-safe, and the
// warm-start below only freezes the graph for a singleton initial labeling,
// so freeze on this thread before fan-out. See UndirectedGraph thread-safety.
graph.freeze();

// One workspace per worker thread; reused across the warm-start, every
// pairwise fuse, and the stage-2 joint fuse.
const auto effective_threads = ::bioimage_cpp::detail::normalize_thread_count(
Expand Down
26 changes: 20 additions & 6 deletions include/bioimage_cpp/graph/undirected_graph.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@ struct Adjacency {
// `rebuild_adjacency_from_edges()` explicitly, which keeps reads cheap and
// thread-safe.
//
// Thread safety: as long as a graph is "frozen" before being shared with
// reader threads (no concurrent inserts, no first-read-from-dirty-state
// race), reads of `node_adjacency` are safe to share across threads. The
// lazy rebuild is not internally synchronized — call
// `rebuild_adjacency_from_edges()` once on the construction thread before
// fan-out if you built the graph via `insert_edge*`.
// Thread safety: the lazy rebuild is not internally synchronized. If two
// threads each take the first `node_adjacency` read on a still-dirty graph
// they race on the rebuild — concurrently reallocating `adjacency_data_` and
// overwriting `adjacency_offsets_` — which corrupts the CSR (garbage neighbor
// ids, out-of-bounds reads) and intermittently segfaults. The rule:
//
// Any algorithm that reads `node_adjacency` (directly, or via
// `breadth_first_search`, `extract_subgraph_from_nodes`, or a sub-solver
// such as `multicut::greedy_additive`'s `DynamicGraph::reset`) from
// `parallel_for_chunks` or other threads MUST `freeze()` the graph on the
// calling thread *before* the fan-out.
//
// Once frozen (or built via `from_sorted_unique_edges`, which rebuilds the CSR
// eagerly), the graph has no mutable read path and is safe to share by
// `const&` across reader threads. Graphs built incrementally via `insert_edge*`
// (including the `from_edges` binding and `region_adjacency_graph`) start dirty.
class UndirectedGraph {
public:
using NodeId = std::uint64_t;
Expand Down Expand Up @@ -139,6 +149,10 @@ public:
return edges_;
}

// Adjacency slice of `node`. The first call on a dirty graph triggers a
// non-thread-safe lazy CSR rebuild (mutable write through this `const`
// method); call `freeze()` on the construction thread before sharing the
// graph with concurrent readers. See the class-level thread-safety note.
[[nodiscard]] AdjacencyList node_adjacency(const NodeId node) const {
validate_node(node);
ensure_adjacency_built();
Expand Down
10 changes: 9 additions & 1 deletion src/bindings/graph.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,15 @@ void bind_graph(nb::module_ &m) {
nb::arg("nodes")
)
.def("edges_from_node_list", &graph_edges_from_node_list, nb::arg("nodes"))
.def("freeze", &Graph::freeze)
.def(
"freeze",
&Graph::freeze,
"Build the internal adjacency representation now (it is otherwise "
"built lazily on first use). Call this on the construction thread "
"before sharing the graph with concurrent reader threads: the lazy "
"build is not thread-safe. No-op if already built; safe to call "
"repeatedly."
)
.def("clone", &Graph::clone)
.def_static(
"from_edges",
Expand Down
19 changes: 19 additions & 0 deletions src/bioimage_cpp/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@
(with and without semantic constraints).
- :mod:`bioimage_cpp.graph.features` — edge-feature accumulation on RAGs and
grid graphs.

Thread safety
-------------
All graph types (:class:`UndirectedGraph`, :class:`GridGraph2D`,
:class:`GridGraph3D`, :class:`RegionAdjacencyGraph`) build their internal
adjacency representation *lazily*, on the first call that reads it. The
built-in multi-threaded algorithms freeze the graph internally before fanning
out, so passing a graph straight into them is safe and needs no extra step.

If you build a graph yourself and then share it across **your own** threads
(reading adjacency, running a BFS, etc. concurrently), call ``graph.freeze()``
once on the construction thread first: the lazy build is not thread-safe, and
racing the first read across threads corrupts the adjacency. ``freeze()`` is a
no-op on an already-built graph.
"""

from __future__ import annotations
Expand Down Expand Up @@ -46,6 +60,11 @@ class UndirectedGraph(_core.UndirectedGraph):
``0 .. number_of_nodes - 1``. Edges are inserted lazily and receive
consecutive ids in insertion order. Re-inserting an existing undirected edge
returns the existing edge id.

The adjacency representation is built lazily on first use. Before sharing a
freshly built graph across threads of your own, call :meth:`freeze` once on
the construction thread — see the module-level "Thread safety" note. The
built-in multi-threaded algorithms already freeze internally.
"""

def insert_edges(self, uvs):
Expand Down
44 changes: 44 additions & 0 deletions tests/graph/lifted_multicut/test_fusion_move.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,47 @@ def test_fusion_move_default_parallel_proposals_tracks_threads():
)
assert one_thread.number_of_parallel_proposals == 2
assert four_threads.number_of_parallel_proposals == 4


def test_greedy_proposals_parallel_is_deterministic_on_dirty_base_graph():
# Regression guard for the lazy-CSR-adjacency data race on the *base* graph.
# The greedy-additive proposal generator reads base_graph.node_adjacency()
# (via DynamicGraph::reset); with T>1 the parallel proposal slots used to
# race on the first rebuild of a not-yet-frozen base graph. Unlike the
# multicut driver, here the singleton warm-start only freezes the *lifted*
# graph, so the race is reachable from the default start. The solver now
# freezes the base graph before fan-out; the multi-threaded result must equal
# the single-threaded reference on every run.
#
# Note: a regression here can surface as a process crash (it is a data race),
# not just a value mismatch.
n = 2000
base_edges = np.array([[i, i + 1] for i in range(n - 1)], dtype=np.uint64)
base_costs = np.array(
[1.0 if i % 3 else -2.0 for i in range(n - 1)], dtype=np.float64
)
# A handful of lifted edges keeps the lifted graph small (fast warm-start)
# while the large base graph drives the parallel proposal generation.
lifted_uvs = np.array(
[[i, i + 5] for i in range(0, n - 5, 250)], dtype=np.uint64
)
lifted_costs = np.array([-3.0] * len(lifted_uvs), dtype=np.float64)
parallel_proposals = 4

def run(threads):
# Fresh base graph per run so each multi-threaded run starts dirty.
base = bic.graph.UndirectedGraph.from_edges(n, base_edges)
objective = bic.graph.lifted_multicut.LiftedMulticutObjective(
base, base_costs, lifted_uvs=lifted_uvs, lifted_costs=lifted_costs
)
solver = bic.graph.lifted_multicut.FusionMoveLiftedMulticut(
proposal_generator=bic.graph.lifted_multicut.GreedyAdditiveProposalGenerator(seed=0),
number_of_threads=threads,
number_of_parallel_proposals=parallel_proposals,
number_of_iterations=3,
)
return solver.optimize(objective)

reference = run(1)
for _ in range(15):
np.testing.assert_array_equal(run(4), reference)
Loading
Loading