Skip to content

Commit 19df391

Browse files
committed
add more spans
Signed-off-by: sallyom <[email protected]>
1 parent 619a81e commit 19df391

File tree

1 file changed

+80
-6
lines changed

1 file changed

+80
-6
lines changed

pkg/kvcache/indexer.go

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,24 @@ type Indexer struct {
6767

6868
// NewKVCacheIndexer creates a KVCacheIndex given a Config.
6969
func NewKVCacheIndexer(ctx context.Context, config *Config) (*Indexer, error) {
70+
tracer := otel.GetTracerProvider().Tracer("llm-d-kv-cache-manager")
71+
ctx, span := tracer.Start(ctx, "llm_d.kv_cache_manager.initialization")
72+
defer span.End()
73+
74+
span.SetAttributes(
75+
attribute.String("component", "llm-d-kv-cache-manager"),
76+
attribute.String("operation", "initialization"),
77+
)
78+
7079
logger := klog.FromContext(ctx)
7180
if config != nil && config.TokenProcessorConfig != nil {
7281
logger.Info("NewKVCacheIndexer config", "blockSize", config.TokenProcessorConfig.BlockSize)
82+
span.SetAttributes(attribute.Int("llm_d.kv_cache_manager.block_size", config.TokenProcessorConfig.BlockSize))
7383
}
7484

7585
tokensIndexer, err := prefixstore.NewLRUTokenStore(config.PrefixStoreConfig)
7686
if err != nil {
87+
span.SetAttributes(attribute.String("operation.outcome", "error"))
7788
return nil, fmt.Errorf("failed to create prefixstore.Indexer: %w", err)
7889
}
7990

@@ -91,9 +102,11 @@ func NewKVCacheIndexer(ctx context.Context, config *Config) (*Indexer, error) {
91102

92103
tokenizersPool, err := tokenization.NewTokenizationPool(config.TokenizersPoolConfig, tokensIndexer)
93104
if err != nil {
105+
span.SetAttributes(attribute.String("operation.outcome", "error"))
94106
return nil, fmt.Errorf("failed to create tokenizers pool: %w", err)
95107
}
96108

109+
span.SetAttributes(attribute.String("operation.outcome", "success"))
97110
return &Indexer{
98111
config: config,
99112
tokensIndexer: tokensIndexer,
@@ -124,47 +137,107 @@ func (k *Indexer) KVBlockIndex() kvblock.Index {
124137
func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
125138
podIdentifiers []string,
126139
) (map[string]int, error) {
127-
tracer := otel.GetTracerProvider().Tracer("llm-d-epp")
128-
ctx, span := tracer.Start(ctx, "kv-cache-manager.GetPodScores")
140+
tracer := otel.GetTracerProvider().Tracer("llm-d-kv-cache-manager")
141+
ctx, span := tracer.Start(ctx, "llm_d.kv_cache_manager.GetPodScores")
129142
defer span.End()
130143

131144
span.SetAttributes(
132-
attribute.String("component", "llm-d-kv-cache-manager"),
133-
attribute.String("operation", "get_pod_scores"),
134145
attribute.String("gen_ai.request.model", modelName),
135-
attribute.Int("llm_d.kv_cache.pod_count", len(podIdentifiers)),
146+
attribute.Int("llm_d.kv_cache_manager.pod_count", len(podIdentifiers)),
136147
)
137148

138149
traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvcache.GetPodScores")
139150

140151
// 1. tokenize prompt
152+
// 1. get available tokens of longest prefix
153+
_, tokenSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.find_tokens")
154+
tokenSpan.SetAttributes(
155+
attribute.String("gen_ai.request.model", modelName),
156+
)
141157
tokens := k.tokenizersPool.Tokenize(prompt, modelName)
158+
if len(tokens) == 0 {
159+
tokenSpan.SetAttributes(
160+
attribute.Int("llm_d.kv_cache_manager.tokens_found", 0),
161+
attribute.String("operation.outcome", "success"),
162+
)
163+
tokenSpan.End()
164+
//nolint:nilnil // no need to return an error
165+
return nil, nil
166+
}
167+
tokenSpan.SetAttributes(
168+
attribute.Int("llm_d.kv_cache_manager.tokens_found", len(tokens)),
169+
attribute.String("operation.outcome", "success"),
170+
)
171+
tokenSpan.End()
142172

143173
// 2. get block keys
174+
_, blockSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.tokens_to_block_keys")
175+
blockSpan.SetAttributes(
176+
attribute.String("gen_ai.request.model", modelName),
177+
attribute.Int("llm_d.kv_cache_manager.input_tokens", len(tokens)),
178+
)
144179
blockKeys := k.tokensProcessor.TokensToKVBlockKeys(tokens, modelName)
145180
if len(blockKeys) == 0 {
181+
blockSpan.SetAttributes(
182+
attribute.Int("llm_d.kv_cache_manager.block_keys_generated", 0),
183+
attribute.String("operation.outcome", "success"),
184+
)
185+
blockSpan.End()
146186
traceLogger.Info("no block keys found, returning empty scores")
147187
//nolint:nilnil // no need to return an error
148188
return nil, nil
149189
}
190+
blockSpan.SetAttributes(
191+
attribute.Int("llm_d.kv_cache_manager.block_keys_generated", len(blockKeys)),
192+
attribute.String("operation.outcome", "success"),
193+
)
194+
blockSpan.End()
150195

151196
traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys)
152197

153198
// 3. query kvblock indexer for pods
199+
_, lookupSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.lookup_pods")
200+
lookupSpan.SetAttributes(
201+
attribute.String("gen_ai.request.model", modelName),
202+
attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)),
203+
)
154204
keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...))
155205
if err != nil {
206+
lookupSpan.RecordError(err)
207+
lookupSpan.SetAttributes(attribute.String("operation.outcome", "error"))
208+
lookupSpan.End()
156209
span.RecordError(err)
210+
span.SetAttributes(attribute.String("operation.outcome", "error"))
157211
return nil, fmt.Errorf("failed to query kvblock indexer: %w", err)
158212
}
213+
lookupSpan.SetAttributes(
214+
attribute.Int("llm_d.kv_cache_manager.lookup_results", len(keyToPods)),
215+
attribute.String("operation.outcome", "success"),
216+
)
217+
lookupSpan.End()
159218
traceLogger.Info("found block keys", "block-keys", blockKeys,
160219
"pods", podsPerKeyPrintHelper(keyToPods))
161220

162221
// 4. score pods
222+
_, scoreSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.score_pods")
223+
scoreSpan.SetAttributes(
224+
attribute.String("gen_ai.request.model", modelName),
225+
attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)),
226+
)
163227
podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods)
164228
if err != nil {
229+
scoreSpan.RecordError(err)
230+
scoreSpan.SetAttributes(attribute.String("operation.outcome", "error"))
231+
scoreSpan.End()
165232
span.RecordError(err)
233+
span.SetAttributes(attribute.String("operation.outcome", "error"))
166234
return nil, fmt.Errorf("failed to query kvblock scorer: %w", err)
167235
}
236+
scoreSpan.SetAttributes(
237+
attribute.Int("llm_d.kv_cache_manager.scored_pods", len(podScores)),
238+
attribute.String("operation.outcome", "success"),
239+
)
240+
scoreSpan.End()
168241
traceLogger.Info("found pod scores", "pod-scores", podScores)
169242

170243
// Calculate hit ratio for observability
@@ -180,7 +253,8 @@ func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
180253
}
181254

182255
span.SetAttributes(
183-
attribute.Float64("llm_d.kv_cache.hit_ratio", hitRatio),
256+
attribute.Float64("llm_d.kv_cache_manager.hit_ratio", hitRatio),
257+
attribute.String("operation.outcome", "success"),
184258
)
185259

186260
return podScores, nil

0 commit comments

Comments
 (0)