2025-05-27 20:54:48 +05:30
|
|
|
package querier
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2025-06-10 18:26:28 +05:30
|
|
|
"fmt"
|
|
|
|
|
"log/slog"
|
|
|
|
|
"slices"
|
2025-06-16 23:11:28 +05:30
|
|
|
"strconv"
|
2025-06-10 18:26:28 +05:30
|
|
|
"sync"
|
2025-06-21 17:49:33 +05:30
|
|
|
"time"
|
2025-05-27 20:54:48 +05:30
|
|
|
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
2025-06-10 18:26:28 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
2025-05-27 20:54:48 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
2025-06-21 17:49:33 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
2025-05-27 20:54:48 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
2025-06-16 23:11:28 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
2025-05-27 20:54:48 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
2025-06-10 18:26:28 +05:30
|
|
|
"golang.org/x/exp/maps"
|
2025-05-27 20:54:48 +05:30
|
|
|
|
|
|
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
2025-06-10 18:26:28 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
2025-05-27 20:54:48 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type querier struct {
|
2025-06-10 18:26:28 +05:30
|
|
|
logger *slog.Logger
|
2025-05-27 20:54:48 +05:30
|
|
|
telemetryStore telemetrystore.TelemetryStore
|
|
|
|
|
metadataStore telemetrytypes.MetadataStore
|
|
|
|
|
promEngine prometheus.Prometheus
|
|
|
|
|
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
|
|
|
|
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
|
|
|
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
2025-06-10 18:26:28 +05:30
|
|
|
bucketCache BucketCache
|
2025-05-27 20:54:48 +05:30
|
|
|
}
|
|
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
var _ Querier = (*querier)(nil)
|
|
|
|
|
|
|
|
|
|
func New(
|
|
|
|
|
settings factory.ProviderSettings,
|
2025-05-27 20:54:48 +05:30
|
|
|
telemetryStore telemetrystore.TelemetryStore,
|
|
|
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
|
|
|
promEngine prometheus.Prometheus,
|
|
|
|
|
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
|
|
|
|
|
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
|
|
|
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
|
2025-06-10 18:26:28 +05:30
|
|
|
bucketCache BucketCache,
|
2025-05-27 20:54:48 +05:30
|
|
|
) *querier {
|
2025-06-10 18:26:28 +05:30
|
|
|
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
2025-05-27 20:54:48 +05:30
|
|
|
return &querier{
|
2025-06-10 18:26:28 +05:30
|
|
|
logger: querierSettings.Logger(),
|
2025-05-27 20:54:48 +05:30
|
|
|
telemetryStore: telemetryStore,
|
|
|
|
|
metadataStore: metadataStore,
|
|
|
|
|
promEngine: promEngine,
|
|
|
|
|
traceStmtBuilder: traceStmtBuilder,
|
|
|
|
|
logStmtBuilder: logStmtBuilder,
|
|
|
|
|
metricStmtBuilder: metricStmtBuilder,
|
2025-06-10 18:26:28 +05:30
|
|
|
bucketCache: bucketCache,
|
2025-05-27 20:54:48 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present
|
|
|
|
|
func extractShiftFromBuilderQuery[T any](spec qbtypes.QueryBuilderQuery[T]) int64 {
|
|
|
|
|
for _, fn := range spec.Functions {
|
|
|
|
|
if fn.Name == qbtypes.FunctionNameTimeShift && len(fn.Args) > 0 {
|
|
|
|
|
switch v := fn.Args[0].Value.(type) {
|
|
|
|
|
case float64:
|
|
|
|
|
return int64(v)
|
|
|
|
|
case int64:
|
|
|
|
|
return v
|
|
|
|
|
case int:
|
|
|
|
|
return int64(v)
|
|
|
|
|
case string:
|
|
|
|
|
if shiftFloat, err := strconv.ParseFloat(v, 64); err == nil {
|
|
|
|
|
return int64(shiftFloat)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function
|
|
|
|
|
func adjustTimeRangeForShift[T any](spec qbtypes.QueryBuilderQuery[T], tr qbtypes.TimeRange, kind qbtypes.RequestType) qbtypes.TimeRange {
|
|
|
|
|
// Only apply time shift for time series and scalar queries
|
|
|
|
|
// Raw/list queries don't support timeshift
|
|
|
|
|
if kind != qbtypes.RequestTypeTimeSeries && kind != qbtypes.RequestTypeScalar {
|
|
|
|
|
return tr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Use the ShiftBy field if it's already populated, otherwise extract it
|
|
|
|
|
shiftBy := spec.ShiftBy
|
|
|
|
|
if shiftBy == 0 {
|
|
|
|
|
shiftBy = extractShiftFromBuilderQuery(spec)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if shiftBy == 0 {
|
|
|
|
|
return tr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ShiftBy is in seconds, convert to milliseconds and shift backward in time
|
|
|
|
|
shiftMS := shiftBy * 1000
|
|
|
|
|
return qbtypes.TimeRange{
|
|
|
|
|
From: tr.From - uint64(shiftMS),
|
|
|
|
|
To: tr.To - uint64(shiftMS),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
|
2025-05-27 20:54:48 +05:30
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// First pass: collect all metric names that need temporality
|
|
|
|
|
metricNames := make([]string, 0)
|
2025-06-21 17:49:33 +05:30
|
|
|
for idx, query := range req.CompositeQuery.Queries {
|
2025-06-16 23:11:28 +05:30
|
|
|
if query.Type == qbtypes.QueryTypeBuilder {
|
|
|
|
|
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
|
|
|
|
|
for _, agg := range spec.Aggregations {
|
|
|
|
|
if agg.MetricName != "" {
|
|
|
|
|
metricNames = append(metricNames, agg.MetricName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-21 17:49:33 +05:30
|
|
|
// if step interval is not set, we set it ourselves with recommended value
|
|
|
|
|
// if step interval is set to value which could result in points more than
|
|
|
|
|
// allowed, we override it.
|
|
|
|
|
switch spec := query.Spec.(type) {
|
|
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
|
|
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
|
|
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
|
|
|
|
if spec.StepInterval.Seconds() == 0 {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
|
|
|
|
|
spec.StepInterval = qbtypes.Step{
|
|
|
|
|
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
req.CompositeQuery.Queries[idx].Spec = spec
|
|
|
|
|
}
|
2025-06-16 23:11:28 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Fetch temporality for all metrics at once
|
|
|
|
|
var metricTemporality map[string]metrictypes.Temporality
|
|
|
|
|
if len(metricNames) > 0 {
|
|
|
|
|
var err error
|
|
|
|
|
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
|
|
|
|
|
// Continue without temporality - statement builder will handle unspecified
|
|
|
|
|
metricTemporality = make(map[string]metrictypes.Temporality)
|
|
|
|
|
}
|
|
|
|
|
q.logger.DebugContext(ctx, "fetched metric temporalities", "metric_temporality", metricTemporality)
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
queries := make(map[string]qbtypes.Query)
|
2025-06-10 18:26:28 +05:30
|
|
|
steps := make(map[string]qbtypes.Step)
|
2025-05-27 20:54:48 +05:30
|
|
|
|
|
|
|
|
for _, query := range req.CompositeQuery.Queries {
|
|
|
|
|
switch query.Type {
|
|
|
|
|
case qbtypes.QueryTypePromQL:
|
|
|
|
|
promQuery, ok := query.Spec.(qbtypes.PromQuery)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
promqlQuery := newPromqlQuery(q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
2025-06-10 18:26:28 +05:30
|
|
|
queries[promQuery.Name] = promqlQuery
|
|
|
|
|
steps[promQuery.Name] = promQuery.Step
|
2025-05-27 20:54:48 +05:30
|
|
|
case qbtypes.QueryTypeClickHouseSQL:
|
|
|
|
|
chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery)
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
chSQLQuery := newchSQLQuery(q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
2025-06-10 18:26:28 +05:30
|
|
|
queries[chQuery.Name] = chSQLQuery
|
2025-05-27 20:54:48 +05:30
|
|
|
case qbtypes.QueryTypeBuilder:
|
|
|
|
|
switch spec := query.Spec.(type) {
|
|
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
|
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
2025-06-21 17:49:33 +05:30
|
|
|
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, req.Variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
queries[spec.Name] = bq
|
|
|
|
|
steps[spec.Name] = spec.StepInterval
|
2025-05-27 20:54:48 +05:30
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
|
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
2025-06-21 17:49:33 +05:30
|
|
|
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType, req.Variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
queries[spec.Name] = bq
|
|
|
|
|
steps[spec.Name] = spec.StepInterval
|
2025-05-27 20:54:48 +05:30
|
|
|
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
for i := range spec.Aggregations {
|
|
|
|
|
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
|
|
|
|
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
|
|
|
|
|
spec.Aggregations[i].Temporality = temp
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
|
|
|
|
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
2025-06-21 17:49:33 +05:30
|
|
|
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, req.Variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
queries[spec.Name] = bq
|
|
|
|
|
steps[spec.Name] = spec.StepInterval
|
2025-05-27 20:54:48 +05:30
|
|
|
default:
|
|
|
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
return q.run(ctx, orgID, queries, req, steps)
|
2025-05-27 20:54:48 +05:30
|
|
|
}
|
|
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbtypes.Query, req *qbtypes.QueryRangeRequest, steps map[string]qbtypes.Step) (*qbtypes.QueryRangeResponse, error) {
|
|
|
|
|
results := make(map[string]any)
|
|
|
|
|
warnings := make([]string, 0)
|
|
|
|
|
stats := qbtypes.ExecStats{}
|
|
|
|
|
|
|
|
|
|
for name, query := range qs {
|
|
|
|
|
// Skip cache if NoCache is set, or if cache is not available
|
|
|
|
|
if req.NoCache || q.bucketCache == nil || query.Fingerprint() == "" {
|
|
|
|
|
if req.NoCache {
|
|
|
|
|
q.logger.DebugContext(ctx, "NoCache flag set, bypassing cache", "query", name)
|
|
|
|
|
} else {
|
|
|
|
|
q.logger.InfoContext(ctx, "no bucket cache or fingerprint, executing query", "fingerprint", query.Fingerprint())
|
|
|
|
|
}
|
|
|
|
|
result, err := query.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
results[name] = result.Value
|
|
|
|
|
warnings = append(warnings, result.Warnings...)
|
|
|
|
|
stats.RowsScanned += result.Stats.RowsScanned
|
|
|
|
|
stats.BytesScanned += result.Stats.BytesScanned
|
|
|
|
|
stats.DurationMS += result.Stats.DurationMS
|
|
|
|
|
} else {
|
|
|
|
|
result, err := q.executeWithCache(ctx, orgID, query, steps[name], req.NoCache)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
results[name] = result.Value
|
|
|
|
|
warnings = append(warnings, result.Warnings...)
|
|
|
|
|
stats.RowsScanned += result.Stats.RowsScanned
|
|
|
|
|
stats.BytesScanned += result.Stats.BytesScanned
|
|
|
|
|
stats.DurationMS += result.Stats.DurationMS
|
2025-05-27 20:54:48 +05:30
|
|
|
}
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
processedResults, err := q.postProcessResults(ctx, results, req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
return &qbtypes.QueryRangeResponse{
|
2025-06-10 18:26:28 +05:30
|
|
|
Type: req.RequestType,
|
|
|
|
|
Data: struct {
|
|
|
|
|
Results []any `json:"results"`
|
|
|
|
|
Warnings []string `json:"warnings"`
|
|
|
|
|
}{
|
2025-06-16 23:11:28 +05:30
|
|
|
Results: maps.Values(processedResults),
|
2025-06-10 18:26:28 +05:30
|
|
|
Warnings: warnings,
|
|
|
|
|
},
|
|
|
|
|
Meta: struct {
|
|
|
|
|
RowsScanned uint64 `json:"rowsScanned"`
|
|
|
|
|
BytesScanned uint64 `json:"bytesScanned"`
|
|
|
|
|
DurationMS uint64 `json:"durationMs"`
|
|
|
|
|
}{
|
|
|
|
|
RowsScanned: stats.RowsScanned,
|
|
|
|
|
BytesScanned: stats.BytesScanned,
|
|
|
|
|
DurationMS: stats.DurationMS,
|
|
|
|
|
},
|
2025-05-27 20:54:48 +05:30
|
|
|
}, nil
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
|
|
|
|
|
// executeWithCache executes a query using the bucket cache
|
|
|
|
|
func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query qbtypes.Query, step qbtypes.Step, noCache bool) (*qbtypes.Result, error) {
|
|
|
|
|
// Get cached data and missing ranges
|
|
|
|
|
cachedResult, missingRanges := q.bucketCache.GetMissRanges(ctx, orgID, query, step)
|
|
|
|
|
|
|
|
|
|
// If no missing ranges, return cached result
|
|
|
|
|
if len(missingRanges) == 0 && cachedResult != nil {
|
|
|
|
|
return cachedResult, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If entire range is missing, execute normally
|
|
|
|
|
if cachedResult == nil && len(missingRanges) == 1 {
|
|
|
|
|
startMs, endMs := query.Window()
|
|
|
|
|
if missingRanges[0].From == startMs && missingRanges[0].To == endMs {
|
|
|
|
|
result, err := query.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
// Store in cache for future use
|
2025-06-16 23:11:28 +05:30
|
|
|
q.bucketCache.Put(ctx, orgID, query, step, result)
|
2025-06-10 18:26:28 +05:30
|
|
|
return result, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute queries for missing ranges with bounded parallelism
|
|
|
|
|
freshResults := make([]*qbtypes.Result, len(missingRanges))
|
|
|
|
|
errors := make([]error, len(missingRanges))
|
|
|
|
|
totalStats := qbtypes.ExecStats{}
|
|
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
q.logger.DebugContext(ctx, "executing queries for missing ranges",
|
|
|
|
|
"missing_ranges_count", len(missingRanges),
|
|
|
|
|
"ranges", missingRanges)
|
|
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
sem := make(chan struct{}, 4)
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
for i, timeRange := range missingRanges {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func(idx int, tr *qbtypes.TimeRange) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
sem <- struct{}{}
|
|
|
|
|
defer func() { <-sem }()
|
|
|
|
|
|
|
|
|
|
// Create a new query with the missing time range
|
|
|
|
|
rangedQuery := q.createRangedQuery(query, *tr)
|
|
|
|
|
if rangedQuery == nil {
|
|
|
|
|
errors[idx] = fmt.Errorf("failed to create ranged query for range %d-%d", tr.From, tr.To)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the ranged query
|
|
|
|
|
result, err := rangedQuery.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
errors[idx] = err
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freshResults[idx] = result
|
|
|
|
|
}(i, timeRange)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for all queries to complete
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
|
|
// Check for errors
|
|
|
|
|
for _, err := range errors {
|
|
|
|
|
if err != nil {
|
|
|
|
|
// If any query failed, fall back to full execution
|
|
|
|
|
q.logger.ErrorContext(ctx, "parallel query execution failed", "error", err)
|
|
|
|
|
result, err := query.Execute(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-06-16 23:11:28 +05:30
|
|
|
q.bucketCache.Put(ctx, orgID, query, step, result)
|
2025-06-10 18:26:28 +05:30
|
|
|
return result, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Calculate total stats and filter out nil results
|
|
|
|
|
validResults := make([]*qbtypes.Result, 0, len(freshResults))
|
|
|
|
|
for _, result := range freshResults {
|
|
|
|
|
if result != nil {
|
|
|
|
|
validResults = append(validResults, result)
|
|
|
|
|
totalStats.RowsScanned += result.Stats.RowsScanned
|
|
|
|
|
totalStats.BytesScanned += result.Stats.BytesScanned
|
|
|
|
|
totalStats.DurationMS += result.Stats.DurationMS
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
freshResults = validResults
|
|
|
|
|
|
|
|
|
|
// Merge cached and fresh results
|
|
|
|
|
mergedResult := q.mergeResults(cachedResult, freshResults)
|
|
|
|
|
mergedResult.Stats.RowsScanned += totalStats.RowsScanned
|
|
|
|
|
mergedResult.Stats.BytesScanned += totalStats.BytesScanned
|
|
|
|
|
mergedResult.Stats.DurationMS += totalStats.DurationMS
|
|
|
|
|
|
|
|
|
|
// Store merged result in cache
|
2025-06-16 23:11:28 +05:30
|
|
|
q.bucketCache.Put(ctx, orgID, query, step, mergedResult)
|
2025-06-10 18:26:28 +05:30
|
|
|
|
|
|
|
|
return mergedResult, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// createRangedQuery creates a copy of the query with a different time range
|
|
|
|
|
func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtypes.TimeRange) qbtypes.Query {
|
|
|
|
|
switch qt := originalQuery.(type) {
|
|
|
|
|
case *promqlQuery:
|
|
|
|
|
return newPromqlQuery(q.promEngine, qt.query, timeRange, qt.requestType)
|
|
|
|
|
case *chSQLQuery:
|
|
|
|
|
return newchSQLQuery(q.telemetryStore, qt.query, qt.args, timeRange, qt.kind)
|
|
|
|
|
case *builderQuery[qbtypes.TraceAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
|
|
|
|
|
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
|
2025-06-21 17:49:33 +05:30
|
|
|
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, adjustedTimeRange, qt.kind, qt.variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
case *builderQuery[qbtypes.LogAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
|
|
|
|
|
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
|
2025-06-21 17:49:33 +05:30
|
|
|
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, adjustedTimeRange, qt.kind, qt.variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
case *builderQuery[qbtypes.MetricAggregation]:
|
2025-06-16 23:11:28 +05:30
|
|
|
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
|
|
|
|
|
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
|
2025-06-21 17:49:33 +05:30
|
|
|
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, adjustedTimeRange, qt.kind, qt.variables)
|
2025-06-10 18:26:28 +05:30
|
|
|
default:
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// mergeResults merges cached result with fresh results
|
|
|
|
|
func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result {
|
2025-06-16 23:11:28 +05:30
|
|
|
if cached == nil {
|
|
|
|
|
if len(fresh) == 1 {
|
|
|
|
|
return fresh[0]
|
|
|
|
|
}
|
|
|
|
|
if len(fresh) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
// If cached is nil but we have multiple fresh results, we need to merge them
|
|
|
|
|
// We need to merge all fresh results properly to avoid duplicates
|
|
|
|
|
merged := &qbtypes.Result{
|
|
|
|
|
Type: fresh[0].Type,
|
|
|
|
|
Stats: fresh[0].Stats,
|
|
|
|
|
Warnings: fresh[0].Warnings,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Merge all fresh results including the first one
|
|
|
|
|
switch merged.Type {
|
|
|
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
|
|
|
// Pass nil as cached value to ensure proper merging of all fresh results
|
|
|
|
|
merged.Value = q.mergeTimeSeriesResults(nil, fresh)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return merged
|
2025-06-10 18:26:28 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start with cached result
|
|
|
|
|
merged := &qbtypes.Result{
|
|
|
|
|
Type: cached.Type,
|
|
|
|
|
Value: cached.Value,
|
|
|
|
|
Stats: cached.Stats,
|
|
|
|
|
Warnings: cached.Warnings,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If no fresh results, return cached
|
|
|
|
|
if len(fresh) == 0 {
|
|
|
|
|
return merged
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch merged.Type {
|
|
|
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
|
|
|
merged.Value = q.mergeTimeSeriesResults(cached.Value.(*qbtypes.TimeSeriesData), fresh)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(fresh) > 0 {
|
|
|
|
|
totalWarnings := len(merged.Warnings)
|
|
|
|
|
for _, result := range fresh {
|
|
|
|
|
totalWarnings += len(result.Warnings)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
allWarnings := make([]string, 0, totalWarnings)
|
|
|
|
|
allWarnings = append(allWarnings, merged.Warnings...)
|
|
|
|
|
for _, result := range fresh {
|
|
|
|
|
allWarnings = append(allWarnings, result.Warnings...)
|
|
|
|
|
}
|
|
|
|
|
merged.Warnings = allWarnings
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return merged
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// mergeTimeSeriesResults merges time series data
|
|
|
|
|
func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData {
|
|
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// Map to store merged series by aggregation index and series key
|
2025-06-10 18:26:28 +05:30
|
|
|
seriesMap := make(map[int]map[string]*qbtypes.TimeSeries)
|
2025-06-16 23:11:28 +05:30
|
|
|
// Map to store aggregation bucket metadata
|
|
|
|
|
bucketMetadata := make(map[int]*qbtypes.AggregationBucket)
|
2025-06-10 18:26:28 +05:30
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// Process cached data if available
|
|
|
|
|
if cachedValue != nil && cachedValue.Aggregations != nil {
|
|
|
|
|
for _, aggBucket := range cachedValue.Aggregations {
|
|
|
|
|
if seriesMap[aggBucket.Index] == nil {
|
|
|
|
|
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
|
|
|
|
|
}
|
|
|
|
|
if bucketMetadata[aggBucket.Index] == nil {
|
|
|
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
|
|
|
}
|
|
|
|
|
for _, series := range aggBucket.Series {
|
|
|
|
|
key := qbtypes.GetUniqueSeriesKey(series.Labels)
|
|
|
|
|
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
|
|
|
|
|
// Merge values from duplicate series in cached data, avoiding duplicate timestamps
|
|
|
|
|
timestampMap := make(map[int64]bool)
|
|
|
|
|
for _, v := range existingSeries.Values {
|
|
|
|
|
timestampMap[v.Timestamp] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Only add values with new timestamps
|
|
|
|
|
for _, v := range series.Values {
|
|
|
|
|
if !timestampMap[v.Timestamp] {
|
|
|
|
|
existingSeries.Values = append(existingSeries.Values, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Create a copy to avoid modifying the cached data
|
|
|
|
|
seriesCopy := &qbtypes.TimeSeries{
|
|
|
|
|
Labels: series.Labels,
|
|
|
|
|
Values: make([]*qbtypes.TimeSeriesValue, len(series.Values)),
|
|
|
|
|
}
|
|
|
|
|
copy(seriesCopy.Values, series.Values)
|
|
|
|
|
seriesMap[aggBucket.Index][key] = seriesCopy
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add fresh series
|
|
|
|
|
for _, result := range freshResults {
|
|
|
|
|
freshTS, ok := result.Value.(*qbtypes.TimeSeriesData)
|
2025-06-16 23:11:28 +05:30
|
|
|
if !ok || freshTS == nil || freshTS.Aggregations == nil {
|
2025-06-10 18:26:28 +05:30
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, aggBucket := range freshTS.Aggregations {
|
|
|
|
|
if seriesMap[aggBucket.Index] == nil {
|
|
|
|
|
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
|
|
|
|
|
}
|
2025-06-16 23:11:28 +05:30
|
|
|
// Prefer fresh metadata over cached metadata
|
|
|
|
|
if aggBucket.Alias != "" || aggBucket.Meta.Unit != "" {
|
|
|
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
|
|
|
} else if bucketMetadata[aggBucket.Index] == nil {
|
|
|
|
|
bucketMetadata[aggBucket.Index] = aggBucket
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, aggBucket := range freshTS.Aggregations {
|
|
|
|
|
for _, series := range aggBucket.Series {
|
|
|
|
|
key := qbtypes.GetUniqueSeriesKey(series.Labels)
|
|
|
|
|
|
|
|
|
|
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
|
2025-06-16 23:11:28 +05:30
|
|
|
// Merge values, avoiding duplicate timestamps
|
|
|
|
|
// Create a map to track existing timestamps
|
|
|
|
|
timestampMap := make(map[int64]bool)
|
|
|
|
|
for _, v := range existingSeries.Values {
|
|
|
|
|
timestampMap[v.Timestamp] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Only add values with new timestamps
|
|
|
|
|
for _, v := range series.Values {
|
|
|
|
|
if !timestampMap[v.Timestamp] {
|
|
|
|
|
existingSeries.Values = append(existingSeries.Values, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-10 18:26:28 +05:30
|
|
|
} else {
|
|
|
|
|
// New series
|
|
|
|
|
seriesMap[aggBucket.Index][key] = series
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result := &qbtypes.TimeSeriesData{
|
|
|
|
|
Aggregations: []*qbtypes.AggregationBucket{},
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// Set QueryName from cached or first fresh result
|
|
|
|
|
if cachedValue != nil {
|
|
|
|
|
result.QueryName = cachedValue.QueryName
|
|
|
|
|
} else if len(freshResults) > 0 {
|
|
|
|
|
if freshTS, ok := freshResults[0].Value.(*qbtypes.TimeSeriesData); ok && freshTS != nil {
|
|
|
|
|
result.QueryName = freshTS.QueryName
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
for index, series := range seriesMap {
|
|
|
|
|
var aggSeries []*qbtypes.TimeSeries
|
|
|
|
|
for _, s := range series {
|
|
|
|
|
// Sort values by timestamp
|
|
|
|
|
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
|
|
|
|
|
if a.Timestamp < b.Timestamp {
|
|
|
|
|
return -1
|
|
|
|
|
}
|
|
|
|
|
if a.Timestamp > b.Timestamp {
|
|
|
|
|
return 1
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
})
|
|
|
|
|
aggSeries = append(aggSeries, s)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-16 23:11:28 +05:30
|
|
|
// Preserve bucket metadata from either cached or fresh results
|
|
|
|
|
bucket := &qbtypes.AggregationBucket{
|
2025-06-10 18:26:28 +05:30
|
|
|
Index: index,
|
|
|
|
|
Series: aggSeries,
|
2025-06-16 23:11:28 +05:30
|
|
|
}
|
|
|
|
|
if metadata, ok := bucketMetadata[index]; ok {
|
|
|
|
|
bucket.Alias = metadata.Alias
|
|
|
|
|
bucket.Meta = metadata.Meta
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.Aggregations = append(result.Aggregations, bucket)
|
2025-06-10 18:26:28 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
}
|