From 71e17a760cfc00eec0b3d7ce1d6173efe9c5fef3 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 31 Jul 2025 00:10:10 +0530 Subject: [PATCH] feat(telemetry/meter): added base setup for telemetry meter signal --- pkg/querier/querier.go | 3 + pkg/querier/signozquerier/provider.go | 12 + pkg/telemetrymeter/statement_builder.go | 363 ++++++++++++++++++ pkg/telemetrymeter/stmt_builder_test.go | 191 +++++++++ pkg/telemetrymeter/tables.go | 88 +++++ pkg/telemetrymeter/testdata/keys_map.json | 34 ++ pkg/telemetrymetrics/statement_builder.go | 36 +- .../querybuildertypesv5/builder_elements.go | 1 - .../querybuildertypesv5/req.go | 6 + .../querybuildertypesv5/validation.go | 3 +- pkg/types/telemetrytypes/signal.go | 1 + 11 files changed, 718 insertions(+), 20 deletions(-) create mode 100644 pkg/telemetrymeter/statement_builder.go create mode 100644 pkg/telemetrymeter/stmt_builder_test.go create mode 100644 pkg/telemetrymeter/tables.go create mode 100644 pkg/telemetrymeter/testdata/keys_map.json diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index db8820f089eb..2c404230de94 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -31,6 +31,7 @@ type querier struct { traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] + meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] bucketCache BucketCache } @@ -44,6 +45,7 @@ func New( traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], + meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], bucketCache BucketCache, ) *querier { querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier") @@ -55,6 +57,7 @@ func New( traceStmtBuilder: traceStmtBuilder, logStmtBuilder: logStmtBuilder, metricStmtBuilder: metricStmtBuilder, + meterStmtBuilder: meterStmtBuilder, bucketCache: bucketCache, } } diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index c9801880c40d..59c2b80fb1c0 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -11,6 +11,7 @@ import ( "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter" "github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrymetadata" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrytraces" @@ -122,6 +123,16 @@ func newProvider( metricConditionBuilder, ) + // Create meter statement builder + meterFieldMapper := telemetrymetrics.NewFieldMapper() + meterConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper) + meterStmtBuilder := telemetrymeter.NewMeterQueryStatementBuilder( + settings, + telemetryMetadataStore, + meterFieldMapper, + meterConditionBuilder, + ) + // Create bucket cache bucketCache := querier.NewBucketCache( settings, @@ -139,6 +150,7 @@ func newProvider( traceStmtBuilder, logStmtBuilder, metricStmtBuilder, + meterStmtBuilder, bucketCache, ), nil } diff --git a/pkg/telemetrymeter/statement_builder.go b/pkg/telemetrymeter/statement_builder.go new file mode 100644 index 000000000000..3c66045070cf --- /dev/null +++ b/pkg/telemetrymeter/statement_builder.go @@ -0,0 +1,363 @@ +package telemetrymeter + +import ( + "context" + "fmt" + "log/slog" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/telemetrymetrics" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type meterQueryStatementBuilder struct { + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder +} + +var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil) + +func NewMeterQueryStatementBuilder( + settings factory.ProviderSettings, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, +) *meterQueryStatementBuilder { + metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter") + metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder) + + return &meterQueryStatementBuilder{ + logger: metricsSettings.Logger(), + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + metricsStatementBuilder: metricsStatementBuilder, + } +} + +func (b *meterQueryStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + _ qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + variables map[string]qbtypes.VariableItem, +) (*qbtypes.Statement, error) { + keySelectors := telemetrymetrics.GetKeySelectors(query) + keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + // TODO[vikrantgupta25]: need to adjust this properly for meter metrics (scrape interval for 1D default so step interval should never be less than that!) + start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query) + + return b.buildPipelineStatement(ctx, start, end, query, keys, variables) +} + +func (b *meterQueryStatementBuilder) buildPipelineStatement( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (*qbtypes.Statement, error) { + var ( + cteFragments []string + cteArgs [][]any + ) + + if b.metricsStatementBuilder.CanShortCircuitDelta(query) { + // spatial_aggregation_cte directly for certain delta queries + frag, args := b.buildTemporalAggDeltaFastPath(ctx, start, end, query, keys, variables) + if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } else { + // temporal_aggregation_cte + if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, variables); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + // spatial_aggregation_cte + frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys) + if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } + + // final SELECT + return b.metricsStatementBuilder.BuildFinalSelect(cteFragments, cteArgs, query) +} + +func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any) { + var filterWhere *sqlbuilder.WhereClause + var err error + stepSec := int64(query.StepInterval.Seconds()) + + sb := sqlbuilder.NewSelectBuilder() + + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", []any{} + } + sb.SelectMore(col) + } + + aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + sb.SelectMore(fmt.Sprintf("%s AS value", aggCol)) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName)) + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", []any{} + } + } + if filterWhere != nil { + sb.AddWhereClause(filterWhere) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + sb.GroupBy("ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args +} + +func (b *meterQueryStatementBuilder) buildTemporalAggregationCTE( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + if query.Aggregations[0].Temporality == metrictypes.Delta { + return b.buildTemporalAggDelta(ctx, start, end, query, keys, variables) + } + return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, keys, variables) +} + +func (b *meterQueryStatementBuilder) buildTemporalAggDelta( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + var filterWhere *sqlbuilder.WhereClause + var err error + + stepSec := int64(query.StepInterval.Seconds()) + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("fingerprint") + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", nil, err + } + sb.SelectMore(col) + } + + aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, + query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName)) + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", nil, err + } + } + if filterWhere != nil { + sb.AddWhereClause(filterWhere) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + + sb.GroupBy("fingerprint", "ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + sb.OrderBy("fingerprint", "ts") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil +} + +func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + var filterWhere *sqlbuilder.WhereClause + var err error + stepSec := int64(query.StepInterval.Seconds()) + + baseSb := sqlbuilder.NewSelectBuilder() + baseSb.Select("fingerprint") + baseSb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", nil, err + } + baseSb.SelectMore(col) + } + + aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName)) + baseSb.Where( + baseSb.In("metric_name", query.Aggregations[0].MetricName), + baseSb.GTE("unix_milli", start), + baseSb.LT("unix_milli", end), + ) + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", nil, err + } + } + if filterWhere != nil { + baseSb.AddWhereClause(filterWhere) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + baseSb.Where(baseSb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + baseSb.GroupBy("fingerprint", "ts") + baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + baseSb.OrderBy("fingerprint", "ts") + + innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse) + + switch query.Aggregations[0].TimeAggregation { + case metrictypes.TimeAggregationRate: + rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + + case metrictypes.TimeAggregationIncrease: + incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + default: + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil + } +} + +func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE( + _ context.Context, + _ uint64, + _ uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any) { + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("ts") + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue())) + sb.From("__temporal_aggregation_cte") + sb.Where(sb.EQ("isNaN(per_series_value)", 0)) + if query.Aggregations[0].ValueFilter != nil { + sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value)) + } + sb.GroupBy("ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args +} diff --git a/pkg/telemetrymeter/stmt_builder_test.go b/pkg/telemetrymeter/stmt_builder_test.go new file mode 100644 index 000000000000..02ec8e743997 --- /dev/null +++ b/pkg/telemetrymeter/stmt_builder_test.go @@ -0,0 +1,191 @@ +package telemetrymeter + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" + "github.com/SigNoz/signoz/pkg/telemetrymetrics" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func TestStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] + expected qbtypes.Statement + expectedErr error + }{ + // { + // name: "test_cumulative_rate_sum", + // requestType: qbtypes.RequestTypeTimeSeries, + // query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + // Signal: telemetrytypes.SignalMetrics, + // StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + // Aggregations: []qbtypes.MetricAggregation{ + // { + // MetricName: "signoz_calls_total", + // Type: metrictypes.SumType, + // Temporality: metrictypes.Cumulative, + // TimeAggregation: metrictypes.TimeAggregationRate, + // SpaceAggregation: metrictypes.SpaceAggregationSum, + // }, + // }, + // Filter: &qbtypes.Filter{ + // Expression: "service.name = 'cartservice'", + // }, + // Limit: 10, + // GroupBy: []qbtypes.GroupByKey{ + // { + // TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + // Name: "service.name", + // }, + // }, + // }, + // }, + // expected: qbtypes.Statement{ + // Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947360000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947360000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + // Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0}, + // }, + // expectedErr: nil, + // }, + // { + // name: "test_delta_rate_sum", + // requestType: qbtypes.RequestTypeTimeSeries, + // query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + // Signal: telemetrytypes.SignalMetrics, + // StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + // Aggregations: []qbtypes.MetricAggregation{ + // { + // MetricName: "signoz_calls_total", + // Type: metrictypes.SumType, + // Temporality: metrictypes.Delta, + // TimeAggregation: metrictypes.TimeAggregationRate, + // SpaceAggregation: metrictypes.SpaceAggregationSum, + // }, + // }, + // Filter: &qbtypes.Filter{ + // Expression: "service.name = 'cartservice'", + // }, + // Limit: 10, + // GroupBy: []qbtypes.GroupByKey{ + // { + // TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + // Name: "service.name", + // }, + // }, + // }, + // }, + // expected: qbtypes.Statement{ + // Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_meter.distributed_samples AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + // Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)}, + // }, + // expectedErr: nil, + // }, + { + name: "test_delta_rate_avg", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Delta, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationAvg, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_meter.distributed_samples AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)}, + }, + expectedErr: nil, + }, + // { + // name: "test_gauge_avg_sum", + // requestType: qbtypes.RequestTypeTimeSeries, + // query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + // Signal: telemetrytypes.SignalMetrics, + // StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + // Aggregations: []qbtypes.MetricAggregation{ + // { + // MetricName: "system.memory.usage", + // Type: metrictypes.GaugeType, + // Temporality: metrictypes.Unspecified, + // TimeAggregation: metrictypes.TimeAggregationAvg, + // SpaceAggregation: metrictypes.SpaceAggregationSum, + // }, + // }, + // Filter: &qbtypes.Filter{ + // Expression: "host.name = 'big-data-node-1'", + // }, + // Limit: 10, + // GroupBy: []qbtypes.GroupByKey{ + // { + // TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + // Name: "host.name", + // }, + // }, + // }, + // }, + // expected: qbtypes.Statement{ + // Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte", + // Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0}, + // }, + // expectedErr: nil, + // }, + } + + fm := telemetrymetrics.NewFieldMapper() + cb := telemetrymetrics.NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json") + if err != nil { + t.Fatalf("failed to load field keys: %v", err) + } + mockMetadataStore.KeysMap = keys + + statementBuilder := NewMeterQueryStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + ) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} diff --git a/pkg/telemetrymeter/tables.go b/pkg/telemetrymeter/tables.go new file mode 100644 index 000000000000..ee58a5077da2 --- /dev/null +++ b/pkg/telemetrymeter/tables.go @@ -0,0 +1,88 @@ +package telemetrymeter + +import ( + "github.com/SigNoz/signoz/pkg/types/metrictypes" +) + +const ( + DBName = "signoz_meter" + SamplesTableName = "distributed_samples" + SamplesLocalTableName = "samples" + SamplesV4Agg1dTableName = "distributed_samples_agg_1d" + SamplesV4Agg1dLocalTableName = "samples_agg_1d" + AttributesMetadataTableName = "distributed_metadata" + AttributesMetadataLocalTableName = "metadata" +) + +func AggregationColumnForSamplesTable( + temporality metrictypes.Temporality, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + var aggregationColumn string + switch temporality { + case metrictypes.Delta: + // for delta metrics, we only support `RATE`/`INCREASE` both of which are sum + // although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics, + // we are keeping it here to make sure that query will not be invalid + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(sum)" + } + + case metrictypes.Cumulative: + // for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is + // used to calculate the sum which is then divided by the window size to get the rate + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(max)" + } + + case metrictypes.Unspecified: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(sum)" + } + + } + return aggregationColumn +} diff --git a/pkg/telemetrymeter/testdata/keys_map.json b/pkg/telemetrymeter/testdata/keys_map.json new file mode 100644 index 000000000000..d1c27a3bf460 --- /dev/null +++ b/pkg/telemetrymeter/testdata/keys_map.json @@ -0,0 +1,34 @@ +{ + "service.name": [ + { + "name": "service.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.request.method": [ + { + "name": "http.request.method", + "fieldContext": "attribute", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.response.status_code": [ + { + "name": "http.response.status_code", + "fieldContext": "attribute", + "fieldDataType": "int", + "signal": "metrics" + } + ], + "host.name": [ + { + "name": "host.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ] +} \ No newline at end of file diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go index e8ffeabb3a5c..c95cbdd1a520 100644 --- a/pkg/telemetrymetrics/statement_builder.go +++ b/pkg/telemetrymetrics/statement_builder.go @@ -19,23 +19,23 @@ const ( IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))` ) -type metricQueryStatementBuilder struct { +type MetricQueryStatementBuilder struct { logger *slog.Logger metadataStore telemetrytypes.MetadataStore fm qbtypes.FieldMapper cb qbtypes.ConditionBuilder } -var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil) +var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil) func NewMetricQueryStatementBuilder( settings factory.ProviderSettings, metadataStore telemetrytypes.MetadataStore, fieldMapper qbtypes.FieldMapper, conditionBuilder qbtypes.ConditionBuilder, -) *metricQueryStatementBuilder { +) *MetricQueryStatementBuilder { metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics") - return &metricQueryStatementBuilder{ + return &MetricQueryStatementBuilder{ logger: metricsSettings.Logger(), metadataStore: metadataStore, fm: fieldMapper, @@ -43,7 +43,7 @@ func NewMetricQueryStatementBuilder( } } -func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { +func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { var keySelectors []*telemetrytypes.FieldKeySelector if query.Filter != nil && query.Filter.Expression != "" { whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) @@ -71,7 +71,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) return keySelectors } -func (b *metricQueryStatementBuilder) Build( +func (b *MetricQueryStatementBuilder) Build( ctx context.Context, start uint64, end uint64, @@ -79,7 +79,7 @@ func (b *metricQueryStatementBuilder) Build( query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], variables map[string]qbtypes.VariableItem, ) (*qbtypes.Statement, error) { - keySelectors := getKeySelectors(query) + keySelectors := GetKeySelectors(query) keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) if err != nil { return nil, err @@ -112,7 +112,7 @@ func (b *metricQueryStatementBuilder) Build( // we can directly use the quantilesDDMerge function // // all of this is true only for delta metrics -func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { +func (b *MetricQueryStatementBuilder) CanShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { if q.Aggregations[0].Temporality != metrictypes.Delta { return false } @@ -138,7 +138,7 @@ func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilde return false } -func (b *metricQueryStatementBuilder) buildPipelineStatement( +func (b *MetricQueryStatementBuilder) buildPipelineStatement( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -199,7 +199,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( return nil, err } - if b.canShortCircuitDelta(query) { + if b.CanShortCircuitDelta(query) { // spatial_aggregation_cte directly for certain delta queries frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs) if frag != "" { @@ -229,10 +229,10 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( query.GroupBy = origGroupBy // final SELECT - return b.buildFinalSelect(cteFragments, cteArgs, query) + return b.BuildFinalSelect(cteFragments, cteArgs, query) } -func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( +func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath( start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], timeSeriesCTE string, @@ -280,7 +280,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args } -func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( +func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -343,7 +343,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil } -func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( +func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -357,7 +357,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs) } -func (b *metricQueryStatementBuilder) buildTemporalAggDelta( +func (b *MetricQueryStatementBuilder) buildTemporalAggDelta( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -400,7 +400,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta( return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil } -func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( +func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -465,7 +465,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( } } -func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( +func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE( _ context.Context, _ uint64, _ uint64, @@ -491,7 +491,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args } -func (b *metricQueryStatementBuilder) buildFinalSelect( +func (b *MetricQueryStatementBuilder) BuildFinalSelect( cteFragments []string, cteArgs [][]any, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index c303c203647e..bc607fc7a765 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -331,7 +331,6 @@ type MetricAggregation struct { // reduce to operator for metric scalar requests ReduceTo ReduceTo `json:"reduceTo,omitempty"` } - type Filter struct { // expression to filter by following the filter syntax Expression string `json:"expression"` diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req.go b/pkg/types/querybuildertypes/querybuildertypesv5/req.go index 50659b53010a..d21a7a6c2f17 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req.go @@ -62,6 +62,12 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err) } q.Spec = spec + case telemetrytypes.SignalMeter: + var spec QueryBuilderQuery[MetricAggregation] + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil { + return wrapUnmarshalError(err, "invalid meter builder query spec: %v", err) + } + q.Spec = spec default: return errors.NewInvalidInputf( errors.CodeInvalidInput, diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go index d8679a49c121..d2958bd8fd77 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go @@ -150,11 +150,12 @@ func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error { func (q *QueryBuilderQuery[T]) validateSignal() error { // Signal validation is handled during unmarshaling in req.go - // Valid signals are: metrics, traces, logs + // Valid signals are: metrics, traces, logs,meter switch q.Signal { case telemetrytypes.SignalMetrics, telemetrytypes.SignalTraces, telemetrytypes.SignalLogs, + telemetrytypes.SignalMeter, telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility return nil default: diff --git a/pkg/types/telemetrytypes/signal.go b/pkg/types/telemetrytypes/signal.go index 28c90fa6fd98..20d3ee3f00a5 100644 --- a/pkg/types/telemetrytypes/signal.go +++ b/pkg/types/telemetrytypes/signal.go @@ -10,5 +10,6 @@ var ( SignalTraces = Signal{valuer.NewString("traces")} SignalLogs = Signal{valuer.NewString("logs")} SignalMetrics = Signal{valuer.NewString("metrics")} + SignalMeter = Signal{valuer.NewString("meter")} SignalUnspecified = Signal{valuer.NewString("")} )