Skip to content

Commit 44d610d

Browse files
authored
Bp index insight change (#4449)
* pass tenant id Signed-off-by: Xinyuan Lu <[email protected]> cr: https://code.amazon.com/reviews/CR-230791364 * deprecate useless API Signed-off-by: Xinyuan Lu <[email protected]> * fix conflict Signed-off-by: Xinyuan Lu <[email protected]> * fix Aoss class Signed-off-by: Xinyuan Lu <[email protected]> cr: https://code.amazon.com/reviews/CR-233780510 * remove log Signed-off-by: Xinyuan Lu <[email protected]> cr: https://code.amazon.com/reviews/CR-233780965 * use interface to simplify code Signed-off-by: Xinyuan Lu <[email protected]> cr: https://code.amazon.com/reviews/CR-233780965 * remove useless stash Signed-off-by: Xinyuan Lu <[email protected]> cr: https://code.amazon.com/reviews/CR-233139007 * fix conlict Signed-off-by: Xinyuan Lu <[email protected]> * apply spotless Signed-off-by: xinyual <[email protected]> * fix conflict Signed-off-by: xinyual <[email protected]> --------- Signed-off-by: Xinyuan Lu <[email protected]> Signed-off-by: xinyual <[email protected]>
1 parent 74a310e commit 44d610d

28 files changed

+101
-2230
lines changed

common/src/main/java/org/opensearch/ml/common/indexInsight/AbstractIndexInsightTask.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.action.get.GetResponse;
2828
import org.opensearch.action.index.IndexResponse;
2929
import org.opensearch.action.search.SearchResponse;
30+
import org.opensearch.common.Numbers;
3031
import org.opensearch.common.regex.Regex;
3132
import org.opensearch.common.util.concurrent.ThreadContext;
3233
import org.opensearch.core.action.ActionListener;
@@ -93,37 +94,47 @@ public void execute(String tenantId, ActionListener<IndexInsight> listener) {
9394
handleExistingDoc(getResponse.getSourceAsMap(), tenantId, listener);
9495
} else {
9596
SearchSourceBuilder patternSourceBuilder = buildPatternSourceBuilder(taskType.name());
96-
sdkClient
97-
.searchDataObjectAsync(
98-
SearchDataObjectRequest
99-
.builder()
100-
.tenantId(tenantId)
101-
.indices(ML_INDEX_INSIGHT_STORAGE_INDEX)
102-
.searchSourceBuilder(patternSourceBuilder)
103-
.build()
104-
)
105-
.whenComplete((r, throwable) -> {
106-
if (throwable != null) {
107-
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
108-
listener.onFailure(cause);
109-
} else {
110-
SearchResponse searchResponse = r.searchResponse();
111-
SearchHit[] hits = searchResponse.getHits().getHits();
112-
Map<String, Object> mappedPatternSource = matchPattern(hits, sourceIndex);
113-
if (Objects.isNull(mappedPatternSource)) {
97+
try (ThreadContext.StoredContext searchContext = client.threadPool().getThreadContext().stashContext()) {
98+
sdkClient
99+
.searchDataObjectAsync(
100+
SearchDataObjectRequest
101+
.builder()
102+
.tenantId(tenantId)
103+
.indices(ML_INDEX_INSIGHT_STORAGE_INDEX)
104+
.searchSourceBuilder(patternSourceBuilder)
105+
.build()
106+
)
107+
.whenComplete((r, throwable) -> {
108+
searchContext.restore();
109+
if (throwable != null) {
110+
Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable);
111+
log.error("Failed to get index insight pattern", cause);
114112
beginGeneration(tenantId, listener);
115113
} else {
116-
handlePatternMatchedDoc(mappedPatternSource, tenantId, listener);
114+
SearchResponse searchResponse = r.searchResponse();
115+
SearchHit[] hits = searchResponse.getHits().getHits();
116+
Map<String, Object> mappedPatternSource = matchPattern(hits, sourceIndex);
117+
if (Objects.isNull(mappedPatternSource)) {
118+
beginGeneration(tenantId, listener);
119+
} else {
120+
handlePatternMatchedDoc(mappedPatternSource, tenantId, listener);
121+
}
117122
}
118-
}
119-
});
123+
});
124+
} catch (Exception e) {
125+
listener.onFailure(e);
126+
}
120127
}
121128
}, listener::onFailure));
122129
}
123130

124131
protected void handleExistingDoc(Map<String, Object> source, String tenantId, ActionListener<IndexInsight> listener) {
125132
String currentStatus = (String) source.get(IndexInsight.STATUS_FIELD);
126-
Long lastUpdateTime = (Long) source.get(IndexInsight.LAST_UPDATE_FIELD);
133+
Object v = source.get(IndexInsight.LAST_UPDATE_FIELD);
134+
Long lastUpdateTime = (v == null) ? null
135+
: (v instanceof Number n) ? n.longValue()
136+
: (v instanceof CharSequence cs && cs.length() > 0) ? Numbers.toLong(cs.toString(), true)
137+
: null;
127138
long currentTime = Instant.now().toEpochMilli();
128139

129140
IndexInsightTaskStatus status = IndexInsightTaskStatus.fromString(currentStatus);
@@ -203,9 +214,11 @@ protected void beginGeneration(String tenantId, ActionListener<IndexInsight> lis
203214
.lastUpdatedTime(Instant.now())
204215
.build();
205216

206-
writeIndexInsight(indexInsight, tenantId, ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, e -> {
207-
saveFailedStatus(tenantId, e, listener);
208-
}));
217+
writeIndexInsight(
218+
indexInsight,
219+
tenantId,
220+
ActionListener.wrap(r -> { runWithPrerequisites(tenantId, listener); }, listener::onFailure)
221+
);
209222
}
210223

211224
protected void runWithPrerequisites(String tenantId, ActionListener<IndexInsight> listener) {
@@ -249,6 +262,7 @@ protected void saveFailedStatus(String tenantId, Exception error, ActionListener
249262
.tenantId(tenantId)
250263
.index(sourceIndex)
251264
.taskType(taskType)
265+
.lastUpdatedTime(Instant.now())
252266
.status(IndexInsightTaskStatus.FAILED)
253267
.build();
254268
writeIndexInsight(
@@ -468,13 +482,15 @@ protected static void callLLMWithAgent(
468482
String agentId,
469483
String prompt,
470484
String sourceIndex,
485+
String tenantId,
471486
ActionListener<String> listener
472487
) {
473488
AgentMLInput agentInput = AgentMLInput
474489
.AgentMLInputBuilder()
475490
.agentId(agentId)
476491
.functionName(FunctionName.AGENT)
477492
.inputDataset(RemoteInferenceInputDataSet.builder().parameters(Collections.singletonMap("prompt", prompt)).build())
493+
.tenantId(tenantId)
478494
.build();
479495

480496
MLExecuteTaskRequest executeRequest = new MLExecuteTaskRequest(FunctionName.AGENT, agentInput);

common/src/main/java/org/opensearch/ml/common/indexInsight/FieldDescriptionTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private void batchProcessFields(
187187
});
188188
LatchedActionListener<Map<String, Object>> latchedActionListener = new LatchedActionListener<>(resultListener, countDownLatch);
189189
for (List<String> batch : batches) {
190-
processBatch(batch, statisticalContentMap, agentId, latchedActionListener);
190+
processBatch(batch, statisticalContentMap, agentId, tenantId, latchedActionListener);
191191
}
192192
try {
193193
countDownLatch.await(60, SECONDS);
@@ -218,11 +218,12 @@ private void processBatch(
218218
List<String> batchFields,
219219
Map<String, Object> statisticalContentMap,
220220
String agentId,
221+
String tenantId,
221222
ActionListener<Map<String, Object>> listener
222223
) {
223224
String prompt = generateBatchPrompt(batchFields, statisticalContentMap);
224225

225-
callLLMWithAgent(client, agentId, prompt, sourceIndex, ActionListener.wrap(response -> {
226+
callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> {
226227
try {
227228
log.info("Batch LLM call successful for {} fields in index {}", batchFields.size(), sourceIndex);
228229
Map<String, Object> batchResult = parseFieldDescription(response);

common/src/main/java/org/opensearch/ml/common/indexInsight/LogRelatedIndexCheckTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private void collectSampleDocString(ActionListener<String> listener) {
132132
private void performLogAnalysis(String agentId, String tenantId, ActionListener<IndexInsight> listener) {
133133
String prompt = RCA_TEMPLATE.replace("{indexName}", sourceIndex).replace("{samples}", sampleDocString);
134134

135-
callLLMWithAgent(client, agentId, prompt, sourceIndex, ActionListener.wrap(response -> {
135+
callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> {
136136
try {
137137
Map<String, Object> parsed = parseCheckResponse(response);
138138
saveResult(MAPPER.writeValueAsString(parsed), tenantId, ActionListener.wrap(insight -> {

common/src/main/java/org/opensearch/ml/common/indexInsight/StatisticalDataTask.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@
3232
import org.opensearch.search.aggregations.Aggregation;
3333
import org.opensearch.search.aggregations.AggregationBuilders;
3434
import org.opensearch.search.aggregations.AggregatorFactories;
35+
import org.opensearch.search.aggregations.bucket.filter.Filters;
3536
import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder;
3637
import org.opensearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter;
37-
import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
38-
import org.opensearch.search.aggregations.bucket.sampler.InternalSampler;
38+
import org.opensearch.search.aggregations.bucket.sampler.Sampler;
3939
import org.opensearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
4040
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
4141
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
42-
import org.opensearch.search.aggregations.metrics.InternalTopHits;
4342
import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder;
4443
import org.opensearch.search.aggregations.metrics.MinAggregationBuilder;
44+
import org.opensearch.search.aggregations.metrics.TopHits;
4545
import org.opensearch.search.aggregations.metrics.TopHitsAggregationBuilder;
4646
import org.opensearch.search.builder.SearchSourceBuilder;
4747
import org.opensearch.search.sort.SortOrder;
@@ -241,7 +241,7 @@ private void filterImportantColumnByLLM(Map<String, Object> parsedResult, String
241241
}
242242
String prompt = generateFilterColumnPrompt(parsedResult);
243243
getAgentIdToRun(client, tenantId, ActionListener.wrap(agentId -> {
244-
callLLMWithAgent(client, agentId, prompt, tenantId, ActionListener.wrap(response -> {
244+
callLLMWithAgent(client, agentId, prompt, sourceIndex, tenantId, ActionListener.wrap(response -> {
245245
listener.onResponse(parseLLMFilteredResult(response));
246246
}, e -> { listener.onResponse(new ArrayList<>()); }));
247247
}, e -> { listener.onResponse(new ArrayList<>()); }));
@@ -313,7 +313,7 @@ private Map<String, Object> parseSearchResult(
313313
Set<String> filteredNames,
314314
SearchResponse searchResponse
315315
) {
316-
Map<String, Aggregation> aggregationMap = ((InternalSampler) searchResponse.getAggregations().getAsMap().get("sample"))
316+
Map<String, Aggregation> aggregationMap = ((Sampler) searchResponse.getAggregations().getAsMap().get("sample"))
317317
.getAggregations()
318318
.getAsMap();
319319
Map<String, Object> result = new LinkedHashMap<>();
@@ -323,7 +323,7 @@ private Map<String, Object> parseSearchResult(
323323
String key = entry.getKey();
324324
Aggregation aggregation = entry.getValue();
325325
if (key.equals(EXAMPLE_DOC_KEYWORD)) {
326-
SearchHit[] hits = ((InternalTopHits) aggregation).getHits().getHits();
326+
SearchHit[] hits = ((TopHits) aggregation).getHits().getHits();
327327
exampleDocs = new ArrayList<>(hits.length);
328328
for (SearchHit hit : hits) {
329329
exampleDocs.add(hit.getSourceAsMap());
@@ -335,7 +335,9 @@ private Map<String, Object> parseSearchResult(
335335
if (!filteredNames.contains(targetField)) {
336336
continue;
337337
}
338+
338339
String aggregationType = key.substring(0, prefix.length() - 1);
340+
339341
Map<String, Object> aggregationResult = gson.fromJson(aggregation.toString(), Map.class);
340342
Object targetValue;
341343
try {
@@ -381,13 +383,18 @@ private Map<String, Object> parseSearchResult(
381383
}
382384

383385
private Set<String> filterColumns(Map<String, String> allFieldsToType, SearchResponse searchResponse) {
384-
InternalSampler sampleAggregation = ((InternalSampler) searchResponse.getAggregations().getAsMap().get("sample"));
386+
Sampler sampleAggregation = (Sampler) searchResponse.getAggregations().getAsMap().get("sample");
385387
Map<String, Aggregation> aggregationMap = sampleAggregation.getAggregations().getAsMap();
386388
long totalDocCount = sampleAggregation.getDocCount();
387389
Set<String> filteredNames = new HashSet<>();
388-
InternalFilters aggregation = (InternalFilters) aggregationMap.get(NOT_NULL_KEYWORD);
389-
for (InternalFilters.InternalBucket bucket : aggregation.getBuckets()) {
390-
String targetField = bucket.getKey();
390+
Filters aggregation;
391+
try {
392+
aggregation = (Filters) aggregationMap.get(NOT_NULL_KEYWORD);
393+
} catch (Exception e) {
394+
return filteredNames;
395+
}
396+
for (Filters.Bucket bucket : aggregation.getBuckets()) {
397+
String targetField = bucket.getKey().toString();
391398
targetField = targetField.substring(0, targetField.length() - 1 - NOT_NULL_KEYWORD.length());
392399
long docCount = bucket.getDocCount();
393400
if (docCount > HIGH_PRIORITY_COLUMN_THRESHOLD * totalDocCount && allFieldsToType.containsKey(targetField)) {

common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightConfigGetAction.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightConfigGetRequest.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightConfigGetResponse.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

common/src/main/java/org/opensearch/ml/common/transport/indexInsight/MLIndexInsightConfigPutAction.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)