Skip to content

Commit 2c7df95

Browse files
authored
feat: Manage lifecycle for multiple runner types (#75)
1 parent 1847dcf commit 2c7df95

File tree

8 files changed

+278
-24
lines changed

8 files changed

+278
-24
lines changed

cmd/launcher/main.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"flag"
55
"fmt"
66
"os"
7-
7+
"sync"
88
"task-runner-launcher/internal/commands"
99
"task-runner-launcher/internal/config"
1010
"task-runner-launcher/internal/errorreporting"
@@ -16,19 +16,19 @@ import (
1616

1717
func main() {
1818
flag.Usage = func() {
19-
fmt.Printf("Usage: %s [runner-type]\n", os.Args[0])
19+
fmt.Printf("Usage: %s [runner-type(s)]\n", os.Args[0])
2020
flag.PrintDefaults()
2121
}
2222

2323
if len(os.Args) < 2 {
24-
os.Stderr.WriteString("Missing runner-type argument")
24+
os.Stderr.WriteString("Missing runner-type argument(s)\n")
2525
flag.Usage()
2626
os.Exit(1)
2727
}
2828

29-
runnerType := os.Args[1]
29+
runnerTypes := os.Args[1:]
3030

31-
launcherConfig, err := config.LoadLauncherConfig([]string{runnerType}, envconfig.OsLookuper())
31+
launcherConfig, err := config.LoadLauncherConfig(runnerTypes, envconfig.OsLookuper())
3232
if err != nil {
3333
logs.Errorf("Failed to load config: %v", err)
3434
os.Exit(1)
@@ -39,13 +39,23 @@ func main() {
3939

4040
http.InitHealthCheckServer(launcherConfig.BaseConfig.HealthCheckServerPort)
4141

42-
logLevel := logs.ParseLevel(launcherConfig.BaseConfig.LogLevel)
43-
logPrefix := logs.GetLauncherPrefix(runnerType)
44-
logger := logs.NewLogger(logLevel, logPrefix)
42+
var wg sync.WaitGroup
43+
44+
for _, runnerType := range runnerTypes {
45+
wg.Add(1)
46+
go func(rt string) {
47+
defer wg.Done()
4548

46-
cmd := commands.NewLaunchCommand(logger)
49+
logLevel := logs.ParseLevel(launcherConfig.BaseConfig.LogLevel)
50+
logPrefix := logs.GetLauncherPrefix(runnerType)
51+
logger := logs.NewLogger(logLevel, logPrefix)
4752

48-
if err := cmd.Execute(launcherConfig, runnerType); err != nil {
49-
logs.Errorf("Failed to execute `launch` command: %s", err)
53+
cmd := commands.NewLaunchCommand(logger)
54+
if err := cmd.Execute(launcherConfig, rt); err != nil {
55+
logger.Errorf("Failed to execute `launch` command: %v", err)
56+
}
57+
}(runnerType)
5058
}
59+
60+
wg.Wait()
5161
}

docs/lifecycle.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
## Summary
44

5-
The purpose of the launcher is to minimize resource use by launching a runner on demand. To do so, the launcher impersonates a task runner until a task is ready for pickup, and then it launches a runner to run the task, and after the runner has automatically shut down, the launcher will re-launch a runner to pick up the next task.
5+
The purpose of the launcher is to minimize resource use by launching a runner on demand.
6+
7+
To do so, the launcher impersonates a task runner until a task is ready for pickup, then it launches a runner to run the task, and after the runner has automatically shut down, the launcher will re-launch a runner once the next task is ready for pickup. The launcher follows this cycle independently for every runner type (i.e., language) configured to run.
68

79
## Step by step
810

docs/setup.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ To set up the launcher:
2020
- The launcher exposes a health check endpoint at `/healthz` on port `5680`, configurable via `N8N_RUNNERS_LAUNCHER_HEALTH_CHECK_PORT`.
2121
- The task broker exposes a health check endpoint at `/healthz` on port `5679`, configurable via `N8N_RUNNERS_BROKER_PORT`.
2222

23+
<br>
24+
2325
```mermaid
2426
sequenceDiagram
2527
participant k8s
@@ -56,6 +58,7 @@ Example config file at `/etc/n8n-task-runners.json`:
5658
"--disable-proto=delete",
5759
"/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"
5860
],
61+
"health-check-server-port": "5681",
5962
"allowed-env": ["PATH", "GENERIC_TIMEZONE"],
6063
"env-overrides": {
6164
"N8N_RUNNERS_TASK_TIMEOUT": "80",
@@ -73,6 +76,7 @@ Example config file at `/etc/n8n-task-runners.json`:
7376
"args": [
7477
"/usr/local/lib/python3.13/site-packages/n8n/task-runner-python/main.py"
7578
],
79+
"health-check-server-port": "5682",
7680
"allowed-env": ["PATH", "GENERIC_TIMEZONE"],
7781
"env-overrides": {
7882
"N8N_RUNNERS_TASK_TIMEOUT": "30",
@@ -90,6 +94,7 @@ Example config file at `/etc/n8n-task-runners.json`:
9094
| `workdir` | Path where the task runner's `command` will run. |
9195
| `command` | Command to start the task runner. |
9296
| `args` | Args and flags to use with `command`. |
97+
| `health-check-server-port` | Port for the runner's health check server. When a single runner is configured, this is optional and defaults to `5681`. When multiple runners are configured, this is required and must be unique per runner.
9398
| `allowed-env` | Env vars that the launcher will pass through from its own environment to the runner. See [environment](environment.md). |
9499
| `env-overrides` | Env vars that the launcher will set directly on the runner. See [environment](environment.md). |
95100

@@ -104,8 +109,9 @@ The launcher can pass env vars to task runners in two ways, as specified in the
104109
| `allowed-env` | Env vars filtered from the launcher's own environment | Passing env vars common to all runner types |
105110
| `env-overrides` | Env vars set by the launcher directly on the runner, with precedence over `allowed-env` | Passing env vars specific to a single runner type |
106111

107-
Exceptionally, these three env vars cannot be disallowed or overridden:
112+
Exceptionally, these four env vars cannot be disallowed or overridden:
108113

109114
- `N8N_RUNNERS_TASK_BROKER_URI`
110115
- `N8N_RUNNERS_GRANT_TOKEN`
111-
- `N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED=true`
116+
- `N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED=true`
117+
- `N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT`

internal/commands/launch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (c *LaunchCommand) Execute(launcherConfig *config.LauncherConfig, runnerTyp
4545
// 2. prepare env vars to pass to runner
4646

4747
runnerEnv := env.PrepareRunnerEnv(baseConfig, runnerConfig, c.logger)
48-
runnerServerURI := fmt.Sprintf("http://%s:%s", baseConfig.RunnerHealthCheckServerHost, baseConfig.RunnerHealthCheckServerPort)
48+
runnerServerURI := fmt.Sprintf("http://%s:%s", baseConfig.RunnerHealthCheckServerHost, runnerConfig.HealthCheckServerPort)
4949

5050
for {
5151
// 3. check until task broker is ready

internal/config/config.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,9 @@ type BaseConfig struct {
4949
// HealthCheckServerPort is the port for the launcher's health check server.
5050
HealthCheckServerPort string `env:"N8N_RUNNERS_LAUNCHER_HEALTH_CHECK_PORT, default=5680"`
5151

52-
// RunnerHealthCheckServerHost is the host for the runner's health check server.
52+
// RunnerHealthCheckServerHost is the host for all runners' health check servers.
5353
RunnerHealthCheckServerHost string `env:"N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST, default=127.0.0.1"`
5454

55-
// RunnerHealthCheckServerPort is the port for the runner's health check server.
56-
RunnerHealthCheckServerPort string `env:"N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT, default=5681"`
57-
5855
// Sentry is the Sentry config for the launcher, a subset of what is defined in:
5956
// https://docs.sentry.io/platforms/go/configuration/options/
6057
Sentry *SentryConfig
@@ -82,6 +79,11 @@ type RunnerConfig struct {
8279
// Arguments for command, currently path to runner entrypoint.
8380
Args []string `json:"args"`
8481

82+
// Port for the runner's health check server.
83+
// When a single runner is configured, this is optional and defaults to 5681.
84+
// When multiple runners are configured, this is required and must be unique per runner.
85+
HealthCheckServerPort string `json:"health-check-server-port,omitempty"`
86+
8587
// Env vars for the launcher to pass from its own environment to the runner.
8688
AllowedEnv []string `json:"allowed-env"`
8789

@@ -180,6 +182,24 @@ func readLauncherConfigFile(runnerTypes []string) (map[string]*RunnerConfig, err
180182
}
181183
}
182184

185+
if len(runnerConfigs) == 1 {
186+
for _, config := range runnerConfigs {
187+
if config.HealthCheckServerPort == "" {
188+
config.HealthCheckServerPort = "5681"
189+
}
190+
}
191+
} else {
192+
for runnerType, config := range runnerConfigs {
193+
if config.HealthCheckServerPort == "" {
194+
return nil, fmt.Errorf("runner %s: health-check-server-port is required with multiple runners", runnerType)
195+
}
196+
}
197+
}
198+
199+
if err := validateRunnerPorts(runnerConfigs); err != nil {
200+
return nil, err
201+
}
202+
183203
if taskRunnersNum == 1 {
184204
logs.Debug("Loaded config file with a single runner config")
185205
} else {
@@ -188,3 +208,33 @@ func readLauncherConfigFile(runnerTypes []string) (map[string]*RunnerConfig, err
188208

189209
return runnerConfigs, nil
190210
}
211+
212+
func validateRunnerPorts(runnerConfigs map[string]*RunnerConfig) error {
213+
reservedPorts := map[string]string{
214+
"5678": "n8n main server",
215+
"5679": "n8n broker server",
216+
"5680": "launcher health check server",
217+
}
218+
219+
usedPorts := make(map[string]string)
220+
221+
for runnerType, config := range runnerConfigs {
222+
port := config.HealthCheckServerPort
223+
224+
if port, err := strconv.Atoi(port); err != nil || port <= 0 || port >= 65536 {
225+
return fmt.Errorf("runner %s: health-check-server-port must be a valid port number", runnerType)
226+
}
227+
228+
if service, exists := reservedPorts[port]; exists {
229+
return fmt.Errorf("runner %s: health-check-server-port %s conflicts with %s", runnerType, port, service)
230+
}
231+
232+
if existingRunner, exists := usedPorts[port]; exists {
233+
return fmt.Errorf("runners %s and %s cannot use the same health-check-server-port %s", existingRunner, runnerType, port)
234+
}
235+
236+
usedPorts[port] = runnerType
237+
}
238+
239+
return nil
240+
}

internal/config/config_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,127 @@ func TestConfigFileErrors(t *testing.T) {
143143
})
144144
}
145145
}
146+
147+
func TestValidateRunnerPorts(t *testing.T) {
148+
tests := []struct {
149+
name string
150+
runnerConfigs map[string]*RunnerConfig
151+
expectedError string
152+
}{
153+
{
154+
name: "valid unique ports",
155+
runnerConfigs: map[string]*RunnerConfig{
156+
"javascript": {HealthCheckServerPort: "5681"},
157+
"python": {HealthCheckServerPort: "5682"},
158+
},
159+
expectedError: "",
160+
},
161+
{
162+
name: "duplicate ports",
163+
runnerConfigs: map[string]*RunnerConfig{
164+
"javascript": {HealthCheckServerPort: "5681"},
165+
"python": {HealthCheckServerPort: "5681"},
166+
},
167+
expectedError: "cannot use the same health-check-server-port",
168+
},
169+
{
170+
name: "reserved port conflict",
171+
runnerConfigs: map[string]*RunnerConfig{
172+
"javascript": {HealthCheckServerPort: "5679"},
173+
},
174+
expectedError: "conflicts with n8n broker server",
175+
},
176+
{
177+
name: "invalid port number",
178+
runnerConfigs: map[string]*RunnerConfig{
179+
"javascript": {HealthCheckServerPort: "not-a-port"},
180+
},
181+
expectedError: "must be a valid port number",
182+
},
183+
{
184+
name: "port out of range",
185+
runnerConfigs: map[string]*RunnerConfig{
186+
"javascript": {HealthCheckServerPort: "70000"},
187+
},
188+
expectedError: "must be a valid port number",
189+
},
190+
}
191+
192+
for _, tt := range tests {
193+
t.Run(tt.name, func(t *testing.T) {
194+
err := validateRunnerPorts(tt.runnerConfigs)
195+
if tt.expectedError == "" {
196+
assert.NoError(t, err)
197+
} else {
198+
assert.Error(t, err)
199+
assert.Contains(t, err.Error(), tt.expectedError)
200+
}
201+
})
202+
}
203+
}
204+
205+
func TestBackwardsCompatibilityPortDefaults(t *testing.T) {
206+
tests := []struct {
207+
name string
208+
configContent string
209+
runnerTypes []string
210+
expectError bool
211+
expectedPorts map[string]string
212+
}{
213+
{
214+
name: "single runner gets default port",
215+
configContent: `{
216+
"task-runners": [{
217+
"runner-type": "javascript",
218+
"workdir": "/test",
219+
"command": "node",
220+
"args": ["test.js"]
221+
}]
222+
}`,
223+
runnerTypes: []string{"javascript"},
224+
expectedPorts: map[string]string{
225+
"javascript": "5681",
226+
},
227+
},
228+
{
229+
name: "multiple runners require explicit ports",
230+
configContent: `{
231+
"task-runners": [
232+
{
233+
"runner-type": "javascript",
234+
"workdir": "/test",
235+
"command": "node",
236+
"args": ["test.js"]
237+
},
238+
{
239+
"runner-type": "python",
240+
"workdir": "/test",
241+
"command": "python",
242+
"args": ["test.py"]
243+
}
244+
]
245+
}`,
246+
runnerTypes: []string{"javascript", "python"},
247+
expectError: true,
248+
},
249+
}
250+
251+
for _, tt := range tests {
252+
t.Run(tt.name, func(t *testing.T) {
253+
configPath = filepath.Join(t.TempDir(), "test-config.json")
254+
err := os.WriteFile(configPath, []byte(tt.configContent), 0600)
255+
require.NoError(t, err)
256+
257+
configs, err := readLauncherConfigFile(tt.runnerTypes)
258+
259+
if tt.expectError {
260+
assert.Error(t, err)
261+
} else {
262+
assert.NoError(t, err)
263+
for runnerType, expectedPort := range tt.expectedPorts {
264+
assert.Equal(t, expectedPort, configs[runnerType].HealthCheckServerPort)
265+
}
266+
}
267+
})
268+
}
269+
}

internal/env/env.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ const (
2222
// EnvVarHealthCheckServerEnabled is the env var to enable the runner's health check server.
2323
EnvVarHealthCheckServerEnabled = "N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED"
2424

25+
// EnvVarHealthCheckServerPort is the env var for the runner's health check server port.
26+
EnvVarHealthCheckServerPort = "N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT"
27+
2528
// EnvVarAutoShutdownTimeout is the env var for how long (in seconds) a runner
2629
// may be idle for before exit.
2730
EnvVarAutoShutdownTimeout = "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT"
@@ -103,6 +106,7 @@ var requiredRuntimeEnvVars = []string{
103106
EnvVarTaskBrokerURI,
104107
EnvVarHealthCheckServerEnabled,
105108
EnvVarGrantToken,
109+
EnvVarHealthCheckServerPort,
106110
}
107111

108112
// PrepareRunnerEnv prepares the environment variables to pass to the runner.
@@ -120,6 +124,7 @@ func PrepareRunnerEnv(baseConfig *config.BaseConfig, runnerConfig *config.Runner
120124
}
121125
runnerEnv = append(runnerEnv, fmt.Sprintf("%s=%s", EnvVarTaskBrokerURI, baseConfig.TaskBrokerURI))
122126
runnerEnv = append(runnerEnv, fmt.Sprintf("%s=true", EnvVarHealthCheckServerEnabled))
127+
runnerEnv = append(runnerEnv, fmt.Sprintf("%s=%s", EnvVarHealthCheckServerPort, runnerConfig.HealthCheckServerPort))
123128

124129
// TODO: The next two lines are legacy behavior to remove after deprecation period.
125130
runnerEnv = append(runnerEnv, fmt.Sprintf("%s=%s", EnvVarAutoShutdownTimeout, baseConfig.AutoShutdownTimeout))

0 commit comments

Comments
 (0)