2025-07-31 00:10:10 +05:30
package telemetrymeter
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type meterQueryStatementBuilder struct {
logger * slog . Logger
metadataStore telemetrytypes . MetadataStore
fm qbtypes . FieldMapper
cb qbtypes . ConditionBuilder
metricsStatementBuilder * telemetrymetrics . MetricQueryStatementBuilder
}
var _ qbtypes . StatementBuilder [ qbtypes . MetricAggregation ] = ( * meterQueryStatementBuilder ) ( nil )
func NewMeterQueryStatementBuilder (
settings factory . ProviderSettings ,
metadataStore telemetrytypes . MetadataStore ,
fieldMapper qbtypes . FieldMapper ,
conditionBuilder qbtypes . ConditionBuilder ,
) * meterQueryStatementBuilder {
metricsSettings := factory . NewScopedProviderSettings ( settings , "github.com/SigNoz/signoz/pkg/telemetrymeter" )
metricsStatementBuilder := telemetrymetrics . NewMetricQueryStatementBuilder ( settings , metadataStore , fieldMapper , conditionBuilder )
return & meterQueryStatementBuilder {
logger : metricsSettings . Logger ( ) ,
metadataStore : metadataStore ,
fm : fieldMapper ,
cb : conditionBuilder ,
metricsStatementBuilder : metricsStatementBuilder ,
}
}
func ( b * meterQueryStatementBuilder ) Build (
ctx context . Context ,
start uint64 ,
end uint64 ,
_ qbtypes . RequestType ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
variables map [ string ] qbtypes . VariableItem ,
) ( * qbtypes . Statement , error ) {
keySelectors := telemetrymetrics . GetKeySelectors ( query )
keys , err := b . metadataStore . GetKeysMulti ( ctx , keySelectors )
if err != nil {
return nil , err
}
// TODO[vikrantgupta25]: need to adjust this properly for meter metrics (scrape interval for 1D default so step interval should never be less than that!)
start , end = querybuilder . AdjustedMetricTimeRange ( start , end , uint64 ( query . StepInterval . Seconds ( ) ) , query )
return b . buildPipelineStatement ( ctx , start , end , query , keys , variables )
}
func ( b * meterQueryStatementBuilder ) buildPipelineStatement (
ctx context . Context ,
start , end uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
keys map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
variables map [ string ] qbtypes . VariableItem ,
) ( * qbtypes . Statement , error ) {
var (
cteFragments [ ] string
cteArgs [ ] [ ] any
)
if b . metricsStatementBuilder . CanShortCircuitDelta ( query ) {
// spatial_aggregation_cte directly for certain delta queries
frag , args := b . buildTemporalAggDeltaFastPath ( ctx , start , end , query , keys , variables )
if frag != "" {
cteFragments = append ( cteFragments , frag )
cteArgs = append ( cteArgs , args )
}
} else {
// temporal_aggregation_cte
if frag , args , err := b . buildTemporalAggregationCTE ( ctx , start , end , query , keys , variables ) ; err != nil {
return nil , err
} else if frag != "" {
cteFragments = append ( cteFragments , frag )
cteArgs = append ( cteArgs , args )
}
// spatial_aggregation_cte
frag , args := b . buildSpatialAggregationCTE ( ctx , start , end , query , keys )
if frag != "" {
cteFragments = append ( cteFragments , frag )
cteArgs = append ( cteArgs , args )
}
}
// final SELECT
return b . metricsStatementBuilder . BuildFinalSelect ( cteFragments , cteArgs , query )
}
func ( b * meterQueryStatementBuilder ) buildTemporalAggDeltaFastPath (
ctx context . Context ,
start , end uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
keys map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
variables map [ string ] qbtypes . VariableItem ,
) ( string , [ ] any ) {
var filterWhere * sqlbuilder . WhereClause
var err error
stepSec := int64 ( query . StepInterval . Seconds ( ) )
sb := sqlbuilder . NewSelectBuilder ( )
sb . SelectMore ( fmt . Sprintf (
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts" ,
stepSec ,
) )
for _ , g := range query . GroupBy {
col , err := b . fm . ColumnExpressionFor ( ctx , & g . TelemetryFieldKey , keys )
if err != nil {
return "" , [ ] any { }
}
sb . SelectMore ( col )
}
aggCol := AggregationColumnForSamplesTable ( query . Aggregations [ 0 ] . Temporality , query . Aggregations [ 0 ] . TimeAggregation , query . Aggregations [ 0 ] . TableHints )
if query . Aggregations [ 0 ] . TimeAggregation == metrictypes . TimeAggregationRate {
aggCol = fmt . Sprintf ( "%s/%d" , aggCol , stepSec )
}
sb . SelectMore ( fmt . Sprintf ( "%s AS value" , aggCol ) )
2025-07-31 16:45:48 +05:30
sb . From ( fmt . Sprintf ( "%s.%s AS points" , DBName , SamplesV4Agg1dTableName ) )
2025-07-31 00:10:10 +05:30
sb . Where (
sb . In ( "metric_name" , query . Aggregations [ 0 ] . MetricName ) ,
sb . GTE ( "unix_milli" , start ) ,
sb . LT ( "unix_milli" , end ) ,
)
if query . Filter != nil && query . Filter . Expression != "" {
filterWhere , _ , err = querybuilder . PrepareWhereClause ( query . Filter . Expression , querybuilder . FilterExprVisitorOpts {
FieldMapper : b . fm ,
ConditionBuilder : b . cb ,
FieldKeys : keys ,
FullTextColumn : & telemetrytypes . TelemetryFieldKey { Name : "labels" } ,
Variables : variables ,
} )
if err != nil {
return "" , [ ] any { }
}
}
if filterWhere != nil {
sb . AddWhereClause ( filterWhere )
}
if query . Aggregations [ 0 ] . Temporality != metrictypes . Unknown {
sb . Where ( sb . ILike ( "temporality" , query . Aggregations [ 0 ] . Temporality . StringValue ( ) ) )
}
sb . GroupBy ( "ts" )
sb . GroupBy ( querybuilder . GroupByKeys ( query . GroupBy ) ... )
q , args := sb . BuildWithFlavor ( sqlbuilder . ClickHouse )
return fmt . Sprintf ( "__spatial_aggregation_cte AS (%s)" , q ) , args
}
func ( b * meterQueryStatementBuilder ) buildTemporalAggregationCTE (
ctx context . Context ,
start , end uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
keys map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
variables map [ string ] qbtypes . VariableItem ,
) ( string , [ ] any , error ) {
if query . Aggregations [ 0 ] . Temporality == metrictypes . Delta {
return b . buildTemporalAggDelta ( ctx , start , end , query , keys , variables )
}
return b . buildTemporalAggCumulativeOrUnspecified ( ctx , start , end , query , keys , variables )
}
func ( b * meterQueryStatementBuilder ) buildTemporalAggDelta (
ctx context . Context ,
start , end uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
keys map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
variables map [ string ] qbtypes . VariableItem ,
) ( string , [ ] any , error ) {
var filterWhere * sqlbuilder . WhereClause
var err error
stepSec := int64 ( query . StepInterval . Seconds ( ) )
sb := sqlbuilder . NewSelectBuilder ( )
sb . Select ( "fingerprint" )
sb . SelectMore ( fmt . Sprintf (
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts" ,
stepSec ,
) )
for _ , g := range query . GroupBy {
col , err := b . fm . ColumnExpressionFor ( ctx , & g . TelemetryFieldKey , keys )
if err != nil {
return "" , nil , err
}
sb . SelectMore ( col )
}
aggCol := AggregationColumnForSamplesTable ( query . Aggregations [ 0 ] . Temporality ,
query . Aggregations [ 0 ] . TimeAggregation , query . Aggregations [ 0 ] . TableHints )
if query . Aggregations [ 0 ] . TimeAggregation == metrictypes . TimeAggregationRate {
aggCol = fmt . Sprintf ( "%s/%d" , aggCol , stepSec )
}
sb . SelectMore ( fmt . Sprintf ( "%s AS per_series_value" , aggCol ) )
2025-07-31 16:45:48 +05:30
sb . From ( fmt . Sprintf ( "%s.%s AS points" , DBName , SamplesV4Agg1dTableName ) )
2025-07-31 00:10:10 +05:30
sb . Where (
sb . In ( "metric_name" , query . Aggregations [ 0 ] . MetricName ) ,
sb . GTE ( "unix_milli" , start ) ,
sb . LT ( "unix_milli" , end ) ,
)
if query . Filter != nil && query . Filter . Expression != "" {
filterWhere , _ , err = querybuilder . PrepareWhereClause ( query . Filter . Expression , querybuilder . FilterExprVisitorOpts {
FieldMapper : b . fm ,
ConditionBuilder : b . cb ,
FieldKeys : keys ,
FullTextColumn : & telemetrytypes . TelemetryFieldKey { Name : "labels" } ,
Variables : variables ,
} )
if err != nil {
return "" , nil , err
}
}
if filterWhere != nil {
sb . AddWhereClause ( filterWhere )
}
if query . Aggregations [ 0 ] . Temporality != metrictypes . Unknown {
sb . Where ( sb . ILike ( "temporality" , query . Aggregations [ 0 ] . Temporality . StringValue ( ) ) )
}
sb . GroupBy ( "fingerprint" , "ts" )
sb . GroupBy ( querybuilder . GroupByKeys ( query . GroupBy ) ... )
sb . OrderBy ( "fingerprint" , "ts" )
q , args := sb . BuildWithFlavor ( sqlbuilder . ClickHouse )
return fmt . Sprintf ( "__temporal_aggregation_cte AS (%s)" , q ) , args , nil
}
func ( b * meterQueryStatementBuilder ) buildTemporalAggCumulativeOrUnspecified (
ctx context . Context ,
start , end uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
keys map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
variables map [ string ] qbtypes . VariableItem ,
) ( string , [ ] any , error ) {
var filterWhere * sqlbuilder . WhereClause
var err error
stepSec := int64 ( query . StepInterval . Seconds ( ) )
baseSb := sqlbuilder . NewSelectBuilder ( )
baseSb . Select ( "fingerprint" )
baseSb . SelectMore ( fmt . Sprintf (
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts" ,
stepSec ,
) )
for _ , g := range query . GroupBy {
col , err := b . fm . ColumnExpressionFor ( ctx , & g . TelemetryFieldKey , keys )
if err != nil {
return "" , nil , err
}
baseSb . SelectMore ( col )
}
aggCol := AggregationColumnForSamplesTable ( query . Aggregations [ 0 ] . Temporality , query . Aggregations [ 0 ] . TimeAggregation , query . Aggregations [ 0 ] . TableHints )
baseSb . SelectMore ( fmt . Sprintf ( "%s AS per_series_value" , aggCol ) )
2025-07-31 16:45:48 +05:30
baseSb . From ( fmt . Sprintf ( "%s.%s AS points" , DBName , SamplesV4Agg1dTableName ) )
2025-07-31 00:10:10 +05:30
baseSb . Where (
baseSb . In ( "metric_name" , query . Aggregations [ 0 ] . MetricName ) ,
baseSb . GTE ( "unix_milli" , start ) ,
baseSb . LT ( "unix_milli" , end ) ,
)
if query . Filter != nil && query . Filter . Expression != "" {
filterWhere , _ , err = querybuilder . PrepareWhereClause ( query . Filter . Expression , querybuilder . FilterExprVisitorOpts {
FieldMapper : b . fm ,
ConditionBuilder : b . cb ,
FieldKeys : keys ,
FullTextColumn : & telemetrytypes . TelemetryFieldKey { Name : "labels" } ,
Variables : variables ,
} )
if err != nil {
return "" , nil , err
}
}
if filterWhere != nil {
baseSb . AddWhereClause ( filterWhere )
}
if query . Aggregations [ 0 ] . Temporality != metrictypes . Unknown {
baseSb . Where ( baseSb . ILike ( "temporality" , query . Aggregations [ 0 ] . Temporality . StringValue ( ) ) )
}
baseSb . GroupBy ( "fingerprint" , "ts" )
baseSb . GroupBy ( querybuilder . GroupByKeys ( query . GroupBy ) ... )
baseSb . OrderBy ( "fingerprint" , "ts" )
innerQuery , innerArgs := baseSb . BuildWithFlavor ( sqlbuilder . ClickHouse )
switch query . Aggregations [ 0 ] . TimeAggregation {
case metrictypes . TimeAggregationRate :
rateExpr := fmt . Sprintf ( telemetrymetrics . RateWithoutNegative , start , start )
wrapped := sqlbuilder . NewSelectBuilder ( )
wrapped . Select ( "ts" )
for _ , g := range query . GroupBy {
wrapped . SelectMore ( fmt . Sprintf ( "`%s`" , g . TelemetryFieldKey . Name ) )
}
wrapped . SelectMore ( fmt . Sprintf ( "%s AS per_series_value" , rateExpr ) )
wrapped . From ( fmt . Sprintf ( "(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)" , innerQuery ) )
q , args := wrapped . BuildWithFlavor ( sqlbuilder . ClickHouse , innerArgs ... )
return fmt . Sprintf ( "__temporal_aggregation_cte AS (%s)" , q ) , args , nil
case metrictypes . TimeAggregationIncrease :
incExpr := fmt . Sprintf ( telemetrymetrics . IncreaseWithoutNegative , start , start )
wrapped := sqlbuilder . NewSelectBuilder ( )
wrapped . Select ( "ts" )
for _ , g := range query . GroupBy {
wrapped . SelectMore ( fmt . Sprintf ( "`%s`" , g . TelemetryFieldKey . Name ) )
}
wrapped . SelectMore ( fmt . Sprintf ( "%s AS per_series_value" , incExpr ) )
wrapped . From ( fmt . Sprintf ( "(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)" , innerQuery ) )
q , args := wrapped . BuildWithFlavor ( sqlbuilder . ClickHouse , innerArgs ... )
return fmt . Sprintf ( "__temporal_aggregation_cte AS (%s)" , q ) , args , nil
default :
return fmt . Sprintf ( "__temporal_aggregation_cte AS (%s)" , innerQuery ) , innerArgs , nil
}
}
func ( b * meterQueryStatementBuilder ) buildSpatialAggregationCTE (
_ context . Context ,
_ uint64 ,
_ uint64 ,
query qbtypes . QueryBuilderQuery [ qbtypes . MetricAggregation ] ,
_ map [ string ] [ ] * telemetrytypes . TelemetryFieldKey ,
) ( string , [ ] any ) {
sb := sqlbuilder . NewSelectBuilder ( )
sb . Select ( "ts" )
for _ , g := range query . GroupBy {
sb . SelectMore ( fmt . Sprintf ( "`%s`" , g . TelemetryFieldKey . Name ) )
}
sb . SelectMore ( fmt . Sprintf ( "%s(per_series_value) AS value" , query . Aggregations [ 0 ] . SpaceAggregation . StringValue ( ) ) )
sb . From ( "__temporal_aggregation_cte" )
sb . Where ( sb . EQ ( "isNaN(per_series_value)" , 0 ) )
if query . Aggregations [ 0 ] . ValueFilter != nil {
sb . Where ( sb . EQ ( "per_series_value" , query . Aggregations [ 0 ] . ValueFilter . Value ) )
}
sb . GroupBy ( "ts" )
sb . GroupBy ( querybuilder . GroupByKeys ( query . GroupBy ) ... )
q , args := sb . BuildWithFlavor ( sqlbuilder . ClickHouse )
return fmt . Sprintf ( "__spatial_aggregation_cte AS (%s)" , q ) , args
}