diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index 8c948e486792a..7a2c20749ce05 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -489,6 +489,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu shard_view_ptr: jlong, table_name: JString, substrait_bytes: jbyteArray, + is_query_plan_explain_enabled: jboolean, runtime_ptr: jlong, listener: JObject, ) { @@ -513,6 +514,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu } }; + let is_query_plan_explain_enabled: bool = is_query_plan_explain_enabled !=0; + let plan_bytes_obj = unsafe { JByteArray::from_raw(substrait_bytes) }; let plan_bytes_vec = match env.convert_byte_array(plan_bytes_obj) { Ok(bytes) => bytes, @@ -550,6 +553,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu files_meta, table_name, plan_bytes_vec, + is_query_plan_explain_enabled, runtime, cpu_executor, ).await; diff --git a/plugins/engine-datafusion/jni/src/query_executor.rs b/plugins/engine-datafusion/jni/src/query_executor.rs index 4a78a9445f6b3..29e2184af4503 100644 --- a/plugins/engine-datafusion/jni/src/query_executor.rs +++ b/plugins/engine-datafusion/jni/src/query_executor.rs @@ -107,6 +107,7 @@ pub async fn execute_query_with_cross_rt_stream( files_meta: Arc>, table_name: String, plan_bytes_vec: Vec, + is_query_plan_explain_enabled: bool, runtime: &DataFusionRuntime, cpu_executor: DedicatedExecutor, ) -> Result { @@ -271,9 +272,11 @@ pub async fn execute_query_with_cross_rt_stream( } - // println!("Explain show"); - // let clone_df = dataframe.clone().explain(false, true); - // clone_df?.show().await?; + if is_query_plan_explain_enabled { + println!("---- Explain plan ----"); + let clone_df = dataframe.clone().explain(false, true).expect("Failed to explain plan"); + clone_df.show().await?; + } let df_stream = match execute_stream(physical_plan, ctx.task_ctx()) { Ok(stream) => stream, diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index 80ae9ced3d610..a346d121b882a 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchException; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.BigArrays; @@ -92,8 +93,8 @@ public DatafusionEngine(DataFormat dataFormat, Collection formatCa } @Override - public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext) throws IOException { - DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays, originalContext); + public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext, ClusterService clusterService) throws IOException { + DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays, originalContext, clusterService); // Parse source datafusionContext.datafusionQuery(new DatafusionQuery(request.shardId().getIndexName(), request.source().queryPlanIR(), new ArrayList<>())); return datafusionContext; @@ -264,6 +265,8 @@ public void executeQueryPhase(DatafusionContext context) { public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener> listener) { try { DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); + context.getDatafusionQuery().setQueryPlanExplainEnabled(context.evaluateSearchQueryExplainMode()); + datafusionSearcher.searchAsync(context.getDatafusionQuery(), datafusionService.getRuntimePointer()).whenCompleteAsync((streamPointer, error)-> { Map finalRes = new HashMap<>(); List rowIdResult = new ArrayList<>(); @@ -395,7 +398,7 @@ public void executeFetchPhase(DatafusionContext context) throws IOException { includeFields.add(CompositeDataFormatWriter.ROW_ID); } excludeFields.addAll(context.mapperService().documentMapper().mapping().getMetadataStringNames()); - excludeFields.add(SeqNoFieldMapper.PRIMARY_TERM_NAME); // TODO: check why _primary_term is not part of metadata mapper fields + excludeFields.add(SeqNoFieldMapper.PRIMARY_TERM_NAME); context.getDatafusionQuery().setSource(includeFields, excludeFields); DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java index 573b7c09969fd..0c03356bab328 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/jni/NativeBridge.java @@ -36,7 +36,7 @@ private NativeBridge() {} public static native void shutdownTokioRuntimeManager(); // Query execution - public static native void executeQueryPhaseAsync(long readerPtr, String tableName, byte[] plan, long runtimePtr, ActionListener listener); + public static native void executeQueryPhaseAsync(long readerPtr, String tableName, byte[] plan, boolean isQueryPlanExplainEnabled, long runtimePtr, ActionListener listener); public static native long executeFetchPhase(long readerPtr, long[] rowIds, String[] includeFields, String[] excludeFields, long runtimePtr); // Stream operations diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java index 61b929c529eec..0c5f0f47b9231 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java @@ -15,11 +15,15 @@ import org.apache.lucene.search.Query; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; @@ -65,6 +69,8 @@ import java.util.Map; import java.util.Optional; +import static org.opensearch.search.SearchService.CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING; + /** * Search context for Datafusion engine */ @@ -90,6 +96,7 @@ public class DatafusionContext extends SearchContext { private int from; private int size; private final SearchContext originalContext; + private final ClusterService clusterService; /** * Constructor @@ -105,7 +112,8 @@ public DatafusionContext( SearchShardTask task, DatafusionEngine engine, BigArrays bigArrays, - SearchContext originalContext) { + SearchContext originalContext, + ClusterService clusterService) { this.readerContext = readerContext; this.indexShard = readerContext.indexShard(); this.request = request; @@ -127,6 +135,7 @@ public DatafusionContext( this.originalContext = originalContext; this.size(Optional.ofNullable(request.source()).isPresent() ? request.source().size() : 0); this.from(Optional.ofNullable(request.source()).isPresent() ? request.source().from() : 0); + this.clusterService = clusterService; } /** @@ -834,4 +843,18 @@ public Comparable convertToComparable(Object rawValue) { throw new IllegalArgumentException("Conversion to Comparable not supported for type " + rawValue.getClass()); }; } + + public boolean evaluateSearchQueryExplainMode() { + Settings indexSettings = this.indexService.getIndexSettings().getSettings(); + if (clusterService == null) { + return this.indexService.getIndexSettings().isSearchQueryPlaneExplainEnabled(); + } + + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + + return indexSettings.getAsBoolean( + IndexSettings.INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING.getKey(), + clusterSettings.get(CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING) + ); + } } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java index 1b61efb8da5ba..a14951ed8596e 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java @@ -20,12 +20,14 @@ public class DatafusionQuery { private List queryPhaseRowIds; private List includeFields; private List excludeFields; + private Boolean isQueryPlanExplainEnabled; public DatafusionQuery(String indexName, byte[] substraitBytes, List searchExecutors) { this.indexName = indexName; this.substraitBytes = substraitBytes; this.searchExecutors = searchExecutors; this.isFetchPhase = false; + this.isQueryPlanExplainEnabled = false; } public void setSource(List includeFields, List excludeFields) { @@ -38,6 +40,14 @@ public void setFetchPhaseContext(List queryPhaseRowIds) { this.isFetchPhase = true; } + public void setQueryPlanExplainEnabled(Boolean queryPlanExplainEnabled) { + isQueryPlanExplainEnabled = queryPlanExplainEnabled; + } + + public boolean getQueryPlanExplainEnabled() { + return isQueryPlanExplainEnabled; + } + public boolean isFetchPhase() { return this.isFetchPhase; } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java index e5aa28930bd87..744467b2cff66 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java @@ -55,7 +55,7 @@ public long search(DatafusionQuery datafusionQuery, Long runtimePtr) { @Override public CompletableFuture searchAsync(DatafusionQuery datafusionQuery, Long runtimePtr) { CompletableFuture result = new CompletableFuture<>(); - NativeBridge.executeQueryPhaseAsync(reader.getReaderPtr(), datafusionQuery.getIndexName(), datafusionQuery.getSubstraitBytes(), runtimePtr, new ActionListener() { + NativeBridge.executeQueryPhaseAsync(reader.getReaderPtr(), datafusionQuery.getIndexName(), datafusionQuery.getSubstraitBytes(), datafusionQuery.getQueryPlanExplainEnabled(), runtimePtr, new ActionListener() { @Override public void onResponse(Long streamPointer) { if (streamPointer == 0) { diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java index 8bcbb468f0a55..f6b5c176e41bb 100644 --- a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java @@ -325,7 +325,7 @@ public void testQueryThenFetchE2ETest() throws IOException, URISyntaxException, ReaderContext readerContext = createAndPutReaderContext(shardSearchRequest, indexService, indexShard, reader); SearchShardTarget searchShardTarget = new SearchShardTarget("node_1", new ShardId("index-7", "index-7", 0), null, OriginalIndices.NONE); SearchShardTask searchShardTask = new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id")); - DatafusionContext datafusionContext = new DatafusionContext(readerContext, shardSearchRequest, searchShardTarget, searchShardTask, engine, null, null); + DatafusionContext datafusionContext = new DatafusionContext(readerContext, shardSearchRequest, searchShardTarget, searchShardTask, engine, null, null, null); byte[] protoContent; try (InputStream is = getClass().getResourceAsStream("/substrait_plan_test.pb")) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c9f4d43d5a323..9b33bde32cb54 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -866,7 +866,9 @@ public void apply(Settings value, Settings current, Settings previous) { ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER, StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING, - StreamSearchTransportService.STREAM_SEARCH_ENABLED + StreamSearchTransportService.STREAM_SEARCH_ENABLED, + + SearchService.CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 8a5eafef4a10a..883c453a89e8b 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -289,6 +289,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_DERIVED_SOURCE_SETTING, IndexSettings.INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING, + IndexSettings.INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 2abdf79584d82..43be50147aac4 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -803,6 +803,18 @@ public static IndexMergePolicy fromString(String text) { Property.Final ); + public static final Setting INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING = Setting.boolSetting( + "index.search.query_plan.explain", + false, + Property.IndexScope, + Property.Dynamic + ); + + private void setSearchQueryPlanExplainEnabled(Boolean searchQueryPlaneExplainEnabled) { + this.searchQueryPlaneExplainEnabled = searchQueryPlaneExplainEnabled; + } + + public static final Setting INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING = Setting.boolSetting( "index.derived_source.translog.enabled", INDEX_DERIVED_SOURCE_SETTING, @@ -863,6 +875,7 @@ public static IndexMergePolicy fromString(String text) { private volatile boolean allowDerivedField; private final boolean derivedSourceEnabled; private volatile boolean derivedSourceEnabledForTranslog; + private boolean searchQueryPlaneExplainEnabled; /** * The maximum age of a retention lease before it is considered expired. @@ -1102,6 +1115,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); derivedSourceEnabled = scopedSettings.get(INDEX_DERIVED_SOURCE_SETTING); derivedSourceEnabledForTranslog = scopedSettings.get(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING); + searchQueryPlaneExplainEnabled = scopedSettings.get(INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING, this::setSearchQueryPlanExplainEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING, this::setDerivedSourceEnabledForTranslog); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type @@ -2166,4 +2181,8 @@ public boolean isDerivedSourceEnabledForTranslog() { public boolean isDerivedSourceEnabled() { return derivedSourceEnabled; } + + public boolean isSearchQueryPlaneExplainEnabled() { + return searchQueryPlaneExplainEnabled; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java index 531e5ffa61b5d..cce57ed6eaeeb 100644 --- a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java @@ -9,6 +9,7 @@ package org.opensearch.index.engine; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.util.BigArrays; import org.opensearch.core.action.ActionListener; @@ -35,7 +36,7 @@ public abstract class SearchExecEngine CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING = Setting.boolSetting( + "search.query_plan.explain", + false, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting QUERY_REWRITING_ENABLED_SETTING = Setting.boolSetting( "search.query_rewriting.enabled", false, @@ -1460,7 +1467,7 @@ private SearchContext createContext( request.getClusterAlias(), OriginalIndices.NONE ); - SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext); + SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext, clusterService); try { if (request.scroll() != null) { context.scrollContext().scroll = request.scroll();