Skip to content

Commit fde8e6b

Browse files
Merge branch 'next-minor'
2 parents c94e524 + fba1316 commit fde8e6b

File tree

26 files changed

+1192
-317
lines changed

26 files changed

+1192
-317
lines changed

core/agent/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ Exit codes and their meaning:
155155
- `9` - OS `exec()` error, like "file is not executable" and similar
156156
- `10` - default exit code once `validation-failed` status is set. It
157157
can be overridden by terminating the step with a non-zero exit code.
158-
- `11-31` - reserved to the agent implementation
158+
- `11` - error "Agent is busy" is returned if the number of running
159+
actions or events has reached `MAX_CONCURRENCY` (32) and after waiting
160+
`OVERLOAD_SLEEP` (500ms) that number did not decrease.
161+
- `12-31` - reserved to the agent implementation
159162
- `32-255` - **available to use** for action-specific error numbers
160163

161164
## File descriptors

core/agent/agent.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"os"
2828
"os/signal"
2929
"syscall"
30+
"strconv"
3031
"time"
3132

3233
"github.com/go-redis/redis/v8"
@@ -52,6 +53,9 @@ var eventPaths flagStringSlice
5253
var pollingDuration = 5000 * time.Millisecond
5354
var taskExpireDuration = 8 * time.Hour
5455

56+
var maxConcurrency = 32 // default limit of spawned concurrent processes
57+
var overloadSleep = 500 * time.Millisecond // wait time before rejecting new processes
58+
5559
// Command arguments --actionsdir and --eventsdir can be repeated multiple
5660
// times. Each item is inserted into a []string.
5761
type flagStringSlice []string
@@ -98,6 +102,20 @@ func main() {
98102
}
99103
}
100104

105+
// Override max number of concurrent tasks and event handlers
106+
if v := os.Getenv("MAX_CONCURRENCY"); v != "" {
107+
if n, err := strconv.Atoi(v); err == nil && n > 0 {
108+
maxConcurrency = n
109+
}
110+
}
111+
// Overrid the wait time before rejecting tasks and events
112+
if v := os.Getenv("OVERLOAD_SLEEP"); v != "" {
113+
n, convError := time.ParseDuration(v)
114+
if convError == nil {
115+
overloadSleep = n
116+
}
117+
}
118+
101119
var signalChannel = make(chan os.Signal, 1)
102120
signal.Notify(signalChannel, syscall.SIGUSR1)
103121
var actionsCtx, cancelActions = context.WithCancel(ctx)

core/agent/hbuiltin.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"encoding/json"
2626
"log"
2727
"sync"
28+
"fmt"
2829

2930
"github.com/NethServer/ns8-core/core/agent/models"
3031
"github.com/go-redis/redis/v8"
@@ -137,3 +138,26 @@ func runCancelTask(rdb *redis.Client, task *models.Task, cancelFuncMap map[strin
137138
}
138139
log.Printf("task/%s/%s: action \"%s\" status is \"%s\" (%d) at step %s", agentPrefix, task.ID, task.Action, actionDescriptor.Status, exitCode, lastStep)
139140
}
141+
142+
func rejectAction(rdb *redis.Client, actionCtx context.Context, task *models.Task) {
143+
progressChannel := "progress/" + agentPrefix + "/task/" + task.ID
144+
outputKey := "task/" + agentPrefix + "/" + task.ID + "/output"
145+
errorKey := "task/" + agentPrefix + "/" + task.ID + "/error"
146+
exitCodeKey := "task/" + agentPrefix + "/" + task.ID + "/exit_code"
147+
actionDescriptor := models.Processor{Status: "pending"}
148+
publishStatus(rdb, progressChannel, actionDescriptor) // pending status
149+
actionOutput := ""
150+
actionError := fmt.Sprintf("Agent is busy. Action %s rejected!\n", task.Action)
151+
exitCode := 11
152+
actionDescriptor.Status = "aborted"
153+
log.Printf(SD_ERR+"Agent is busy. Action %s rejected!", task.Action)
154+
rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
155+
// Publish the action response
156+
pipe.Set(ctx, outputKey, actionOutput, taskExpireDuration)
157+
pipe.Set(ctx, errorKey, actionError, taskExpireDuration)
158+
pipe.Set(ctx, exitCodeKey, exitCode, taskExpireDuration)
159+
pipe.Expire(ctx, "task/" + agentPrefix + "/" + task.ID + "/context", taskExpireDuration)
160+
publishStatus(pipe, progressChannel, actionDescriptor) // aborted status
161+
return nil
162+
})
163+
}

core/agent/hevent.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"os"
2727
"os/exec"
2828
"strings"
29-
"sync"
3029
"time"
3130

3231
"github.com/NethServer/ns8-core/core/agent/models"
@@ -58,13 +57,17 @@ func listenEventsAsync(ctx context.Context, complete chan int) {
5857

5958
pubsub := rdb.PSubscribe(ctx, "*/event/*")
6059

61-
var wg sync.WaitGroup
60+
wg := NewWorkersLimiter(maxConcurrency, overloadSleep)
6261
csyn := make(chan int, 1)
6362

6463
go func() {
6564
for msg := range pubsub.Channel(redis.WithChannelHealthCheckInterval(pollingDuration)) {
6665
if before, after, found := strings.Cut(msg.Channel, "/event/"); found {
67-
go runEvent(&wg, &models.Event{Source: before, Payload: msg.Payload, Name: after})
66+
if wg.ObserveOverload() {
67+
log.Printf(SD_ERR + "Agent is busy. Event %s rejected!", msg.Channel)
68+
} else {
69+
go runEvent(wg, &models.Event{Source: before, Payload: msg.Payload, Name: after})
70+
}
6871
}
6972
}
7073
csyn <- 1
@@ -80,7 +83,7 @@ func listenEventsAsync(ctx context.Context, complete chan int) {
8083
complete <- 1
8184
}
8285

83-
func runEvent(wg *sync.WaitGroup, event *models.Event) {
86+
func runEvent(wg *workersLimiter, event *models.Event) {
8487
wg.Add(1)
8588
defer wg.Done()
8689

core/agent/htask.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ func runAction(rdb *redis.Client, actionCtx context.Context, task *models.Task)
289289

290290
func listenActionsAsync(actionsCtx context.Context, complete chan int) {
291291
defer func() { complete <- 1 }()
292-
var workersRegistry sync.WaitGroup
293292
brpopCtx, cancelBrpop := context.WithCancel(ctx)
294293
taskCancelFunctions := make(map[string]context.CancelFunc)
295294

@@ -318,6 +317,7 @@ func listenActionsAsync(actionsCtx context.Context, complete chan int) {
318317
MaxRetryBackoff: 5000 * time.Millisecond,
319318
})
320319

320+
workersRegistry := NewWorkersLimiter(maxConcurrency, overloadSleep)
321321
var tcMu sync.Mutex
322322
taskCh := make(chan models.Task)
323323
go readTasks(rdb, brpopCtx, taskCh)
@@ -338,6 +338,12 @@ func listenActionsAsync(actionsCtx context.Context, complete chan int) {
338338
workersRegistry.Wait()
339339
break MAINLOOP
340340
}
341+
342+
if workersRegistry.ObserveOverload() {
343+
rejectAction(rdb, ctx, &task)
344+
continue
345+
}
346+
341347
// Create a cancelable context for the task and
342348
// store its cancel function in a safe map
343349
taskCtx, taskCancelFunction := context.WithCancel(ctx)

core/agent/worklimit.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (C) 2025 Nethesis S.r.l.
3+
* http://www.nethesis.it - [email protected]
4+
*
5+
* This script is part of NethServer.
6+
*
7+
* NethServer is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License,
10+
* or any later version.
11+
*
12+
* NethServer is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with NethServer. If not, see COPYING.
19+
*/
20+
21+
package main
22+
23+
import (
24+
"sync"
25+
"sync/atomic"
26+
"time"
27+
)
28+
29+
type workersLimiter struct {
30+
wg sync.WaitGroup
31+
counter int64
32+
limit int64
33+
delay time.Duration
34+
}
35+
36+
func NewWorkersLimiter(limit int, delay time.Duration) *workersLimiter {
37+
w := new(workersLimiter)
38+
w.counter = 0
39+
w.limit = int64(limit)
40+
w.delay = delay
41+
return w
42+
}
43+
44+
func (w *workersLimiter) Add(delta int) {
45+
// call wg.Add first so it panics like the standard WaitGroup if delta causes it
46+
w.wg.Add(delta)
47+
atomic.AddInt64(&w.counter, int64(delta))
48+
}
49+
50+
func (w *workersLimiter) Done() {
51+
// call wg.Done first to preserve panic semantics
52+
w.wg.Done()
53+
atomic.AddInt64(&w.counter, -1)
54+
}
55+
56+
func (w *workersLimiter) Wait() {
57+
w.wg.Wait()
58+
}
59+
60+
// ObserveOverload checks if the current number of running workers exceeds
61+
// the configured limit. If the limit is reached, it waits for a short
62+
// delay before re-checking. Returns true if the system is still
63+
// overloaded after the delay, signaling that new work should be rejected.
64+
func (w *workersLimiter) ObserveOverload() bool {
65+
if atomic.LoadInt64(&w.counter) >= w.limit {
66+
time.Sleep(w.delay)
67+
if atomic.LoadInt64(&w.counter) >= w.limit {
68+
return true
69+
}
70+
}
71+
return false
72+
}

0 commit comments

Comments
 (0)