Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit efef3b1

Browse files
authored
Create task retention worker and scheduler (#53)
**What** - Create new generic SQLTask spec and handler - Create a new task handler that can be used to cleanup old tasks matching a specific filter. The caller can specify the queue, task type, status, age, and a custom sqlizer. - Add utility to schedule this new cleanup task once per hour. The exact minute is chosen randomly as a basic attempt to spread our the workload. - Improve the db test utility EqualCount to allow passing an error message and ars down to the require method inside of it. This allows the errors to be more expressive and hopefully easier to read - Add new db setup utility that uses docker to ensure a db server is ready. This is intended as a dev mode only tool which is not left in the final committed code, but can be very useful to speedup test iteration and debugging. Signed-off-by: Lucas Roesler <[email protected]>
1 parent 8dccb68 commit efef3b1

File tree

5 files changed

+610
-22
lines changed

5 files changed

+610
-22
lines changed

pkg/db/test/db.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"os/exec"
78
"testing"
89
"time"
910

@@ -28,7 +29,7 @@ type DBInitializer func(context.Context, *sql.DB) error
2829

2930
// EqualCount asserts that the count of rows matches the expected value given the table and WHERE query and args.EqualCount
3031
// Note that this is a simple COUNT of rows in a single table. More complex queries should be constructed by hand.
31-
func EqualCount(t *testing.T, db *sql.DB, expected int, table string, filter squirrel.Sqlizer) int {
32+
func EqualCount(t *testing.T, db *sql.DB, expected int, table string, filter squirrel.Sqlizer, msgAndArgs ...interface{}) int {
3233
var count int
3334
err := squirrel.StatementBuilder.
3435
PlaceholderFormat(squirrel.Dollar).
@@ -37,8 +38,8 @@ func EqualCount(t *testing.T, db *sql.DB, expected int, table string, filter squ
3738
Where(filter).
3839
RunWith(db).
3940
Scan(&count)
40-
require.NoError(t, err)
41-
require.Equal(t, expected, count)
41+
require.NoError(t, err, msgAndArgs...)
42+
require.Equal(t, expected, count, msgAndArgs...)
4243

4344
return count
4445
}
@@ -112,3 +113,59 @@ func connectDB(name string) (db *sql.DB, err error) {
112113

113114
return sql.Open(cfg.DriverName, connStr+" password="+adminPassword)
114115
}
116+
117+
// cleanupDB attempts to remove the test container created by EnsureDBReady
118+
func cleanupDB() {
119+
cmd := exec.Command("docker", "rm", "-v", "-f", "go-base-postgres")
120+
err := cmd.Run()
121+
if err != nil {
122+
fmt.Println("error during db cleanupdb: ", err)
123+
}
124+
}
125+
126+
// EnsureDBReady is a helper utility that can be inserted into a test during development to make it simpler
127+
// to run the test in isolation. Docker will be called to start a db that is read to be used with GetDatabase.
128+
// Example usage:
129+
//
130+
// err, cleanup := dbtest.EnsureDBReady(ctx)
131+
// require.NoError(t, err)
132+
// defer cleanup()
133+
//
134+
// _, db := dbtest.GetDatabase(t)
135+
// defer db.Close()
136+
//
137+
// While the code is somewhat robust to having existing dbs running, this is not intended to be left in the test code.
138+
// The goal is that you can run individual tests via your IDE integrations or using the CLI, e.g.
139+
// go test -run ^TestRetentionHandler$`
140+
func EnsureDBReady(ctx context.Context) (error, func()) {
141+
check := exec.CommandContext(ctx, "docker", "container", "inspect", "go-base-postgres", "-f", "{{.ID}}").Run()
142+
if check == nil {
143+
// container is already running
144+
return nil, func() {}
145+
}
146+
147+
if exitError, ok := check.(*exec.ExitError); ok && exitError.ExitCode() != 1 {
148+
return fmt.Errorf("unexpected exit code when checking for running db: %w", check), func() {}
149+
}
150+
151+
db := exec.CommandContext(ctx,
152+
"docker",
153+
"run",
154+
"--rm",
155+
"-d",
156+
"--name", "go-base-postgres",
157+
"-p", "0.0.0.0:5432:5432",
158+
"-e", "POSTGRES_PASSWORD=localdev",
159+
"-e", "POSTGRES_USER=contiamo_test",
160+
"-e", "POSTGRES_DB=postgres",
161+
"postgres:alpine",
162+
"-c", "fsync=off",
163+
"-c", "full_page_writes=off",
164+
"-c", "synchronous_commit=off",
165+
)
166+
167+
err := db.Run()
168+
169+
time.Sleep(3 * time.Second)
170+
return err, cleanupDB
171+
}

pkg/queue/postgres/setup.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import (
1111
)
1212

1313
const (
14+
// TasksTable is the name of the Postgres table that is used for tasks
15+
TasksTable = "tasks"
16+
// SchedulesTable is the name of the Postgres table used for schedules
17+
SchedulesTable = "schedules"
18+
1419
createTableTmpl = `
1520
CREATE EXTENSION IF NOT EXISTS citext;
1621
@@ -80,71 +85,71 @@ var (
8085

8186
// schedules
8287
{
83-
Table: "schedules",
88+
Table: SchedulesTable,
8489
Columns: []string{"next_execution_time DESC"},
8590
},
8691
{
87-
Table: "schedules",
92+
Table: SchedulesTable,
8893
Columns: []string{"task_queue"},
8994
Type: "hash",
9095
},
9196
{
92-
Table: "schedules",
97+
Table: SchedulesTable,
9398
Columns: []string{"task_type"},
9499
Type: "hash",
95100
},
96101
{
97-
Table: "schedules",
102+
Table: SchedulesTable,
98103
Columns: []string{"created_at DESC", "updated_at DESC"},
99104
},
100105

101106
// tasks
102107
{
103-
Table: "tasks",
108+
Table: TasksTable,
104109
Columns: []string{"queue"},
105110
Type: "hash",
106111
},
107112
{
108-
Table: "tasks",
113+
Table: TasksTable,
109114
Columns: []string{"type"},
110115
Type: "hash",
111116
},
112117
{
113-
Table: "tasks",
118+
Table: TasksTable,
114119
Columns: []string{"status"},
115120
Type: "hash",
116121
},
117122
{
118-
Table: "tasks",
123+
Table: TasksTable,
119124
Columns: []string{"schedule_id"},
120125
Type: "hash",
121126
},
122127
{
123-
Table: "tasks",
128+
Table: TasksTable,
124129
Columns: []string{"created_at", "last_heartbeat_at"},
125130
},
126131
{
127-
Table: "tasks",
132+
Table: TasksTable,
128133
Columns: []string{"created_at DESC", "last_heartbeat_at DESC"},
129134
},
130135
{
131-
Table: "tasks",
136+
Table: TasksTable,
132137
Columns: []string{"created_at DESC"},
133138
},
134139
{
135-
Table: "tasks",
140+
Table: TasksTable,
136141
Columns: []string{"last_heartbeat_at DESC"},
137142
},
138143
{
139-
Table: "tasks",
144+
Table: TasksTable,
140145
Columns: []string{"created_at DESC", "updated_at DESC"},
141146
},
142147
{
143-
Table: "tasks",
148+
Table: TasksTable,
144149
Columns: []string{"started_at DESC"},
145150
},
146151
{
147-
Table: "tasks",
152+
Table: TasksTable,
148153
Columns: []string{"finished_at DESC"},
149154
},
150155
}
@@ -170,14 +175,14 @@ func SetupTables(ctx context.Context, db db.SQLDB, references []ForeignReference
170175
logrus.Debug("checking queue-related tables...")
171176

172177
logrus.Debug("checking `schedules` table...")
173-
err = syncTable(ctx, db, "schedules", scheduleColumns, references)
178+
err = syncTable(ctx, db, SchedulesTable, scheduleColumns, references)
174179
if err != nil {
175180
return err
176181
}
177182
logrus.Debug("`schedules` table is up to date")
178183

179184
logrus.Debug("checking `tasks` table...")
180-
err = syncTable(ctx, db, "tasks", taskColumns, references)
185+
err = syncTable(ctx, db, TasksTable, taskColumns, references)
181186
if err != nil {
182187
return err
183188
}
@@ -194,12 +199,12 @@ func SetupTables(ctx context.Context, db db.SQLDB, references []ForeignReference
194199
applyIndexes := make(indexList, 0, len(references)+len(indexes))
195200
for _, ref := range references {
196201
applyIndexes = append(applyIndexes, index{
197-
Table: "schedules",
202+
Table: SchedulesTable,
198203
Columns: []string{ref.ColumnName},
199204
Type: "hash",
200205
})
201206
applyIndexes = append(applyIndexes, index{
202-
Table: "tasks",
207+
Table: TasksTable,
203208
Columns: []string{ref.ColumnName},
204209
Type: "hash",
205210
})

pkg/queue/workers/retention.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package workers
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"fmt"
8+
"math/rand"
9+
"time"
10+
11+
"github.com/Masterminds/squirrel"
12+
"github.com/contiamo/go-base/pkg/queue"
13+
"github.com/contiamo/go-base/pkg/queue/postgres"
14+
)
15+
16+
const (
17+
// MaintenanceTaskQueue task queue name used for all the periodic maintenance jobs.
18+
// These are internal queue internal tasks
19+
MaintenanceTaskQueue string = "queue-maintenance"
20+
21+
// RetentionTask is finished task cleanup type
22+
RetentionTask queue.TaskType = "retention"
23+
)
24+
25+
// NewRetentionHandler creates a task handler that will clean up old finished tasks
26+
func NewRetentionHandler(db *sql.DB) TaskHandler {
27+
return NewSQLTaskHandler("RetentionHandler", db)
28+
}
29+
30+
// AssertRetentionSchedule creates a new queue retention tasks for the supplied queue, finished tasks matching
31+
// the supplied parameters will be deleted
32+
func AssertRetentionSchedule(ctx context.Context, scheduler queue.Scheduler, queueName string, taskType queue.TaskType, status queue.TaskStatus, filter squirrel.Sqlizer, age time.Duration) error {
33+
spec := createRetentionSpec(queueName, taskType, status, filter, age)
34+
specBytes, err := json.Marshal(spec)
35+
if err != nil {
36+
return fmt.Errorf("can not build retention task spec: %w", err)
37+
}
38+
// randomly distribute the retention tasks throughout the hour
39+
when := rand.Intn(60)
40+
retentionSchedule := queue.TaskScheduleRequest{
41+
TaskBase: queue.TaskBase{
42+
Queue: MaintenanceTaskQueue,
43+
Type: RetentionTask,
44+
Spec: specBytes,
45+
},
46+
CronSchedule: fmt.Sprintf("%d * * * *", when), // every hour at minute "when"
47+
}
48+
49+
return scheduler.AssertSchedule(ctx, retentionSchedule)
50+
}
51+
52+
//createRetentionSpec builds the task retention job spec. It is split out to simplify test setup
53+
func createRetentionSpec(queueName string, taskType queue.TaskType, status queue.TaskStatus, filter squirrel.Sqlizer, age time.Duration) SQLExecTaskSpec {
54+
spec := SQLExecTaskSpec{
55+
SQL: "",
56+
}
57+
58+
// use separate WHERE statements to make the order deterministic
59+
deletionSQL := squirrel.Delete(postgres.TasksTable).
60+
Where(squirrel.Eq{"status": status}).
61+
Where(
62+
// note that using this comparision allows us to use the index on
63+
// finished_at, if yo use `age(now(), finished_at)`, this can not use the index
64+
fmt.Sprintf("finished_at <= now() - interval '%f minutes'", age.Minutes()),
65+
)
66+
67+
if queueName != "" {
68+
deletionSQL = deletionSQL.Where(squirrel.Eq{"queue": queueName})
69+
}
70+
71+
if taskType != "" {
72+
deletionSQL = deletionSQL.Where(squirrel.Eq{"type": taskType})
73+
}
74+
75+
if filter != nil {
76+
deletionSQL = deletionSQL.Where(filter)
77+
}
78+
79+
spec.SQL = squirrel.DebugSqlizer(deletionSQL)
80+
81+
return spec
82+
}

0 commit comments

Comments
 (0)