Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
shard_view_ptr: jlong,
table_name: JString,
substrait_bytes: jbyteArray,
is_query_plan_explain_enabled: jboolean,
runtime_ptr: jlong,
listener: JObject,
) {
Expand All @@ -513,6 +514,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
}
};

let is_query_plan_explain_enabled: bool = is_query_plan_explain_enabled !=0;

let plan_bytes_obj = unsafe { JByteArray::from_raw(substrait_bytes) };
let plan_bytes_vec = match env.convert_byte_array(plan_bytes_obj) {
Ok(bytes) => bytes,
Expand Down Expand Up @@ -550,6 +553,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
files_meta,
table_name,
plan_bytes_vec,
is_query_plan_explain_enabled,
runtime,
cpu_executor,
).await;
Expand Down
9 changes: 6 additions & 3 deletions plugins/engine-datafusion/jni/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub async fn execute_query_with_cross_rt_stream(
files_meta: Arc<Vec<CustomFileMeta>>,
table_name: String,
plan_bytes_vec: Vec<u8>,
is_query_plan_explain_enabled: bool,
runtime: &DataFusionRuntime,
cpu_executor: DedicatedExecutor,
) -> Result<jlong, DataFusionError> {
Expand Down Expand Up @@ -271,9 +272,11 @@ pub async fn execute_query_with_cross_rt_stream(
}


// println!("Explain show");
// let clone_df = dataframe.clone().explain(false, true);
// clone_df?.show().await?;
if is_query_plan_explain_enabled {
println!("---- Explain plan ----");
let clone_df = dataframe.clone().explain(false, true).expect("Failed to explain plan");
clone_df.show().await?;
}

let df_stream = match execute_stream(physical_plan, ctx.task_ctx()) {
Ok(stream) => stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.util.BigArrays;
Expand Down Expand Up @@ -92,8 +93,8 @@ public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCa
}

@Override
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext) throws IOException {
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays, originalContext);
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext, ClusterService clusterService) throws IOException {
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays, originalContext, clusterService);
// Parse source
datafusionContext.datafusionQuery(new DatafusionQuery(request.shardId().getIndexName(), request.source().queryPlanIR(), new ArrayList<>()));
return datafusionContext;
Expand Down Expand Up @@ -264,6 +265,8 @@ public void executeQueryPhase(DatafusionContext context) {
public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener<Map<String, Object[]>> listener) {
try {
DatafusionSearcher datafusionSearcher = context.getEngineSearcher();
context.getDatafusionQuery().setQueryPlanExplainEnabled(context.evaluateSearchQueryExplainMode());

datafusionSearcher.searchAsync(context.getDatafusionQuery(), datafusionService.getRuntimePointer()).whenCompleteAsync((streamPointer, error)-> {
Map<String, Object[]> finalRes = new HashMap<>();
List<Long> rowIdResult = new ArrayList<>();
Expand Down Expand Up @@ -395,7 +398,7 @@ public void executeFetchPhase(DatafusionContext context) throws IOException {
includeFields.add(CompositeDataFormatWriter.ROW_ID);
}
excludeFields.addAll(context.mapperService().documentMapper().mapping().getMetadataStringNames());
excludeFields.add(SeqNoFieldMapper.PRIMARY_TERM_NAME); // TODO: check why _primary_term is not part of metadata mapper fields
excludeFields.add(SeqNoFieldMapper.PRIMARY_TERM_NAME);

context.getDatafusionQuery().setSource(includeFields, excludeFields);
DatafusionSearcher datafusionSearcher = context.getEngineSearcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private NativeBridge() {}
public static native void shutdownTokioRuntimeManager();

// Query execution
public static native void executeQueryPhaseAsync(long readerPtr, String tableName, byte[] plan, long runtimePtr, ActionListener<Long> listener);
public static native void executeQueryPhaseAsync(long readerPtr, String tableName, byte[] plan, boolean isQueryPlanExplainEnabled, long runtimePtr, ActionListener<Long> listener);
public static native long executeFetchPhase(long readerPtr, long[] rowIds, String[] includeFields, String[] excludeFields, long runtimePtr);

// Stream operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
import org.apache.lucene.search.Query;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -64,6 +68,8 @@
import java.util.Map;
import java.util.Optional;

import static org.opensearch.search.SearchService.CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING;

/**
* Search context for Datafusion engine
*/
Expand All @@ -89,6 +95,7 @@ public class DatafusionContext extends SearchContext {
private int from;
private int size;
private final SearchContext originalContext;
private final ClusterService clusterService;

/**
* Constructor
Expand All @@ -104,7 +111,8 @@ public DatafusionContext(
SearchShardTask task,
DatafusionEngine engine,
BigArrays bigArrays,
SearchContext originalContext) {
SearchContext originalContext,
ClusterService clusterService) {
this.readerContext = readerContext;
this.indexShard = readerContext.indexShard();
this.request = request;
Expand All @@ -126,6 +134,7 @@ public DatafusionContext(
this.originalContext = originalContext;
this.size(Optional.ofNullable(request.source()).isPresent() ? request.source().size() : 0);
this.from(Optional.ofNullable(request.source()).isPresent() ? request.source().from() : 0);
this.clusterService = clusterService;
}

/**
Expand Down Expand Up @@ -832,4 +841,18 @@ public Comparable<?> convertToComparable(Object rawValue) {
throw new IllegalArgumentException("Conversion to Comparable not supported for type " + rawValue.getClass());
};
}

public boolean evaluateSearchQueryExplainMode() {
Settings indexSettings = this.indexService.getIndexSettings().getSettings();
if (clusterService == null) {
return this.indexService.getIndexSettings().isSearchQueryPlaneExplainEnabled();
}

ClusterSettings clusterSettings = clusterService.getClusterSettings();

return indexSettings.getAsBoolean(
IndexSettings.INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING.getKey(),
clusterSettings.getOrNull(CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ public class DatafusionQuery {
private List<Long> queryPhaseRowIds;
private List<String> includeFields;
private List<String> excludeFields;
private Boolean isQueryPlanExplainEnabled;

public DatafusionQuery(String indexName, byte[] substraitBytes, List<SearchExecutor> searchExecutors) {
this.indexName = indexName;
this.substraitBytes = substraitBytes;
this.searchExecutors = searchExecutors;
this.isFetchPhase = false;
this.isQueryPlanExplainEnabled = false;
}

public void setSource(List<String> includeFields, List<String> excludeFields) {
Expand All @@ -38,6 +40,14 @@ public void setFetchPhaseContext(List<Long> queryPhaseRowIds) {
this.isFetchPhase = true;
}

public void setQueryPlanExplainEnabled(Boolean queryPlanExplainEnabled) {
isQueryPlanExplainEnabled = queryPlanExplainEnabled;
}

public boolean getQueryPlanExplainEnabled() {
return isQueryPlanExplainEnabled;
}

public boolean isFetchPhase() {
return this.isFetchPhase;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public long search(DatafusionQuery datafusionQuery, Long runtimePtr) {
@Override
public CompletableFuture<Long> searchAsync(DatafusionQuery datafusionQuery, Long runtimePtr) {
CompletableFuture<Long> result = new CompletableFuture<>();
NativeBridge.executeQueryPhaseAsync(reader.getReaderPtr(), datafusionQuery.getIndexName(), datafusionQuery.getSubstraitBytes(), runtimePtr, new ActionListener<Long>() {
NativeBridge.executeQueryPhaseAsync(reader.getReaderPtr(), datafusionQuery.getIndexName(), datafusionQuery.getSubstraitBytes(), datafusionQuery.getQueryPlanExplainEnabled(), runtimePtr, new ActionListener<Long>() {
@Override
public void onResponse(Long streamPointer) {
if (streamPointer == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void testQueryThenFetchE2ETest() throws IOException, URISyntaxException,
ReaderContext readerContext = createAndPutReaderContext(shardSearchRequest, indexService, indexShard, reader);
SearchShardTarget searchShardTarget = new SearchShardTarget("node_1", new ShardId("index-7", "index-7", 0), null, OriginalIndices.NONE);
SearchShardTask searchShardTask = new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"));
DatafusionContext datafusionContext = new DatafusionContext(readerContext, shardSearchRequest, searchShardTarget, searchShardTask, engine, null, null);
DatafusionContext datafusionContext = new DatafusionContext(readerContext, shardSearchRequest, searchShardTarget, searchShardTask, engine, null, null, null);

byte[] protoContent;
try (InputStream is = getClass().getResourceAsStream("/substrait_plan_test.pb")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ public void apply(Settings value, Settings current, Settings previous) {
ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER,
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING,
StreamSearchTransportService.STREAM_SEARCH_ENABLED
StreamSearchTransportService.STREAM_SEARCH_ENABLED,

SearchService.CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_DERIVED_SOURCE_SETTING,
IndexSettings.INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING,

IndexSettings.INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,18 @@ public static IndexMergePolicy fromString(String text) {
Property.Final
);

public static final Setting<Boolean> INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING = Setting.boolSetting(
"index.search.query_plan.explain",
false,
Property.IndexScope,
Property.Dynamic
);

private void setSearchQueryPlanExplainEnabled(Boolean searchQueryPlaneExplainEnabled) {
this.searchQueryPlaneExplainEnabled = searchQueryPlaneExplainEnabled;
}


public static final Setting<Boolean> INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING = Setting.boolSetting(
"index.derived_source.translog.enabled",
INDEX_DERIVED_SOURCE_SETTING,
Expand Down Expand Up @@ -863,6 +875,7 @@ public static IndexMergePolicy fromString(String text) {
private volatile boolean allowDerivedField;
private final boolean derivedSourceEnabled;
private volatile boolean derivedSourceEnabledForTranslog;
private boolean searchQueryPlaneExplainEnabled;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -1102,6 +1115,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
derivedSourceEnabled = scopedSettings.get(INDEX_DERIVED_SOURCE_SETTING);
derivedSourceEnabledForTranslog = scopedSettings.get(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING);
searchQueryPlaneExplainEnabled = scopedSettings.get(INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING, this::setSearchQueryPlanExplainEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_DERIVED_SOURCE_TRANSLOG_ENABLED_SETTING, this::setDerivedSourceEnabledForTranslog);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
Expand Down Expand Up @@ -2166,4 +2181,8 @@ public boolean isDerivedSourceEnabledForTranslog() {
public boolean isDerivedSourceEnabled() {
return derivedSourceEnabled;
}

public boolean isSearchQueryPlaneExplainEnabled() {
return searchQueryPlaneExplainEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.engine;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.action.ActionListener;
Expand All @@ -35,7 +36,7 @@ public abstract class SearchExecEngine<C extends SearchContext, S extends Engine
/**
* Create a search context for this engine
*/
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext) throws IOException;
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext, ClusterService clusterService) throws IOException;

/**
* execute Query Phase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Deprecated
);

public static final Setting<Boolean> CLUSTER_SEARCH_QUERY_PLAN_EXPLAIN_SETTING = Setting.boolSetting(
"search.query_plan.explain",
false,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Boolean> QUERY_REWRITING_ENABLED_SETTING = Setting.boolSetting(
"search.query_rewriting.enabled",
false,
Expand Down Expand Up @@ -1460,7 +1467,7 @@ private SearchContext createContext(
request.getClusterAlias(),
OriginalIndices.NONE
);
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext);
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task, bigArrays, originalContext, clusterService);
try {
if (request.scroll() != null) {
context.scrollContext().scroll = request.scroll();
Expand Down
Loading