Skip to content

Commit 2824646

Browse files
authored
Immediate Payload Offloads OLAP Wiring (#2492)
* feat: payload store updates for immediate offloads * feat: handle immediate offloads * feat: start wiring up immediate offloads * fix: get rid of payload store return * feat: start immediate offloads work * fix: event trigger put call * fix: dynamic payload put depending on if offload worked * fix: rm put * fix: write event payload from the right place * fix: dummy id for task events to prevent duplication issues with the tasks themselves * fix: rm comments * fix: rm unused struct * fix: enabled wal * fix: rm `RETURNING` * fix: small cleanup * fix: wal issue
1 parent c549618 commit 2824646

File tree

4 files changed

+144
-105
lines changed

4 files changed

+144
-105
lines changed

pkg/repository/v1/olap.go

Lines changed: 124 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"log"
9+
"math/rand"
910
"sort"
1011
"sync"
1112
"time"
@@ -1459,9 +1460,17 @@ func (r *OLAPRepositoryImpl) writeTaskEventBatch(ctx context.Context, tenantId s
14591460
}
14601461

14611462
if event.ExternalID.Valid {
1463+
// generating a dummy id + inserted at to use for creating the external keys for the task events
1464+
// we do this since we don't have the id + inserted at of the events themselves on the opts, and we don't
1465+
// actually need those for anything once the keys are created.
1466+
dummyId := rand.Int63()
1467+
// randomly jitter the inserted at time by +/- 300ms to make collisions virtually impossible
1468+
dummyInsertedAt := time.Now().Add(time.Duration(rand.Intn(2*300+1)-300) * time.Millisecond)
1469+
14621470
payloadsToWrite = append(payloadsToWrite, StoreOLAPPayloadOpts{
1471+
Id: dummyId,
14631472
ExternalId: event.ExternalID,
1464-
InsertedAt: event.TaskInsertedAt,
1473+
InsertedAt: sqlchelpers.TimestamptzFromTime(dummyInsertedAt),
14651474
Payload: event.Output,
14661475
})
14671476
}
@@ -1772,6 +1781,7 @@ func (r *OLAPRepositoryImpl) writeTaskBatch(ctx context.Context, tenantId string
17721781
})
17731782

17741783
putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{
1784+
Id: task.ID,
17751785
ExternalId: task.ExternalID,
17761786
InsertedAt: task.InsertedAt,
17771787
Payload: payload,
@@ -1833,6 +1843,7 @@ func (r *OLAPRepositoryImpl) writeDAGBatch(ctx context.Context, tenantId string,
18331843
})
18341844

18351845
putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{
1846+
Id: dag.ID,
18361847
ExternalId: dag.ExternalID,
18371848
InsertedAt: dag.InsertedAt,
18381849
Payload: dag.Input,
@@ -2049,29 +2060,6 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
20492060
return fmt.Errorf("error creating events: %v", err)
20502061
}
20512062

2052-
tenantIdToPutPayloadOpts := make(map[string][]StoreOLAPPayloadOpts)
2053-
2054-
for _, event := range insertedEvents {
2055-
if event == nil {
2056-
continue
2057-
}
2058-
2059-
tenantIdToPutPayloadOpts[event.TenantID.String()] = append(tenantIdToPutPayloadOpts[event.TenantID.String()], StoreOLAPPayloadOpts{
2060-
ExternalId: event.ExternalID,
2061-
InsertedAt: event.SeenAt,
2062-
Payload: event.Payload,
2063-
})
2064-
2065-
}
2066-
2067-
for tenantId, putPayloadOpts := range tenantIdToPutPayloadOpts {
2068-
err = r.PutPayloads(ctx, tx, tenantId, putPayloadOpts)
2069-
2070-
if err != nil {
2071-
return fmt.Errorf("error putting event payloads: %v", err)
2072-
}
2073-
}
2074-
20752063
eventExternalIdToId := make(map[pgtype.UUID]int64)
20762064

20772065
for _, event := range insertedEvents {
@@ -2102,73 +2090,33 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev
21022090
return fmt.Errorf("error creating event triggers: %v", err)
21032091
}
21042092

2105-
if err := commit(ctx); err != nil {
2106-
return fmt.Errorf("error committing transaction: %v", err)
2107-
}
2108-
2109-
if !r.payloadStore.ExternalStoreEnabled() {
2110-
return nil
2111-
}
2112-
2113-
offloadToExternalOpts := make([]OffloadToExternalStoreOpts, 0)
2114-
idInsertedAtToExternalId := make(map[IdInsertedAt]pgtype.UUID)
2093+
tenantIdToPutPayloadOpts := make(map[string][]StoreOLAPPayloadOpts)
21152094

21162095
for _, event := range insertedEvents {
2117-
id := event.ID
2118-
insertedAt := event.SeenAt
2119-
idInsertedAtToExternalId[IdInsertedAt{
2120-
ID: id,
2121-
InsertedAt: insertedAt,
2122-
}] = event.ExternalID
2123-
payload := eventExternalIdToPayload[event.ExternalID]
2124-
2125-
offloadToExternalOpts = append(offloadToExternalOpts, OffloadToExternalStoreOpts{
2126-
StorePayloadOpts: &StorePayloadOpts{
2127-
Id: event.ID,
2128-
InsertedAt: event.SeenAt,
2129-
ExternalId: event.ExternalID,
2130-
Type: sqlcv1.V1PayloadTypeTASKINPUT,
2131-
Payload: payload,
2132-
TenantId: event.TenantID.String(),
2133-
},
2134-
OffloadAt: time.Now(),
2135-
})
2136-
}
2137-
2138-
if len(offloadToExternalOpts) == 0 {
2139-
return nil
2140-
}
2141-
2142-
retrieveOptsToKey, err := r.PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...)
2143-
2144-
if err != nil {
2145-
return err
2146-
}
2147-
2148-
tenantIdToffloadOpts := make(map[string][]OffloadPayloadOpts)
2096+
if event == nil {
2097+
continue
2098+
}
21492099

2150-
for opt, key := range retrieveOptsToKey {
2151-
externalId := idInsertedAtToExternalId[IdInsertedAt{
2152-
ID: opt.Id,
2153-
InsertedAt: opt.InsertedAt,
2154-
}]
2100+
payload := eventExternalIdToPayload[event.ExternalID]
21552101

2156-
tenantIdToffloadOpts[opt.TenantId.String()] = append(tenantIdToffloadOpts[opt.TenantId.String()], OffloadPayloadOpts{
2157-
ExternalId: externalId,
2158-
ExternalLocationKey: string(key),
2102+
tenantIdToPutPayloadOpts[event.TenantID.String()] = append(tenantIdToPutPayloadOpts[event.TenantID.String()], StoreOLAPPayloadOpts{
2103+
Id: event.ID,
2104+
ExternalId: event.ExternalID,
2105+
InsertedAt: event.SeenAt,
2106+
Payload: payload,
21592107
})
21602108
}
21612109

2162-
for tenantId, opts := range tenantIdToffloadOpts {
2163-
err = r.OffloadPayloads(ctx, tenantId, opts)
2110+
for tenantId, putPayloadOpts := range tenantIdToPutPayloadOpts {
2111+
err = r.PutPayloads(ctx, tx, tenantId, putPayloadOpts)
21642112

21652113
if err != nil {
2166-
return fmt.Errorf("error offloading payloads: %v", err)
2114+
return fmt.Errorf("error putting event payloads: %v", err)
21672115
}
21682116
}
21692117

2170-
if len(offloadToExternalOpts) == 0 {
2171-
return nil
2118+
if err := commit(ctx); err != nil {
2119+
return fmt.Errorf("error committing transaction: %v", err)
21722120
}
21732121

21742122
return nil
@@ -2426,27 +2374,107 @@ type OffloadPayloadOpts struct {
24262374
}
24272375

24282376
func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId string, putPayloadOpts []StoreOLAPPayloadOpts) error {
2429-
insertedAts := make([]pgtype.Timestamptz, len(putPayloadOpts))
2430-
tenantIds := make([]pgtype.UUID, len(putPayloadOpts))
2431-
externalIds := make([]pgtype.UUID, len(putPayloadOpts))
2432-
payloads := make([][]byte, len(putPayloadOpts))
2433-
locations := make([]string, len(putPayloadOpts))
2377+
localTx := false
2378+
var (
2379+
commit func(context.Context) error
2380+
rollback func()
2381+
err error
2382+
)
24342383

2435-
for i, opt := range putPayloadOpts {
2436-
externalIds[i] = opt.ExternalId
2437-
insertedAts[i] = opt.InsertedAt
2438-
tenantIds[i] = sqlchelpers.UUIDFromStr(tenantId)
2439-
payloads[i] = opt.Payload
2440-
locations[i] = string(sqlcv1.V1PayloadLocationOlapINLINE)
2384+
if tx == nil {
2385+
localTx = true
2386+
tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000)
2387+
2388+
if err != nil {
2389+
return fmt.Errorf("error beginning transaction in `PutPayload`: %v", err)
2390+
}
2391+
2392+
defer rollback()
24412393
}
24422394

2443-
return r.queries.PutPayloads(ctx, tx, sqlcv1.PutPayloadsParams{
2444-
Externalids: externalIds,
2445-
Insertedats: insertedAts,
2446-
Tenantids: tenantIds,
2447-
Payloads: payloads,
2448-
Locations: locations,
2395+
placeholderPayloadType := sqlcv1.V1PayloadTypeTASKEVENTDATA // placeholder, not used in OLAP
2396+
retrieveOptsToKey := make(map[RetrievePayloadOpts]ExternalPayloadLocationKey)
2397+
2398+
if r.payloadStore.ExternalStoreEnabled() {
2399+
storeExternalPayloadOpts := make([]OffloadToExternalStoreOpts, len(putPayloadOpts))
2400+
2401+
for i, opt := range putPayloadOpts {
2402+
storeOpts := OffloadToExternalStoreOpts{
2403+
StorePayloadOpts: &StorePayloadOpts{
2404+
Id: opt.Id,
2405+
InsertedAt: opt.InsertedAt,
2406+
ExternalId: opt.ExternalId,
2407+
Type: placeholderPayloadType,
2408+
Payload: opt.Payload,
2409+
TenantId: tenantId,
2410+
},
2411+
OffloadAt: opt.InsertedAt.Time, // placeholder, offloaded immediately
2412+
}
2413+
2414+
storeExternalPayloadOpts[i] = storeOpts
2415+
}
2416+
2417+
retrieveOptsToKey, err = r.payloadStore.ExternalStore().Store(ctx, storeExternalPayloadOpts...)
2418+
2419+
if err != nil {
2420+
return fmt.Errorf("error offloading payloads to external store: %v", err)
2421+
}
2422+
}
2423+
2424+
insertedAts := make([]pgtype.Timestamptz, 0, len(putPayloadOpts))
2425+
tenantIds := make([]pgtype.UUID, 0, len(putPayloadOpts))
2426+
externalIds := make([]pgtype.UUID, 0, len(putPayloadOpts))
2427+
payloads := make([][]byte, 0, len(putPayloadOpts))
2428+
locations := make([]string, 0, len(putPayloadOpts))
2429+
externalKeys := make([]string, 0, len(putPayloadOpts))
2430+
2431+
tenantIdUUID := sqlchelpers.UUIDFromStr(tenantId)
2432+
2433+
for _, opt := range putPayloadOpts {
2434+
retrieveOpts := RetrievePayloadOpts{
2435+
Id: opt.Id,
2436+
InsertedAt: opt.InsertedAt,
2437+
Type: placeholderPayloadType,
2438+
TenantId: tenantIdUUID,
2439+
}
2440+
2441+
key, ok := retrieveOptsToKey[retrieveOpts]
2442+
2443+
externalIds = append(externalIds, opt.ExternalId)
2444+
insertedAts = append(insertedAts, opt.InsertedAt)
2445+
tenantIds = append(tenantIds, tenantIdUUID)
2446+
2447+
if ok {
2448+
payloads = append(payloads, nil)
2449+
locations = append(locations, string(sqlcv1.V1PayloadLocationOlapEXTERNAL))
2450+
externalKeys = append(externalKeys, string(key))
2451+
} else {
2452+
payloads = append(payloads, opt.Payload)
2453+
locations = append(locations, string(sqlcv1.V1PayloadLocationOlapINLINE))
2454+
externalKeys = append(externalKeys, "")
2455+
}
2456+
}
2457+
2458+
err = r.queries.PutPayloads(ctx, tx, sqlcv1.PutPayloadsParams{
2459+
Externalids: externalIds,
2460+
Insertedats: insertedAts,
2461+
Tenantids: tenantIds,
2462+
Payloads: payloads,
2463+
Locations: locations,
2464+
Externallocationkeys: externalKeys,
24492465
})
2466+
2467+
if err != nil {
2468+
return fmt.Errorf("error putting payloads: %v", err)
2469+
}
2470+
2471+
if localTx {
2472+
if err := commit(ctx); err != nil {
2473+
return fmt.Errorf("error committing transaction in `PutPayload`: %v", err)
2474+
}
2475+
}
2476+
2477+
return nil
24502478
}
24512479

24522480
func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error) {

pkg/repository/v1/payloadstore.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type StorePayloadOpts struct {
2626
}
2727

2828
type StoreOLAPPayloadOpts struct {
29+
Id int64
2930
ExternalId pgtype.UUID
3031
InsertedAt pgtype.Timestamptz
3132
Payload []byte
@@ -45,6 +46,7 @@ type RetrievePayloadOpts struct {
4546

4647
type PayloadLocation string
4748
type ExternalPayloadLocationKey string
49+
type TenantID string
4850

4951
type ExternalStore interface {
5052
Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error)
@@ -64,6 +66,7 @@ type PayloadStoreRepository interface {
6466
OLAPDualWritesEnabled() bool
6567
WALPollLimit() int
6668
WALProcessInterval() time.Duration
69+
WALEnabled() bool
6770
ExternalCutoverProcessInterval() time.Duration
6871
ExternalStoreEnabled() bool
6972
ExternalStore() ExternalStore
@@ -697,6 +700,10 @@ func (p *payloadStoreRepositoryImpl) ExternalStore() ExternalStore {
697700
return p.externalStore
698701
}
699702

703+
func (p *payloadStoreRepositoryImpl) WALEnabled() bool {
704+
return p.walEnabled
705+
}
706+
700707
type NoOpExternalStore struct{}
701708

702709
func (n *NoOpExternalStore) Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) {

pkg/repository/v1/sqlcv1/olap.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,8 @@ WITH inputs AS (
16261626
UNNEST(@insertedAts::TIMESTAMPTZ[]) AS inserted_at,
16271627
UNNEST(@payloads::JSONB[]) AS payload,
16281628
UNNEST(@tenantIds::UUID[]) AS tenant_id,
1629-
UNNEST(CAST(@locations::TEXT[] AS v1_payload_location_olap[])) AS location
1629+
UNNEST(CAST(@locations::TEXT[] AS v1_payload_location_olap[])) AS location,
1630+
UNNEST(@externalLocationKeys::TEXT[]) AS external_location_key
16301631
)
16311632

16321633
INSERT INTO v1_payloads_olap (
@@ -1644,7 +1645,7 @@ SELECT
16441645
i.inserted_at,
16451646
i.location,
16461647
CASE
1647-
WHEN i.location = 'EXTERNAL' THEN i.payload
1648+
WHEN i.location = 'EXTERNAL' THEN i.external_location_key
16481649
ELSE NULL
16491650
END,
16501651
CASE

pkg/repository/v1/sqlcv1/olap.sql.go

Lines changed: 10 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)