From 9158b25d4d7bbd3f061fcc109b59c918ef4d1238 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Sun, 3 Aug 2025 01:32:23 +0530 Subject: [PATCH] feat(telemetrymeter): deprecate the signal and use aggregation instead --- pkg/apis/fields/api.go | 2 +- pkg/querier/querier.go | 7 +- pkg/querier/signozquerier/provider.go | 2 +- .../app/clickhouseReader/reader.go | 50 +++- pkg/query-service/app/http_handler.go | 2 - pkg/query-service/interfaces/interface.go | 1 - pkg/query-service/model/v3/v3.go | 3 +- pkg/querybuilder/time.go | 17 +- pkg/telemetrymetadata/metadata.go | 19 +- pkg/telemetrymetadata/metadata_test.go | 2 +- pkg/telemetrymeter/statement_builder.go | 16 +- pkg/telemetrymeter/stmt_builder_test.go | 8 +- pkg/telemetrymeter/tables.go | 226 +++++++++++++----- .../querybuildertypesv5/req.go | 6 - .../querybuildertypesv5/validation.go | 1 - pkg/types/telemetrytypes/signal.go | 1 - 16 files changed, 262 insertions(+), 101 deletions(-) diff --git a/pkg/apis/fields/api.go b/pkg/apis/fields/api.go index 25a0362b3232..38c9778bde5c 100644 --- a/pkg/apis/fields/api.go +++ b/pkg/apis/fields/api.go @@ -35,7 +35,7 @@ func NewAPI( telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.SamplesV4Agg1dTableName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ead1ac37653d..95fe304f9256 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -172,7 +172,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != "" event.GroupByApplied = len(spec.GroupBy) > 0 - if spec.Signal == telemetrytypes.SignalMeter { + if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") { spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))} } else { if spec.StepInterval.Seconds() == 0 { @@ -274,10 +274,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) var bq *builderQuery[qbtypes.MetricAggregation] - switch spec.Signal { - case telemetrytypes.SignalMeter: + if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") { bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars) - default: + } else { bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars) } diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index 755547119d7d..9d4cb1dbdae4 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -54,7 +54,7 @@ func newProvider( telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.SamplesV4Agg1dTableName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 5ed350562c50..1476ba96e514 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2740,10 +2740,16 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org response.AttributeKeys = append(response.AttributeKeys, key) } + meterAggregateAttributes, err := r.getMeterAggregateAttributes(ctx, orgID, req) + if err != nil { + return nil, err + } + + response.AttributeKeys = append(response.AttributeKeys, meterAggregateAttributes.AttributeKeys...) return &response, nil } -func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { +func (r *ClickHouseReader) getMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { var response v3.AggregateAttributeResponse // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db query := fmt.Sprintf( @@ -2792,7 +2798,6 @@ func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgI } func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { - var query string var err error var rows driver.Rows @@ -2829,6 +2834,47 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F response.AttributeKeys = append(response.AttributeKeys, key) } + meterKeys, err := r.getMeterAttributeKeys(ctx, req) + if err != nil { + return nil, err + } + + response.AttributeKeys = append(response.AttributeKeys, meterKeys.AttributeKeys...) + return &response, nil +} + +func (r *ClickHouseReader) getMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + // skips the internal attributes i.e attributes starting with __ + query = fmt.Sprintf("SELECT DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name FROM %s.%s WHERE metric_name=$1 AND attr_name ILIKE $2 AND attr_name NOT LIKE '\\_\\_%%'", signozMeterDBName, signozMeterSamplesName) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: attributeKey, + DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72. + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + return &response, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 690db39d9483..69be0b7f18fb 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4213,8 +4213,6 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * switch req.DataSource { case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), orgID, req, false) - case v3.DataSourceMeter: - response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req) case v3.DataSourceLogs: response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 037fb769309e..b378c94a0283 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -50,7 +50,6 @@ type Reader interface { FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) - GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index cb1abd38818b..67bbf65af586 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -22,12 +22,11 @@ const ( DataSourceTraces DataSource = "traces" DataSourceLogs DataSource = "logs" DataSourceMetrics DataSource = "metrics" - DataSourceMeter DataSource = "meter" ) func (d DataSource) Validate() error { switch d { - case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter: + case DataSourceTraces, DataSourceLogs, DataSourceMetrics: return nil default: return fmt.Errorf("invalid data source: %s", d) diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go index 5fceeb2c8893..cb6c18a0b778 100644 --- a/pkg/querybuilder/time.go +++ b/pkg/querybuilder/time.go @@ -68,12 +68,21 @@ func RecommendedStepIntervalForMeter(start, end uint64) uint64 { step := (end - start) / RecommendedNumberOfPoints / 1e9 // for meter queries the minimum step interval allowed is 1 day as this is our granularity - if step < 86400 { - return 86400 + if step < 3600 { + return 3600 } - // return the nearest lower multiple of 86400 ( 1 day ) - recommended := step - step%86400 + // return the nearest lower multiple of 3600 ( 1 hour ) + recommended := step - step%3600 + + // if the time range is greater than 1 month set the step interval to be multiple of 1 day + if end-start >= uint64(30*24*time.Hour.Nanoseconds()) { + if recommended < 86400 { + recommended = 86400 + } else { + recommended = uint64(math.Round(float64(recommended)/86400)) * 86400 + } + } return recommended } diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index abc43509f9ba..8a04b333422d 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -555,6 +555,12 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) } + meterKeys, err := t.getMeterKeys(ctx, fieldKeySelectors) + if err != nil { + return nil, err + } + + keys = append(keys, meterKeys...) return keys, nil } @@ -606,7 +612,7 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors } keys = append(keys, &telemetrytypes.TelemetryFieldKey{ Name: name, - Signal: telemetrytypes.SignalMeter, + Signal: telemetrytypes.SignalMetrics, }) } @@ -634,8 +640,6 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele keys, err = t.getLogsKeys(ctx, selectors) case telemetrytypes.SignalMetrics: keys, err = t.getMetricsKeys(ctx, selectors) - case telemetrytypes.SignalMeter: - keys, err = t.getMeterKeys(ctx, selectors) case telemetrytypes.SignalUnspecified: // get traces keys tracesKeys, err := t.getTracesKeys(ctx, selectors) @@ -1019,6 +1023,13 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu } values.StringValues = append(values.StringValues, stringValue) } + + meterFieldValues, err := t.getMeterFieldValues(ctx, fieldValueSelector) + if err != nil { + return nil, err + } + + values.StringValues = append(values.StringValues, meterFieldValues.StringValues...) return values, nil } @@ -1098,8 +1109,6 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto values, err = t.getLogFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalMetrics: values, err = t.getMetricFieldValues(ctx, fieldValueSelector) - case telemetrytypes.SignalMeter: - values, err = t.getMeterFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalUnspecified: mapOfValues := make(map[any]bool) mapOfRelatedValues := make(map[any]bool) diff --git a/pkg/telemetrymetadata/metadata_test.go b/pkg/telemetrymetadata/metadata_test.go index 19b791db0bb6..b42925cb4dc6 100644 --- a/pkg/telemetrymetadata/metadata_test.go +++ b/pkg/telemetrymetadata/metadata_test.go @@ -44,7 +44,7 @@ func TestGetKeys(t *testing.T) { telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.SamplesV4Agg1dTableName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/telemetrymeter/statement_builder.go b/pkg/telemetrymeter/statement_builder.go index 057d934b66da..6c67c41fb6f4 100644 --- a/pkg/telemetrymeter/statement_builder.go +++ b/pkg/telemetrymeter/statement_builder.go @@ -56,7 +56,6 @@ func (b *meterQueryStatementBuilder) Build( return nil, err } - // TODO[vikrantgupta25]: need to adjust this properly for meter metrics (scrape interval for 1D default so step interval should never be less than that!) start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query) return b.buildPipelineStatement(ctx, start, end, query, keys, variables) @@ -127,13 +126,14 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath( sb.SelectMore(col) } - aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) } sb.SelectMore(fmt.Sprintf("%s AS value", aggCol)) - sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName)) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) sb.Where( sb.In("metric_name", query.Aggregations[0].MetricName), sb.GTE("unix_milli", start), @@ -205,7 +205,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta( sb.SelectMore(col) } - aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) @@ -213,7 +214,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta( sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) - sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName)) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) sb.Where( sb.In("metric_name", query.Aggregations[0].MetricName), sb.GTE("unix_milli", start), @@ -273,10 +274,11 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( baseSb.SelectMore(col) } - aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) - baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName)) + baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) baseSb.Where( baseSb.In("metric_name", query.Aggregations[0].MetricName), baseSb.GTE("unix_milli", start), diff --git a/pkg/telemetrymeter/stmt_builder_test.go b/pkg/telemetrymeter/stmt_builder_test.go index dd879a6f29f7..446e9c693de7 100644 --- a/pkg/telemetrymeter/stmt_builder_test.go +++ b/pkg/telemetrymeter/stmt_builder_test.go @@ -26,7 +26,7 @@ func TestStatementBuilder(t *testing.T) { name: "test_cumulative_rate_sum", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ - Signal: telemetrytypes.SignalMeter, + Signal: telemetrytypes.SignalMetrics, StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, Aggregations: []qbtypes.MetricAggregation{ { @@ -59,7 +59,7 @@ func TestStatementBuilder(t *testing.T) { name: "test_delta_rate_sum", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ - Signal: telemetrytypes.SignalMeter, + Signal: telemetrytypes.SignalMetrics, StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, Aggregations: []qbtypes.MetricAggregation{ { @@ -92,7 +92,7 @@ func TestStatementBuilder(t *testing.T) { name: "test_delta_rate_avg", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ - Signal: telemetrytypes.SignalMeter, + Signal: telemetrytypes.SignalMetrics, StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, Aggregations: []qbtypes.MetricAggregation{ { @@ -125,7 +125,7 @@ func TestStatementBuilder(t *testing.T) { name: "test_gauge_avg_sum", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ - Signal: telemetrytypes.SignalMeter, + Signal: telemetrytypes.SignalMetrics, StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, Aggregations: []qbtypes.MetricAggregation{ { diff --git a/pkg/telemetrymeter/tables.go b/pkg/telemetrymeter/tables.go index 7f040f64f1d4..e2180cbcf8ac 100644 --- a/pkg/telemetrymeter/tables.go +++ b/pkg/telemetrymeter/tables.go @@ -1,84 +1,192 @@ package telemetrymeter import ( + "time" + "github.com/SigNoz/signoz/pkg/types/metrictypes" ) const ( - DBName = "signoz_meter" - SamplesTableName = "distributed_samples" - SamplesLocalTableName = "samples" - SamplesV4Agg1dTableName = "distributed_samples_agg_1d" - SamplesV4Agg1dLocalTableName = "samples_agg_1d" + DBName = "signoz_meter" + SamplesTableName = "distributed_samples" + SamplesLocalTableName = "samples" + SamplesAgg1dTableName = "distributed_samples_agg_1d" + SamplesAgg1dLocalTableName = "samples_agg_1d" ) +var ( + oneMonthInMilliseconds = uint64(time.Hour * 24 * 30) + + // when the query requests for almost 1 day, but not exactly 1 day, we need to add an offset to the end time + // to make sure that we are using the correct table + // this is because the start gets adjusted to the nearest step interval and uses the 5m table for 4m step interval + // leading to time series that doesn't best represent the rate of change + offsetBucket = uint64(1 * time.Hour.Milliseconds()) +) + +// start and end are in milliseconds +// we have two tables for samples +// 1. distributed_samples +// 2. distributed_samples_v4_agg_1d - for queries with time range above or equal to 30 days +// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it +func WhichSamplesTableToUse( + start, end uint64, + metricType metrictypes.Type, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + + // if we have a hint for the table, we need to use it + // the hint will be used to override the default table selection logic + if tableHints != nil { + if tableHints.SamplesTableName != "" { + return tableHints.SamplesTableName + } + } + + // if the time aggregation is count_distinct, we need to use the distributed_samples table + // because the aggregated tables don't support count_distinct + if timeAggregation == metrictypes.TimeAggregationCountDistinct { + return SamplesTableName + } + + if end-start < oneMonthInMilliseconds+offsetBucket { + return SamplesTableName + } + return SamplesAgg1dTableName + +} + func AggregationColumnForSamplesTable( + start, end uint64, + metricType metrictypes.Type, temporality metrictypes.Temporality, timeAggregation metrictypes.TimeAggregation, tableHints *metrictypes.MetricTableHints, ) string { + tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints) var aggregationColumn string switch temporality { case metrictypes.Delta: - // for delta metrics, we only support `RATE`/`INCREASE` both of which are sum - // although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics, - // we are keeping it here to make sure that query will not be invalid - switch timeAggregation { - case metrictypes.TimeAggregationLatest: - aggregationColumn = "anyLast(last)" - case metrictypes.TimeAggregationSum: - aggregationColumn = "sum(sum)" - case metrictypes.TimeAggregationAvg: - aggregationColumn = "sum(sum) / sum(count)" - case metrictypes.TimeAggregationMin: - aggregationColumn = "min(min)" - case metrictypes.TimeAggregationMax: - aggregationColumn = "max(max)" - case metrictypes.TimeAggregationCount: - aggregationColumn = "sum(count)" - // count_distinct is not supported in aggregated tables - case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results - aggregationColumn = "sum(sum)" + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(sum)" + } } - case metrictypes.Cumulative: - // for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is - // used to calculate the sum which is then divided by the window size to get the rate - switch timeAggregation { - case metrictypes.TimeAggregationLatest: - aggregationColumn = "anyLast(last)" - case metrictypes.TimeAggregationSum: - aggregationColumn = "sum(sum)" - case metrictypes.TimeAggregationAvg: - aggregationColumn = "sum(sum) / sum(count)" - case metrictypes.TimeAggregationMin: - aggregationColumn = "min(min)" - case metrictypes.TimeAggregationMax: - aggregationColumn = "max(max)" - case metrictypes.TimeAggregationCount: - aggregationColumn = "sum(count)" - // count_distinct is not supported in aggregated tables - case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results - aggregationColumn = "max(max)" + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(max)" + } } case metrictypes.Unspecified: - switch timeAggregation { - case metrictypes.TimeAggregationLatest: - aggregationColumn = "anyLast(last)" - case metrictypes.TimeAggregationSum: - aggregationColumn = "sum(sum)" - case metrictypes.TimeAggregationAvg: - aggregationColumn = "sum(sum) / sum(count)" - case metrictypes.TimeAggregationMin: - aggregationColumn = "min(min)" - case metrictypes.TimeAggregationMax: - aggregationColumn = "max(max)" - case metrictypes.TimeAggregationCount: - aggregationColumn = "sum(count)" - // count_distinct is not supported in aggregated tables - case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen - aggregationColumn = "sum(sum)" + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(sum)" + } } } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req.go b/pkg/types/querybuildertypes/querybuildertypesv5/req.go index d21a7a6c2f17..50659b53010a 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req.go @@ -62,12 +62,6 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err) } q.Spec = spec - case telemetrytypes.SignalMeter: - var spec QueryBuilderQuery[MetricAggregation] - if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil { - return wrapUnmarshalError(err, "invalid meter builder query spec: %v", err) - } - q.Spec = spec default: return errors.NewInvalidInputf( errors.CodeInvalidInput, diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go index d2958bd8fd77..598edb4969c3 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go @@ -155,7 +155,6 @@ func (q *QueryBuilderQuery[T]) validateSignal() error { case telemetrytypes.SignalMetrics, telemetrytypes.SignalTraces, telemetrytypes.SignalLogs, - telemetrytypes.SignalMeter, telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility return nil default: diff --git a/pkg/types/telemetrytypes/signal.go b/pkg/types/telemetrytypes/signal.go index 20d3ee3f00a5..28c90fa6fd98 100644 --- a/pkg/types/telemetrytypes/signal.go +++ b/pkg/types/telemetrytypes/signal.go @@ -10,6 +10,5 @@ var ( SignalTraces = Signal{valuer.NewString("traces")} SignalLogs = Signal{valuer.NewString("logs")} SignalMetrics = Signal{valuer.NewString("metrics")} - SignalMeter = Signal{valuer.NewString("meter")} SignalUnspecified = Signal{valuer.NewString("")} )