2024-01-09 22:19:03 +05:30
package v2
import (
"context"
"fmt"
"sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
2024-09-13 17:04:22 +05:30
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
2024-01-09 22:19:03 +05:30
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
2024-09-20 19:23:01 +05:30
"go.signoz.io/signoz/pkg/query-service/common"
2024-05-15 18:52:01 +05:30
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
2024-09-20 19:23:01 +05:30
"go.signoz.io/signoz/pkg/query-service/querycache"
2024-09-12 21:34:27 +05:30
"go.signoz.io/signoz/pkg/query-service/utils"
2024-01-09 22:19:03 +05:30
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
)
type channelResult struct {
Series [ ] * v3 . Series
List [ ] * v3 . Row
Err error
Name string
Query string
}
type querier struct {
cache cache . Cache
reader interfaces . Reader
keyGenerator cache . KeyGenerator
2024-09-20 19:23:01 +05:30
queryCache interfaces . QueryCache
2024-01-09 22:19:03 +05:30
fluxInterval time . Duration
builder * queryBuilder . QueryBuilder
featureLookUp interfaces . FeatureLookup
// used for testing
// TODO(srikanthccv): remove this once we have a proper mock
testingMode bool
queriesExecuted [ ] string
2024-07-15 18:06:39 +05:30
// tuple of start and end time in milliseconds
2024-09-13 17:04:22 +05:30
timeRanges [ ] [ ] int
returnedSeries [ ] * v3 . Series
returnedErr error
2024-09-12 10:58:07 +05:30
UseLogsNewSchema bool
2024-01-09 22:19:03 +05:30
}
type QuerierOptions struct {
Reader interfaces . Reader
Cache cache . Cache
KeyGenerator cache . KeyGenerator
FluxInterval time . Duration
FeatureLookup interfaces . FeatureLookup
// used for testing
2024-09-12 10:58:07 +05:30
TestingMode bool
ReturnedSeries [ ] * v3 . Series
ReturnedErr error
UseLogsNewSchema bool
2024-01-09 22:19:03 +05:30
}
func NewQuerier ( opts QuerierOptions ) interfaces . Querier {
2024-09-13 17:04:22 +05:30
logsQueryBuilder := logsV3 . PrepareLogsQuery
if opts . UseLogsNewSchema {
logsQueryBuilder = logsV4 . PrepareLogsQuery
}
2024-09-20 19:23:01 +05:30
qc := querycache . NewQueryCache ( querycache . WithCache ( opts . Cache ) , querycache . WithFluxInterval ( opts . FluxInterval ) )
2024-01-09 22:19:03 +05:30
return & querier {
cache : opts . Cache ,
2024-09-20 19:23:01 +05:30
queryCache : qc ,
2024-01-09 22:19:03 +05:30
reader : opts . Reader ,
keyGenerator : opts . KeyGenerator ,
fluxInterval : opts . FluxInterval ,
builder : queryBuilder . NewQueryBuilder ( queryBuilder . QueryBuilderOptions {
BuildTraceQuery : tracesV3 . PrepareTracesQuery ,
2024-09-13 17:04:22 +05:30
BuildLogQuery : logsQueryBuilder ,
2024-01-09 22:19:03 +05:30
BuildMetricQuery : metricsV4 . PrepareMetricQuery ,
} , opts . FeatureLookup ) ,
featureLookUp : opts . FeatureLookup ,
2024-09-13 17:04:22 +05:30
testingMode : opts . TestingMode ,
returnedSeries : opts . ReturnedSeries ,
returnedErr : opts . ReturnedErr ,
UseLogsNewSchema : opts . UseLogsNewSchema ,
2024-01-09 22:19:03 +05:30
}
}
2024-08-09 12:32:11 +05:30
// execClickHouseQuery executes the clickhouse query and returns the series list
// if testing mode is enabled, it returns the mocked series list
2024-01-09 22:19:03 +05:30
func ( q * querier ) execClickHouseQuery ( ctx context . Context , query string ) ( [ ] * v3 . Series , error ) {
if q . testingMode && q . reader == nil {
2024-08-09 12:32:11 +05:30
q . queriesExecuted = append ( q . queriesExecuted , query )
2024-01-09 22:19:03 +05:30
return q . returnedSeries , q . returnedErr
}
result , err := q . reader . GetTimeSeriesResultV3 ( ctx , query )
var pointsWithNegativeTimestamps int
// Filter out the points with negative or zero timestamps
for idx := range result {
series := result [ idx ]
points := make ( [ ] v3 . Point , 0 )
for pointIdx := range series . Points {
point := series . Points [ pointIdx ]
2024-06-05 19:33:45 +05:30
if point . Timestamp >= 0 {
2024-01-09 22:19:03 +05:30
points = append ( points , point )
} else {
pointsWithNegativeTimestamps ++
}
}
series . Points = points
}
if pointsWithNegativeTimestamps > 0 {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "found points with negative timestamps for query" , zap . String ( "query" , query ) )
2024-01-09 22:19:03 +05:30
}
return result , err
}
2024-08-09 12:32:11 +05:30
// execPromQuery executes the prom query and returns the series list
// if testing mode is enabled, it returns the mocked series list
2024-01-09 22:19:03 +05:30
func ( q * querier ) execPromQuery ( ctx context . Context , params * model . QueryRangeParams ) ( [ ] * v3 . Series , error ) {
if q . testingMode && q . reader == nil {
2024-08-09 12:32:11 +05:30
q . queriesExecuted = append ( q . queriesExecuted , params . Query )
2024-07-15 18:06:39 +05:30
q . timeRanges = append ( q . timeRanges , [ ] int { int ( params . Start . UnixMilli ( ) ) , int ( params . End . UnixMilli ( ) ) } )
2024-01-09 22:19:03 +05:30
return q . returnedSeries , q . returnedErr
}
promResult , _ , err := q . reader . GetQueryRangeResult ( ctx , params )
if err != nil {
return nil , err
}
matrix , promErr := promResult . Matrix ( )
if promErr != nil {
return nil , promErr
}
var seriesList [ ] * v3 . Series
for _ , v := range matrix {
var s v3 . Series
s . Labels = v . Metric . Copy ( ) . Map ( )
for idx := range v . Floats {
p := v . Floats [ idx ]
s . Points = append ( s . Points , v3 . Point { Timestamp : p . T , Value : p . F } )
}
seriesList = append ( seriesList , & s )
}
return seriesList , nil
}
2024-09-13 16:43:56 +05:30
func ( q * querier ) runBuilderQueries ( ctx context . Context , params * v3 . QueryRangeParamsV3 ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-01-09 22:19:03 +05:30
cacheKeys := q . keyGenerator . GenerateKeys ( params )
2024-10-10 17:02:46 +05:30
now := time . Now ( )
2024-01-09 22:19:03 +05:30
ch := make ( chan channelResult , len ( params . CompositeQuery . BuilderQueries ) )
var wg sync . WaitGroup
for queryName , builderQuery := range params . CompositeQuery . BuilderQueries {
if queryName == builderQuery . Expression {
2024-02-11 00:31:47 +05:30
wg . Add ( 1 )
2024-09-13 16:43:56 +05:30
go q . runBuilderQuery ( ctx , builderQuery , params , cacheKeys , ch , & wg )
2024-01-09 22:19:03 +05:30
}
}
wg . Wait ( )
close ( ch )
2024-10-10 17:02:46 +05:30
zap . L ( ) . Info ( "time taken to run builder queries" , zap . Duration ( "multiQueryDuration" , time . Since ( now ) ) , zap . Int ( "num_queries" , len ( params . CompositeQuery . BuilderQueries ) ) )
2024-01-09 22:19:03 +05:30
results := make ( [ ] * v3 . Result , 0 )
2024-05-15 18:52:01 +05:30
errQueriesByName := make ( map [ string ] error )
2024-01-09 22:19:03 +05:30
var errs [ ] error
for result := range ch {
if result . Err != nil {
errs = append ( errs , result . Err )
2024-05-15 18:52:01 +05:30
errQueriesByName [ result . Name ] = result . Err
2024-01-09 22:19:03 +05:30
continue
}
results = append ( results , & v3 . Result {
QueryName : result . Name ,
Series : result . Series ,
} )
}
var err error
if len ( errs ) > 0 {
err = fmt . Errorf ( "error in builder queries" )
}
2024-06-11 20:10:38 +05:30
return results , errQueriesByName , err
2024-01-09 22:19:03 +05:30
}
2024-06-11 20:10:38 +05:30
func ( q * querier ) runPromQueries ( ctx context . Context , params * v3 . QueryRangeParamsV3 ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-01-09 22:19:03 +05:30
channelResults := make ( chan channelResult , len ( params . CompositeQuery . PromQueries ) )
var wg sync . WaitGroup
cacheKeys := q . keyGenerator . GenerateKeys ( params )
for queryName , promQuery := range params . CompositeQuery . PromQueries {
if promQuery . Disabled {
continue
}
wg . Add ( 1 )
go func ( queryName string , promQuery * v3 . PromQuery ) {
defer wg . Done ( )
2024-07-15 18:06:39 +05:30
cacheKey , ok := cacheKeys [ queryName ]
2024-09-20 19:23:01 +05:30
if ! ok || params . NoCache {
zap . L ( ) . Info ( "skipping cache for metrics prom query" , zap . String ( "queryName" , queryName ) , zap . Int64 ( "start" , params . Start ) , zap . Int64 ( "end" , params . End ) , zap . Int64 ( "step" , params . Step ) , zap . Bool ( "noCache" , params . NoCache ) , zap . String ( "cacheKey" , cacheKeys [ queryName ] ) )
query := metricsV4 . BuildPromQuery ( promQuery , params . Step , params . Start , params . End )
series , err := q . execPromQuery ( ctx , query )
channelResults <- channelResult { Err : err , Name : queryName , Query : query . Query , Series : series }
return
2024-01-09 22:19:03 +05:30
}
2024-09-20 19:23:01 +05:30
misses := q . queryCache . FindMissingTimeRanges ( params . Start , params . End , params . Step , cacheKey )
zap . L ( ) . Info ( "cache misses for metrics prom query" , zap . Any ( "misses" , misses ) )
missedSeries := make ( [ ] querycache . CachedSeriesData , 0 )
2024-01-09 22:19:03 +05:30
for _ , miss := range misses {
2024-09-20 19:23:01 +05:30
query := metricsV4 . BuildPromQuery ( promQuery , params . Step , miss . Start , miss . End )
2024-01-09 22:19:03 +05:30
series , err := q . execPromQuery ( ctx , query )
if err != nil {
channelResults <- channelResult { Err : err , Name : queryName , Query : query . Query , Series : nil }
return
}
2024-09-20 19:23:01 +05:30
missedSeries = append ( missedSeries , querycache . CachedSeriesData {
Data : series ,
Start : miss . Start ,
End : miss . End ,
} )
2024-01-09 22:19:03 +05:30
}
2024-09-20 19:23:01 +05:30
mergedSeries := q . queryCache . MergeWithCachedSeriesData ( cacheKey , missedSeries )
resultSeries := common . GetSeriesFromCachedData ( mergedSeries , params . Start , params . End )
channelResults <- channelResult { Err : nil , Name : queryName , Query : promQuery . Query , Series : resultSeries }
2024-01-09 22:19:03 +05:30
} ( queryName , promQuery )
}
wg . Wait ( )
close ( channelResults )
results := make ( [ ] * v3 . Result , 0 )
2024-05-15 18:52:01 +05:30
errQueriesByName := make ( map [ string ] error )
2024-01-09 22:19:03 +05:30
var errs [ ] error
for result := range channelResults {
if result . Err != nil {
errs = append ( errs , result . Err )
2024-05-15 18:52:01 +05:30
errQueriesByName [ result . Name ] = result . Err
2024-01-09 22:19:03 +05:30
continue
}
results = append ( results , & v3 . Result {
QueryName : result . Name ,
Series : result . Series ,
} )
}
var err error
if len ( errs ) > 0 {
err = fmt . Errorf ( "error in prom queries" )
}
2024-06-11 20:10:38 +05:30
return results , errQueriesByName , err
2024-01-09 22:19:03 +05:30
}
2024-06-11 20:10:38 +05:30
func ( q * querier ) runClickHouseQueries ( ctx context . Context , params * v3 . QueryRangeParamsV3 ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-01-09 22:19:03 +05:30
channelResults := make ( chan channelResult , len ( params . CompositeQuery . ClickHouseQueries ) )
var wg sync . WaitGroup
for queryName , clickHouseQuery := range params . CompositeQuery . ClickHouseQueries {
if clickHouseQuery . Disabled {
continue
}
wg . Add ( 1 )
go func ( queryName string , clickHouseQuery * v3 . ClickHouseQuery ) {
defer wg . Done ( )
series , err := q . execClickHouseQuery ( ctx , clickHouseQuery . Query )
channelResults <- channelResult { Err : err , Name : queryName , Query : clickHouseQuery . Query , Series : series }
} ( queryName , clickHouseQuery )
}
wg . Wait ( )
close ( channelResults )
results := make ( [ ] * v3 . Result , 0 )
2024-05-15 18:52:01 +05:30
errQueriesByName := make ( map [ string ] error )
2024-01-09 22:19:03 +05:30
var errs [ ] error
for result := range channelResults {
if result . Err != nil {
errs = append ( errs , result . Err )
2024-05-15 18:52:01 +05:30
errQueriesByName [ result . Name ] = result . Err
2024-01-09 22:19:03 +05:30
continue
}
results = append ( results , & v3 . Result {
QueryName : result . Name ,
Series : result . Series ,
} )
}
var err error
if len ( errs ) > 0 {
err = fmt . Errorf ( "error in clickhouse queries" )
}
2024-06-11 20:10:38 +05:30
return results , errQueriesByName , err
2024-01-09 22:19:03 +05:30
}
2024-09-13 16:43:56 +05:30
func ( q * querier ) runLogsListQuery ( ctx context . Context , params * v3 . QueryRangeParamsV3 , tsRanges [ ] utils . LogsListTsRange ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-09-12 21:34:27 +05:30
res := make ( [ ] * v3 . Result , 0 )
qName := ""
pageSize := uint64 ( 0 )
// se we are considering only one query
for name , v := range params . CompositeQuery . BuilderQueries {
qName = name
pageSize = v . PageSize
}
data := [ ] * v3 . Row { }
for _ , v := range tsRanges {
params . Start = v . Start
params . End = v . End
params . CompositeQuery . BuilderQueries [ qName ] . PageSize = pageSize - uint64 ( len ( data ) )
2024-09-13 16:43:56 +05:30
queries , err := q . builder . PrepareQueries ( params )
2024-09-12 21:34:27 +05:30
if err != nil {
return nil , nil , err
}
// this will to run only once
for name , query := range queries {
rowList , err := q . reader . GetListResultV3 ( ctx , query )
if err != nil {
errs := [ ] error { err }
errQuriesByName := map [ string ] error {
name : err ,
}
return nil , errQuriesByName , fmt . Errorf ( "encountered multiple errors: %s" , multierr . Combine ( errs ... ) )
}
data = append ( data , rowList ... )
}
// append a filter to the params
if len ( data ) > 0 {
params . CompositeQuery . BuilderQueries [ qName ] . Filters . Items = append ( params . CompositeQuery . BuilderQueries [ qName ] . Filters . Items , v3 . FilterItem {
Key : v3 . AttributeKey {
Key : "id" ,
IsColumn : true ,
DataType : "string" ,
} ,
Operator : v3 . FilterOperatorLessThan ,
Value : data [ len ( data ) - 1 ] . Data [ "id" ] ,
} )
}
if uint64 ( len ( data ) ) >= pageSize {
break
}
}
res = append ( res , & v3 . Result {
QueryName : qName ,
List : data ,
} )
return res , nil , nil
}
2024-09-13 16:43:56 +05:30
func ( q * querier ) runBuilderListQueries ( ctx context . Context , params * v3 . QueryRangeParamsV3 ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-09-12 21:34:27 +05:30
// List query has support for only one query.
if q . UseLogsNewSchema && params . CompositeQuery != nil && len ( params . CompositeQuery . BuilderQueries ) == 1 {
for _ , v := range params . CompositeQuery . BuilderQueries {
// only allow of logs queries with timestamp ordering desc
if v . DataSource == v3 . DataSourceLogs && len ( v . OrderBy ) == 1 && v . OrderBy [ 0 ] . ColumnName == "timestamp" && v . OrderBy [ 0 ] . Order == "desc" {
startEndArr := utils . GetLogsListTsRanges ( params . Start , params . End )
if len ( startEndArr ) > 0 {
2024-09-13 16:43:56 +05:30
return q . runLogsListQuery ( ctx , params , startEndArr )
2024-09-12 21:34:27 +05:30
}
}
}
}
2024-01-09 22:19:03 +05:30
2024-09-13 16:43:56 +05:30
queries , err := q . builder . PrepareQueries ( params )
2024-01-09 22:19:03 +05:30
if err != nil {
2024-06-11 20:10:38 +05:30
return nil , nil , err
2024-01-09 22:19:03 +05:30
}
ch := make ( chan channelResult , len ( queries ) )
var wg sync . WaitGroup
for name , query := range queries {
wg . Add ( 1 )
go func ( name , query string ) {
defer wg . Done ( )
rowList , err := q . reader . GetListResultV3 ( ctx , query )
if err != nil {
2024-09-13 18:01:37 +05:30
ch <- channelResult { Err : err , Name : name , Query : query }
2024-01-09 22:19:03 +05:30
return
}
ch <- channelResult { List : rowList , Name : name , Query : query }
} ( name , query )
}
wg . Wait ( )
close ( ch )
var errs [ ] error
2024-05-15 18:52:01 +05:30
errQuriesByName := make ( map [ string ] error )
2024-01-09 22:19:03 +05:30
res := make ( [ ] * v3 . Result , 0 )
// read values from the channel
for r := range ch {
if r . Err != nil {
errs = append ( errs , r . Err )
2024-05-15 18:52:01 +05:30
errQuriesByName [ r . Name ] = r . Err
2024-01-09 22:19:03 +05:30
continue
}
res = append ( res , & v3 . Result {
QueryName : r . Name ,
List : r . List ,
} )
}
if len ( errs ) != 0 {
2024-06-11 20:10:38 +05:30
return nil , errQuriesByName , fmt . Errorf ( "encountered multiple errors: %s" , multierr . Combine ( errs ... ) )
2024-01-09 22:19:03 +05:30
}
return res , nil , nil
}
2024-08-09 12:32:11 +05:30
// QueryRange is the main function that runs the queries
// and returns the results
2024-09-13 16:43:56 +05:30
func ( q * querier ) QueryRange ( ctx context . Context , params * v3 . QueryRangeParamsV3 ) ( [ ] * v3 . Result , map [ string ] error , error ) {
2024-01-09 22:19:03 +05:30
var results [ ] * v3 . Result
var err error
2024-05-15 18:52:01 +05:30
var errQueriesByName map [ string ] error
2024-01-09 22:19:03 +05:30
if params . CompositeQuery != nil {
switch params . CompositeQuery . QueryType {
case v3 . QueryTypeBuilder :
if params . CompositeQuery . PanelType == v3 . PanelTypeList || params . CompositeQuery . PanelType == v3 . PanelTypeTrace {
2024-09-13 16:43:56 +05:30
results , errQueriesByName , err = q . runBuilderListQueries ( ctx , params )
2024-01-09 22:19:03 +05:30
} else {
2024-09-13 16:43:56 +05:30
results , errQueriesByName , err = q . runBuilderQueries ( ctx , params )
2024-01-09 22:19:03 +05:30
}
2024-05-15 18:52:01 +05:30
// in builder query, the only errors we expose are the ones that exceed the resource limits
// everything else is internal error as they are not actionable by the user
for name , err := range errQueriesByName {
if ! chErrors . IsResourceLimitError ( err ) {
delete ( errQueriesByName , name )
}
}
2024-01-09 22:19:03 +05:30
case v3 . QueryTypePromQL :
2024-06-11 20:10:38 +05:30
results , errQueriesByName , err = q . runPromQueries ( ctx , params )
2024-01-09 22:19:03 +05:30
case v3 . QueryTypeClickHouseSQL :
2024-06-11 20:10:38 +05:30
results , errQueriesByName , err = q . runClickHouseQueries ( ctx , params )
2024-01-09 22:19:03 +05:30
default :
err = fmt . Errorf ( "invalid query type" )
}
}
// return error if the number of series is more than one for value type panel
if params . CompositeQuery . PanelType == v3 . PanelTypeValue {
2024-04-30 09:53:03 +05:30
if len ( results ) > 1 && params . CompositeQuery . EnabledQueries ( ) > 1 {
2024-01-09 22:19:03 +05:30
err = fmt . Errorf ( "there can be only one active query for value type panel" )
} else if len ( results ) == 1 && len ( results [ 0 ] . Series ) > 1 {
err = fmt . Errorf ( "there can be only one result series for value type panel but got %d" , len ( results [ 0 ] . Series ) )
}
}
2024-06-11 20:10:38 +05:30
return results , errQueriesByName , err
2024-01-09 22:19:03 +05:30
}
2024-08-09 12:32:11 +05:30
// QueriesExecuted returns the list of queries executed
// in the last query range call
// used for testing
2024-01-09 22:19:03 +05:30
func ( q * querier ) QueriesExecuted ( ) [ ] string {
return q . queriesExecuted
}
2024-07-15 18:06:39 +05:30
2024-08-09 12:32:11 +05:30
// TimeRanges returns the list of time ranges
// that were used to fetch the data
// used for testing
2024-07-15 18:06:39 +05:30
func ( q * querier ) TimeRanges ( ) [ ] [ ] int {
return q . timeRanges
}