diff --git a/pkg/telemetrytraces/trace_operator_cte_builder.go b/pkg/telemetrytraces/trace_operator_cte_builder.go index 581d136970a2..c0784d2680c6 100644 --- a/pkg/telemetrytraces/trace_operator_cte_builder.go +++ b/pkg/telemetrytraces/trace_operator_cte_builder.go @@ -410,7 +410,7 @@ func (b *traceOperatorCTEBuilder) buildFinalQuery(selectFromCTE string, requestT case qbtypes.RequestTypeTrace: return b.buildTraceQuery(selectFromCTE) case qbtypes.RequestTypeScalar: - return b.buildTraceQuery(selectFromCTE) + return b.buildScalarQuery(selectFromCTE) default: return nil, fmt.Errorf("unsupported request type: %s", requestType) } @@ -760,6 +760,92 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(selectFromCTE string) (*qbtype }, nil } +func (b *traceOperatorCTEBuilder) buildScalarQuery(selectFromCTE string) (*qbtypes.Statement, error) { + sb := sqlbuilder.NewSelectBuilder() + + // Get keys for field mapping + keySelectors := b.getKeySelectors() + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(b.ctx, keySelectors) + if err != nil { + return nil, err + } + + var allGroupByArgs []any + + // Add group by fields using proper field mapper + for _, gb := range b.operator.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr( + b.ctx, + &gb.TelemetryFieldKey, + b.stmtBuilder.fm, + b.stmtBuilder.cb, + keys, + telemetrytypes.FieldDataTypeString, + "", + nil, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to map group by field '%s': %v", + gb.TelemetryFieldKey.Name, + err, + ) + } + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(colExpr) + } + + // Add aggregations using proper aggregation expression rewriter + var allAggChArgs []any + for i, agg := range b.operator.Aggregations { + rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite( + b.ctx, + agg.Expression, + uint64((b.end-b.start)/querybuilder.NsToSeconds), // Use full time range for scalar + keys, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to rewrite aggregation expression '%s': %v", + agg.Expression, + err, + ) + } + allAggChArgs = append(allAggChArgs, chArgs...) + + // Use alias if provided, otherwise use default naming + alias := fmt.Sprintf("__result_%d", i) + if agg.Alias != "" { + alias = agg.Alias + } + + sb.SelectMore(fmt.Sprintf("%s AS %s", rewritten, alias)) + } + + sb.From(selectFromCTE) + + // Group by all group by fields (no time grouping for scalar) + if len(b.operator.GroupBy) > 0 { + groupByKeys := make([]string, len(b.operator.GroupBy)) + for i, gb := range b.operator.GroupBy { + groupByKeys[i] = fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name) + } + sb.GroupBy(groupByKeys...) + } + + // Combine all arguments + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + return &qbtypes.Statement{ + Query: sql, + Args: args, + }, nil +} + func (b *traceOperatorCTEBuilder) addCTE(name, sql string, args []any, dependsOn []string) { b.ctes = append(b.ctes, cteNode{ name: name,