feat(telemetry/meter): test query range API fixes

This commit is contained in:
vikrantgupta25 2025-07-31 16:45:48 +05:30
parent 73bc95a56a
commit bd762d02e3
No known key found for this signature in database
GPG Key ID: F8440BDE36411E79
4 changed files with 43 additions and 23 deletions

View File

@ -272,7 +272,15 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
var bq *builderQuery[qbtypes.MetricAggregation]
switch spec.Signal {
case telemetrytypes.SignalMeter:
bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
default:
bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
}
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
default:

View File

@ -1072,14 +1072,6 @@ func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValue
sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName))
}
if fieldValueSelector.StartUnixMilli > 0 {
sb.Where(sb.GE("last_reported_unix_milli", fieldValueSelector.StartUnixMilli))
}
if fieldValueSelector.EndUnixMilli > 0 {
sb.Where(sb.LE("first_reported_unix_milli", fieldValueSelector.EndUnixMilli))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
sb.Where(sb.E("attr_string_value", fieldValueSelector.Value))
@ -1194,6 +1186,33 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
return make(map[string]metrictypes.Temporality), nil
}
result := make(map[string]metrictypes.Temporality)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, t.metricsDBName, t.metricsFieldsTblName, metricNames...)
if err != nil {
return nil, err
}
meterMetricsTemporality, err := t.fetchMetricsTemporality(ctx, t.meterDBName, t.meterFieldsTblName, metricNames...)
if err != nil {
return nil, err
}
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if temporality, exists := metricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
if temporality, exists := meterMetricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
result[metricName] = metrictypes.Unknown
}
return result, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, database string, table string, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
// Build query to fetch temporality for all metrics
@ -1245,12 +1264,5 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
result[metricName] = temporality
}
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if _, exists := result[metricName]; !exists {
result[metricName] = metrictypes.Unknown
}
}
return result, nil
}

View File

@ -133,7 +133,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
}
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
@ -213,7 +213,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
@ -276,7 +276,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesTableName))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
baseSb.GTE("unix_milli", start),

View File

@ -50,7 +50,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(max) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(max) AS per_series_value FROM signoz_meter.distributed_samples_agg_1d AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
},
expectedErr: nil,
@ -83,7 +83,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(sum)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(sum)/86400 AS value FROM signoz_meter.distributed_samples_agg_1d AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
},
expectedErr: nil,
@ -116,7 +116,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(sum)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(sum)/86400 AS per_series_value FROM signoz_meter.distributed_samples_agg_1d AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
},
expectedErr: nil,
@ -149,7 +149,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, sum(sum) / sum(count) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, sum(sum) / sum(count) AS per_series_value FROM signoz_meter.distributed_samples_agg_1d AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
},
expectedErr: nil,