Skip to content

Commit eebed22

Browse files
committed
initial commit
1 parent 50a5cd2 commit eebed22

File tree

14 files changed

+72
-37
lines changed

14 files changed

+72
-37
lines changed

internal/api/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func New(
6262
wholeConf any,
6363
log log.Modular,
6464
stats metrics.Type,
65+
count *int,
6566
opts ...OptFunc,
6667
) (*Type, error) {
6768
gMux := mux.NewRouter()
@@ -146,6 +147,10 @@ func New(
146147
}
147148
}
148149

150+
handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) {
151+
fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count)
152+
}
153+
149154
if t.conf.DebugEndpoints {
150155
t.RegisterEndpoint(
151156
"/debug/config/json", "DEBUG: Returns the loaded config as JSON.",
@@ -200,6 +205,7 @@ func New(
200205

201206
t.RegisterEndpoint("/ping", "Ping me.", handlePing)
202207
t.RegisterEndpoint("/version", "Returns the service version.", handleVersion)
208+
t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement)
203209
t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints)
204210

205211
// If we want to expose a stats endpoint we register the endpoints.

internal/api/api_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestAPIEnableCORS(t *testing.T) {
2020
conf.CORS.Enabled = true
2121
conf.CORS.AllowedOrigins = []string{"*"}
2222

23-
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
23+
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
2424
require.NoError(t, err)
2525

2626
handler := s.Handler()
@@ -41,7 +41,7 @@ func TestAPIEnableCORSOrigins(t *testing.T) {
4141
conf.CORS.Enabled = true
4242
conf.CORS.AllowedOrigins = []string{"foo", "bar"}
4343

44-
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
44+
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
4545
require.NoError(t, err)
4646

4747
handler := s.Handler()
@@ -81,7 +81,7 @@ func TestAPIEnableCORSNoHeaders(t *testing.T) {
8181
conf := api.NewConfig()
8282
conf.CORS.Enabled = true
8383

84-
_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
84+
_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
8585
require.Error(t, err)
8686
assert.Contains(t, err.Error(), "must specify at least one allowed origin")
8787
}
@@ -164,7 +164,7 @@ func TestAPIBasicAuth(t *testing.T) {
164164
conf.BasicAuth.PasswordHash = tc.correctPass
165165
conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ=="
166166

167-
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
167+
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
168168
if ok := tc.expectedErr(t, err); !(ok && err == nil) {
169169
return
170170
}

internal/cli/common/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func CreateManager(
3030
logger log.Modular,
3131
streamsMode bool,
3232
conf config.Type,
33+
count *int,
3334
mgrOpts ...manager.OptFunc,
3435
) (stoppableMgr *StoppableManager, err error) {
3536
var stats *metrics.Namespaced
@@ -88,7 +89,7 @@ func CreateManager(
8889
}
8990

9091
var httpServer *api.Type
91-
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil {
92+
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil {
9293
err = fmt.Errorf("failed to initialise API: %w", err)
9394
return
9495
}

internal/cli/common/service.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
5454
}
5555

5656
verLogger := logger.With("benthos_version", cliOpts.Version)
57+
5758
if mainPath == "" {
5859
verLogger.Info("Running without a main config file")
5960
} else if inferredMainPath {
@@ -73,8 +74,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
7374
if strict && len(lints) > 0 {
7475
return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled"))
7576
}
76-
77-
stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf)
77+
//Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag.
78+
success_reload_count := 0
79+
stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &success_reload_count)
7880
if err != nil {
7981
return err
8082
}
@@ -90,9 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
9092
watching := cliOpts.RootFlags.GetWatcher(c)
9193
if streamsMode {
9294
enableStreamsAPI := !c.Bool("no-api")
93-
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager())
95+
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &success_reload_count)
9496
} else {
95-
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager())
97+
logger.Info("InitMode Get Initiated... strict:%v", strict)
98+
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &success_reload_count)
9699
}
97100
if err != nil {
98101
return err
@@ -133,6 +136,7 @@ func initStreamsMode(
133136
strict, watching, enableAPI bool,
134137
confReader *config.Reader,
135138
mgr *manager.Type,
139+
success_reload_count *int,
136140
) (RunningStream, error) {
137141
logger := mgr.Logger()
138142
streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI))
@@ -181,7 +185,7 @@ func initStreamsMode(
181185
}
182186

183187
if watching {
184-
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
188+
if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil {
185189
return nil, fmt.Errorf("failed to create stream config watcher: %w", err)
186190
}
187191
}
@@ -194,6 +198,7 @@ func initNormalMode(
194198
strict, watching bool,
195199
confReader *config.Reader,
196200
mgr *manager.Type,
201+
success_reload_count *int,
197202
) (newStream RunningStream, stoppedChan chan struct{}, err error) {
198203
logger := mgr.Logger()
199204

@@ -231,7 +236,7 @@ func initNormalMode(
231236
}
232237

233238
if watching {
234-
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
239+
if err := confReader.BeginFileWatching(mgr, strict, success_reload_count); err != nil {
235240
return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err)
236241
}
237242
}

internal/cli/studio/pull_runner.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er
297297
tmpTracingSummary.SetEnabled(false)
298298

299299
stopMgrTmp, err := common.CreateManager(
300-
r.cliContext, r.cliOpts, r.logger, false, conf,
300+
r.cliContext, r.cliOpts, r.logger, false, conf, nil,
301301
manager.OptSetEnvironment(tmpEnv),
302302
manager.OptSetBloblangEnvironment(bloblEnv),
303303
manager.OptSetFS(sessFS))
@@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) {
413413
}
414414
}
415415
for _, res := range diff.AddResources {
416-
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil {
416+
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil {
417417
r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err)
418418
runErr = err
419419
}
420420
}
421421
if diff.MainConfig != nil {
422-
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil {
422+
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil {
423423
r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err)
424424
runErr = err
425425
}

internal/config/reader.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig,
357357
// TriggerMainUpdate attempts to re-read the main configuration file, trigger
358358
// the provided main update func, and apply changes to resources to the provided
359359
// manager as appropriate.
360-
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error {
360+
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, success_reload_count *int) error {
361361
conf, _, lints, err := r.readMain(newPath)
362362
if errors.Is(err, fs.ErrNotExist) {
363363
if r.mainPath != newPath {
@@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat
416416
mgr.Logger().Error("Failed to apply updated config: %v", err)
417417
return err
418418
}
419+
420+
// Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher
421+
// flag then success watcher count gets increased
422+
if success_reload_count != nil {
423+
*success_reload_count = *success_reload_count + 1
424+
mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *success_reload_count)
425+
}
419426
mgr.Logger().Info("Updated main config")
420427
}
421428
return nil

internal/config/reader_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ processor_resources:
161161
assert.False(t, testMgr.ProbeProcessor("c"))
162162
assert.False(t, testMgr.ProbeProcessor("d"))
163163

164-
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml"))
164+
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil))
165165

166166
// Wait for the config watcher to reload the config
167167
select {
@@ -226,10 +226,10 @@ processor_resources:
226226
return nil
227227
}))
228228

229-
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml"))
230-
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml"))
229+
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil))
230+
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil))
231231

232-
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml"))
232+
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil))
233233

234234
assert.Equal(t, "fooin", conf.Input.Label)
235235
assert.Equal(t, "fooout", conf.Output.Label)

internal/config/resource_reader.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [
240240

241241
// TriggerResourceUpdate attempts to re-read a resource configuration file and
242242
// apply changes to the provided manager as appropriate.
243-
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error {
243+
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error {
244244
newResConf, lints, err := r.readResource(path)
245245
if errors.Is(err, fs.ErrNotExist) {
246246
return r.TriggerResourceDelete(mgr, path)
@@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa
273273
}
274274

275275
r.resourceFileInfo[path] = newInfo
276+
277+
if success_reload_count != nil {
278+
*success_reload_count = *success_reload_count + 1
279+
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count)
280+
}
276281
return nil
277282
}
278283

internal/config/resource_reader_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ processor_resources:
5959
// Watch for configuration changes.
6060
testMgr, err := manager.New(conf.ResourceConfig)
6161
require.NoError(t, err)
62-
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
62+
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))
6363

6464
tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
6565
defer done()
@@ -175,7 +175,7 @@ processor_resources:
175175
// Watch for configuration changes.
176176
testMgr, err := manager.New(conf.ResourceConfig)
177177
require.NoError(t, err)
178-
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
178+
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))
179179

180180
tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
181181
defer done()

internal/config/stream_reader.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) {
184184

185185
// TriggerStreamUpdate attempts to re-read a stream configuration file, and
186186
// trigger the provided stream update func.
187-
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error {
187+
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, success_reload_count *int) error {
188188
if r.streamUpdateFn == nil {
189189
return nil
190190
}
@@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path
236236
return err
237237
}
238238
mgr.Logger().Info("Updated stream %v config from file.", info.id)
239+
240+
if success_reload_count != nil {
241+
*success_reload_count = *success_reload_count + 1
242+
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *success_reload_count)
243+
}
239244
return nil
240245
}

0 commit comments

Comments
 (0)