Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,18 @@
#
# These 2 envs are used to enable local adaptors mode. OPENFN_ADAPTORS_REPO points
# to the repo directory which must have a `packages` subdir. LOCAL_ADAPTORS env is
# the flag used to enable/disable this mode
# the flag used to enable/disable this mode.
#
# OPENFN_ADAPTORS_REPO accepts a comma-separated list of paths so that a private
# adaptor repo can be loaded alongside the canonical OpenFn adaptors monorepo.
# Order is precedence: when two repos ship a package with the same dirname, the
# entry from the earlier path wins and a warning is logged for the shadowed one.
# Both the registry (picker UI, metadata) and the bundled ws-worker resolve
# `@local` adaptors against the same list, so a workflow run picks up the same
# package the picker shows.
# LOCAL_ADAPTORS=true
# OPENFN_ADAPTORS_REPO=/path/to/repo/
# OPENFN_ADAPTORS_REPO=/path/to/private,/path/to/canonical
#
# Control whether metrics reported by the Workflow editor or Job editor are
# written to the Lightning logs.
Expand Down
15 changes: 12 additions & 3 deletions RUNNINGLOCAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,21 @@ To start, set up the following environment variables:
- `LOCAL_ADAPTORS`: Used to enable or disable the local adaptors mode. Set it to
`true` to enable.
- `OPENFN_ADAPTORS_REPO`: This should point to the adaptors monorepo. This is
the same variable used when you pass `-m` to the CLI.
the same variable used when you pass `-m` to the CLI. It also accepts a
comma-separated list of paths to merge multiple repos into the registry; the
first path wins on dirname collisions, with a warning logged for shadowed
entries. Both the registry view and the bundled `ws-worker` resolve `@local`
adaptors against the same list, so a workflow run picks up the same package
the picker shows.

Example configuration:

```sh
export LOCAL_ADAPTORS=true
export OPENFN_ADAPTORS_REPO=/path/to/repo/

# Or, merge a private adaptor repo with the canonical one (first wins):
export OPENFN_ADAPTORS_REPO=/path/to/private,/path/to/canonical
```

You can also run the server directly in local mode with:
Expand All @@ -174,8 +182,9 @@ You can also run the server directly in local mode with:
LOCAL_ADAPTORS=true mix phx.server
```

Ensure that the `OPENFN_ADAPTORS_REPO` directory is correctly set up with the
necessary `packages` subdirectory, otherwise the app wont start
Each path in `OPENFN_ADAPTORS_REPO` must contain a `packages` subdirectory.
Paths that are missing or unreadable are logged and skipped, so the rest of
the list still loads.

### Problems with Apple Silicon

Expand Down
105 changes: 85 additions & 20 deletions lib/lightning/adaptor_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ defmodule Lightning.AdaptorRegistry do
def handle_continue(opts, _state) do
adaptors =
case Enum.into(opts, %{}) do
%{local_adaptors_repo: repo_path} when is_binary(repo_path) ->
read_adaptors_from_local_repo(repo_path)
%{local_adaptors_repos: repo_paths}
when is_list(repo_paths) and repo_paths != [] ->
read_adaptors_from_local_repos(repo_paths)

%{use_cache: use_cache}
when use_cache === true or is_binary(use_cache) ->
Expand Down Expand Up @@ -150,10 +151,21 @@ defmodule Lightning.AdaptorRegistry do
and uses the cached file for every subsequent start.
It can either be a boolean, or a string - the latter being a file path
to set where the cache file is located.
- `:local_adaptors_repos` - an ordered list of paths to local adaptor
monorepos (each containing a `packages/` subdirectory). When set, the
registry skips NPM and lists adaptors from these directories instead.
Earlier paths win on dirname collisions; shadowed entries are summarised
in a single warning log.
- `:name` (defaults to AdaptorRegistry) - the name of the process, useful
for testing and/or running multiple versions of the registry
"""
@spec start_link(opts :: [use_cache: boolean() | binary(), name: term()]) ::
@spec start_link(
opts :: [
use_cache: boolean() | binary(),
local_adaptors_repos: [binary()],
name: term()
]
) ::
{:error, any} | {:ok, pid}
def start_link(opts \\ [use_cache: true]) do
Logger.info("Starting AdaptorRegistry")
Expand Down Expand Up @@ -270,20 +282,72 @@ defmodule Lightning.AdaptorRegistry do
}
end

defp read_adaptors_from_local_repo(repo_path) do
Logger.debug("Using local adaptors repo at #{repo_path}")

repo_path
|> Path.join("packages")
|> File.ls!()
|> Enum.map(fn package ->
%{
name: "@openfn/language-" <> package,
repo: "file://" <> Path.join([repo_path, "packages", package]),
latest: "local",
versions: []
}
end)
defp read_adaptors_from_local_repos(repo_paths) when is_list(repo_paths) do
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is super unfortunate timing - @stuartc is just at the tail end of re-writing this code.

@stuartc I hate to make your project more complicated, but what do you make of this change?

We can merge this to main and have you rebase to unblock Jeremi; or we can have you port this solution int your branch (which blocks Jeremi and increases the risk of this work)

Tricky one. We should talk about this tomorrow!

Logger.debug("Using local adaptors repos at #{inspect(repo_paths)}")

repo_paths
|> Enum.flat_map(&adaptors_in_repo/1)
|> dedupe_first_wins()
end

defp adaptors_in_repo(repo_path) do
packages_path = Path.join(repo_path, "packages")

case File.ls(packages_path) do
{:ok, entries} ->
Enum.map(entries, fn package ->
%{
name: "@openfn/language-" <> package,
repo: "file://" <> Path.join([repo_path, "packages", package]),
latest: "local",
versions: []
}
end)

{:error, reason} ->
Logger.error(
"Skipping local adaptors repo #{inspect(repo_path)}: " <>
"cannot list #{inspect(packages_path)} (#{:file.format_error(reason)})"
)

[]
end
end

# First-occurrence wins: when two roots ship a package with the same
# `@openfn/language-<dirname>` name, the entry from the earlier root is kept.
# Listing your private repo before the canonical one therefore lets you
# override individual adaptors locally without forking the whole canonical
# tree. Shadowed entries are summarised in a single warning so the override
# case (the intended use of ordering) does not flood logs with one line per
# package.
defp dedupe_first_wins(adaptors) do
{kept_reversed, _seen, shadowed} =
Enum.reduce(adaptors, {[], MapSet.new(), []}, fn adaptor,
{kept, seen, shadowed} ->
if MapSet.member?(seen, adaptor.name) do
{kept, seen, [adaptor | shadowed]}
else
{[adaptor | kept], MapSet.put(seen, adaptor.name), shadowed}
end
end)

log_shadowed(shadowed)
Enum.reverse(kept_reversed)
end

defp log_shadowed([]), do: :ok

defp log_shadowed(shadowed) do
names =
shadowed
|> Enum.reverse()
|> Enum.map_join(", ", & &1.name)

Logger.warning(
"AdaptorRegistry: #{length(shadowed)} adaptor(s) shadowed by earlier " <>
"local-adaptors repo entries: #{names}"
)
end

@doc """
Expand Down Expand Up @@ -362,8 +426,9 @@ defmodule Lightning.AdaptorRegistry do
end

def local_adaptors_enabled? do
config = Lightning.Config.adaptor_registry()

if config[:local_adaptors_repo], do: true, else: false
case Lightning.Config.adaptor_registry()[:local_adaptors_repos] do
[_ | _] -> true
_ -> false
end
end
end
37 changes: 23 additions & 14 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,30 @@ defmodule Lightning.Config.Bootstrap do
config :lightning, :adaptor_service,
adaptors_path: env!("ADAPTORS_PATH", :string, "./priv/openfn")

local_adaptors_repo =
env!(
"OPENFN_ADAPTORS_REPO",
:string,
Utils.get_env([
:lightning,
Lightning.AdaptorRegistry,
:local_adaptors_repo
])
)
# OPENFN_ADAPTORS_REPO accepts a comma-separated list of paths so that a
# private adaptor repo can be loaded alongside the canonical OpenFn
# adaptors monorepo. A single path is still valid; it just becomes a
# one-element list. Order is precedence: earlier entries shadow later ones
# when two repos ship a package with the same name. Comma (rather than
# ':') keeps Windows drive-letter paths like `c:/repo` usable.
local_adaptors_repos =
env!("OPENFN_ADAPTORS_REPO", :string, nil)
|> case do
nil ->
[]

value when is_binary(value) ->
value
|> String.split(",", trim: true)
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.map(&Path.expand/1)
end

use_local_adaptors_repo? =
use_local_adaptors_repos? =
env!("LOCAL_ADAPTORS", &Utils.ensure_boolean/1, false)
|> tap(fn v ->
if v && !is_binary(local_adaptors_repo) do
if v && local_adaptors_repos == [] do
raise """
LOCAL_ADAPTORS is set to true, but OPENFN_ADAPTORS_REPO is not set.
"""
Expand All @@ -231,8 +240,8 @@ defmodule Lightning.Config.Bootstrap do
:string,
Utils.get_env([:lightning, Lightning.AdaptorRegistry, :use_cache])
),
local_adaptors_repo:
use_local_adaptors_repo? && Path.expand(local_adaptors_repo)
local_adaptors_repos:
if(use_local_adaptors_repos?, do: local_adaptors_repos, else: [])

config :lightning,
schemas_path:
Expand Down
111 changes: 104 additions & 7 deletions test/lightning/adaptor_registry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,16 @@ defmodule Lightning.AdaptorRegistryTest do
end

@tag :tmp_dir
test "lists directory names of the when local_adaptors_repo is set", %{
tmp_dir: tmp_dir,
test: test
} do
test "lists directory names from a single-element local_adaptors_repos list",
%{tmp_dir: tmp_dir, test: test} do
expected_adaptors = ["foo", "bar", "baz"]

Enum.each(expected_adaptors, fn adaptor ->
[tmp_dir, "packages", adaptor] |> Path.join() |> File.mkdir_p!()
end)

start_supervised!(
{AdaptorRegistry, [name: test, local_adaptors_repo: tmp_dir]}
{AdaptorRegistry, [name: test, local_adaptors_repos: [tmp_dir]]}
)

results = AdaptorRegistry.all(test)
Expand All @@ -158,6 +156,105 @@ defmodule Lightning.AdaptorRegistryTest do
assert expected_result in results
end
end

@tag :tmp_dir
test "merges adaptors from multiple local_adaptors_repos", %{
tmp_dir: tmp_dir,
test: test
} do
repo_a = Path.join(tmp_dir, "a")
repo_b = Path.join(tmp_dir, "b")
[repo_a, "packages", "alpha"] |> Path.join() |> File.mkdir_p!()
[repo_b, "packages", "beta"] |> Path.join() |> File.mkdir_p!()

start_supervised!(
{AdaptorRegistry, [name: test, local_adaptors_repos: [repo_a, repo_b]]}
)

names = AdaptorRegistry.all(test) |> Enum.map(& &1.name) |> Enum.sort()

assert names == ["@openfn/language-alpha", "@openfn/language-beta"]
end

@tag :tmp_dir
test "first repo wins on collision and emits a warning", %{
tmp_dir: tmp_dir,
test: test
} do
repo_a = Path.join(tmp_dir, "a")
repo_b = Path.join(tmp_dir, "b")
[repo_a, "packages", "http"] |> Path.join() |> File.mkdir_p!()
[repo_b, "packages", "http"] |> Path.join() |> File.mkdir_p!()

log =
ExUnit.CaptureLog.capture_log(fn ->
start_supervised!(
{AdaptorRegistry,
[name: test, local_adaptors_repos: [repo_a, repo_b]]}
)

# force the GenServer to finish handle_continue
AdaptorRegistry.all(test)
end)

results = AdaptorRegistry.all(test)
assert length(results) == 1

assert hd(results).repo ==
"file://" <> Path.join([repo_a, "packages", "http"])

assert log =~ "@openfn/language-http"
assert log =~ "shadowed"
end

@tag :tmp_dir
test "soft-fails when a repo path is missing or unreadable", %{
tmp_dir: tmp_dir,
test: test
} do
good_repo = Path.join(tmp_dir, "good")
missing_repo = Path.join(tmp_dir, "does-not-exist")
[good_repo, "packages", "alpha"] |> Path.join() |> File.mkdir_p!()

log =
ExUnit.CaptureLog.capture_log(fn ->
start_supervised!(
{AdaptorRegistry,
[name: test, local_adaptors_repos: [missing_repo, good_repo]]}
)

AdaptorRegistry.all(test)
end)

names = AdaptorRegistry.all(test) |> Enum.map(& &1.name)
assert names == ["@openfn/language-alpha"]
assert log =~ "Skipping local adaptors repo"
assert log =~ missing_repo
end
end

describe "local_adaptors_enabled?/0" do
test "returns true when a non-empty plural list is configured" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repos: ["/some/path"]]
end)

assert AdaptorRegistry.local_adaptors_enabled?()
end

test "returns false when the list is empty" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repos: []]
end)

refute AdaptorRegistry.local_adaptors_enabled?()
end

test "returns false when the key is absent" do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn -> [] end)

refute AdaptorRegistry.local_adaptors_enabled?()
end
end

describe "resolve_package_name/1" do
Expand All @@ -178,10 +275,10 @@ defmodule Lightning.AdaptorRegistryTest do
end

@tag :tmp_dir
test "returns local as the version when local_adaptors_repo config is set",
test "returns local as the version when local_adaptors_repos config is set",
%{tmp_dir: tmp_dir} do
Mox.stub(Lightning.MockConfig, :adaptor_registry, fn ->
[local_adaptors_repo: tmp_dir]
[local_adaptors_repos: [tmp_dir]]
end)

assert AdaptorRegistry.resolve_package_name("@openfn/language-foo@1.2.3") ==
Expand Down
Loading