feat(telemetry/meter): added base setup for telemetry meter signal

This commit is contained in:
vikrantgupta25 2025-07-31 00:10:10 +05:30
parent ff3235bd02
commit 71e17a760c
No known key found for this signature in database
GPG Key ID: F8440BDE36411E79
11 changed files with 718 additions and 20 deletions

View File

@ -31,6 +31,7 @@ type querier struct {
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
bucketCache BucketCache
}
@ -44,6 +45,7 @@ func New(
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
bucketCache BucketCache,
) *querier {
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
@ -55,6 +57,7 @@ func New(
traceStmtBuilder: traceStmtBuilder,
logStmtBuilder: logStmtBuilder,
metricStmtBuilder: metricStmtBuilder,
meterStmtBuilder: meterStmtBuilder,
bucketCache: bucketCache,
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
@ -122,6 +123,16 @@ func newProvider(
metricConditionBuilder,
)
// Create meter statement builder
meterFieldMapper := telemetrymetrics.NewFieldMapper()
meterConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper)
meterStmtBuilder := telemetrymeter.NewMeterQueryStatementBuilder(
settings,
telemetryMetadataStore,
meterFieldMapper,
meterConditionBuilder,
)
// Create bucket cache
bucketCache := querier.NewBucketCache(
settings,
@ -139,6 +150,7 @@ func newProvider(
traceStmtBuilder,
logStmtBuilder,
metricStmtBuilder,
meterStmtBuilder,
bucketCache,
), nil
}

View File

@ -0,0 +1,363 @@
package telemetrymeter
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type meterQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil)
func NewMeterQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
) *meterQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter")
metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder)
return &meterQueryStatementBuilder{
logger: metricsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
metricsStatementBuilder: metricsStatementBuilder,
}
}
func (b *meterQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
_ qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
keySelectors := telemetrymetrics.GetKeySelectors(query)
keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
// TODO[vikrantgupta25]: need to adjust this properly for meter metrics (scrape interval for 1D default so step interval should never be less than that!)
start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query)
return b.buildPipelineStatement(ctx, start, end, query, keys, variables)
}
func (b *meterQueryStatementBuilder) buildPipelineStatement(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if b.metricsStatementBuilder.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(ctx, start, end, query, keys, variables)
if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
} else {
// temporal_aggregation_cte
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// spatial_aggregation_cte
frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys)
if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
}
// final SELECT
return b.metricsStatementBuilder.BuildFinalSelect(cteFragments, cteArgs, query)
}
func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any) {
var filterWhere *sqlbuilder.WhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", []any{}
}
sb.SelectMore(col)
}
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", []any{}
}
}
if filterWhere != nil {
sb.AddWhereClause(filterWhere)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *meterQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, keys, variables)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, keys, variables)
}
func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
var filterWhere *sqlbuilder.WhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
sb.SelectMore(col)
}
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", nil, err
}
}
if filterWhere != nil {
sb.AddWhereClause(filterWhere)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
sb.GroupBy("fingerprint", "ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("fingerprint", "ts")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
var filterWhere *sqlbuilder.WhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
baseSb := sqlbuilder.NewSelectBuilder()
baseSb.Select("fingerprint")
baseSb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
baseSb.SelectMore(col)
}
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
baseSb.GTE("unix_milli", start),
baseSb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, _, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", nil, err
}
}
if filterWhere != nil {
baseSb.AddWhereClause(filterWhere)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
baseSb.Where(baseSb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
baseSb.GroupBy("fingerprint", "ts")
baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
baseSb.OrderBy("fingerprint", "ts")
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse)
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
default:
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil
}
}
func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
sb.From("__temporal_aggregation_cte")
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
if query.Aggregations[0].ValueFilter != nil {
sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value))
}
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}

View File

@ -0,0 +1,191 @@
package telemetrymeter
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
// {
// name: "test_cumulative_rate_sum",
// requestType: qbtypes.RequestTypeTimeSeries,
// query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
// Signal: telemetrytypes.SignalMetrics,
// StepInterval: qbtypes.Step{Duration: 30 * time.Second},
// Aggregations: []qbtypes.MetricAggregation{
// {
// MetricName: "signoz_calls_total",
// Type: metrictypes.SumType,
// Temporality: metrictypes.Cumulative,
// TimeAggregation: metrictypes.TimeAggregationRate,
// SpaceAggregation: metrictypes.SpaceAggregationSum,
// },
// },
// Filter: &qbtypes.Filter{
// Expression: "service.name = 'cartservice'",
// },
// Limit: 10,
// GroupBy: []qbtypes.GroupByKey{
// {
// TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
// Name: "service.name",
// },
// },
// },
// },
// expected: qbtypes.Statement{
// Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947360000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947360000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
// Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
// },
// expectedErr: nil,
// },
// {
// name: "test_delta_rate_sum",
// requestType: qbtypes.RequestTypeTimeSeries,
// query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
// Signal: telemetrytypes.SignalMetrics,
// StepInterval: qbtypes.Step{Duration: 30 * time.Second},
// Aggregations: []qbtypes.MetricAggregation{
// {
// MetricName: "signoz_calls_total",
// Type: metrictypes.SumType,
// Temporality: metrictypes.Delta,
// TimeAggregation: metrictypes.TimeAggregationRate,
// SpaceAggregation: metrictypes.SpaceAggregationSum,
// },
// },
// Filter: &qbtypes.Filter{
// Expression: "service.name = 'cartservice'",
// },
// Limit: 10,
// GroupBy: []qbtypes.GroupByKey{
// {
// TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
// Name: "service.name",
// },
// },
// },
// },
// expected: qbtypes.Statement{
// Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_meter.distributed_samples AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
// Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
// },
// expectedErr: nil,
// },
{
name: "test_delta_rate_avg",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_meter.distributed_samples AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
},
// {
// name: "test_gauge_avg_sum",
// requestType: qbtypes.RequestTypeTimeSeries,
// query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
// Signal: telemetrytypes.SignalMetrics,
// StepInterval: qbtypes.Step{Duration: 30 * time.Second},
// Aggregations: []qbtypes.MetricAggregation{
// {
// MetricName: "system.memory.usage",
// Type: metrictypes.GaugeType,
// Temporality: metrictypes.Unspecified,
// TimeAggregation: metrictypes.TimeAggregationAvg,
// SpaceAggregation: metrictypes.SpaceAggregationSum,
// },
// },
// Filter: &qbtypes.Filter{
// Expression: "host.name = 'big-data-node-1'",
// },
// Limit: 10,
// GroupBy: []qbtypes.GroupByKey{
// {
// TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
// Name: "host.name",
// },
// },
// },
// },
// expected: qbtypes.Statement{
// Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
// Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
// },
// expectedErr: nil,
// },
}
fm := telemetrymetrics.NewFieldMapper()
cb := telemetrymetrics.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
if err != nil {
t.Fatalf("failed to load field keys: %v", err)
}
mockMetadataStore.KeysMap = keys
statementBuilder := NewMeterQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@ -0,0 +1,88 @@
package telemetrymeter
import (
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
const (
DBName = "signoz_meter"
SamplesTableName = "distributed_samples"
SamplesLocalTableName = "samples"
SamplesV4Agg1dTableName = "distributed_samples_agg_1d"
SamplesV4Agg1dLocalTableName = "samples_agg_1d"
AttributesMetadataTableName = "distributed_metadata"
AttributesMetadataLocalTableName = "metadata"
)
func AggregationColumnForSamplesTable(
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
var aggregationColumn string
switch temporality {
case metrictypes.Delta:
// for delta metrics, we only support `RATE`/`INCREASE` both of which are sum
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
// we are keeping it here to make sure that query will not be invalid
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(sum)"
}
case metrictypes.Cumulative:
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(max)"
}
case metrictypes.Unspecified:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(sum)"
}
}
return aggregationColumn
}

View File

@ -0,0 +1,34 @@
{
"service.name": [
{
"name": "service.name",
"fieldContext": "resource",
"fieldDataType": "string",
"signal": "metrics"
}
],
"http.request.method": [
{
"name": "http.request.method",
"fieldContext": "attribute",
"fieldDataType": "string",
"signal": "metrics"
}
],
"http.response.status_code": [
{
"name": "http.response.status_code",
"fieldContext": "attribute",
"fieldDataType": "int",
"signal": "metrics"
}
],
"host.name": [
{
"name": "host.name",
"fieldContext": "resource",
"fieldDataType": "string",
"signal": "metrics"
}
]
}

View File

@ -19,23 +19,23 @@ const (
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
)
type metricQueryStatementBuilder struct {
type MetricQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil)
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil)
func NewMetricQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
) *metricQueryStatementBuilder {
) *MetricQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics")
return &metricQueryStatementBuilder{
return &MetricQueryStatementBuilder{
logger: metricsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
@ -43,7 +43,7 @@ func NewMetricQueryStatementBuilder(
}
}
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
@ -71,7 +71,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation])
return keySelectors
}
func (b *metricQueryStatementBuilder) Build(
func (b *MetricQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
@ -79,7 +79,7 @@ func (b *metricQueryStatementBuilder) Build(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
keySelectors := getKeySelectors(query)
keySelectors := GetKeySelectors(query)
keys, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
@ -112,7 +112,7 @@ func (b *metricQueryStatementBuilder) Build(
// we can directly use the quantilesDDMerge function
//
// all of this is true only for delta metrics
func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
func (b *MetricQueryStatementBuilder) CanShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
if q.Aggregations[0].Temporality != metrictypes.Delta {
return false
}
@ -138,7 +138,7 @@ func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilde
return false
}
func (b *metricQueryStatementBuilder) buildPipelineStatement(
func (b *MetricQueryStatementBuilder) buildPipelineStatement(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -199,7 +199,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
return nil, err
}
if b.canShortCircuitDelta(query) {
if b.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
if frag != "" {
@ -229,10 +229,10 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
query.GroupBy = origGroupBy
// final SELECT
return b.buildFinalSelect(cteFragments, cteArgs, query)
return b.BuildFinalSelect(cteFragments, cteArgs, query)
}
func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
@ -280,7 +280,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -343,7 +343,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
}
func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -357,7 +357,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -400,7 +400,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -465,7 +465,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
}
func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
@ -491,7 +491,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *metricQueryStatementBuilder) buildFinalSelect(
func (b *MetricQueryStatementBuilder) BuildFinalSelect(
cteFragments []string,
cteArgs [][]any,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],

View File

@ -331,7 +331,6 @@ type MetricAggregation struct {
// reduce to operator for metric scalar requests
ReduceTo ReduceTo `json:"reduceTo,omitempty"`
}
type Filter struct {
// expression to filter by following the filter syntax
Expression string `json:"expression"`

View File

@ -62,6 +62,12 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err)
}
q.Spec = spec
case telemetrytypes.SignalMeter:
var spec QueryBuilderQuery[MetricAggregation]
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil {
return wrapUnmarshalError(err, "invalid meter builder query spec: %v", err)
}
q.Spec = spec
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,

View File

@ -150,11 +150,12 @@ func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
func (q *QueryBuilderQuery[T]) validateSignal() error {
// Signal validation is handled during unmarshaling in req.go
// Valid signals are: metrics, traces, logs
// Valid signals are: metrics, traces, logs,meter
switch q.Signal {
case telemetrytypes.SignalMetrics,
telemetrytypes.SignalTraces,
telemetrytypes.SignalLogs,
telemetrytypes.SignalMeter,
telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility
return nil
default:

View File

@ -10,5 +10,6 @@ var (
SignalTraces = Signal{valuer.NewString("traces")}
SignalLogs = Signal{valuer.NewString("logs")}
SignalMetrics = Signal{valuer.NewString("metrics")}
SignalMeter = Signal{valuer.NewString("meter")}
SignalUnspecified = Signal{valuer.NewString("")}
)