Skip to content

Commit 98b9e8e

Browse files
Arta AsadiArta Asadi
authored andcommitted
fix: update task run scheduler migrator
1 parent 4955ede commit 98b9e8e

File tree

4 files changed

+44
-24
lines changed

4 files changed

+44
-24
lines changed

services/tasks/db/db.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/opengovern/opensecurity/services/tasks/db/models"
88
"gorm.io/gorm"
99
"gorm.io/gorm/clause"
10-
"time"
1110
)
1211

1312
type Database struct {
@@ -136,6 +135,25 @@ func (db Database) FetchLastCreatedTaskRunsByTaskID(taskID string) (*models.Task
136135
return &task, nil
137136
}
138137

138+
// FetchLastTaskRunsByTaskSchedulerID retrieves last task run by scheduler id
139+
func (db Database) FetchLastTaskRunsByTaskSchedulerID(taskID, taskSchedulerID string) (*models.TaskRun, error) {
140+
var task models.TaskRun
141+
tx := db.Orm.Model(&models.TaskRun{}).
142+
Where("task_id = ?", taskID).
143+
Where("trigger_type = ?", models.TriggerTypeScheduled).
144+
Where("triggered_by = ?", taskSchedulerID).
145+
Order("created_at desc").
146+
First(&task)
147+
if tx.Error != nil {
148+
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
149+
return nil, nil
150+
}
151+
return nil, tx.Error
152+
}
153+
154+
return &task, nil
155+
}
156+
139157
// TimeoutTaskRunsByTaskID Timeout task runs for given task id by given timeout interval
140158
func (db Database) TimeoutTaskRunsByTaskID(taskID string, timeoutInterval uint64) error {
141159
tx := db.Orm.
@@ -241,11 +259,3 @@ func (db Database) GetTaskConfigSecret(taskId string) (*models.TaskConfigSecret,
241259

242260
return &configSecret, nil
243261
}
244-
245-
func (db Database) UpdateTaskRunScheduleLastRun(id uint) error {
246-
tx := db.Orm.Model(&models.TaskRunSchedule{}).Where("id = ?", id).Update("last_run", time.Now())
247-
if tx.Error != nil {
248-
return tx.Error
249-
}
250-
return nil
251-
}

services/tasks/db/models/task.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"github.com/jackc/pgtype"
55
"github.com/lib/pq"
66
"gorm.io/gorm"
7-
"time"
87
)
98

109
type TaskSecretHealthStatus string
@@ -46,9 +45,8 @@ type TaskConfigSecret struct {
4645
}
4746

4847
type TaskRunSchedule struct {
49-
ID uint `gorm:"primarykey"`
50-
TaskID string
51-
LastRun *time.Time
52-
Params pgtype.JSONB
53-
Frequency float64
48+
SchedulerID string `gorm:"primarykey"`
49+
TaskID string `gorm:"primarykey"`
50+
Params pgtype.JSONB
51+
Frequency float64
5452
}

services/tasks/db/models/task_run.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@ const (
1717
TaskRunStatusCancelled TaskRunStatus = "CANCELLED"
1818
)
1919

20+
type TriggerType string
21+
22+
const (
23+
TriggerTypeManual TriggerType = "MANUAL"
24+
TriggerTypeScheduled TriggerType = "SCHEDULED"
25+
)
26+
2027
type TaskRun struct {
2128
gorm.Model
2229
TaskID string
2330
Params pgtype.JSONB
2431
Status TaskRunStatus
2532
Result pgtype.JSONB
33+
TriggerType TriggerType
34+
TriggeredBy string
2635
FailureMessage string
2736
}

services/tasks/scheduler/create_task.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,24 @@ func (s *MainScheduler) createTasks(ctx context.Context) error {
4040
return err
4141
}
4242
for _, runSchedule := range runSchedules {
43-
if runSchedule.LastRun != nil {
44-
if time.Now().Before(runSchedule.LastRun.Add(time.Duration(runSchedule.Frequency) * time.Second)) {
43+
lastRun, err := s.db.FetchLastTaskRunsByTaskSchedulerID(task.ID, runSchedule.SchedulerID)
44+
if err != nil {
45+
return err
46+
}
47+
if lastRun != nil {
48+
if time.Now().Before(lastRun.CreatedAt.Add(time.Duration(runSchedule.Frequency) * time.Second)) {
4549
continue
4650
}
4751
}
52+
4853
newRun := models.TaskRun{
49-
TaskID: task.ID,
50-
Status: models.TaskRunStatusCreated,
54+
TaskID: task.ID,
55+
Status: models.TaskRunStatusCreated,
56+
TriggerType: models.TriggerTypeScheduled,
57+
TriggeredBy: runSchedule.SchedulerID,
5158
}
5259

53-
err := newRun.Result.Set([]byte("{}"))
60+
err = newRun.Result.Set([]byte("{}"))
5461
if err != nil {
5562
return err
5663
}
@@ -59,10 +66,6 @@ func (s *MainScheduler) createTasks(ctx context.Context) error {
5966
if err = s.db.CreateTaskRun(&newRun); err != nil {
6067
return err
6168
}
62-
63-
if err = s.db.UpdateTaskRunScheduleLastRun(runSchedule.ID); err != nil {
64-
return err
65-
}
6669
}
6770
}
6871

0 commit comments

Comments
 (0)