fix: incorrect query prepared for group by body.{key} (#8823)

This commit is contained in:
Srikanth Chekuri 2025-08-18 15:11:53 +05:30 committed by GitHub
parent 7029233596
commit 8f833fa62c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 221 additions and 22 deletions

View File

@ -110,6 +110,10 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
if req.RequestType == qbtypes.RequestTypeTimeSeries && req.FormatOptions != nil && req.FormatOptions.FillGaps {
for name := range typedResults {
if req.SkipFillGaps(name) {
continue
}
funcs := []qbtypes.Function{{Name: qbtypes.FunctionNameFillZero}}
funcs = q.prepareFillZeroArgsWithStep(funcs, req, req.StepIntervalForQuery(name))
// empty time series if it doesn't exist

View File

@ -23,6 +23,10 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
intervalWarn = "Query %s is requesting aggregation interval %v seconds, which is smaller than the minimum allowed interval of %v seconds for selected time range. Using the minimum instead"
)
type querier struct {
logger *slog.Logger
telemetryStore telemetrystore.TelemetryStore
@ -121,6 +125,8 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
PanelType: req.RequestType.StringValue(),
}
intervalWarnings := []string{}
// First pass: collect all metric names that need temporality
metricNames := make([]string, 0)
for idx, query := range req.CompositeQuery.Queries {
@ -147,9 +153,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{
newStep := qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
}
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Duration.Seconds()))
spec.StepInterval = newStep
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
@ -162,9 +170,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{
newStep := qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)),
}
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Duration.Seconds()))
spec.StepInterval = newStep
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
@ -181,9 +191,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{
newStep := qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
}
intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Duration.Seconds()))
spec.StepInterval = newStep
}
}
req.CompositeQuery.Queries[idx].Spec = spec
@ -290,6 +302,16 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
qbResp, qbErr := q.run(ctx, orgID, queries, req, steps, event)
if qbResp != nil {
qbResp.QBEvent = event
if len(intervalWarnings) != 0 && req.RequestType == qbtypes.RequestTypeTimeSeries {
if qbResp.Warning == nil {
qbResp.Warning = &qbtypes.QueryWarnData{
Warnings: make([]qbtypes.QueryWarnDataAdditional, len(intervalWarnings)),
}
for idx := range intervalWarnings {
qbResp.Warning.Warnings[idx] = qbtypes.QueryWarnDataAdditional{Message: intervalWarnings[idx]}
}
}
}
}
return qbResp, qbErr
}

View File

@ -220,7 +220,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i := 0; i < len(args)-1; i++ {
origVal := args[i].String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
if err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal)
}
@ -238,9 +238,9 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i, arg := range args {
orig := arg.String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
if err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", orig)
return err
}
v.chArgs = append(v.chArgs, exprArgs...)
newCol := expr

View File

@ -23,6 +23,8 @@ func CollisionHandledFinalExpr(
cb qbtypes.ConditionBuilder,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
requiredDataType telemetrytypes.FieldDataType,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) (string, []any, error) {
if requiredDataType != telemetrytypes.FieldDataTypeString &&
@ -100,7 +102,15 @@ func CollisionHandledFinalExpr(
if err != nil {
return "", nil, err
}
if strings.HasPrefix(field.Name, jsonBodyPrefix) && jsonBodyPrefix != "" && jsonKeyToKey != nil {
// TODO(nitya): enable group by on body column?
return "", nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "Group by/Aggregation isn't available for the body column")
// colName, _ = jsonKeyToKey(context.Background(), field, qbtypes.FilterOperatorUnknown, dummyValue)
} else {
colName, _ = telemetrytypes.DataTypeCollisionHandledFieldName(field, dummyValue, colName)
}
stmts = append(stmts, colName)
}

View File

@ -43,13 +43,16 @@ func QueryStringToKeysSelectors(query string) []*telemetrytypes.FieldKeySelector
FieldDataType: key.FieldDataType,
})
if key.FieldContext != telemetrytypes.FieldContextUnspecified {
if key.FieldContext == telemetrytypes.FieldContextLog ||
key.FieldContext == telemetrytypes.FieldContextSpan ||
key.FieldContext == telemetrytypes.FieldContextMetric ||
key.FieldContext == telemetrytypes.FieldContextTrace {
// span.kind in metrics or metric.max_count in span etc.. should get the search on span.kind
// see note in where_clause_visitor.go in VisitKey(...)
keys = append(keys, &telemetrytypes.FieldKeySelector{
Name: key.FieldContext.StringValue() + "." + key.Name,
Signal: key.Signal,
FieldContext: key.FieldContext,
FieldContext: telemetrytypes.FieldContextAttribute, // do not keep the original context because this is attribute
FieldDataType: key.FieldDataType,
})
}

View File

@ -32,12 +32,6 @@ func TestQueryToKeys(t *testing.T) {
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
{
Name: "resource.service.name",
Signal: telemetrytypes.SignalUnspecified,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
},
},
{

View File

@ -303,7 +303,7 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
if err != nil {
return nil, err
}
@ -449,7 +449,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
if err != nil {
return nil, err
}

View File

@ -355,3 +355,77 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
})
}
}
func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
expected qbtypes.Statement
expectedErrContains string
}{
{
name: "Time series with limit and body group by",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "body.status",
},
},
},
},
expectedErrContains: "Group by/Aggregation isn't available for the body column",
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErrContains != "" {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErrContains)
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@ -202,7 +202,8 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
// the span_attribute_keys has historically pushed the top level column as attributes
sb.Where(sb.Or(conds...)).Where("isColumn = false")
sb.GroupBy("tagKey", "tagType", "dataType")
if limit == 0 {
limit = 1000
@ -403,7 +404,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
sb := sqlbuilder.Select(
"name AS tag_key",
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
"datatype AS tag_data_type",
"lower(datatype) AS tag_data_type", // in logs, we had some historical data with capital and small case
fmt.Sprintf(`%d AS priority`, getPriorityForContext(fieldContext)),
).From(tblName)

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"os"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
@ -17,6 +18,48 @@ import (
const (
RateWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
RateWithInterpolation = `
CASE
WHEN row_number() OVER rate_window = 1 THEN
-- First row: try to interpolate using next value
CASE
WHEN leadInFrame(per_series_value, 1) OVER rate_window IS NOT NULL THEN
-- Assume linear growth to next point
(leadInFrame(per_series_value, 1) OVER rate_window - per_series_value) /
(leadInFrame(ts, 1) OVER rate_window - ts)
ELSE
0 -- No next value either, can't interpolate
END
WHEN (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0 THEN
-- Counter reset detected
per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window)
ELSE
-- Normal case: calculate rate
(per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) /
(ts - lagInFrame(ts, 1) OVER rate_window)
END`
IncreaseWithInterpolation = `
CASE
WHEN row_number() OVER rate_window = 1 THEN
-- First row: try to interpolate using next value
CASE
WHEN leadInFrame(per_series_value, 1) OVER rate_window IS NOT NULL THEN
-- Calculate the interpolated increase for this interval
((leadInFrame(per_series_value, 1) OVER rate_window - per_series_value) /
(leadInFrame(ts, 1) OVER rate_window - ts)) *
(leadInFrame(ts, 1) OVER rate_window - ts)
ELSE
0 -- No next value either, can't interpolate
END
WHEN (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0 THEN
-- Counter reset detected: the increase is the current value
per_series_value
ELSE
-- Normal case: calculate increase
(per_series_value - lagInFrame(per_series_value, 1) OVER rate_window)
END`
)
type MetricQueryStatementBuilder struct {
@ -444,6 +487,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithoutNegative, start, start)
if os.Getenv("INTERPOLATION_ENABLED") == "true" {
rateExpr = RateWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
@ -456,6 +502,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(IncreaseWithoutNegative, start, start)
if os.Getenv("INTERPOLATION_ENABLED") == "true" {
incExpr = IncreaseWithInterpolation
}
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {

View File

@ -490,7 +490,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
if err != nil {
return nil, err
}
@ -632,7 +632,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
if err != nil {
return nil, err
}

View File

@ -39,6 +39,8 @@ func (m *alertMigrateV5) Migrate(ctx context.Context, ruleData map[string]any) b
return false
}
m.logger.InfoContext(ctx, "migrating alert", "alert_name", ruleData["alert"])
ruleCondition, ok := ruleData["condition"].(map[string]any)
if !ok {
m.logger.WarnContext(ctx, "didn't find condition")

View File

@ -347,7 +347,7 @@ func (mc *migrateCommon) createAggregations(ctx context.Context, queryData map[s
aggregateAttr, hasAttr := queryData["aggregateAttribute"].(map[string]any)
dataSource, _ := queryData["dataSource"].(string)
if aggregateOp == "noop" {
if aggregateOp == "noop" && dataSource != "metrics" {
return false
}
@ -696,8 +696,16 @@ func (mc *migrateCommon) buildCondition(ctx context.Context, key, operator strin
case "<=":
return fmt.Sprintf("%s <= %s", key, formattedValue)
case "in", "IN":
if !strings.HasPrefix(formattedValue, "[") && !mc.isVariable(formattedValue) {
mc.logger.WarnContext(ctx, "multi-value operator in found with single value", "key", key, "formatted_value", formattedValue)
return fmt.Sprintf("%s = %s", key, formattedValue)
}
return fmt.Sprintf("%s IN %s", key, formattedValue)
case "nin", "NOT IN":
if !strings.HasPrefix(formattedValue, "[") && !mc.isVariable(formattedValue) {
mc.logger.WarnContext(ctx, "multi-value operator not in found with single value", "key", key, "formatted_value", formattedValue)
return fmt.Sprintf("%s != %s", key, formattedValue)
}
return fmt.Sprintf("%s NOT IN %s", key, formattedValue)
case "like", "LIKE":
return fmt.Sprintf("%s LIKE %s", key, formattedValue)
@ -892,6 +900,7 @@ func (mc *migrateCommon) isVariable(s string) bool {
s = strings.TrimSpace(s)
patterns := []string{
`^\{.*\}$`, // {var} or {.var}
`^\{\{.*\}\}$`, // {{var}} or {{.var}}
`^\$.*$`, // $var or $service.name
`^\[\[.*\]\]$`, // [[var]] or [[.var]]
@ -919,6 +928,11 @@ func (mc *migrateCommon) normalizeVariable(ctx context.Context, s string) string
varName = strings.TrimPrefix(varName, ".")
// this is probably going to be problem if user has $ as start of key
varName = strings.TrimPrefix(varName, "$")
} else if strings.HasPrefix(s, "{") && strings.HasSuffix(s, "}") { // {var} or {.var}
varName = strings.TrimPrefix(strings.TrimSuffix(s, "}"), "{")
varName = strings.TrimPrefix(varName, ".")
// this is probably going to be problem if user has $ as start of key
varName = strings.TrimPrefix(varName, "$")
} else if strings.HasPrefix(s, "[[") && strings.HasSuffix(s, "]]") {
// [[var]] or [[.var]]
varName = strings.TrimPrefix(strings.TrimSuffix(s, "]]"), "[[")

View File

@ -37,6 +37,8 @@ func (m *dashboardMigrateV5) Migrate(ctx context.Context, dashboardData map[stri
return false
}
m.logger.InfoContext(ctx, "migrating dashboard", "dashboard_name", dashboardData["title"])
// if there is a white space in variable, replace it
if variables, ok := dashboardData["variables"].(map[string]any); ok {
for _, variable := range variables {
@ -74,6 +76,13 @@ func (migration *dashboardMigrateV5) updateWidget(ctx context.Context, widget ma
return false
}
if qType, ok := query["queryType"]; ok {
if qType == "promql" || qType == "clickhouse_sql" {
migration.logger.InfoContext(ctx, "nothing to migrate for query type", "query_type", qType)
return false
}
}
builder, ok := query["builder"].(map[string]any)
if !ok {
return false

View File

@ -319,6 +319,23 @@ func (r *QueryRangeRequest) IsAnomalyRequest() (*QueryBuilderQuery[MetricAggrega
return &q, hasAnomaly
}
// We do not support fill gaps for these queries. Maybe support in future?
func (r *QueryRangeRequest) SkipFillGaps(name string) bool {
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case PromQuery:
if spec.Name == name {
return true
}
case ClickHouseQuery:
if spec.Name == name {
return true
}
}
}
return false
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
// Define a type alias to avoid infinite recursion