Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,9 @@ These Historical configurations can be defined in the `historical/runtime.proper
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|max(1,Number of cores / 6)|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false|
|`druid.segmentCache.lazyLoadOnStart`|_DEPRECATED_ Use `druid.segmentCache.startupLoadStrategy` instead. Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false|
|`druid.segmentCache.startupLoadStrategy.type`|Selects the segment column metadata loading strategy during historical startup. Possible values are `loadAllEagerly`, `loadAllLazily`, and `loadEagerlyBeforePeriod`. More details on each strategy below.|`loadAllEagerly`|
|`druid.segmentCache.startupLoadStrategy.period`| Used only when startup load strategy `loadEagerlyBeforePeriod` is configured. Suppose timestamp `t` is when the Historical started up. Any segment metadata with interval that falls in the interval of `[t - startupLoadPeriod, t]` will be loaded.|`P7D`|
|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload`|Number of threads to asynchronously read segment index files into null output stream on each new segment download after the Historical service finishes bootstrapping. Recommended to set to 1 or 2 or leave unspecified to disable. See also `druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|0|
|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|Number of threads to asynchronously read segment index files into null output stream during Historical service bootstrap. This thread pool is terminated after Historical service finishes bootstrapping. Recommended to set to half of available cores. If left unspecified, `druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload` will be used. If both configs are unspecified, this feature is disabled. Preemptively loading segments into page cache helps in the sense that later when a segment is queried, it's already in page cache and only a minor page fault needs to be triggered instead of a more costly major page fault to make the query latency more consistent. Note that loading segment into page cache just does a blind loading of segment index files and will evict any existing segments from page cache at the discretion of operating system when the total segment size on local disk is larger than the page cache usable in the RAM, which roughly equals to total available RAM in the host - druid process memory including both heap and direct memory allocated - memory used by other non druid processes on the host, so it is the user's responsibility to ensure the host has enough RAM to host all the segments to avoid random evictions to fully leverage this feature.|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload`|

Expand All @@ -1602,6 +1604,14 @@ In `druid.segmentCache.locationSelector.strategy`, one of `leastBytesUsed`, `rou

Note that if `druid.segmentCache.numLoadingThreads` > 1, multiple threads can download different segments at the same time. In this case, with the `leastBytesUsed` strategy or `mostAvailableSize` strategy, Historicals may select a sub-optimal storage location because each decision is based on a snapshot of the storage location status of when a segment is requested to download.

In `druid.segmentCache.startupLoadStrategy`, one of `loadAllEagerly`, `loadAllLazily`, or `loadEagerlyBeforePeriod` could be specified to represent the strategy to load segments when starting the Historical service.

|Strategy|Description|
|--------|-----------|
|`loadAllEagerly`|The default startup strategy. The Historical service will load all segment column metadata immediately during the initial startup process.|
|`loadAllLazily`|To significantly improve historical system startup time, segments are not loaded during the initial startup sequence. Instead, the loading cost is deferred, and will be incurred the first time a segment is referenced by a query.|
|`loadEagerlyBeforePeriod`|Provides a balance between fast startup and query performance. The Historical service will eagerly load column metadata only for segments that fall within the most recent period defined by `druid.segmentCache.startupLoadPeriod`. Segments outside this recent period will be loaded on-demand when first queried.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How feasible/extensible is it to accept a map of datasource to load period, to allow configurable periods per datasource? (similar to the loadByPeriod - load rules config where each datasource can have different load retention rules)

I think having that option would allow a lot more flexibility to operators as the query workloads can be vastly different.

Copy link
Contributor Author

@GWphua GWphua Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is workable --

I can change startupLoadStrategy.period to startupLoadStrategy.datasourceToPeriodMapping, which receives something like a JSON

e.g.
{"DS1": "P7D", "DS2": "P2D", ".": "P7D"}

Where . refers to the default configuration (since datasources cannot start with .)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is that let's keep the change in this PR small enough. for datasource level configuration, if there's really need for this feature, we can implement it by defining a datasource level configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How feasible/extensible is it to accept a map of datasource to load period, to allow configurable periods per datasource? (similar to the loadByPeriod - load rules config where each datasource can have different load retention rules)

I think having that option would allow a lot more flexibility to operators as the query workloads can be vastly different.

I feel we can leave this for another PR, since it is out of scope of this intended PR. WDYT? @abhishekrb19


#### Historical query configs

##### Concurrent requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import org.apache.druid.server.coordination.startup.HistoricalStartupCacheLoadStrategy;
import org.apache.druid.server.coordination.startup.LoadAllEagerlyStrategy;
import org.apache.druid.server.coordination.startup.LoadAllLazilyStrategy;
import org.apache.druid.utils.RuntimeInfo;

import java.io.File;
Expand All @@ -41,9 +44,16 @@
@JsonProperty
private List<StorageLocationConfig> locations = Collections.emptyList();

/**
* @deprecated Use {@link #startupLoadStrategy} instead.
*/
@Deprecated
@JsonProperty("lazyLoadOnStart")
private boolean lazyLoadOnStart = false;

@JsonProperty("startupLoadStrategy")
private HistoricalStartupCacheLoadStrategy startupLoadStrategy = null;

@JsonProperty("deleteOnRemove")
private boolean deleteOnRemove = true;

Expand Down Expand Up @@ -84,11 +94,24 @@
return locations;
}

/**
* @deprecated Use {@link #getStartupCacheLoadStrategy()} instead.
* Removal of this method in the future will requires a change in {@link #getStartupCacheLoadStrategy()}
* to default to {@link LoadAllEagerlyStrategy#STRATEGY_NAME} when {@link #startupLoadStrategy} is null.
*/
@Deprecated
public boolean isLazyLoadOnStart()
{
return lazyLoadOnStart;
}

public HistoricalStartupCacheLoadStrategy getStartupCacheLoadStrategy()
{
return startupLoadStrategy == null
? isLazyLoadOnStart() ? new LoadAllLazilyStrategy() : new LoadAllEagerlyStrategy()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
SegmentLoaderConfig.isLazyLoadOnStart
should be avoided because it has been deprecated.
: startupLoadStrategy;
}

public boolean isDeleteOnRemove()
{
return deleteOnRemove;
Expand Down Expand Up @@ -184,6 +207,7 @@
return "SegmentLoaderConfig{" +
"locations=" + locations +
", lazyLoadOnStart=" + lazyLoadOnStart +
", startupLoadStrategy=" + startupLoadStrategy +
", deleteOnRemove=" + deleteOnRemove +
", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", announceIntervalMillis=" + announceIntervalMillis +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.server.coordination.startup.HistoricalStartupCacheLoadStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class SegmentLocalCacheManager implements SegmentCacheManager

private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;
private final HistoricalStartupCacheLoadStrategy loadStrategy;

private final List<StorageLocation> locations;

Expand Down Expand Up @@ -127,6 +129,7 @@ public SegmentLocalCacheManager(
this.locations = locations;
this.strategy = strategy;
this.indexIO = indexIO;
this.loadStrategy = config.getStartupCacheLoadStrategy();

log.info("Using storage location strategy[%s].", this.strategy.getClass().getSimpleName());

Expand Down Expand Up @@ -853,7 +856,8 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException
final SegmentizerFactory factory = getSegmentFactory(storageDir);

@SuppressWarnings("ObjectEquality")
final boolean lazy = config.isLazyLoadOnStart() && lazyLoadCallback != SegmentLazyLoadFailCallback.NOOP;
final boolean lazy = lazyLoadCallback != SegmentLazyLoadFailCallback.NOOP
&& loadStrategy.shouldLoadLazily(dataSegment);
final Segment segment = factory.factorize(dataSegment, storageDir, lazy, lazyLoadCallback);
// wipe load callback after calling
lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordination.startup;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.timeline.DataSegment;

/**
* Strategy for determining whether segments should be loaded lazily or eagerly during
* Historical process startup. Lazy loading can help to lower Historical startup time at the
* expense of query latency, by deferring the loading process to the first access of that segment.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = LoadAllEagerlyStrategy.STRATEGY_NAME, value = LoadAllEagerlyStrategy.class),
@JsonSubTypes.Type(name = LoadAllLazilyStrategy.STRATEGY_NAME, value = LoadAllLazilyStrategy.class),
@JsonSubTypes.Type(name = LoadEagerlyBeforePeriod.STRATEGY_NAME, value = LoadEagerlyBeforePeriod.class)
})
public interface HistoricalStartupCacheLoadStrategy
{
/**
* Indicates whether the provided segment should be loaded lazily during Historical startup.
*
* @param segment the segment being evaluated
* @return {@code true} if the segment should be loaded lazily, {@code false} if it should be loaded eagerly.
*/
boolean shouldLoadLazily(DataSegment segment);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordination.startup;

import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;

import java.util.Locale;

/**
* Eagerly loads column metadata for all segments at Historical startup.
* <p>
* Optimizes for predictable first-query latency at the cost of longer startup time and higher I/O during bootstrap.
* {@link #shouldLoadLazily(DataSegment)} always returns {@code false}.
*/
public class LoadAllEagerlyStrategy implements HistoricalStartupCacheLoadStrategy
{
private static final Logger log = new Logger(LoadAllEagerlyStrategy.class);

public static final String STRATEGY_NAME = "loadAllEagerly";

public LoadAllEagerlyStrategy()
{
log.info("Using [%s] strategy", STRATEGY_NAME);
}

@Override
public boolean shouldLoadLazily(DataSegment segment)
{
return false;
}

@Override
public String toString()
{
return String.format(Locale.ROOT, "{type=%s}", STRATEGY_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordination.startup;

import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;

import java.util.Locale;

/**
* Defers column metadata loading for all segments until first access.
* <p>
* Minimizes Historical startup time and I/O during bootstrap at the expense of increased latency on the first
* query that touches a segment. {@link #shouldLoadLazily(DataSegment)} always returns {@code true}.
*/
public class LoadAllLazilyStrategy implements HistoricalStartupCacheLoadStrategy
{
private static final Logger log = new Logger(LoadAllLazilyStrategy.class);

public static final String STRATEGY_NAME = "loadAllLazily";

public LoadAllLazilyStrategy()
{
log.info("Using [%s] strategy", STRATEGY_NAME);
}

@Override
public boolean shouldLoadLazily(DataSegment segment)
{
return true;
}

@Override
public String toString()
{
return String.format(Locale.ROOT, "{type=%s}", STRATEGY_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordination.startup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

import java.util.Locale;

/**
* Eagerly loads column metadata for segments whose intervals overlap a recent sliding window; all others load lazily.
* <p>
* Balances bootstrap time and first-query performance by eagerly loading only "hot" segments. The window is
* computed as {@code [now - period, now]} at Historical startup.
*/
public class LoadEagerlyBeforePeriod implements HistoricalStartupCacheLoadStrategy
{
private static final Logger log = new Logger(LoadEagerlyBeforePeriod.class);
public static final String STRATEGY_NAME = "loadEagerlyBeforePeriod";

private final Interval eagerLoadingInterval;

@VisibleForTesting
@JsonCreator
public LoadEagerlyBeforePeriod(
@JsonProperty("period") Period eagerLoadingPeriod
)
{
DateTime now = DateTimes.nowUtc();
this.eagerLoadingInterval = new Interval(now.minus(eagerLoadingPeriod), now);

log.info("Using [%s] strategy with Interval[%s]", STRATEGY_NAME, eagerLoadingInterval);
}

@VisibleForTesting
public Interval getEagerLoadingInterval()
{
return this.eagerLoadingInterval;
}

@Override
public boolean shouldLoadLazily(DataSegment segment)
{
return !segment.getInterval().overlaps(eagerLoadingInterval);
}

@Override
public String toString()
{
return String.format(Locale.ROOT, "{type=%s,interval=%s}", STRATEGY_NAME, getEagerLoadingInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordination.startup.LoadAllLazilyStrategy;

import java.util.Map;

Expand Down Expand Up @@ -63,15 +64,14 @@ public SegmentStatsMonitor(
SegmentLoaderConfig segmentLoaderConfig
)
{
if (segmentLoaderConfig.isLazyLoadOnStart()) {
if (segmentLoaderConfig.getStartupCacheLoadStrategy() instanceof LoadAllLazilyStrategy) {
// log message ensures there is an error displayed at startup if this fails as the exception isn't logged.
log.error("Monitor doesn't support working with lazy loading on start");
// throw this exception it kill the process at startup
throw new IllegalStateException("Monitor doesn't support working with lazy loading on start");
}
this.serverConfig = serverConfig;
this.segmentLoadDropHandler = segmentLoadDropHandler;

}

@Override
Expand Down
Loading
Loading