2025-05-27 20:54:48 +05:30
package querier
import (
"context"
2025-06-10 18:26:28 +05:30
"fmt"
"log/slog"
"slices"
2025-06-16 23:11:28 +05:30
"strconv"
2025-07-28 23:05:44 +05:30
"strings"
2025-06-10 18:26:28 +05:30
"sync"
2025-06-21 17:49:33 +05:30
"time"
2025-05-27 20:54:48 +05:30
"github.com/SigNoz/signoz/pkg/errors"
2025-06-10 18:26:28 +05:30
"github.com/SigNoz/signoz/pkg/factory"
2025-05-27 20:54:48 +05:30
"github.com/SigNoz/signoz/pkg/prometheus"
2025-08-28 11:44:43 +05:30
"github.com/SigNoz/signoz/pkg/query-service/utils"
2025-06-21 17:49:33 +05:30
"github.com/SigNoz/signoz/pkg/querybuilder"
2025-05-27 20:54:48 +05:30
"github.com/SigNoz/signoz/pkg/telemetrystore"
2025-06-16 23:11:28 +05:30
"github.com/SigNoz/signoz/pkg/types/metrictypes"
2025-05-27 20:54:48 +05:30
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
2025-06-10 18:26:28 +05:30
"golang.org/x/exp/maps"
2025-05-27 20:54:48 +05:30
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
2025-06-10 18:26:28 +05:30
"github.com/SigNoz/signoz/pkg/valuer"
2025-05-27 20:54:48 +05:30
)
2025-08-18 15:11:53 +05:30
var (
intervalWarn = "Query %s is requesting aggregation interval %v seconds, which is smaller than the minimum allowed interval of %v seconds for selected time range. Using the minimum instead"
)
2025-05-27 20:54:48 +05:30
type querier struct {
2025-08-28 11:44:43 +05:30
logger * slog . Logger
telemetryStore telemetrystore . TelemetryStore
metadataStore telemetrytypes . MetadataStore
promEngine prometheus . Prometheus
traceStmtBuilder qbtypes . StatementBuilder [ qbtypes . TraceAggregation ]
logStmtBuilder qbtypes . StatementBuilder [ qbtypes . LogAggregation ]
metricStmtBuilder qbtypes . StatementBuilder [ qbtypes . MetricAggregation ]
meterStmtBuilder qbtypes . StatementBuilder [ qbtypes . MetricAggregation ]
bucketCache BucketCache
liveDataRefreshSeconds time . Duration
2025-05-27 20:54:48 +05:30
}
2025-06-10 18:26:28 +05:30
var _ Querier = ( * querier ) ( nil )
func New (
settings factory . ProviderSettings ,
2025-05-27 20:54:48 +05:30
telemetryStore telemetrystore . TelemetryStore ,
metadataStore telemetrytypes . MetadataStore ,
promEngine prometheus . Prometheus ,
traceStmtBuilder qbtypes . StatementBuilder [ qbtypes . TraceAggregation ] ,
logStmtBuilder qbtypes . StatementBuilder [ qbtypes . LogAggregation ] ,
metricStmtBuilder qbtypes . StatementBuilder [ qbtypes . MetricAggregation ] ,
2025-08-07 16:50:37 +05:30
meterStmtBuilder qbtypes . StatementBuilder [ qbtypes . MetricAggregation ] ,
2025-06-10 18:26:28 +05:30
bucketCache BucketCache ,
2025-05-27 20:54:48 +05:30
) * querier {
2025-06-10 18:26:28 +05:30
querierSettings := factory . NewScopedProviderSettings ( settings , "github.com/SigNoz/signoz/pkg/querier" )
2025-05-27 20:54:48 +05:30
return & querier {
2025-08-28 11:44:43 +05:30
logger : querierSettings . Logger ( ) ,
telemetryStore : telemetryStore ,
metadataStore : metadataStore ,
promEngine : promEngine ,
traceStmtBuilder : traceStmtBuilder ,
logStmtBuilder : logStmtBuilder ,
metricStmtBuilder : metricStmtBuilder ,
meterStmtBuilder : meterStmtBuilder ,
bucketCache : bucketCache ,
liveDataRefreshSeconds : 5 ,
2025-05-27 20:54:48 +05:30
}
}
2025-06-16 23:11:28 +05:30
// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present
func extractShiftFromBuilderQuery [ T any ] ( spec qbtypes . QueryBuilderQuery [ T ] ) int64 {
for _ , fn := range spec . Functions {
if fn . Name == qbtypes . FunctionNameTimeShift && len ( fn . Args ) > 0 {
switch v := fn . Args [ 0 ] . Value . ( type ) {
case float64 :
return int64 ( v )
case int64 :
return v
case int :
return int64 ( v )
case string :
if shiftFloat , err := strconv . ParseFloat ( v , 64 ) ; err == nil {
return int64 ( shiftFloat )
}
}
}
}
return 0
}
// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function
func adjustTimeRangeForShift [ T any ] ( spec qbtypes . QueryBuilderQuery [ T ] , tr qbtypes . TimeRange , kind qbtypes . RequestType ) qbtypes . TimeRange {
// Only apply time shift for time series and scalar queries
// Raw/list queries don't support timeshift
if kind != qbtypes . RequestTypeTimeSeries && kind != qbtypes . RequestTypeScalar {
return tr
}
// Use the ShiftBy field if it's already populated, otherwise extract it
shiftBy := spec . ShiftBy
if shiftBy == 0 {
shiftBy = extractShiftFromBuilderQuery ( spec )
}
if shiftBy == 0 {
return tr
}
// ShiftBy is in seconds, convert to milliseconds and shift backward in time
shiftMS := shiftBy * 1000
return qbtypes . TimeRange {
From : tr . From - uint64 ( shiftMS ) ,
To : tr . To - uint64 ( shiftMS ) ,
}
}
2025-06-10 18:26:28 +05:30
func ( q * querier ) QueryRange ( ctx context . Context , orgID valuer . UUID , req * qbtypes . QueryRangeRequest ) ( * qbtypes . QueryRangeResponse , error ) {
2025-05-27 20:54:48 +05:30
2025-06-23 14:00:50 +05:30
tmplVars := req . Variables
if tmplVars == nil {
tmplVars = make ( map [ string ] qbtypes . VariableItem )
}
2025-07-28 23:05:44 +05:30
event := & qbtypes . QBEvent {
Version : "v5" ,
NumberOfQueries : len ( req . CompositeQuery . Queries ) ,
PanelType : req . RequestType . StringValue ( ) ,
}
2025-06-23 14:00:50 +05:30
2025-08-18 15:11:53 +05:30
intervalWarnings := [ ] string { }
2025-06-16 23:11:28 +05:30
// First pass: collect all metric names that need temporality
metricNames := make ( [ ] string , 0 )
2025-06-21 17:49:33 +05:30
for idx , query := range req . CompositeQuery . Queries {
2025-07-28 23:05:44 +05:30
event . QueryType = query . Type . StringValue ( )
2025-06-16 23:11:28 +05:30
if query . Type == qbtypes . QueryTypeBuilder {
if spec , ok := query . Spec . ( qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ) ; ok {
for _ , agg := range spec . Aggregations {
if agg . MetricName != "" {
metricNames = append ( metricNames , agg . MetricName )
}
}
}
2025-06-21 17:49:33 +05:30
// if step interval is not set, we set it ourselves with recommended value
// if step interval is set to value which could result in points more than
// allowed, we override it.
switch spec := query . Spec . ( type ) {
case qbtypes . QueryBuilderQuery [ qbtypes . TraceAggregation ] :
2025-07-28 23:05:44 +05:30
event . TracesUsed = true
event . FilterApplied = spec . Filter != nil && spec . Filter . Expression != ""
event . GroupByApplied = len ( spec . GroupBy ) > 0
2025-06-21 17:49:33 +05:30
if spec . StepInterval . Seconds ( ) == 0 {
spec . StepInterval = qbtypes . Step {
Duration : time . Second * time . Duration ( querybuilder . RecommendedStepInterval ( req . Start , req . End ) ) ,
}
}
if spec . StepInterval . Seconds ( ) < float64 ( querybuilder . MinAllowedStepInterval ( req . Start , req . End ) ) {
2025-08-18 15:11:53 +05:30
newStep := qbtypes . Step {
2025-06-21 17:49:33 +05:30
Duration : time . Second * time . Duration ( querybuilder . MinAllowedStepInterval ( req . Start , req . End ) ) ,
}
2025-08-18 15:11:53 +05:30
intervalWarnings = append ( intervalWarnings , fmt . Sprintf ( intervalWarn , spec . Name , spec . StepInterval . Seconds ( ) , newStep . Duration . Seconds ( ) ) )
spec . StepInterval = newStep
2025-06-21 17:49:33 +05:30
}
req . CompositeQuery . Queries [ idx ] . Spec = spec
case qbtypes . QueryBuilderQuery [ qbtypes . LogAggregation ] :
2025-07-28 23:05:44 +05:30
event . LogsUsed = true
event . FilterApplied = spec . Filter != nil && spec . Filter . Expression != ""
event . GroupByApplied = len ( spec . GroupBy ) > 0
2025-06-21 17:49:33 +05:30
if spec . StepInterval . Seconds ( ) == 0 {
spec . StepInterval = qbtypes . Step {
Duration : time . Second * time . Duration ( querybuilder . RecommendedStepInterval ( req . Start , req . End ) ) ,
}
}
if spec . StepInterval . Seconds ( ) < float64 ( querybuilder . MinAllowedStepInterval ( req . Start , req . End ) ) {
2025-08-18 15:11:53 +05:30
newStep := qbtypes . Step {
2025-06-21 17:49:33 +05:30
Duration : time . Second * time . Duration ( querybuilder . MinAllowedStepInterval ( req . Start , req . End ) ) ,
}
2025-08-18 15:11:53 +05:30
intervalWarnings = append ( intervalWarnings , fmt . Sprintf ( intervalWarn , spec . Name , spec . StepInterval . Seconds ( ) , newStep . Duration . Seconds ( ) ) )
spec . StepInterval = newStep
2025-06-21 17:49:33 +05:30
}
req . CompositeQuery . Queries [ idx ] . Spec = spec
case qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] :
2025-07-28 23:05:44 +05:30
event . MetricsUsed = true
event . FilterApplied = spec . Filter != nil && spec . Filter . Expression != ""
event . GroupByApplied = len ( spec . GroupBy ) > 0
2025-08-07 16:50:37 +05:30
if spec . Source == telemetrytypes . SourceMeter {
spec . StepInterval = qbtypes . Step { Duration : time . Second * time . Duration ( querybuilder . RecommendedStepIntervalForMeter ( req . Start , req . End ) ) }
} else {
if spec . StepInterval . Seconds ( ) == 0 {
spec . StepInterval = qbtypes . Step {
Duration : time . Second * time . Duration ( querybuilder . RecommendedStepIntervalForMetric ( req . Start , req . End ) ) ,
}
2025-06-21 17:49:33 +05:30
}
2025-08-07 16:50:37 +05:30
if spec . StepInterval . Seconds ( ) < float64 ( querybuilder . MinAllowedStepIntervalForMetric ( req . Start , req . End ) ) {
2025-08-18 15:11:53 +05:30
newStep := qbtypes . Step {
2025-08-07 16:50:37 +05:30
Duration : time . Second * time . Duration ( querybuilder . MinAllowedStepIntervalForMetric ( req . Start , req . End ) ) ,
}
2025-08-18 15:11:53 +05:30
intervalWarnings = append ( intervalWarnings , fmt . Sprintf ( intervalWarn , spec . Name , spec . StepInterval . Seconds ( ) , newStep . Duration . Seconds ( ) ) )
spec . StepInterval = newStep
2025-06-21 17:49:33 +05:30
}
}
2025-07-18 18:37:57 +05:30
req . CompositeQuery . Queries [ idx ] . Spec = spec
}
} else if query . Type == qbtypes . QueryTypePromQL {
2025-07-28 23:05:44 +05:30
event . MetricsUsed = true
2025-07-18 18:37:57 +05:30
switch spec := query . Spec . ( type ) {
case qbtypes . PromQuery :
if spec . Step . Seconds ( ) == 0 {
spec . Step = qbtypes . Step {
Duration : time . Second * time . Duration ( querybuilder . RecommendedStepIntervalForMetric ( req . Start , req . End ) ) ,
}
}
2025-06-21 17:49:33 +05:30
req . CompositeQuery . Queries [ idx ] . Spec = spec
}
2025-07-28 23:05:44 +05:30
} else if query . Type == qbtypes . QueryTypeClickHouseSQL {
switch spec := query . Spec . ( type ) {
case qbtypes . ClickHouseQuery :
if strings . TrimSpace ( spec . Query ) != "" {
event . MetricsUsed = strings . Contains ( spec . Query , "signoz_metrics" )
event . LogsUsed = strings . Contains ( spec . Query , "signoz_logs" )
event . TracesUsed = strings . Contains ( spec . Query , "signoz_traces" )
}
}
2025-06-16 23:11:28 +05:30
}
}
// Fetch temporality for all metrics at once
var metricTemporality map [ string ] metrictypes . Temporality
if len ( metricNames ) > 0 {
var err error
metricTemporality , err = q . metadataStore . FetchTemporalityMulti ( ctx , metricNames ... )
if err != nil {
q . logger . WarnContext ( ctx , "failed to fetch metric temporality" , "error" , err , "metrics" , metricNames )
// Continue without temporality - statement builder will handle unspecified
metricTemporality = make ( map [ string ] metrictypes . Temporality )
}
q . logger . DebugContext ( ctx , "fetched metric temporalities" , "metric_temporality" , metricTemporality )
}
2025-05-27 20:54:48 +05:30
queries := make ( map [ string ] qbtypes . Query )
2025-06-10 18:26:28 +05:30
steps := make ( map [ string ] qbtypes . Step )
2025-05-27 20:54:48 +05:30
for _ , query := range req . CompositeQuery . Queries {
switch query . Type {
case qbtypes . QueryTypePromQL :
promQuery , ok := query . Spec . ( qbtypes . PromQuery )
if ! ok {
return nil , errors . NewInvalidInputf ( errors . CodeInvalidInput , "invalid promql query spec %T" , query . Spec )
}
2025-06-23 14:00:50 +05:30
promqlQuery := newPromqlQuery ( q . logger , q . promEngine , promQuery , qbtypes . TimeRange { From : req . Start , To : req . End } , req . RequestType , tmplVars )
2025-06-10 18:26:28 +05:30
queries [ promQuery . Name ] = promqlQuery
steps [ promQuery . Name ] = promQuery . Step
2025-05-27 20:54:48 +05:30
case qbtypes . QueryTypeClickHouseSQL :
chQuery , ok := query . Spec . ( qbtypes . ClickHouseQuery )
if ! ok {
return nil , errors . NewInvalidInputf ( errors . CodeInvalidInput , "invalid clickhouse query spec %T" , query . Spec )
}
2025-06-23 14:00:50 +05:30
chSQLQuery := newchSQLQuery ( q . logger , q . telemetryStore , chQuery , nil , qbtypes . TimeRange { From : req . Start , To : req . End } , req . RequestType , tmplVars )
2025-06-10 18:26:28 +05:30
queries [ chQuery . Name ] = chSQLQuery
2025-05-27 20:54:48 +05:30
case qbtypes . QueryTypeBuilder :
switch spec := query . Spec . ( type ) {
case qbtypes . QueryBuilderQuery [ qbtypes . TraceAggregation ] :
2025-06-16 23:11:28 +05:30
spec . ShiftBy = extractShiftFromBuilderQuery ( spec )
timeRange := adjustTimeRangeForShift ( spec , qbtypes . TimeRange { From : req . Start , To : req . End } , req . RequestType )
2025-06-23 14:00:50 +05:30
bq := newBuilderQuery ( q . telemetryStore , q . traceStmtBuilder , spec , timeRange , req . RequestType , tmplVars )
2025-06-10 18:26:28 +05:30
queries [ spec . Name ] = bq
steps [ spec . Name ] = spec . StepInterval
2025-05-27 20:54:48 +05:30
case qbtypes . QueryBuilderQuery [ qbtypes . LogAggregation ] :
2025-06-16 23:11:28 +05:30
spec . ShiftBy = extractShiftFromBuilderQuery ( spec )
timeRange := adjustTimeRangeForShift ( spec , qbtypes . TimeRange { From : req . Start , To : req . End } , req . RequestType )
2025-06-23 14:00:50 +05:30
bq := newBuilderQuery ( q . telemetryStore , q . logStmtBuilder , spec , timeRange , req . RequestType , tmplVars )
2025-06-10 18:26:28 +05:30
queries [ spec . Name ] = bq
steps [ spec . Name ] = spec . StepInterval
2025-05-27 20:54:48 +05:30
case qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] :
2025-06-16 23:11:28 +05:30
for i := range spec . Aggregations {
if spec . Aggregations [ i ] . MetricName != "" && spec . Aggregations [ i ] . Temporality == metrictypes . Unknown {
if temp , ok := metricTemporality [ spec . Aggregations [ i ] . MetricName ] ; ok && temp != metrictypes . Unknown {
spec . Aggregations [ i ] . Temporality = temp
}
}
2025-07-12 16:47:59 +05:30
// TODO(srikanthccv): warn when the metric is missing
if spec . Aggregations [ i ] . Temporality == metrictypes . Unknown {
spec . Aggregations [ i ] . Temporality = metrictypes . Unspecified
}
2025-06-16 23:11:28 +05:30
}
spec . ShiftBy = extractShiftFromBuilderQuery ( spec )
timeRange := adjustTimeRangeForShift ( spec , qbtypes . TimeRange { From : req . Start , To : req . End } , req . RequestType )
2025-08-07 16:50:37 +05:30
var bq * builderQuery [ qbtypes . MetricAggregation ]
if spec . Source == telemetrytypes . SourceMeter {
2025-08-24 14:21:45 +05:30
event . Source = telemetrytypes . SourceMeter . StringValue ( )
2025-08-07 16:50:37 +05:30
bq = newBuilderQuery ( q . telemetryStore , q . meterStmtBuilder , spec , timeRange , req . RequestType , tmplVars )
} else {
bq = newBuilderQuery ( q . telemetryStore , q . metricStmtBuilder , spec , timeRange , req . RequestType , tmplVars )
}
2025-06-10 18:26:28 +05:30
queries [ spec . Name ] = bq
steps [ spec . Name ] = spec . StepInterval
2025-05-27 20:54:48 +05:30
default :
return nil , errors . NewInvalidInputf ( errors . CodeInvalidInput , "unsupported builder spec type %T" , query . Spec )
}
}
}
2025-07-28 23:05:44 +05:30
qbResp , qbErr := q . run ( ctx , orgID , queries , req , steps , event )
if qbResp != nil {
qbResp . QBEvent = event
2025-08-18 15:11:53 +05:30
if len ( intervalWarnings ) != 0 && req . RequestType == qbtypes . RequestTypeTimeSeries {
if qbResp . Warning == nil {
qbResp . Warning = & qbtypes . QueryWarnData {
Warnings : make ( [ ] qbtypes . QueryWarnDataAdditional , len ( intervalWarnings ) ) ,
}
for idx := range intervalWarnings {
qbResp . Warning . Warnings [ idx ] = qbtypes . QueryWarnDataAdditional { Message : intervalWarnings [ idx ] }
}
}
}
2025-07-28 23:05:44 +05:30
}
return qbResp , qbErr
2025-05-27 20:54:48 +05:30
}
2025-08-28 11:44:43 +05:30
func ( q * querier ) QueryRawStream ( ctx context . Context , orgID valuer . UUID , req * qbtypes . QueryRangeRequest , client * qbtypes . RawStream ) {
event := & qbtypes . QBEvent {
Version : "v5" ,
NumberOfQueries : len ( req . CompositeQuery . Queries ) ,
PanelType : req . RequestType . StringValue ( ) ,
}
for _ , query := range req . CompositeQuery . Queries {
event . QueryType = query . Type . StringValue ( )
if query . Type == qbtypes . QueryTypeBuilder {
switch spec := query . Spec . ( type ) {
case qbtypes . QueryBuilderQuery [ qbtypes . LogAggregation ] :
event . FilterApplied = spec . Filter != nil && spec . Filter . Expression != ""
default :
// return if it's not log aggregation
client . Error <- errors . NewInvalidInputf ( errors . CodeInvalidInput , "unsupported builder spec type %T" , query . Spec )
return
}
} else {
// return if it's not of type query builder
client . Error <- errors . NewInvalidInputf ( errors . CodeInvalidInput , "unsupported query type %s" , query . Type )
return
}
}
queries := make ( map [ string ] qbtypes . Query )
query := req . CompositeQuery . Queries [ 0 ]
spec := query . Spec . ( qbtypes . QueryBuilderQuery [ qbtypes . LogAggregation ] )
// add the new id to the id filter
if spec . Filter == nil || spec . Filter . Expression == "" {
spec . Filter = & qbtypes . Filter { Expression : "id > $id" }
} else {
spec . Filter . Expression = fmt . Sprintf ( "%s and id > $id" , spec . Filter . Expression )
}
tsStart := req . Start
if tsStart == 0 {
tsStart = uint64 ( time . Now ( ) . UnixNano ( ) )
} else {
tsStart = uint64 ( utils . GetEpochNanoSecs ( int64 ( tsStart ) ) )
}
updatedLogID := ""
ticker := time . NewTicker ( time . Duration ( q . liveDataRefreshSeconds ) * time . Second )
defer ticker . Stop ( )
// we are creating a custom ticker wrapper to trigger it instantly
tick := make ( chan time . Time , 1 )
tick <- time . Now ( ) // initial tick
go func ( ) {
for t := range ticker . C {
tick <- t
}
} ( )
for {
select {
case <- ctx . Done ( ) :
done := true
client . Done <- & done
return
case <- tick :
// timestamp end is not specified here
timeRange := adjustTimeRangeForShift ( spec , qbtypes . TimeRange { From : tsStart } , req . RequestType )
bq := newBuilderQuery ( q . telemetryStore , q . logStmtBuilder , spec , timeRange , req . RequestType , map [ string ] qbtypes . VariableItem {
"id" : {
Value : updatedLogID ,
} ,
} )
queries [ spec . Name ] = bq
qbResp , qbErr := q . run ( ctx , orgID , queries , req , nil , event )
if qbErr != nil {
client . Error <- qbErr
return
}
if qbResp == nil || len ( qbResp . Data . Results ) == 0 || qbResp . Data . Results [ 0 ] == nil {
continue
}
data := qbResp . Data . Results [ 0 ] . ( * qbtypes . RawData )
for i := len ( data . Rows ) - 1 ; i >= 0 ; i -- {
client . Logs <- data . Rows [ i ]
if i == 0 {
tsStart = uint64 ( data . Rows [ i ] . Timestamp . UnixNano ( ) )
updatedLogID = data . Rows [ i ] . Data [ "id" ] . ( string )
}
}
}
}
}
2025-07-28 23:05:44 +05:30
func ( q * querier ) run (
ctx context . Context ,
orgID valuer . UUID ,
qs map [ string ] qbtypes . Query ,
req * qbtypes . QueryRangeRequest ,
steps map [ string ] qbtypes . Step ,
qbEvent * qbtypes . QBEvent ,
) ( * qbtypes . QueryRangeResponse , error ) {
2025-06-10 18:26:28 +05:30
results := make ( map [ string ] any )
warnings := make ( [ ] string , 0 )
2025-08-04 21:02:54 +05:30
warningsDocURL := ""
2025-06-10 18:26:28 +05:30
stats := qbtypes . ExecStats { }
2025-07-28 23:05:44 +05:30
hasData := func ( result * qbtypes . Result ) bool {
if result == nil || result . Value == nil {
return false
}
switch result . Type {
case qbtypes . RequestTypeScalar :
if val , ok := result . Value . ( * qbtypes . ScalarData ) ; ok && val != nil {
return len ( val . Data ) != 0
}
case qbtypes . RequestTypeRaw :
if val , ok := result . Value . ( * qbtypes . RawData ) ; ok && val != nil {
return len ( val . Rows ) != 0
}
case qbtypes . RequestTypeTimeSeries :
if val , ok := result . Value . ( * qbtypes . TimeSeriesData ) ; ok && val != nil {
if len ( val . Aggregations ) != 0 {
anyNonEmpty := false
for _ , aggBucket := range val . Aggregations {
if len ( aggBucket . Series ) != 0 {
anyNonEmpty = true
break
}
}
return anyNonEmpty
}
return false
}
}
return false
}
2025-06-10 18:26:28 +05:30
for name , query := range qs {
// Skip cache if NoCache is set, or if cache is not available
if req . NoCache || q . bucketCache == nil || query . Fingerprint ( ) == "" {
if req . NoCache {
q . logger . DebugContext ( ctx , "NoCache flag set, bypassing cache" , "query" , name )
} else {
q . logger . InfoContext ( ctx , "no bucket cache or fingerprint, executing query" , "fingerprint" , query . Fingerprint ( ) )
}
result , err := query . Execute ( ctx )
2025-07-28 23:05:44 +05:30
qbEvent . HasData = qbEvent . HasData || hasData ( result )
2025-06-10 18:26:28 +05:30
if err != nil {
return nil , err
}
results [ name ] = result . Value
warnings = append ( warnings , result . Warnings ... )
2025-08-04 21:02:54 +05:30
warningsDocURL = result . WarningsDocURL
2025-06-10 18:26:28 +05:30
stats . RowsScanned += result . Stats . RowsScanned
stats . BytesScanned += result . Stats . BytesScanned
stats . DurationMS += result . Stats . DurationMS
} else {
result , err := q . executeWithCache ( ctx , orgID , query , steps [ name ] , req . NoCache )
2025-07-28 23:05:44 +05:30
qbEvent . HasData = qbEvent . HasData || hasData ( result )
2025-06-10 18:26:28 +05:30
if err != nil {
return nil , err
}
2025-08-19 13:01:55 +05:30
switch v := result . Value . ( type ) {
case * qbtypes . TimeSeriesData :
v . QueryName = name
case * qbtypes . ScalarData :
v . QueryName = name
case * qbtypes . RawData :
v . QueryName = name
}
2025-06-10 18:26:28 +05:30
results [ name ] = result . Value
warnings = append ( warnings , result . Warnings ... )
2025-08-04 21:02:54 +05:30
warningsDocURL = result . WarningsDocURL
2025-06-10 18:26:28 +05:30
stats . RowsScanned += result . Stats . RowsScanned
stats . BytesScanned += result . Stats . BytesScanned
stats . DurationMS += result . Stats . DurationMS
2025-05-27 20:54:48 +05:30
}
}
2025-06-10 18:26:28 +05:30
2025-06-16 23:11:28 +05:30
processedResults , err := q . postProcessResults ( ctx , results , req )
if err != nil {
return nil , err
}
2025-08-04 21:02:54 +05:30
resp := & qbtypes . QueryRangeResponse {
2025-06-10 18:26:28 +05:30
Type : req . RequestType ,
2025-08-04 21:02:54 +05:30
Data : qbtypes . QueryData {
Results : maps . Values ( processedResults ) ,
2025-06-10 18:26:28 +05:30
} ,
Meta : struct {
RowsScanned uint64 ` json:"rowsScanned" `
BytesScanned uint64 ` json:"bytesScanned" `
DurationMS uint64 ` json:"durationMs" `
} {
RowsScanned : stats . RowsScanned ,
BytesScanned : stats . BytesScanned ,
DurationMS : stats . DurationMS ,
} ,
2025-08-04 21:02:54 +05:30
}
if len ( warnings ) != 0 {
warns := make ( [ ] qbtypes . QueryWarnDataAdditional , len ( warnings ) )
for i , warning := range warnings {
warns [ i ] = qbtypes . QueryWarnDataAdditional {
Message : warning ,
}
}
2025-08-06 23:05:39 +05:30
resp . Warning = & qbtypes . QueryWarnData {
2025-08-04 21:02:54 +05:30
Message : "Encountered warnings" ,
Url : warningsDocURL ,
Warnings : warns ,
}
}
return resp , nil
2025-05-27 20:54:48 +05:30
}
2025-06-10 18:26:28 +05:30
// executeWithCache executes a query using the bucket cache
func ( q * querier ) executeWithCache ( ctx context . Context , orgID valuer . UUID , query qbtypes . Query , step qbtypes . Step , noCache bool ) ( * qbtypes . Result , error ) {
// Get cached data and missing ranges
cachedResult , missingRanges := q . bucketCache . GetMissRanges ( ctx , orgID , query , step )
// If no missing ranges, return cached result
if len ( missingRanges ) == 0 && cachedResult != nil {
return cachedResult , nil
}
// If entire range is missing, execute normally
if cachedResult == nil && len ( missingRanges ) == 1 {
startMs , endMs := query . Window ( )
if missingRanges [ 0 ] . From == startMs && missingRanges [ 0 ] . To == endMs {
result , err := query . Execute ( ctx )
if err != nil {
return nil , err
}
// Store in cache for future use
2025-06-16 23:11:28 +05:30
q . bucketCache . Put ( ctx , orgID , query , step , result )
2025-06-10 18:26:28 +05:30
return result , nil
}
}
// Execute queries for missing ranges with bounded parallelism
freshResults := make ( [ ] * qbtypes . Result , len ( missingRanges ) )
errors := make ( [ ] error , len ( missingRanges ) )
totalStats := qbtypes . ExecStats { }
2025-06-16 23:11:28 +05:30
q . logger . DebugContext ( ctx , "executing queries for missing ranges" ,
"missing_ranges_count" , len ( missingRanges ) ,
"ranges" , missingRanges )
2025-06-10 18:26:28 +05:30
sem := make ( chan struct { } , 4 )
var wg sync . WaitGroup
for i , timeRange := range missingRanges {
wg . Add ( 1 )
go func ( idx int , tr * qbtypes . TimeRange ) {
defer wg . Done ( )
sem <- struct { } { }
defer func ( ) { <- sem } ( )
// Create a new query with the missing time range
rangedQuery := q . createRangedQuery ( query , * tr )
if rangedQuery == nil {
errors [ idx ] = fmt . Errorf ( "failed to create ranged query for range %d-%d" , tr . From , tr . To )
return
}
// Execute the ranged query
result , err := rangedQuery . Execute ( ctx )
if err != nil {
errors [ idx ] = err
return
}
freshResults [ idx ] = result
} ( i , timeRange )
}
// Wait for all queries to complete
wg . Wait ( )
// Check for errors
for _ , err := range errors {
if err != nil {
// If any query failed, fall back to full execution
q . logger . ErrorContext ( ctx , "parallel query execution failed" , "error" , err )
result , err := query . Execute ( ctx )
if err != nil {
return nil , err
}
2025-06-16 23:11:28 +05:30
q . bucketCache . Put ( ctx , orgID , query , step , result )
2025-06-10 18:26:28 +05:30
return result , nil
}
}
// Calculate total stats and filter out nil results
validResults := make ( [ ] * qbtypes . Result , 0 , len ( freshResults ) )
for _ , result := range freshResults {
if result != nil {
validResults = append ( validResults , result )
totalStats . RowsScanned += result . Stats . RowsScanned
totalStats . BytesScanned += result . Stats . BytesScanned
totalStats . DurationMS += result . Stats . DurationMS
}
}
freshResults = validResults
// Merge cached and fresh results
mergedResult := q . mergeResults ( cachedResult , freshResults )
mergedResult . Stats . RowsScanned += totalStats . RowsScanned
mergedResult . Stats . BytesScanned += totalStats . BytesScanned
mergedResult . Stats . DurationMS += totalStats . DurationMS
// Store merged result in cache
2025-06-16 23:11:28 +05:30
q . bucketCache . Put ( ctx , orgID , query , step , mergedResult )
2025-06-10 18:26:28 +05:30
return mergedResult , nil
}
// createRangedQuery creates a copy of the query with a different time range
func ( q * querier ) createRangedQuery ( originalQuery qbtypes . Query , timeRange qbtypes . TimeRange ) qbtypes . Query {
2025-08-06 23:05:39 +05:30
// this is called in a goroutine, so we create a copy of the query to avoid race conditions
2025-06-10 18:26:28 +05:30
switch qt := originalQuery . ( type ) {
case * promqlQuery :
2025-08-06 23:05:39 +05:30
queryCopy := qt . query . Copy ( )
return newPromqlQuery ( q . logger , q . promEngine , queryCopy , timeRange , qt . requestType , qt . vars )
2025-06-10 18:26:28 +05:30
case * chSQLQuery :
2025-08-06 23:05:39 +05:30
queryCopy := qt . query . Copy ( )
argsCopy := make ( [ ] any , len ( qt . args ) )
copy ( argsCopy , qt . args )
return newchSQLQuery ( q . logger , q . telemetryStore , queryCopy , argsCopy , timeRange , qt . kind , qt . vars )
2025-06-10 18:26:28 +05:30
case * builderQuery [ qbtypes . TraceAggregation ] :
2025-08-06 23:05:39 +05:30
specCopy := qt . spec . Copy ( )
specCopy . ShiftBy = extractShiftFromBuilderQuery ( specCopy )
adjustedTimeRange := adjustTimeRangeForShift ( specCopy , timeRange , qt . kind )
return newBuilderQuery ( q . telemetryStore , q . traceStmtBuilder , specCopy , adjustedTimeRange , qt . kind , qt . variables )
2025-06-10 18:26:28 +05:30
case * builderQuery [ qbtypes . LogAggregation ] :
2025-08-06 23:05:39 +05:30
specCopy := qt . spec . Copy ( )
specCopy . ShiftBy = extractShiftFromBuilderQuery ( specCopy )
adjustedTimeRange := adjustTimeRangeForShift ( specCopy , timeRange , qt . kind )
return newBuilderQuery ( q . telemetryStore , q . logStmtBuilder , specCopy , adjustedTimeRange , qt . kind , qt . variables )
2025-06-10 18:26:28 +05:30
case * builderQuery [ qbtypes . MetricAggregation ] :
2025-08-06 23:05:39 +05:30
specCopy := qt . spec . Copy ( )
specCopy . ShiftBy = extractShiftFromBuilderQuery ( specCopy )
adjustedTimeRange := adjustTimeRangeForShift ( specCopy , timeRange , qt . kind )
2025-08-07 16:50:37 +05:30
if qt . spec . Source == telemetrytypes . SourceMeter {
return newBuilderQuery ( q . telemetryStore , q . meterStmtBuilder , specCopy , adjustedTimeRange , qt . kind , qt . variables )
}
2025-08-06 23:05:39 +05:30
return newBuilderQuery ( q . telemetryStore , q . metricStmtBuilder , specCopy , adjustedTimeRange , qt . kind , qt . variables )
2025-06-10 18:26:28 +05:30
default :
return nil
}
}
// mergeResults merges cached result with fresh results
func ( q * querier ) mergeResults ( cached * qbtypes . Result , fresh [ ] * qbtypes . Result ) * qbtypes . Result {
2025-06-16 23:11:28 +05:30
if cached == nil {
if len ( fresh ) == 1 {
return fresh [ 0 ]
}
if len ( fresh ) == 0 {
return nil
}
// If cached is nil but we have multiple fresh results, we need to merge them
// We need to merge all fresh results properly to avoid duplicates
merged := & qbtypes . Result {
2025-08-04 21:02:54 +05:30
Type : fresh [ 0 ] . Type ,
Stats : fresh [ 0 ] . Stats ,
Warnings : fresh [ 0 ] . Warnings ,
WarningsDocURL : fresh [ 0 ] . WarningsDocURL ,
2025-06-16 23:11:28 +05:30
}
// Merge all fresh results including the first one
switch merged . Type {
case qbtypes . RequestTypeTimeSeries :
// Pass nil as cached value to ensure proper merging of all fresh results
merged . Value = q . mergeTimeSeriesResults ( nil , fresh )
}
return merged
2025-06-10 18:26:28 +05:30
}
// Start with cached result
merged := & qbtypes . Result {
2025-08-04 21:02:54 +05:30
Type : cached . Type ,
Value : cached . Value ,
Stats : cached . Stats ,
Warnings : cached . Warnings ,
WarningsDocURL : cached . WarningsDocURL ,
2025-06-10 18:26:28 +05:30
}
// If no fresh results, return cached
if len ( fresh ) == 0 {
return merged
}
switch merged . Type {
case qbtypes . RequestTypeTimeSeries :
merged . Value = q . mergeTimeSeriesResults ( cached . Value . ( * qbtypes . TimeSeriesData ) , fresh )
}
if len ( fresh ) > 0 {
totalWarnings := len ( merged . Warnings )
for _ , result := range fresh {
totalWarnings += len ( result . Warnings )
}
allWarnings := make ( [ ] string , 0 , totalWarnings )
allWarnings = append ( allWarnings , merged . Warnings ... )
for _ , result := range fresh {
allWarnings = append ( allWarnings , result . Warnings ... )
}
merged . Warnings = allWarnings
}
return merged
}
// mergeTimeSeriesResults merges time series data
func ( q * querier ) mergeTimeSeriesResults ( cachedValue * qbtypes . TimeSeriesData , freshResults [ ] * qbtypes . Result ) * qbtypes . TimeSeriesData {
2025-06-16 23:11:28 +05:30
// Map to store merged series by aggregation index and series key
2025-06-10 18:26:28 +05:30
seriesMap := make ( map [ int ] map [ string ] * qbtypes . TimeSeries )
2025-06-16 23:11:28 +05:30
// Map to store aggregation bucket metadata
bucketMetadata := make ( map [ int ] * qbtypes . AggregationBucket )
2025-06-10 18:26:28 +05:30
2025-06-16 23:11:28 +05:30
// Process cached data if available
if cachedValue != nil && cachedValue . Aggregations != nil {
for _ , aggBucket := range cachedValue . Aggregations {
if seriesMap [ aggBucket . Index ] == nil {
seriesMap [ aggBucket . Index ] = make ( map [ string ] * qbtypes . TimeSeries )
}
if bucketMetadata [ aggBucket . Index ] == nil {
bucketMetadata [ aggBucket . Index ] = aggBucket
}
for _ , series := range aggBucket . Series {
key := qbtypes . GetUniqueSeriesKey ( series . Labels )
if existingSeries , ok := seriesMap [ aggBucket . Index ] [ key ] ; ok {
// Merge values from duplicate series in cached data, avoiding duplicate timestamps
timestampMap := make ( map [ int64 ] bool )
for _ , v := range existingSeries . Values {
timestampMap [ v . Timestamp ] = true
}
// Only add values with new timestamps
for _ , v := range series . Values {
if ! timestampMap [ v . Timestamp ] {
existingSeries . Values = append ( existingSeries . Values , v )
}
}
} else {
// Create a copy to avoid modifying the cached data
seriesCopy := & qbtypes . TimeSeries {
Labels : series . Labels ,
Values : make ( [ ] * qbtypes . TimeSeriesValue , len ( series . Values ) ) ,
}
copy ( seriesCopy . Values , series . Values )
seriesMap [ aggBucket . Index ] [ key ] = seriesCopy
}
}
2025-06-10 18:26:28 +05:30
}
}
// Add fresh series
for _ , result := range freshResults {
freshTS , ok := result . Value . ( * qbtypes . TimeSeriesData )
2025-06-16 23:11:28 +05:30
if ! ok || freshTS == nil || freshTS . Aggregations == nil {
2025-06-10 18:26:28 +05:30
continue
}
for _ , aggBucket := range freshTS . Aggregations {
if seriesMap [ aggBucket . Index ] == nil {
seriesMap [ aggBucket . Index ] = make ( map [ string ] * qbtypes . TimeSeries )
}
2025-06-16 23:11:28 +05:30
// Prefer fresh metadata over cached metadata
if aggBucket . Alias != "" || aggBucket . Meta . Unit != "" {
bucketMetadata [ aggBucket . Index ] = aggBucket
} else if bucketMetadata [ aggBucket . Index ] == nil {
bucketMetadata [ aggBucket . Index ] = aggBucket
}
2025-06-10 18:26:28 +05:30
}
for _ , aggBucket := range freshTS . Aggregations {
for _ , series := range aggBucket . Series {
key := qbtypes . GetUniqueSeriesKey ( series . Labels )
if existingSeries , ok := seriesMap [ aggBucket . Index ] [ key ] ; ok {
2025-06-16 23:11:28 +05:30
// Merge values, avoiding duplicate timestamps
// Create a map to track existing timestamps
timestampMap := make ( map [ int64 ] bool )
for _ , v := range existingSeries . Values {
timestampMap [ v . Timestamp ] = true
}
// Only add values with new timestamps
for _ , v := range series . Values {
if ! timestampMap [ v . Timestamp ] {
existingSeries . Values = append ( existingSeries . Values , v )
}
}
2025-06-10 18:26:28 +05:30
} else {
// New series
seriesMap [ aggBucket . Index ] [ key ] = series
}
}
}
}
result := & qbtypes . TimeSeriesData {
Aggregations : [ ] * qbtypes . AggregationBucket { } ,
}
2025-06-16 23:11:28 +05:30
// Set QueryName from cached or first fresh result
if cachedValue != nil {
result . QueryName = cachedValue . QueryName
} else if len ( freshResults ) > 0 {
if freshTS , ok := freshResults [ 0 ] . Value . ( * qbtypes . TimeSeriesData ) ; ok && freshTS != nil {
result . QueryName = freshTS . QueryName
}
}
2025-06-10 18:26:28 +05:30
for index , series := range seriesMap {
var aggSeries [ ] * qbtypes . TimeSeries
for _ , s := range series {
// Sort values by timestamp
slices . SortFunc ( s . Values , func ( a , b * qbtypes . TimeSeriesValue ) int {
if a . Timestamp < b . Timestamp {
return - 1
}
if a . Timestamp > b . Timestamp {
return 1
}
return 0
} )
aggSeries = append ( aggSeries , s )
}
2025-06-16 23:11:28 +05:30
// Preserve bucket metadata from either cached or fresh results
bucket := & qbtypes . AggregationBucket {
2025-06-10 18:26:28 +05:30
Index : index ,
Series : aggSeries ,
2025-06-16 23:11:28 +05:30
}
if metadata , ok := bucketMetadata [ index ] ; ok {
bucket . Alias = metadata . Alias
bucket . Meta = metadata . Meta
}
result . Aggregations = append ( result . Aggregations , bucket )
2025-06-10 18:26:28 +05:30
}
return result
}