2023-09-17 10:40:45 +05:30
package querier
import (
"context"
"fmt"
2023-09-29 18:20:40 +05:30
"strings"
2023-09-17 10:40:45 +05:30
"sync"
2025-03-20 21:01:41 +05:30
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
2025-05-03 18:30:07 +05:30
"github.com/SigNoz/signoz/pkg/valuer"
2023-09-17 10:40:45 +05:30
"go.uber.org/zap"
)
2025-04-28 21:01:35 +05:30
func prepareLogsQuery (
_ context . Context ,
2024-02-12 18:45:21 +05:30
start ,
end int64 ,
builderQuery * v3 . BuilderQuery ,
params * v3 . QueryRangeParamsV3 ,
) ( string , error ) {
query := ""
2025-04-28 21:01:35 +05:30
logsQueryBuilder := logsV4 . PrepareLogsQuery
2024-09-13 17:04:22 +05:30
2024-02-12 18:45:21 +05:30
if params == nil || builderQuery == nil {
return query , fmt . Errorf ( "params and builderQuery cannot be nil" )
}
// for ts query with limit replace it as it is already formed
if params . CompositeQuery . PanelType == v3 . PanelTypeGraph && builderQuery . Limit > 0 && len ( builderQuery . GroupBy ) > 0 {
2024-09-13 17:04:22 +05:30
limitQuery , err := logsQueryBuilder (
2024-02-12 18:45:21 +05:30
start ,
end ,
params . CompositeQuery . QueryType ,
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { GraphLimitQtype : constants . FirstQueryGraphLimit } ,
2024-02-12 18:45:21 +05:30
)
if err != nil {
return query , err
}
2024-09-13 17:04:22 +05:30
placeholderQuery , err := logsQueryBuilder (
2024-02-12 18:45:21 +05:30
start ,
end ,
params . CompositeQuery . QueryType ,
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { GraphLimitQtype : constants . SecondQueryGraphLimit } ,
2024-02-12 18:45:21 +05:30
)
if err != nil {
return query , err
}
query = strings . Replace ( placeholderQuery , "#LIMIT_PLACEHOLDER" , limitQuery , 1 )
return query , err
}
2024-09-13 17:04:22 +05:30
query , err := logsQueryBuilder (
2024-02-12 18:45:21 +05:30
start ,
end ,
params . CompositeQuery . QueryType ,
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { } ,
2024-02-12 18:45:21 +05:30
)
if err != nil {
return query , err
}
return query , err
}
2023-09-17 10:40:45 +05:30
func ( q * querier ) runBuilderQuery (
ctx context . Context ,
2025-05-03 18:30:07 +05:30
orgID valuer . UUID ,
2023-09-17 10:40:45 +05:30
builderQuery * v3 . BuilderQuery ,
params * v3 . QueryRangeParamsV3 ,
cacheKeys map [ string ] string ,
ch chan channelResult ,
wg * sync . WaitGroup ,
) {
defer wg . Done ( )
queryName := builderQuery . QueryName
2024-03-08 21:12:53 +05:30
start := params . Start
end := params . End
if builderQuery . ShiftBy != 0 {
start = start - builderQuery . ShiftBy * 1000
end = end - builderQuery . ShiftBy * 1000
}
2023-09-17 10:40:45 +05:30
if builderQuery . DataSource == v3 . DataSourceLogs {
var query string
var err error
2024-09-20 19:23:01 +05:30
if _ , ok := cacheKeys [ queryName ] ; ! ok || params . NoCache {
zap . L ( ) . Info ( "skipping cache for logs query" , zap . String ( "queryName" , queryName ) , zap . Int64 ( "start" , start ) , zap . Int64 ( "end" , end ) , zap . Int64 ( "step" , builderQuery . StepInterval ) , zap . Bool ( "noCache" , params . NoCache ) , zap . String ( "cacheKey" , cacheKeys [ queryName ] ) )
2025-04-28 21:01:35 +05:30
query , err = prepareLogsQuery ( ctx , start , end , builderQuery , params )
2023-09-17 10:40:45 +05:30
if err != nil {
2024-02-12 18:45:21 +05:30
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : nil }
2023-09-17 10:40:45 +05:30
return
}
2024-02-12 18:45:21 +05:30
series , err := q . execClickHouseQuery ( ctx , query )
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : series }
return
}
2025-05-03 18:30:07 +05:30
misses := q . queryCache . FindMissingTimeRanges ( orgID , start , end , builderQuery . StepInterval , cacheKeys [ queryName ] )
2024-09-20 19:23:01 +05:30
zap . L ( ) . Info ( "cache misses for logs query" , zap . Any ( "misses" , misses ) )
missedSeries := make ( [ ] querycache . CachedSeriesData , 0 )
2025-03-13 10:04:06 +05:30
filteredMissedSeries := make ( [ ] querycache . CachedSeriesData , 0 )
2024-02-12 18:45:21 +05:30
for _ , miss := range misses {
2025-04-28 21:01:35 +05:30
query , err = prepareLogsQuery ( ctx , miss . Start , miss . End , builderQuery , params )
2023-09-17 10:40:45 +05:30
if err != nil {
2024-02-12 18:45:21 +05:30
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : nil }
2023-09-17 10:40:45 +05:30
return
}
2024-02-12 18:45:21 +05:30
series , err := q . execClickHouseQuery ( ctx , query )
2023-09-17 10:40:45 +05:30
if err != nil {
2024-02-12 18:45:21 +05:30
ch <- channelResult {
Err : err ,
Name : queryName ,
Query : query ,
Series : nil ,
}
2023-09-17 10:40:45 +05:30
return
}
2025-03-13 10:04:06 +05:30
filteredSeries , startTime , endTime := common . FilterSeriesPoints ( series , miss . Start , miss . End , builderQuery . StepInterval )
// making sure that empty range doesn't doesn't enter the cache
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
if len ( filteredSeries ) > 0 || len ( series ) == 0 {
filteredMissedSeries = append ( filteredMissedSeries , querycache . CachedSeriesData {
Data : filteredSeries ,
Start : startTime ,
End : endTime ,
} )
}
// for the actual response
2024-09-20 19:23:01 +05:30
missedSeries = append ( missedSeries , querycache . CachedSeriesData {
2025-03-13 10:04:06 +05:30
Data : series ,
2024-09-20 19:23:01 +05:30
Start : miss . Start ,
End : miss . End ,
} )
2023-09-17 10:40:45 +05:30
}
2025-05-03 18:30:07 +05:30
filteredMergedSeries := q . queryCache . MergeWithCachedSeriesDataV2 ( orgID , cacheKeys [ queryName ] , filteredMissedSeries )
q . queryCache . StoreSeriesInCache ( orgID , cacheKeys [ queryName ] , filteredMergedSeries )
2025-03-13 10:04:06 +05:30
2025-05-03 18:30:07 +05:30
mergedSeries := q . queryCache . MergeWithCachedSeriesDataV2 ( orgID , cacheKeys [ queryName ] , missedSeries )
2025-03-13 10:04:06 +05:30
resultSeries := common . GetSeriesFromCachedDataV2 ( mergedSeries , start , end , builderQuery . StepInterval )
2024-02-12 18:45:21 +05:30
ch <- channelResult {
Err : nil ,
Name : queryName ,
2024-09-20 19:23:01 +05:30
Series : resultSeries ,
2024-02-12 18:45:21 +05:30
}
2023-09-17 10:40:45 +05:30
return
}
if builderQuery . DataSource == v3 . DataSourceTraces {
2025-04-28 21:01:35 +05:30
tracesQueryBuilder := tracesV4 . PrepareTracesQuery
2024-11-22 12:00:29 +05:30
2023-09-17 10:40:45 +05:30
var query string
var err error
// for ts query with group by and limit form two queries
if params . CompositeQuery . PanelType == v3 . PanelTypeGraph && builderQuery . Limit > 0 && len ( builderQuery . GroupBy ) > 0 {
2024-11-22 12:00:29 +05:30
limitQuery , err := tracesQueryBuilder (
2024-03-08 21:12:53 +05:30
start ,
end ,
2023-09-17 10:40:45 +05:30
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { GraphLimitQtype : constants . FirstQueryGraphLimit } ,
2023-09-17 10:40:45 +05:30
)
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : limitQuery , Series : nil }
return
}
2024-11-22 12:00:29 +05:30
placeholderQuery , err := tracesQueryBuilder (
2024-03-08 21:12:53 +05:30
start ,
end ,
2023-09-17 10:40:45 +05:30
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { GraphLimitQtype : constants . SecondQueryGraphLimit } ,
2023-09-17 10:40:45 +05:30
)
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : limitQuery , Series : nil }
return
}
2024-12-18 21:07:31 +07:00
query = strings . Replace ( placeholderQuery , "#LIMIT_PLACEHOLDER" , limitQuery , 1 )
2023-09-17 10:40:45 +05:30
} else {
2024-11-22 12:00:29 +05:30
query , err = tracesQueryBuilder (
2024-03-08 21:12:53 +05:30
start ,
end ,
2023-09-17 10:40:45 +05:30
params . CompositeQuery . PanelType ,
builderQuery ,
2025-04-04 23:38:16 +05:30
v3 . QBOptions { } ,
2023-09-17 10:40:45 +05:30
)
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : nil }
return
}
}
series , err := q . execClickHouseQuery ( ctx , query )
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : series }
return
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
2024-09-20 19:23:01 +05:30
if _ , ok := cacheKeys [ queryName ] ; ! ok || params . NoCache {
zap . L ( ) . Info ( "skipping cache for metrics query" , zap . String ( "queryName" , queryName ) , zap . Int64 ( "start" , start ) , zap . Int64 ( "end" , end ) , zap . Int64 ( "step" , builderQuery . StepInterval ) , zap . Bool ( "noCache" , params . NoCache ) , zap . String ( "cacheKey" , cacheKeys [ queryName ] ) )
2025-04-04 23:38:16 +05:30
query , err := metricsV3 . PrepareMetricQuery ( start , end , params . CompositeQuery . QueryType , params . CompositeQuery . PanelType , builderQuery , metricsV3 . Options { } )
2023-09-17 10:40:45 +05:30
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : nil }
return
}
series , err := q . execClickHouseQuery ( ctx , query )
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : series }
return
}
cacheKey := cacheKeys [ queryName ]
2025-05-03 18:30:07 +05:30
misses := q . queryCache . FindMissingTimeRanges ( orgID , start , end , builderQuery . StepInterval , cacheKey )
2024-09-20 19:23:01 +05:30
zap . L ( ) . Info ( "cache misses for metrics query" , zap . Any ( "misses" , misses ) )
missedSeries := make ( [ ] querycache . CachedSeriesData , 0 )
2023-09-17 10:40:45 +05:30
for _ , miss := range misses {
query , err := metricsV3 . PrepareMetricQuery (
2024-09-20 19:23:01 +05:30
miss . Start ,
miss . End ,
2023-09-17 10:40:45 +05:30
params . CompositeQuery . QueryType ,
params . CompositeQuery . PanelType ,
builderQuery ,
metricsV3 . Options { } ,
)
if err != nil {
ch <- channelResult {
Err : err ,
Name : queryName ,
Query : query ,
Series : nil ,
}
return
}
series , err := q . execClickHouseQuery ( ctx , query )
if err != nil {
ch <- channelResult {
Err : err ,
Name : queryName ,
Query : query ,
Series : nil ,
}
return
}
2024-09-20 19:23:01 +05:30
missedSeries = append ( missedSeries , querycache . CachedSeriesData {
Start : miss . Start ,
End : miss . End ,
Data : series ,
} )
2024-02-12 18:45:21 +05:30
}
2025-05-03 18:30:07 +05:30
mergedSeries := q . queryCache . MergeWithCachedSeriesData ( orgID , cacheKey , missedSeries )
2024-09-20 19:23:01 +05:30
resultSeries := common . GetSeriesFromCachedData ( mergedSeries , start , end )
2023-09-17 10:40:45 +05:30
ch <- channelResult {
Err : nil ,
Name : queryName ,
2024-09-20 19:23:01 +05:30
Series : resultSeries ,
2023-09-17 10:40:45 +05:30
}
}
func ( q * querier ) runBuilderExpression (
ctx context . Context ,
2025-05-03 18:30:07 +05:30
orgID valuer . UUID ,
2023-09-17 10:40:45 +05:30
builderQuery * v3 . BuilderQuery ,
params * v3 . QueryRangeParamsV3 ,
cacheKeys map [ string ] string ,
ch chan channelResult ,
wg * sync . WaitGroup ,
) {
defer wg . Done ( )
queryName := builderQuery . QueryName
2024-09-13 16:43:56 +05:30
queries , err := q . builder . PrepareQueries ( params )
2023-09-17 10:40:45 +05:30
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : "" , Series : nil }
return
}
2024-09-20 19:23:01 +05:30
if _ , ok := cacheKeys [ queryName ] ; ! ok || params . NoCache {
zap . L ( ) . Info ( "skipping cache for expression query" , zap . String ( "queryName" , queryName ) , zap . Int64 ( "start" , params . Start ) , zap . Int64 ( "end" , params . End ) , zap . Int64 ( "step" , params . Step ) , zap . Bool ( "noCache" , params . NoCache ) , zap . String ( "cacheKey" , cacheKeys [ queryName ] ) )
2023-09-17 10:40:45 +05:30
query := queries [ queryName ]
series , err := q . execClickHouseQuery ( ctx , query )
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : series }
return
}
cacheKey := cacheKeys [ queryName ]
2024-06-12 12:21:27 +05:30
step := postprocess . StepIntervalForFunction ( params , queryName )
2025-05-03 18:30:07 +05:30
misses := q . queryCache . FindMissingTimeRanges ( orgID , params . Start , params . End , step , cacheKey )
2024-09-20 19:23:01 +05:30
zap . L ( ) . Info ( "cache misses for expression query" , zap . Any ( "misses" , misses ) )
missedSeries := make ( [ ] querycache . CachedSeriesData , 0 )
2023-09-17 10:40:45 +05:30
for _ , miss := range misses {
missQueries , _ := q . builder . PrepareQueries ( & v3 . QueryRangeParamsV3 {
2024-09-20 19:23:01 +05:30
Start : miss . Start ,
End : miss . End ,
2023-09-17 10:40:45 +05:30
Step : params . Step ,
NoCache : params . NoCache ,
CompositeQuery : params . CompositeQuery ,
Variables : params . Variables ,
2024-09-13 16:43:56 +05:30
} )
2023-09-17 10:40:45 +05:30
query := missQueries [ queryName ]
series , err := q . execClickHouseQuery ( ctx , query )
if err != nil {
ch <- channelResult { Err : err , Name : queryName , Query : query , Series : nil }
return
}
2024-09-20 19:23:01 +05:30
missedSeries = append ( missedSeries , querycache . CachedSeriesData {
Start : miss . Start ,
End : miss . End ,
Data : series ,
} )
2024-09-04 10:35:13 +05:30
}
2025-05-03 18:30:07 +05:30
mergedSeries := q . queryCache . MergeWithCachedSeriesData ( orgID , cacheKey , missedSeries )
2023-09-17 10:40:45 +05:30
2024-09-20 19:23:01 +05:30
resultSeries := common . GetSeriesFromCachedData ( mergedSeries , params . Start , params . End )
2024-02-12 18:45:21 +05:30
2023-09-17 10:40:45 +05:30
ch <- channelResult {
Err : nil ,
Name : queryName ,
2024-09-20 19:23:01 +05:30
Series : resultSeries ,
2023-09-17 10:40:45 +05:30
}
}