@@ -96,6 +96,14 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
9696 return optimizers
9797}
9898
99+ // QueryOpts implements promql.QueryOpts but allows to override more engine default options
100+ type QueryOpts struct {
101+ promql.QueryOpts
102+
103+ // DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
104+ DecodingConcurrency int
105+ }
106+
99107// New creates a new query engine with the given options. The query engine will
100108// use the storage passed in NewInstantQuery and NewRangeQuery for retrieving
101109// data when executing queries.
@@ -239,31 +247,12 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
239247 if err != nil {
240248 return nil , err
241249 }
242-
243- if opts == nil {
244- opts = promql .NewPrometheusQueryOpts (false , e .lookbackDelta )
245- }
246- if opts .LookbackDelta () <= 0 {
247- opts = promql .NewPrometheusQueryOpts (opts .EnablePerStepStats (), e .lookbackDelta )
248- }
249-
250250 // determine sorting order before optimizers run, we do this by looking for "sort"
251251 // and "sort_desc" and optimize them away afterwards since they are only needed at
252252 // the presentation layer and not when computing the results.
253253 resultSort := newResultSort (expr )
254254
255- qOpts := & query.Options {
256- Start : ts ,
257- End : ts ,
258- Step : 0 ,
259- StepsBatch : stepsBatch ,
260- LookbackDelta : opts .LookbackDelta (),
261- EnablePerStepStats : e .enablePerStepStats && opts .EnablePerStepStats (),
262- ExtLookbackDelta : e .extLookbackDelta ,
263- EnableAnalysis : e .enableAnalysis ,
264- NoStepSubqueryIntervalFn : e .noStepSubqueryIntervalFn ,
265- DecodingConcurrency : e .decodingConcurrency ,
266- }
255+ qOpts := e .makeQueryOpts (ts , ts , 0 , opts )
267256 if qOpts .StepsBatch > 64 {
268257 return nil , ErrStepsBatchTooLarge
269258 }
@@ -308,13 +297,6 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
308297 }
309298 defer e .activeQueryTracker .Delete (idx )
310299
311- if opts == nil {
312- opts = promql .NewPrometheusQueryOpts (false , e .lookbackDelta )
313- }
314- if opts .LookbackDelta () <= 0 {
315- opts = promql .NewPrometheusQueryOpts (opts .EnablePerStepStats (), e .lookbackDelta )
316- }
317-
318300 qOpts := e .makeQueryOpts (ts , ts , 0 , opts )
319301 if qOpts .StepsBatch > 64 {
320302 return nil , ErrStepsBatchTooLarge
@@ -371,12 +353,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
371353 if expr .Type () != parser .ValueTypeVector && expr .Type () != parser .ValueTypeScalar {
372354 return nil , errors .Newf ("invalid expression type %q for range query, must be Scalar or instant Vector" , parser .DocumentedType (expr .Type ()))
373355 }
374- if opts == nil {
375- opts = promql .NewPrometheusQueryOpts (false , e .lookbackDelta )
376- }
377- if opts .LookbackDelta () <= 0 {
378- opts = promql .NewPrometheusQueryOpts (opts .EnablePerStepStats (), e .lookbackDelta )
379- }
380356 qOpts := e .makeQueryOpts (start , end , step , opts )
381357 if qOpts .StepsBatch > 64 {
382358 return nil , ErrStepsBatchTooLarge
@@ -420,12 +396,6 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
420396 }
421397 defer e .activeQueryTracker .Delete (idx )
422398
423- if opts == nil {
424- opts = promql .NewPrometheusQueryOpts (false , e .lookbackDelta )
425- }
426- if opts .LookbackDelta () <= 0 {
427- opts = promql .NewPrometheusQueryOpts (opts .EnablePerStepStats (), e .lookbackDelta )
428- }
429399 qOpts := e .makeQueryOpts (start , end , step , opts )
430400 if qOpts .StepsBatch > 64 {
431401 return nil , ErrStepsBatchTooLarge
@@ -462,19 +432,38 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
462432}
463433
464434func (e * Engine ) makeQueryOpts (start time.Time , end time.Time , step time.Duration , opts promql.QueryOpts ) * query.Options {
465- qOpts := & query.Options {
435+ res := & query.Options {
466436 Start : start ,
467437 End : end ,
468438 Step : step ,
469439 StepsBatch : stepsBatch ,
470- LookbackDelta : opts . LookbackDelta () ,
471- EnablePerStepStats : e .enablePerStepStats && opts . EnablePerStepStats () ,
440+ LookbackDelta : e . lookbackDelta ,
441+ EnablePerStepStats : e .enablePerStepStats ,
472442 ExtLookbackDelta : e .extLookbackDelta ,
473443 EnableAnalysis : e .enableAnalysis ,
474444 NoStepSubqueryIntervalFn : e .noStepSubqueryIntervalFn ,
475445 DecodingConcurrency : e .decodingConcurrency ,
476446 }
477- return qOpts
447+ if opts == nil {
448+ return res
449+ }
450+
451+ if opts .LookbackDelta () > 0 {
452+ res .LookbackDelta = opts .LookbackDelta ()
453+ }
454+ if opts .EnablePerStepStats () {
455+ res .EnablePerStepStats = opts .EnablePerStepStats ()
456+ }
457+
458+ extOpts , ok := opts .(* QueryOpts )
459+ if ! ok {
460+ return res
461+ }
462+
463+ if extOpts .DecodingConcurrency != 0 {
464+ res .DecodingConcurrency = extOpts .DecodingConcurrency
465+ }
466+ return res
478467}
479468
480469func (e * Engine ) storageScanners (queryable storage.Queryable , qOpts * query.Options , lplan logicalplan.Plan ) (engstorage.Scanners , error ) {
0 commit comments