Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0244859
Adds red tests for version stuck
lmac-1 Mar 24, 2026
a6b2abf
adds second test
lmac-1 Mar 24, 2026
31c88d4
Rename provisioner_version_conflict_test to external_workflow_update_…
lmac-1 Mar 24, 2026
01dd46a
Fix nobody-online path: provisioner invalidates stale DocumentState o…
lmac-1 Mar 24, 2026
80fa40d
Update tests to reflect provisioner-based stale state invalidation
lmac-1 Mar 24, 2026
03a1055
Reconcile SharedDocs via Collaborate.start after provisioner import
lmac-1 Mar 25, 2026
a1f041b
Update collaboration and provisioner tests for unified reconciler path
lmac-1 Mar 25, 2026
3f29830
updates changelog
lmac-1 Mar 25, 2026
c7e1f06
Guard against nil index in phantom delete ops
lmac-1 Mar 27, 2026
9f2111e
Adds clarifying comment following self review
lmac-1 Mar 27, 2026
e333b09
Add test coverage for WorkflowChannel handle_info and WorkflowReconci…
lmac-1 Mar 27, 2026
8d42984
Merge branch 'main' into version-stuck-provisioner-2
lmac-1 Mar 27, 2026
4bab51a
Merge remote-tracking branch 'origin/main' into version-stuck-provisi…
stuartc May 4, 2026
8d3bc3e
Fix flaky collaboration tests: isolated supervisor per test, Applicat…
stuartc May 5, 2026
532db29
Decouple Mox from CollaborationCase
stuartc May 5, 2026
a23ae75
Thread `base` explicitly through collaboration spawned-process paths …
stuartc May 5, 2026
d769533
Remove Mox.set_mox_global from collaboration tests (Blocker 2)
stuartc May 5, 2026
566b12b
Reconcile only changed workflows after provisioner import
stuartc May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ and this project adheres to

### Fixed

- Fix version-stuck bug where the collaborative editor shows stale state after a
sandbox merge or CLI deploy.
[#4535](https://github.com/OpenFn/lightning/issues/4535)
- Collection storage on Project Settings, Collections no longer shows `0` for
collections holding less than one megabyte of data. The underlying counter was
always correct, the rendering now reflects values at any scale.
Expand Down
102 changes: 69 additions & 33 deletions lib/lightning/collaboration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,33 @@ defmodule Lightning.Collaborate do

Collaborate.start(user: user, workflow: workflow)
"""
alias Lightning.Accounts.User
alias Lightning.Collaboration.DocumentSupervisor
alias Lightning.Collaboration.Registry
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.Supervisor, as: SessionSupervisor
alias Lightning.Collaboration.Topology

require Logger

@pg_scope :workflow_collaboration

@spec start(opts :: Keyword.t()) :: GenServer.on_start()
def start(opts) do
base = Keyword.get(opts, :base, Topology.base())

case do_start(opts, base) do
{:error, {:error, :shared_doc_not_found}} ->
# A SharedDoc was registered in :pg but died before the Session could
# observe it (0ms auto_exit race). Yield one ms — enough for the timer
# to fire and clear :pg — then try once more from scratch.
Process.sleep(1)
do_start(opts, base)

result ->
result
end
end

defp do_start(opts, base) do
session_id = Ecto.UUID.generate()
parent_pid = Keyword.get(opts, :parent_pid, self())

Expand All @@ -52,45 +68,65 @@ defmodule Lightning.Collaborate do
"Starting collaboration for document: #{document_name} (workflow: #{workflow.id})"
)

# Ensure document supervisor exists for this document
case lookup_shared_doc(document_name) do
nil ->
Logger.info("Starting document for #{document_name}")
{:ok, _doc_supervisor_pid} = start_document(workflow, document_name)

_shared_doc_pid ->
Logger.info("Found existing document for #{document_name}")
:ok
# Ensure document supervisor exists for this document.
# Returns {:error, reason} on failure rather than raising, so callers
# such as WorkflowReconciler can handle failures gracefully.
doc_result =
case lookup_shared_doc(document_name, base) do
nil ->
Logger.info("Starting document for #{document_name}")

case start_document(workflow, document_name, base) do
{:ok, _} -> :ok
error -> error
end

_shared_doc_pid ->
Logger.info("Found existing document for #{document_name}")
:ok
end

case doc_result do
:ok ->
# Start session for this user
user_id = if is_struct(user, User), do: user.id, else: nil

SessionSupervisor.start_child(base, {
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
base: base,
name:
Topology.via(
base,
{:session, "#{document_name}:#{session_id}", user_id}
)
})

error ->
error
end

# Start session for this user
SessionSupervisor.start_child({
Session,
workflow: workflow,
user: user,
parent_pid: parent_pid,
document_name: document_name,
name: Registry.via({:session, "#{document_name}:#{session_id}", user.id})
})
end

def start_document(
%Lightning.Workflows.Workflow{} = workflow,
document_name
document_name,
base \\ Topology.base()
) do
{:ok, doc_supervisor_pid} =
SessionSupervisor.start_child(
{DocumentSupervisor,
workflow: workflow,
document_name: document_name,
name: Registry.via({:doc_supervisor, document_name})}
)

{:ok, doc_supervisor_pid}
SessionSupervisor.start_child(
base,
{DocumentSupervisor,
workflow: workflow,
document_name: document_name,
base: base,
name: Topology.via(base, {:doc_supervisor, document_name})}
)
end

defp lookup_shared_doc(document_name) do
case :pg.get_members(@pg_scope, document_name) do
defp lookup_shared_doc(document_name, base) do
case :pg.get_members(Topology.pg_scope(base), document_name) do
[] -> nil
[shared_doc_pid | _] -> shared_doc_pid
end
Expand Down
23 changes: 11 additions & 12 deletions lib/lightning/collaboration/document_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
"""
use GenServer

import Lightning.Collaboration.Registry, only: [via: 1]

alias Lightning.Collaboration.Persistence
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Topology

require Logger

@pg_scope :workflow_collaboration

def start_link(args, opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end
Expand All @@ -31,12 +28,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
def init(opts) do
workflow = Keyword.fetch!(opts, :workflow)
document_name = Keyword.fetch!(opts, :document_name)
base = Keyword.fetch!(opts, :base)

# Resolve topology references once at init time so subsequent callbacks
# don't need to re-read the Mox stub or Application env.
pg_scope = Topology.pg_scope(base)

{:ok, persistence_writer_pid} =
PersistenceWriter.start_link(
document_name: document_name,
workflow_id: workflow.id,
name: via({:persistence_writer, document_name})
base: base,
name: Topology.via(base, {:persistence_writer, document_name})
)

persistence_writer_ref = Process.monitor(persistence_writer_pid)
Expand All @@ -53,17 +56,18 @@ defmodule Lightning.Collaboration.DocumentSupervisor do
persistence_writer: persistence_writer_pid
}}
],
name: via({:shared_doc, document_name})
name: Topology.via(base, {:shared_doc, document_name})
)

# Register with :pg using document_name so versioned rooms are isolated
:ok = register_shared_doc_with_pg(document_name, shared_doc_pid)
:ok = :pg.join(pg_scope, document_name, shared_doc_pid)

shared_doc_ref = Process.monitor(shared_doc_pid)

{:ok,
%{
workflow: workflow,
pg_scope: pg_scope,
persistence_writer_pid: persistence_writer_pid,
persistence_writer_ref: persistence_writer_ref,
shared_doc_pid: shared_doc_pid,
Expand Down Expand Up @@ -137,9 +141,4 @@ defmodule Lightning.Collaboration.DocumentSupervisor do

{:stop, :normal, state |> Map.put(key, nil)}
end

# Supervisor.start_link(children, strategy: :one_for_all)
defp register_shared_doc_with_pg(document_name, shared_doc_pid) do
:pg.join(@pg_scope, document_name, shared_doc_pid)
end
end
108 changes: 7 additions & 101 deletions lib/lightning/collaboration/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ defmodule Lightning.Collaboration.Persistence do
@behaviour Yex.Sync.SharedDoc.PersistenceBehaviour

alias Lightning.Collaboration.DocumentState
alias Lightning.Collaboration.PersistenceWriter
alias Lightning.Collaboration.Session
alias Lightning.Collaboration.WorkflowSerializer

require Logger

Expand All @@ -30,7 +28,6 @@ defmodule Lightning.Collaboration.Persistence do
case DocumentState.get_checkpoint_and_updates(doc_name) do
{:ok, checkpoint, updates} ->
apply_persisted_state(doc, doc_name, checkpoint, updates)
reconcile_or_reset(doc, doc_name, workflow)

{:error, :not_found} ->
Logger.info(
Expand All @@ -46,16 +43,17 @@ defmodule Lightning.Collaboration.Persistence do

@impl true
def update_v1(state, update, doc_name, _doc) do
case PersistenceWriter.add_update(doc_name, update) do
:ok ->
state

{:error, reason} ->
case state[:persistence_writer] do
nil ->
Logger.error(
"Failed to add update to PersistenceWriter: #{inspect(reason)}"
"PersistenceWriter pid not in persistence state. document=#{doc_name}"
)

state

pid ->
GenServer.cast(pid, {:add_update, update})
state
end
end

Expand Down Expand Up @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do
DocumentState.apply_to_doc(doc, checkpoint, updates)
Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}")
end

defp reconcile_or_reset(doc, doc_name, workflow) do
workflow_map = Yex.Doc.get_map(doc, "workflow")
persisted_lock_version = extract_lock_version(workflow_map)
current_lock_version = workflow.lock_version

if stale?(persisted_lock_version, current_lock_version) do
Logger.warning("""
Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \
current: #{current_lock_version})
Discarding persisted state and reloading from database.
document=#{doc_name}
""")

clear_and_reset_workflow(doc, workflow)
else
Logger.debug(
"Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}"
)

reconcile_workflow_metadata(doc, workflow)
end
end

defp extract_lock_version(workflow_map) do
case Yex.Map.fetch(workflow_map, "lock_version") do
{:ok, version} when is_float(version) -> trunc(version)
{:ok, version} when is_integer(version) -> version
{:ok, nil} -> nil
:error -> nil
end
end

defp stale?(nil, current_version), do: not is_nil(current_version)

defp stale?(persisted_version, current_version),
do: persisted_version != current_version

defp clear_and_reset_workflow(doc, workflow) do
# Same pattern as Session.clear_and_reset_doc
# Get all Yex collections BEFORE transaction to avoid VM deadlock
jobs_array = Yex.Doc.get_array(doc, "jobs")
edges_array = Yex.Doc.get_array(doc, "edges")
triggers_array = Yex.Doc.get_array(doc, "triggers")

# Transaction 1: Clear all arrays
Yex.Doc.transaction(doc, "clear_stale_workflow", fn ->
clear_array(jobs_array)
clear_array(edges_array)
clear_array(triggers_array)
end)

# Transaction 2: Re-serialize workflow from database
Session.initialize_workflow_document(doc, workflow)

:ok
end

defp clear_array(array) do
length = Yex.Array.length(array)

if length > 0 do
Yex.Array.delete_range(array, 0, length)
end
end

defp reconcile_workflow_metadata(doc, workflow) do
# Update workflow metadata fields to match current database state
# This is critical when loading persisted Y.Doc state that may be stale
workflow_map = Yex.Doc.get_map(doc, "workflow")

Yex.Doc.transaction(doc, "reconcile_metadata", fn ->
# Update lock_version to current database value
Yex.Map.set(workflow_map, "lock_version", workflow.lock_version)

# Update name in case it changed
Yex.Map.set(workflow_map, "name", workflow.name)

# Update deleted_at if present
Yex.Map.set(
workflow_map,
"deleted_at",
WorkflowSerializer.datetime_to_string(workflow.deleted_at)
)
end)

Logger.debug(
"Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}"
)

:ok
end
end
Loading