Skip to content

Commit af266d4

Browse files
SoulPancakerueian
andauthored
feat: Create and alias index (#813)
* feat: Create and aliax index * feat: create alias inside * feat: check if alias exists * feat: address comment * feat: address from suggestion Co-authored-by: Rueian <[email protected]> * feat: address comment * feat: update test * feat: fix create and alias index hash * feat: fix JSON repo create and alias index --------- Co-authored-by: Rueian <[email protected]>
1 parent a11f1dd commit af266d4

File tree

5 files changed

+237
-0
lines changed

5 files changed

+237
-0
lines changed

om/hash.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package om
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/oklog/ulid/v2"
67
"reflect"
78
"strconv"
9+
"strings"
810
"time"
911

1012
"github.com/redis/rueidis"
@@ -146,6 +148,78 @@ func (r *HashRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
146148
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnHash().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
147149
}
148150

151+
// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
152+
func (r *HashRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
153+
alias := r.idx
154+
155+
var currentIndex string
156+
aliasExists := false
157+
infoCmd := r.client.B().FtInfo().Index(alias).Build()
158+
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
159+
if err != nil {
160+
if strings.Contains(err.Error(), "Unknown index name") {
161+
// This is expected when the alias doesn't exist yet
162+
aliasExists = false
163+
} else {
164+
// This is an unexpected error (network, connection, etc.)
165+
return fmt.Errorf("failed to check if index exists: %w", err)
166+
}
167+
} else {
168+
aliasExists = true
169+
}
170+
171+
if aliasExists {
172+
message, ok := infoResp["index_name"]
173+
if !ok {
174+
return fmt.Errorf("index_name not found in FT.INFO response")
175+
}
176+
177+
currentIndex, err = message.ToString()
178+
if err != nil {
179+
return fmt.Errorf("failed to convert index_name to string: %w", err)
180+
}
181+
}
182+
183+
newIndex := alias + "_v1"
184+
if aliasExists && currentIndex != "" {
185+
// Find the last occurrence of "_v" followed by digits
186+
lastVersionIndex := strings.LastIndex(currentIndex, "_v")
187+
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
188+
versionStr := currentIndex[lastVersionIndex+2:]
189+
if version, err := strconv.Atoi(versionStr); err == nil {
190+
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
191+
}
192+
}
193+
}
194+
195+
// Create the new index
196+
cmd := r.client.B().FtCreate().Index(newIndex).OnHash().Prefix(1).Prefix(r.prefix + ":")
197+
if err := r.client.Do(ctx, cmdFn(cmd.Schema())).Error(); err != nil {
198+
return err
199+
}
200+
201+
// Update or add the alias
202+
var aliasErr error
203+
if aliasExists {
204+
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
205+
} else {
206+
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
207+
}
208+
209+
if aliasErr != nil {
210+
return fmt.Errorf("failed to update alias: %w", aliasErr)
211+
}
212+
213+
// Drop the old index if it exists and differs from the new one
214+
if aliasExists && currentIndex != "" && currentIndex != newIndex {
215+
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
216+
return fmt.Errorf("failed to drop old index: %w", err)
217+
}
218+
}
219+
220+
return nil
221+
}
222+
149223
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop the index whose name is `hashidx:{prefix}`
150224
func (r *HashRepository[T]) DropIndex(ctx context.Context) error {
151225
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()

om/hash_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,3 +479,60 @@ func TestNewHashRepositoryTTL(t *testing.T) {
479479
}
480480
})
481481
}
482+
483+
// TestCreateAndAliasIndex tests the CreateAndAliasIndex method of HashRepository.
484+
func TestCreateAndAliasIndex(t *testing.T) {
485+
ctx := context.Background()
486+
487+
client := setup(t)
488+
client.Do(ctx, client.B().Flushall().Build())
489+
defer client.Close()
490+
491+
repo := NewHashRepository("hashalias", HashTestStruct{}, client)
492+
493+
t.Run("CreateAndAliasIndex", func(t *testing.T) {
494+
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
495+
return schema.FieldName("Val").Text().Build()
496+
})
497+
if err != nil {
498+
t.Fatalf("failed to create and alias index: %v", err)
499+
}
500+
501+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
502+
503+
// Step 3: Create new index version and update alias
504+
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
505+
return schema.FieldName("Val").Text().Build()
506+
})
507+
if err != nil {
508+
t.Fatalf("failed to create and alias new index version: %v", err)
509+
}
510+
511+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
512+
})
513+
}
514+
515+
// Helper to verify that alias points to the expected index name
516+
func verifyAliasTarget(t *testing.T, ctx context.Context, client rueidis.Client, aliasName string, expectedIndex string) {
517+
t.Helper()
518+
519+
infoCmd := client.B().FtInfo().Index(aliasName).Build()
520+
infoResp, err := client.Do(ctx, infoCmd).ToMap()
521+
if err != nil {
522+
t.Fatalf("failed to fetch index info: %v", err)
523+
}
524+
525+
indexMsg, ok := infoResp["index_name"]
526+
if !ok {
527+
t.Fatalf("FT.INFO response missing index_name field")
528+
}
529+
530+
actualIndex, err := (&indexMsg).ToString()
531+
if err != nil {
532+
t.Fatalf("failed to convert index_name to string: %v", err)
533+
}
534+
535+
if actualIndex != expectedIndex {
536+
t.Fatalf("alias does not point to the expected index. expected=%s got=%s", expectedIndex, actualIndex)
537+
}
538+
}

om/json.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package om
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"reflect"
78
"strconv"
89
"strings"
@@ -145,6 +146,80 @@ func (r *JSONRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
145146
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnJson().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
146147
}
147148

149+
// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
150+
func (r *JSONRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
151+
alias := r.idx
152+
153+
var currentIndex string
154+
aliasExists := false
155+
infoCmd := r.client.B().FtInfo().Index(alias).Build()
156+
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
157+
if err != nil {
158+
if strings.Contains(err.Error(), "Unknown index name") {
159+
aliasExists = false
160+
} else {
161+
return fmt.Errorf("failed to check if index exists: %w", err)
162+
}
163+
} else {
164+
aliasExists = true
165+
}
166+
167+
if aliasExists {
168+
message, ok := infoResp["index_name"]
169+
if !ok {
170+
return fmt.Errorf("index_name not found in FT.INFO response")
171+
}
172+
currentIndex, err = message.ToString()
173+
if err != nil {
174+
return fmt.Errorf("failed to convert index_name to string: %w", err)
175+
}
176+
}
177+
178+
// Compute new index version name
179+
newIndex := alias + "_v1"
180+
if aliasExists && currentIndex != "" {
181+
lastVersionIndex := strings.LastIndex(currentIndex, "_v")
182+
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
183+
versionStr := currentIndex[lastVersionIndex+2:]
184+
if version, err := strconv.Atoi(versionStr); err == nil {
185+
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
186+
}
187+
}
188+
}
189+
190+
// Create the new index with schema
191+
createCmd := r.client.B().FtCreate().
192+
Index(newIndex).
193+
OnJson().
194+
Prefix(1).
195+
Prefix(r.prefix + ":")
196+
if err := r.client.Do(ctx, cmdFn(createCmd.Schema())).Error(); err != nil {
197+
return fmt.Errorf("failed to create index %s: %w", newIndex, err)
198+
}
199+
200+
// Set alias to point to new index
201+
var aliasErr error
202+
if aliasExists {
203+
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
204+
} else {
205+
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
206+
}
207+
208+
if aliasErr != nil {
209+
return fmt.Errorf("failed to update alias: %w", aliasErr)
210+
}
211+
212+
// Drop old index if it's different from the new one
213+
if aliasExists && currentIndex != "" && currentIndex != newIndex {
214+
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
215+
return fmt.Errorf("failed to drop old index: %w", err)
216+
}
217+
}
218+
219+
return nil
220+
}
221+
222+
148223
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop the index whose name is `jsonidx:{prefix}`
149224
func (r *JSONRepository[T]) DropIndex(ctx context.Context) error {
150225
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()

om/json_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,3 +448,33 @@ func TestNewJSONTTLRepository(t *testing.T) {
448448
}
449449
})
450450
}
451+
452+
func TestCreateAndAliasIndex_JSON(t *testing.T) {
453+
ctx := context.Background()
454+
455+
client := setup(t)
456+
client.Do(ctx, client.B().Flushall().Build())
457+
defer client.Close()
458+
459+
repo := NewJSONRepository("jsonalias", JSONTestStruct{}, client)
460+
461+
t.Run("CreateAndAliasIndex_JSON", func(t *testing.T) {
462+
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
463+
return schema.FieldName("$.val").As("val").Text().Build()
464+
})
465+
if err != nil {
466+
t.Fatalf("failed to create and alias JSON index: %v", err)
467+
}
468+
469+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
470+
471+
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
472+
return schema.FieldName("$.val").As("val").Text().Build()
473+
})
474+
if err != nil {
475+
t.Fatalf("failed to create and alias new JSON index version: %v", err)
476+
}
477+
478+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
479+
})
480+
}

om/repo.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Repository[T any] interface {
4545
SaveMulti(ctx context.Context, entity ...*T) (errs []error)
4646
Remove(ctx context.Context, id string) error
4747
CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error
48+
CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error
4849
AlterIndex(ctx context.Context, cmdFn func(alter FtAlterIndex) rueidis.Completed) error
4950
DropIndex(ctx context.Context) error
5051
IndexName() string

0 commit comments

Comments
 (0)