Skip to content

Commit dfe503d

Browse files
committed
wip env manager with kafka
1 parent 3bae015 commit dfe503d

File tree

5 files changed

+237
-8
lines changed

5 files changed

+237
-8
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package k8sclient
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
type ComponentManager interface {
9+
Name() ComponentName
10+
Snapshot(ctx context.Context) error
11+
Restore(ctx context.Context) error
12+
}
13+
14+
type EnvManager struct {
15+
mu sync.Mutex
16+
components map[ComponentName]ComponentManager
17+
}
18+
19+
type ComponentName string
20+
21+
func NewEnvManager(components ...ComponentManager) *EnvManager {
22+
comps := make(map[ComponentName]ComponentManager, len(components))
23+
for _, component := range components {
24+
comps[component.Name()] = component
25+
}
26+
return &EnvManager{components: comps}
27+
}
28+
29+
func (e *EnvManager) SnapshotAll(ctx context.Context) error {
30+
e.mu.Lock()
31+
defer e.mu.Unlock()
32+
33+
for _, c := range e.components {
34+
if err := c.Snapshot(ctx); err != nil {
35+
return err
36+
}
37+
}
38+
return nil
39+
}
40+
41+
func (e *EnvManager) RestoreAll(ctx context.Context) error {
42+
e.mu.Lock()
43+
defer e.mu.Unlock()
44+
45+
for _, c := range e.components {
46+
if err := c.Restore(ctx); err != nil {
47+
return err
48+
}
49+
}
50+
return nil
51+
}
52+
53+
func (e *EnvManager) Kafka() *KafkaComponent {
54+
for _, component := range e.components {
55+
if k, ok := component.(*KafkaComponent); ok {
56+
return k
57+
}
58+
}
59+
return nil
60+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package k8sclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
appsv1 "k8s.io/api/apps/v1"
8+
"k8s.io/apimachinery/pkg/types"
9+
)
10+
11+
type KafkaComponent struct {
12+
name ComponentName
13+
namespace string
14+
resource types.NamespacedName
15+
16+
k8s *K8sClient
17+
base *int32 // baseline replicas
18+
dirty bool
19+
}
20+
21+
const kafkaComponentName ComponentName = "kafka"
22+
23+
func NewKafkaComponent(k8s *K8sClient, namespace, statefulSetName string) *KafkaComponent {
24+
return &KafkaComponent{
25+
name: kafkaComponentName,
26+
namespace: namespace,
27+
resource: types.NamespacedName{Namespace: namespace, Name: statefulSetName},
28+
k8s: k8s,
29+
}
30+
}
31+
32+
func (k *KafkaComponent) Name() ComponentName { return k.name }
33+
34+
// Snapshot reads the current replicas once and stores them as baseline.
35+
func (k *KafkaComponent) Snapshot(ctx context.Context) error {
36+
if k.base != nil {
37+
// already snapshotted
38+
return nil
39+
}
40+
41+
var sts appsv1.StatefulSet
42+
if err := k.k8s.KubeClient.Get(ctx, k.resource, &sts); err != nil {
43+
return fmt.Errorf("kafka snapshot: get %s/%s: %w", k.resource.Namespace, k.resource.Name, err)
44+
}
45+
46+
if sts.Spec.Replicas == nil {
47+
return fmt.Errorf("kafka snapshot: statefulset %s/%s has nil .spec.replicas", k.resource.Namespace, k.resource.Name)
48+
}
49+
50+
replicas := *sts.Spec.Replicas
51+
k.base = &replicas
52+
return nil
53+
}
54+
55+
// Restore only acts if we've modified Kafka in this scenario (dirty=true).
56+
func (k *KafkaComponent) Restore(ctx context.Context) error {
57+
if !k.dirty || k.base == nil {
58+
return nil
59+
}
60+
if err := k.scale(ctx, *k.base); err != nil {
61+
return err
62+
}
63+
k.dirty = false
64+
return nil
65+
}
66+
67+
// Public helpers used from steps:
68+
69+
func (k *KafkaComponent) MakeUnavailable(ctx context.Context) error {
70+
if err := k.Snapshot(ctx); err != nil {
71+
return err
72+
}
73+
if err := k.scale(ctx, 0); err != nil {
74+
return err
75+
}
76+
k.dirty = true
77+
return nil
78+
}
79+
80+
func (k *KafkaComponent) MakeAvailable(ctx context.Context) error {
81+
if k.base == nil {
82+
// nothing to restore to; safest is to no-op or return error
83+
return nil
84+
}
85+
if err := k.scale(ctx, *k.base); err != nil {
86+
return err
87+
}
88+
k.dirty = false
89+
return nil
90+
}
91+
92+
func (k *KafkaComponent) scale(ctx context.Context, replicas int32) error {
93+
var sts appsv1.StatefulSet
94+
if err := k.k8s.KubeClient.Get(ctx, k.resource, &sts); err != nil {
95+
return fmt.Errorf("kafka scale: get %s/%s: %w", k.resource.Namespace, k.resource.Name, err)
96+
}
97+
sts.Spec.Replicas = &replicas
98+
if err := k.k8s.KubeClient.Update(ctx, &sts); err != nil {
99+
return fmt.Errorf("kafka scale: update %s/%s: %w", k.resource.Namespace, k.resource.Name, err)
100+
}
101+
return nil
102+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package steps
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/cucumber/godog"
8+
"github.com/seldonio/seldon-core/tests/integration/godog/k8sclient"
9+
)
10+
11+
type Env struct {
12+
env *k8sclient.EnvManager
13+
}
14+
15+
func LoadEnvSteps(scenario *godog.ScenarioContext, w *World) {
16+
scenario.Step(`^(Kafka) is unavailable for Core 2 with timeout "([^"]+)"`, func(kind, timeout string) error {
17+
return withTimeoutCtx(timeout, func(ctx context.Context) error {
18+
switch kind {
19+
case "Kafka":
20+
k := w.env.Kafka()
21+
if k != nil {
22+
return nil
23+
}
24+
25+
return k.MakeUnavailable(ctx)
26+
default:
27+
return fmt.Errorf("unknown target type: %s", kind)
28+
}
29+
})
30+
})
31+
32+
scenario.Step(`^(Kafka) is available for Core 2 with timeout "([^"]+)"`, func(kind, timeout string) error {
33+
return withTimeoutCtx(timeout, func(ctx context.Context) error {
34+
switch kind {
35+
case "Kafka":
36+
k := w.env.Kafka()
37+
if k != nil {
38+
return nil
39+
}
40+
41+
return k.MakeAvailable(ctx)
42+
default:
43+
return fmt.Errorf("unknown target type: %s", kind)
44+
}
45+
})
46+
})
47+
}

tests/integration/godog/steps/world.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ import (
1919
)
2020

2121
type World struct {
22-
namespace string
23-
kubeClient *k8sclient.K8sClient
24-
corek8sClient v.Interface
25-
watcherStorage k8sclient.WatcherStorage
26-
StartingClusterState string //todo: this will be a combination of starting state awareness of core 2 such as the
27-
//todo: server config,seldon config and seldon runtime to be able to reconcile to starting state should we change
28-
//todo: the state such as reducing replicas to 0 of scheduler to test unavailability
22+
namespace string
23+
kubeClient *k8sclient.K8sClient
24+
corek8sClient v.Interface
25+
watcherStorage k8sclient.WatcherStorage
26+
env *k8sclient.EnvManager //this is a combination of components for the cluster
2927
currentModel *Model
3028
currentPipeline *Pipeline
3129
currentExperiment *Experiment
@@ -41,6 +39,7 @@ type Config struct {
4139
KubeClient *k8sclient.K8sClient
4240
K8sClient v.Interface
4341
WatcherStorage k8sclient.WatcherStorage
42+
Env *k8sclient.EnvManager
4443
GRPC v2_dataplane.GRPCInferenceServiceClient
4544
IngressHost string
4645
HTTPPort uint
@@ -62,6 +61,7 @@ func NewWorld(c Config) (*World, error) {
6261
namespace: c.Namespace,
6362
kubeClient: c.KubeClient,
6463
watcherStorage: c.WatcherStorage,
64+
env: c.Env,
6565
currentModel: newModel(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage),
6666
currentExperiment: newExperiment(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage),
6767
currentPipeline: newPipeline(label, c.Namespace, c.K8sClient, c.Logger, c.WatcherStorage),

tests/integration/godog/suite/suite.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"context"
1414
"crypto/tls"
1515
"fmt"
16+
"time"
1617

1718
"github.com/cucumber/godog"
1819
v2dataplane "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane"
@@ -32,6 +33,7 @@ type dependencies struct {
3233
mlopsClient *v.Clientset
3334
watcherStore k8sclient.WatcherStorage
3435
inferenceClient v2dataplane.GRPCInferenceServiceClient
36+
env *k8sclient.EnvManager
3537
Config *GodogConfig
3638
}
3739

@@ -53,6 +55,8 @@ type dependencies struct {
5355
var suiteDeps dependencies
5456

5557
func InitializeTestSuite(ctx *godog.TestSuiteContext) {
58+
realContext, cancel := context.WithTimeout(context.Background(), 60*time.Second)
59+
defer cancel()
5660
// Load configuration from JSON file
5761
config, err := LoadConfig()
5862
if err != nil {
@@ -101,12 +105,23 @@ func InitializeTestSuite(ctx *godog.TestSuiteContext) {
101105
}
102106
grpcClient := v2dataplane.NewGRPCInferenceServiceClient(conn)
103107

108+
// Example: kafka in namespace "mlops" with statefulset name "kafka"
109+
kafka := k8sclient.NewKafkaComponent(k8sClient, config.Namespace, "kafka")
110+
111+
env := k8sclient.NewEnvManager(kafka)
112+
113+
// Snapshot baseline state once per suite
114+
if err := env.SnapshotAll(realContext); err != nil {
115+
panic(fmt.Errorf("failed to snapshot environment: %w", err))
116+
}
117+
104118
suiteDeps.logger = log
105119
suiteDeps.k8sClient = k8sClient
106120
suiteDeps.mlopsClient = clientSet // todo: this clientSet might get use for get requests or for the mlops interface and could be passed to the world might be split up by type
107121
suiteDeps.watcherStore = watchStore
108122
suiteDeps.Config = config
109123
suiteDeps.inferenceClient = grpcClient
124+
suiteDeps.env = env
110125

111126
ctx.BeforeSuite(func() {
112127
suiteDeps.watcherStore.Start()
@@ -118,6 +133,11 @@ func InitializeTestSuite(ctx *godog.TestSuiteContext) {
118133

119134
ctx.AfterSuite(func() {
120135
suiteDeps.watcherStore.Stop()
136+
137+
err := suiteDeps.env.RestoreAll(context.Background())
138+
if err != nil {
139+
return
140+
}
121141
// e.g. clean namespace, close clients if needed delete servers
122142
})
123143
}
@@ -172,5 +192,5 @@ func InitializeScenario(scenarioCtx *godog.ScenarioContext) {
172192
steps.LoadServerSteps(scenarioCtx, world)
173193
steps.LoadCustomPipelineSteps(scenarioCtx, world)
174194
steps.LoadExperimentSteps(scenarioCtx, world)
175-
// TODO: load other steps, e.g. pipeline, experiment, etc.
195+
steps.LoadEnvSteps(scenarioCtx, world)
176196
}

0 commit comments

Comments
 (0)