Skip to content

Commit 427dd1c

Browse files
authored
Merge pull request #402 from cybertec-postgresql/400_concurrent_map_read_write_panic
[-] protect concurrent read and write of interval chains map, fixes #400
2 parents 0ae08c9 + 209ad5f commit 427dd1c

File tree

3 files changed

+23
-28
lines changed

3 files changed

+23
-28
lines changed

internal/scheduler/chain.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,7 @@ func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) {
156156
}
157157

158158
func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, context.CancelFunc) {
159-
var timeout int
160-
if t1 > t2 {
161-
timeout = t1
162-
} else {
163-
timeout = t2
164-
}
159+
timeout := Max(t1, t2)
165160
if timeout > 0 {
166161
return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout))
167162
}

internal/scheduler/interval_chain.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ func (ichain IntervalChain) isListed(ichains []IntervalChain) bool {
2626
// SendIntervalChain sends interval chain to the channel for workers
2727
func (sch *Scheduler) SendIntervalChain(c IntervalChain) {
2828
select {
29-
case sch.intervalChainsChan <- c:
29+
case sch.ichainsChan <- c:
3030
sch.l.WithField("chain", c.ChainID).Debug("Sent interval chain to the execution channel")
3131
default:
3232
sch.l.WithField("chain", c.ChainID).Error("Failed to send interval chain to the execution channel")
3333
}
3434
}
3535

3636
func (sch *Scheduler) isValid(ichain IntervalChain) bool {
37+
sch.intervalChainMutex.Lock()
38+
defer sch.intervalChainMutex.Unlock()
3739
return (IntervalChain{}) != sch.intervalChains[ichain.ChainID]
3840
}
3941

@@ -54,7 +56,6 @@ func (sch *Scheduler) reschedule(ctx context.Context, ichain IntervalChain) {
5456
}
5557

5658
func (sch *Scheduler) retrieveIntervalChainsAndRun(ctx context.Context) {
57-
sch.intervalChainMutex.Lock()
5859
ichains := []IntervalChain{}
5960
err := sch.pgengine.SelectIntervalChains(ctx, &ichains)
6061
if err != nil {
@@ -64,6 +65,7 @@ func (sch *Scheduler) retrieveIntervalChainsAndRun(ctx context.Context) {
6465
}
6566

6667
// delete chains that are not returned from the database
68+
sch.intervalChainMutex.Lock()
6769
for id, ichain := range sch.intervalChains {
6870
if !ichain.isListed(ichains) {
6971
delete(sch.intervalChains, id)

internal/scheduler/scheduler.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,21 @@ const (
3030

3131
// Scheduler is the main class for running the tasks
3232
type Scheduler struct {
33-
l log.LoggerIface
34-
chainsChan chan Chain // channel for passing chains to workers
35-
pgengine *pgengine.PgEngine
33+
pgengine *pgengine.PgEngine
34+
l log.LoggerIface
35+
chainsChan chan Chain // channel for passing chains to workers
36+
ichainsChan chan IntervalChain // channel for passing interval chains to workers
3637

3738
exclusiveMutex sync.RWMutex //read-write mutex for running regular and exclusive chains
3839

39-
// activeChains holds the map of chain ID with context cancel() function, so we can abort chain by request
40-
activeChains map[int]func()
40+
activeChains map[int]func() // map of chain ID with context cancel() function to abort chain by request
4141
activeChainMutex sync.Mutex
4242

43-
// map of active chains, updated every minute
44-
intervalChains map[int]IntervalChain
45-
// create channel for passing interval chains to workers
46-
intervalChainsChan chan IntervalChain
43+
intervalChains map[int]IntervalChain // map of active chains, updated every minute
4744
intervalChainMutex sync.Mutex
48-
shutdown chan struct{} // closed when shutdown is called
49-
status RunStatus
45+
46+
shutdown chan struct{} // closed when shutdown is called
47+
status RunStatus
5048
}
5149

5250
// Max returns the maximum number of two arguments
@@ -60,14 +58,14 @@ func Max(x, y int) int {
6058
// New returns a new instance of Scheduler
6159
func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
6260
return &Scheduler{
63-
l: logger,
64-
pgengine: pge,
65-
chainsChan: make(chan Chain, Max(minChannelCapacity, pge.Resource.CronWorkers*2)),
66-
intervalChainsChan: make(chan IntervalChain, Max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
67-
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
68-
intervalChains: make(map[int]IntervalChain),
69-
shutdown: make(chan struct{}),
70-
status: RunningStatus,
61+
l: logger,
62+
pgengine: pge,
63+
chainsChan: make(chan Chain, Max(minChannelCapacity, pge.Resource.CronWorkers*2)),
64+
ichainsChan: make(chan IntervalChain, Max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
65+
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
66+
intervalChains: make(map[int]IntervalChain),
67+
shutdown: make(chan struct{}),
68+
status: RunningStatus,
7169
}
7270
}
7371

@@ -98,7 +96,7 @@ func (sch *Scheduler) Run(ctx context.Context) RunStatus {
9896
for w := 1; w <= sch.Config().Resource.IntervalWorkers; w++ {
9997
workerCtx, cancel := context.WithCancel(ctx)
10098
defer cancel()
101-
go sch.intervalChainWorker(workerCtx, sch.intervalChainsChan)
99+
go sch.intervalChainWorker(workerCtx, sch.ichainsChan)
102100
}
103101
ctx = log.WithLogger(ctx, sch.l)
104102

0 commit comments

Comments
 (0)