From b6f5c053a0b12637e3460fa502f0bae3bf4b1efa Mon Sep 17 00:00:00 2001 From: Ekansh Gupta Date: Fri, 5 Sep 2025 21:07:10 +0530 Subject: [PATCH] feat: trace operators BE (#8293) * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: refactor trace operator * feat: added postprocess * feat: added postprocess * feat: added postprocess * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: replaced info to debug logs * feat: replaced info to debug logs * feat: replaced info to debug logs * feat: updated time series query * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: added deep copy in ranged queries * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: added comment for build all spans cte * feat: added postprocess for timeseries and added limits to memory * feat: fixed span count in trace view * feat: fixed span count in trace view * feat: fixed linting issues * feat: fixed linting issues * feat: fixed linting issues * feat: fixed linting issues --------- Co-authored-by: Nityananda Gohain --- pkg/querier/postprocess.go | 28 + pkg/querier/querier.go | 141 ++- pkg/querier/signozquerier/provider.go | 12 + pkg/querier/trace_operator_query.go | 90 ++ pkg/telemetrytraces/test_data.go | 21 + .../trace_operator_cte_builder.go | 907 ++++++++++++++++++ .../trace_operator_cte_builder_test.go | 551 +++++++++++ .../trace_operator_statement_builder.go | 95 ++ .../querybuildertypesv5/qb.go | 5 + .../querybuildertypesv5/req.go | 6 +- .../querybuildertypesv5/req_test.go | 159 ++- .../querybuildertypesv5/trace_operator.go | 134 ++- .../querybuildertypesv5/validation.go | 42 +- 13 files changed, 2102 insertions(+), 89 deletions(-) create mode 100644 pkg/querier/trace_operator_query.go create mode 100644 pkg/telemetrytraces/trace_operator_cte_builder.go create mode 100644 pkg/telemetrytraces/trace_operator_cte_builder_test.go create mode 100644 pkg/telemetrytraces/trace_operator_statement_builder.go diff --git a/pkg/querier/postprocess.go b/pkg/querier/postprocess.go index 46864d89e3d2..21763a242aaf 100644 --- a/pkg/querier/postprocess.go +++ b/pkg/querier/postprocess.go @@ -29,6 +29,8 @@ func getqueryInfo(spec any) queryInfo { return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} + case qbtypes.QueryBuilderTraceOperator: + return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} case qbtypes.QueryBuilderFormula: return queryInfo{Name: s.Name, Disabled: s.Disabled} case qbtypes.PromQuery: @@ -70,6 +72,11 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any result = postProcessMetricQuery(q, result, spec, req) typedResults[spec.Name] = result } + case qbtypes.QueryBuilderTraceOperator: + if result, ok := typedResults[spec.Name]; ok { + result = postProcessTraceOperator(q, result, spec, req) + typedResults[spec.Name] = result + } } } @@ -210,6 +217,27 @@ func postProcessMetricQuery( return result } +// postProcessTraceOperator applies postprocessing to a trace operator query result +func postProcessTraceOperator( + q *querier, + result *qbtypes.Result, + query qbtypes.QueryBuilderTraceOperator, + req *qbtypes.QueryRangeRequest, +) *qbtypes.Result { + + result = q.applySeriesLimit(result, query.Limit, query.Order) + + // Apply functions if any + if len(query.Functions) > 0 { + step := query.StepInterval.Duration.Milliseconds() + functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step) + result = q.applyFunctions(result, functions) + } + + return result +} + + // applyMetricReduceTo applies reduce to operation using the metric's ReduceTo field func (q *querier) applyMetricReduceTo(result *qbtypes.Result, reduceOp qbtypes.ReduceTo) *qbtypes.Result { tsData, ok := result.Value.(*qbtypes.TimeSeriesData) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c802ee8194ae..f0ae4abb086a 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -29,16 +29,17 @@ var ( ) type querier struct { - logger *slog.Logger - telemetryStore telemetrystore.TelemetryStore - metadataStore telemetrytypes.MetadataStore - promEngine prometheus.Prometheus - traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] - logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] - metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] - meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] - bucketCache BucketCache - liveDataRefreshSeconds time.Duration + logger *slog.Logger + telemetryStore telemetrystore.TelemetryStore + metadataStore telemetrytypes.MetadataStore + promEngine prometheus.Prometheus + traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] + metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] + meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] + traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder + bucketCache BucketCache + liveDataRefreshSeconds time.Duration } var _ Querier = (*querier)(nil) @@ -52,20 +53,22 @@ func New( logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], + traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder, bucketCache BucketCache, ) *querier { querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier") return &querier{ - logger: querierSettings.Logger(), - telemetryStore: telemetryStore, - metadataStore: metadataStore, - promEngine: promEngine, - traceStmtBuilder: traceStmtBuilder, - logStmtBuilder: logStmtBuilder, - metricStmtBuilder: metricStmtBuilder, - meterStmtBuilder: meterStmtBuilder, - bucketCache: bucketCache, - liveDataRefreshSeconds: 5, + logger: querierSettings.Logger(), + telemetryStore: telemetryStore, + metadataStore: metadataStore, + promEngine: promEngine, + traceStmtBuilder: traceStmtBuilder, + logStmtBuilder: logStmtBuilder, + metricStmtBuilder: metricStmtBuilder, + meterStmtBuilder: meterStmtBuilder, + traceOperatorStmtBuilder: traceOperatorStmtBuilder, + bucketCache: bucketCache, + liveDataRefreshSeconds: 5, } } @@ -127,9 +130,28 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype NumberOfQueries: len(req.CompositeQuery.Queries), PanelType: req.RequestType.StringValue(), } - intervalWarnings := []string{} + dependencyQueries := make(map[string]bool) + traceOperatorQueries := make(map[string]qbtypes.QueryBuilderTraceOperator) + + for _, query := range req.CompositeQuery.Queries { + if query.Type == qbtypes.QueryTypeTraceOperator { + if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok { + // Parse expression to find dependencies + if err := spec.ParseExpression(); err != nil { + return nil, fmt.Errorf("failed to parse trace operator expression: %w", err) + } + + deps := spec.CollectReferencedQueries(spec.ParsedExpression) + for _, dep := range deps { + dependencyQueries[dep] = true + } + traceOperatorQueries[spec.Name] = spec + } + } + } + // First pass: collect all metric names that need temporality metricNames := make([]string, 0) for idx, query := range req.CompositeQuery.Queries { @@ -223,6 +245,23 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype event.TracesUsed = strings.Contains(spec.Query, "signoz_traces") } } + } else if query.Type == qbtypes.QueryTypeTraceOperator { + if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok { + if spec.StepInterval.Seconds() == 0 { + spec.StepInterval = qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End)), + } + } + + if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) { + newStep := qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End)), + } + intervalWarnings = append(intervalWarnings, fmt.Sprintf(intervalWarn, spec.Name, spec.StepInterval.Seconds(), newStep.Duration.Seconds())) + spec.StepInterval = newStep + } + req.CompositeQuery.Queries[idx].Spec = spec + } } } @@ -243,6 +282,38 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype steps := make(map[string]qbtypes.Step) for _, query := range req.CompositeQuery.Queries { + var queryName string + var isTraceOperator bool + + switch query.Type { + case qbtypes.QueryTypeTraceOperator: + if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok { + queryName = spec.Name + isTraceOperator = true + } + case qbtypes.QueryTypePromQL: + if spec, ok := query.Spec.(qbtypes.PromQuery); ok { + queryName = spec.Name + } + case qbtypes.QueryTypeClickHouseSQL: + if spec, ok := query.Spec.(qbtypes.ClickHouseQuery); ok { + queryName = spec.Name + } + case qbtypes.QueryTypeBuilder: + switch spec := query.Spec.(type) { + case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: + queryName = spec.Name + case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]: + queryName = spec.Name + case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: + queryName = spec.Name + } + } + + if !isTraceOperator && dependencyQueries[queryName] { + continue + } + switch query.Type { case qbtypes.QueryTypePromQL: promQuery, ok := query.Spec.(qbtypes.PromQuery) @@ -259,6 +330,22 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype } chSQLQuery := newchSQLQuery(q.logger, q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars) queries[chQuery.Name] = chSQLQuery + case qbtypes.QueryTypeTraceOperator: + traceOpQuery, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator) + if !ok { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid trace operator query spec %T", query.Spec) + } + toq := &traceOperatorQuery{ + telemetryStore: q.telemetryStore, + stmtBuilder: q.traceOperatorStmtBuilder, + spec: traceOpQuery, + compositeQuery: &req.CompositeQuery, + fromMS: uint64(req.Start), + toMS: uint64(req.End), + kind: req.RequestType, + } + queries[traceOpQuery.Name] = toq + steps[traceOpQuery.Name] = traceOpQuery.StepInterval case qbtypes.QueryTypeBuilder: switch spec := query.Spec.(type) { case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: @@ -676,7 +763,17 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp return newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables) } return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables) - + case *traceOperatorQuery: + specCopy := qt.spec.Copy() + return &traceOperatorQuery{ + telemetryStore: q.telemetryStore, + stmtBuilder: q.traceOperatorStmtBuilder, + spec: specCopy, + fromMS: uint64(timeRange.From), + toMS: uint64(timeRange.To), + compositeQuery: qt.compositeQuery, + kind: qt.kind, + } default: return nil } diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index 999de129555c..032b133df4b1 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -89,6 +89,17 @@ func newProvider( telemetryStore, ) + // ADD: Create trace operator statement builder + traceOperatorStmtBuilder := telemetrytraces.NewTraceOperatorStatementBuilder( + settings, + telemetryMetadataStore, + traceFieldMapper, + traceConditionBuilder, + traceStmtBuilder, // Pass the regular trace statement builder + resourceFilterStmtBuilder, // Pass the resource filter statement builder + traceAggExprRewriter, + ) + // Create log statement builder logFieldMapper := telemetrylogs.NewFieldMapper() logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper) @@ -157,6 +168,7 @@ func newProvider( logStmtBuilder, metricStmtBuilder, meterStmtBuilder, + traceOperatorStmtBuilder, bucketCache, ), nil } diff --git a/pkg/querier/trace_operator_query.go b/pkg/querier/trace_operator_query.go new file mode 100644 index 000000000000..762b34691b98 --- /dev/null +++ b/pkg/querier/trace_operator_query.go @@ -0,0 +1,90 @@ +package querier + +import ( + "context" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/SigNoz/signoz/pkg/telemetrystore" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +type traceOperatorQuery struct { + telemetryStore telemetrystore.TelemetryStore + stmtBuilder qbtypes.TraceOperatorStatementBuilder + spec qbtypes.QueryBuilderTraceOperator + compositeQuery *qbtypes.CompositeQuery + fromMS uint64 + toMS uint64 + kind qbtypes.RequestType +} + +var _ qbtypes.Query = (*traceOperatorQuery)(nil) + +func (q *traceOperatorQuery) Fingerprint() string { + return "" +} + +func (q *traceOperatorQuery) Window() (uint64, uint64) { + return q.fromMS, q.toMS +} + +func (q *traceOperatorQuery) Execute(ctx context.Context) (*qbtypes.Result, error) { + stmt, err := q.stmtBuilder.Build( + ctx, + q.fromMS, + q.toMS, + q.kind, + q.spec, + q.compositeQuery, + ) + if err != nil { + return nil, err + } + + // Execute the query with proper context + result, err := q.executeWithContext(ctx, stmt.Query, stmt.Args) + if err != nil { + return nil, err + } + result.Warnings = stmt.Warnings + return result, nil +} + +func (q *traceOperatorQuery) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) { + totalRows := uint64(0) + totalBytes := uint64(0) + elapsed := time.Duration(0) + + ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) { + totalRows += p.Rows + totalBytes += p.Bytes + elapsed += p.Elapsed + })) + + rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + // Pass query window and step for partial value detection + queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS} + + // Use the consume function like builderQuery does + payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name) + if err != nil { + return nil, err + } + + return &qbtypes.Result{ + Type: q.kind, + Value: payload, + Stats: qbtypes.ExecStats{ + RowsScanned: totalRows, + BytesScanned: totalBytes, + DurationMS: uint64(elapsed.Milliseconds()), + }, + }, nil +} + diff --git a/pkg/telemetrytraces/test_data.go b/pkg/telemetrytraces/test_data.go index 6c25ae8c83d6..2e01a29ea6ba 100644 --- a/pkg/telemetrytraces/test_data.go +++ b/pkg/telemetrytraces/test_data.go @@ -56,6 +56,27 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { FieldDataType: telemetrytypes.FieldDataTypeString, }, }, + "duration_nano": { + { + Name: "duration_nano", + FieldContext: telemetrytypes.FieldContextSpan, + FieldDataType: telemetrytypes.FieldDataTypeInt64, + }, + }, + "http.method": { + { + Name: "http.method", + FieldContext: telemetrytypes.FieldContextAttribute, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + "response_status_code": { + { + Name: "response_status_code", + FieldContext: telemetrytypes.FieldContextSpan, + FieldDataType: telemetrytypes.FieldDataTypeInt64, + }, + }, } for _, keys := range keysMap { for _, key := range keys { diff --git a/pkg/telemetrytraces/trace_operator_cte_builder.go b/pkg/telemetrytraces/trace_operator_cte_builder.go new file mode 100644 index 000000000000..a5cb9471d86e --- /dev/null +++ b/pkg/telemetrytraces/trace_operator_cte_builder.go @@ -0,0 +1,907 @@ +package telemetrytraces + +import ( + "context" + "fmt" + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" + "strings" +) + +type cteNode struct { + name string + sql string + args []any + dependsOn []string +} + +type traceOperatorCTEBuilder struct { + start uint64 + end uint64 + operator *qbtypes.QueryBuilderTraceOperator + stmtBuilder *traceOperatorStatementBuilder + queries map[string]*qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] + ctes []cteNode + cteNameToIndex map[string]int + queryToCTEName map[string]string + compositeQuery *qbtypes.CompositeQuery +} + +func (b *traceOperatorCTEBuilder) collectQueries() error { + referencedQueries := b.operator.CollectReferencedQueries(b.operator.ParsedExpression) + + for _, queryEnv := range b.compositeQuery.Queries { + if queryEnv.Type == qbtypes.QueryTypeBuilder { + if traceQuery, ok := queryEnv.Spec.(qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]); ok { + for _, refName := range referencedQueries { + if traceQuery.Name == refName { + queryCopy := traceQuery + b.queries[refName] = &queryCopy + break + } + } + } + } + } + + for _, refName := range referencedQueries { + if _, found := b.queries[refName]; !found { + return errors.NewInvalidInputf(errors.CodeInvalidInput, "referenced query '%s' not found", refName) + } + } + + return nil +} + +func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes.RequestType) (*qbtypes.Statement, error) { + + b.buildAllSpansCTE(ctx) + + rootCTEName, err := b.buildExpressionCTEs(ctx, b.operator.ParsedExpression) + if err != nil { + return nil, err + } + + selectFromCTE := rootCTEName + if b.operator.ReturnSpansFrom != "" { + selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom] + if selectFromCTE == "" { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, + "returnSpansFrom references query '%s' which has no corresponding CTE", + b.operator.ReturnSpansFrom) + } + } + + finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType) + if err != nil { + return nil, err + } + + var cteFragments []string + var cteArgs [][]any + + timeConstantsCTE := b.buildTimeConstantsCTE() + cteFragments = append(cteFragments, timeConstantsCTE) + + for _, cte := range b.ctes { + cteFragments = append(cteFragments, fmt.Sprintf("%s AS (%s)", cte.name, cte.sql)) + cteArgs = append(cteArgs, cte.args) + } + + finalSQL := querybuilder.CombineCTEs(cteFragments) + finalStmt.Query + " SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000" + finalArgs := querybuilder.PrependArgs(cteArgs, finalStmt.Args) + + b.stmtBuilder.logger.DebugContext(ctx, "Final trace operator query built", + "operator_expression", b.operator.Expression, + "cte_count", len(cteFragments), + "args_count", len(finalArgs)) + + return &qbtypes.Statement{ + Query: finalSQL, + Args: finalArgs, + Warnings: finalStmt.Warnings, + }, nil +} + +// Will be used in Indirect descendant Query, will not be used in any other query +func (b *traceOperatorCTEBuilder) buildAllSpansCTE(ctx context.Context) { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("*") + sb.SelectMore(sqlbuilder.Escape("resource_string_service$$name") + " AS `service.name`") + + sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) + startBucket := b.start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := b.end / querybuilder.NsToSeconds + sb.Where( + sb.GE("timestamp", fmt.Sprintf("%d", b.start)), + sb.L("timestamp", fmt.Sprintf("%d", b.end)), + sb.GE("ts_bucket_start", startBucket), + sb.LE("ts_bucket_start", endBucket), + ) + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + b.stmtBuilder.logger.DebugContext(ctx, "Built all_spans CTE") + b.addCTE("all_spans", sql, args, nil) +} + +func (b *traceOperatorCTEBuilder) buildTimeConstantsCTE() string { + startBucket := b.start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := b.end / querybuilder.NsToSeconds + + return fmt.Sprintf(`toDateTime64(%d, 9) AS t_from, toDateTime64(%d, 9) AS t_to, %d AS bucket_from, %d AS bucket_to`, b.start, b.end, startBucket, endBucket) +} + +func (b *traceOperatorCTEBuilder) buildResourceFilterCTE(ctx context.Context, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) (*qbtypes.Statement, error) { + return b.stmtBuilder.resourceFilterStmtBuilder.Build( + ctx, + b.start, + b.end, + qbtypes.RequestTypeRaw, + query, + nil, + ) +} + +func (b *traceOperatorCTEBuilder) buildExpressionCTEs(ctx context.Context, expr *qbtypes.TraceOperand) (string, error) { + if expr == nil { + return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "expression is nil") + } + + if expr.QueryRef != nil { + return b.buildQueryCTE(ctx, expr.QueryRef.Name) + } + + var leftCTE, rightCTE string + var err error + + if expr.Left != nil { + leftCTE, err = b.buildExpressionCTEs(ctx, expr.Left) + if err != nil { + return "", err + } + } + + if expr.Right != nil { + rightCTE, err = b.buildExpressionCTEs(ctx, expr.Right) + if err != nil { + return "", err + } + } + + return b.buildOperatorCTE(ctx, *expr.Operator, leftCTE, rightCTE) +} + +func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName string) (string, error) { + query, exists := b.queries[queryName] + if !exists { + return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "query %s not found", queryName) + } + + cteName := queryName + b.queryToCTEName[queryName] = cteName + + if _, exists := b.cteNameToIndex[cteName]; exists { + return cteName, nil + } + + keySelectors := getKeySelectors(*query) + b.stmtBuilder.logger.DebugContext(ctx, "Key selectors for query", "query_name", queryName, "key_selectors", keySelectors) + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return "", err + } + b.stmtBuilder.logger.DebugContext(ctx, "Retrieved keys for query", "query_name", queryName, "keys_count", len(keys)) + + // Build resource filter CTE for this specific query + resourceFilterCTEName := fmt.Sprintf("__resource_filter_%s", cteName) + resourceStmt, err := b.buildResourceFilterCTE(ctx, *query) + if err != nil { + return "", err + } + + if resourceStmt != nil && resourceStmt.Query != "" { + b.stmtBuilder.logger.DebugContext(ctx, "Built resource filter CTE for query", + "query_name", queryName, + "resource_filter_cte_name", resourceFilterCTEName) + b.addCTE(resourceFilterCTEName, resourceStmt.Query, resourceStmt.Args, nil) + } else { + b.stmtBuilder.logger.DebugContext(ctx, "No resource filter needed for query", "query_name", queryName) + resourceFilterCTEName = "" + } + + sb := sqlbuilder.NewSelectBuilder() + sb.Select("*") + sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName)) + if resourceFilterCTEName != "" { + sb.Where(fmt.Sprintf("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM %s)", resourceFilterCTEName)) + } + startBucket := b.start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment + endBucket := b.end / querybuilder.NsToSeconds + sb.Where( + sb.GE("timestamp", fmt.Sprintf("%d", b.start)), + sb.L("timestamp", fmt.Sprintf("%d", b.end)), + sb.GE("ts_bucket_start", startBucket), + sb.LE("ts_bucket_start", endBucket), + ) + + if query.Filter != nil && query.Filter.Expression != "" { + b.stmtBuilder.logger.DebugContext(ctx, "Applying filter to query CTE", "query_name", queryName, "filter", query.Filter.Expression) + filterWhereClause, err := querybuilder.PrepareWhereClause( + query.Filter.Expression, + querybuilder.FilterExprVisitorOpts{ + Logger: b.stmtBuilder.logger, + FieldMapper: b.stmtBuilder.fm, + ConditionBuilder: b.stmtBuilder.cb, + FieldKeys: keys, + SkipResourceFilter: true, + }, + ) + if err != nil { + b.stmtBuilder.logger.ErrorContext(ctx, "Failed to prepare where clause", "error", err, "filter", query.Filter.Expression) + return "", err + } + if filterWhereClause != nil { + b.stmtBuilder.logger.DebugContext(ctx, "Adding where clause", "where_clause", filterWhereClause.WhereClause) + sb.AddWhereClause(filterWhereClause.WhereClause) + } else { + b.stmtBuilder.logger.WarnContext(ctx, "PrepareWhereClause returned nil", "filter", query.Filter.Expression) + } + } else { + if query.Filter == nil { + b.stmtBuilder.logger.DebugContext(ctx, "No filter for query CTE", "query_name", queryName, "reason", "filter is nil") + } else { + b.stmtBuilder.logger.DebugContext(ctx, "No filter for query CTE", "query_name", queryName, "reason", "filter expression is empty") + } + } + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + b.stmtBuilder.logger.DebugContext(ctx, "Built query CTE", + "query_name", queryName, + "cte_name", cteName) + dependencies := []string{} + if resourceFilterCTEName != "" { + dependencies = append(dependencies, resourceFilterCTEName) + } + b.addCTE(cteName, sql, args, dependencies) + + return cteName, nil +} + +func sanitizeForSQL(s string) string { + replacements := map[string]string{ + "=>": "DIR_DESC", + "->": "INDIR_DESC", + "&&": "AND", + "||": "OR", + "NOT": "NOT", + " ": "_", + } + + result := s + for old, new := range replacements { + result = strings.ReplaceAll(result, old, new) + } + return result +} + +func (b *traceOperatorCTEBuilder) buildOperatorCTE(ctx context.Context, op qbtypes.TraceOperatorType, leftCTE, rightCTE string) (string, error) { + sanitizedOp := sanitizeForSQL(op.StringValue()) + cteName := fmt.Sprintf("%s_%s_%s", leftCTE, sanitizedOp, rightCTE) + + if _, exists := b.cteNameToIndex[cteName]; exists { + return cteName, nil + } + + var sql string + var args []any + var dependsOn []string + + switch op { + case qbtypes.TraceOperatorDirectDescendant: + sql, args, dependsOn = b.buildDirectDescendantCTE(leftCTE, rightCTE) + case qbtypes.TraceOperatorIndirectDescendant: + sql, dependsOn = b.buildIndirectDescendantCTE(leftCTE, rightCTE) + args = nil + case qbtypes.TraceOperatorAnd: + sql, args, dependsOn = b.buildAndCTE(leftCTE, rightCTE) + case qbtypes.TraceOperatorOr: + sql, dependsOn = b.buildOrCTE(leftCTE, rightCTE) + args = nil + case qbtypes.TraceOperatorNot, qbtypes.TraceOperatorExclude: + sql, args, dependsOn = b.buildNotCTE(leftCTE, rightCTE) + default: + return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %s", op.StringValue()) + } + + b.stmtBuilder.logger.DebugContext(ctx, "Built operator CTE", + "operator", op.StringValue(), + "cte_name", cteName, + "left_cte", leftCTE, + "right_cte", rightCTE) + b.addCTE(cteName, sql, args, dependsOn) + return cteName, nil +} + +func (b *traceOperatorCTEBuilder) buildDirectDescendantCTE(parentCTE, childCTE string) (string, []any, []string) { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("p.*") + + sb.From(fmt.Sprintf("%s AS p", parentCTE)) + sb.JoinWithOption( + sqlbuilder.InnerJoin, + fmt.Sprintf("%s AS c", childCTE), + "p.trace_id = c.trace_id AND p.span_id = c.parent_span_id", + ) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return sql, args, []string{parentCTE, childCTE} +} + +func (b *traceOperatorCTEBuilder) buildIndirectDescendantCTE(ancestorCTE, descendantCTE string) (string, []string) { + sql := fmt.Sprintf(`WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM %s AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM %s AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id`, descendantCTE, ancestorCTE) + return sql, []string{ancestorCTE, descendantCTE, "all_spans"} +} + +func (b *traceOperatorCTEBuilder) buildAndCTE(leftCTE, rightCTE string) (string, []any, []string) { + sb := sqlbuilder.NewSelectBuilder() + // Select all columns from left CTE + sb.Select("l.*") + sb.From(fmt.Sprintf("%s AS l", leftCTE)) + sb.JoinWithOption( + sqlbuilder.InnerJoin, + fmt.Sprintf("%s AS r", rightCTE), + "l.trace_id = r.trace_id", + ) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return sql, args, []string{leftCTE, rightCTE} +} + +func (b *traceOperatorCTEBuilder) buildOrCTE(leftCTE, rightCTE string) (string, []string) { + sql := fmt.Sprintf(`SELECT * FROM %s UNION DISTINCT SELECT * FROM %s`, leftCTE, rightCTE) + + return sql, []string{leftCTE, rightCTE} +} + +func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string, []any, []string) { + sb := sqlbuilder.NewSelectBuilder() + + // Handle unary NOT case (rightCTE is empty) + if rightCTE == "" { + sb.Select("b.*") + sb.From("all_spans AS b") + sb.Where(fmt.Sprintf( + "b.trace_id GLOBAL NOT IN (SELECT DISTINCT trace_id FROM %s)", + leftCTE, + )) + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return sql, args, []string{"all_spans", leftCTE} + } + + sb.Select("l.*") + sb.From(fmt.Sprintf("%s AS l", leftCTE)) + sb.Where(fmt.Sprintf( + "l.trace_id GLOBAL NOT IN (SELECT DISTINCT trace_id FROM %s)", + rightCTE, + )) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return sql, args, []string{leftCTE, rightCTE} +} + +func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) { + switch requestType { + case qbtypes.RequestTypeRaw: + return b.buildListQuery(ctx, selectFromCTE) + case qbtypes.RequestTypeTimeSeries: + return b.buildTimeSeriesQuery(ctx, selectFromCTE) + case qbtypes.RequestTypeTrace: + return b.buildTraceQuery(ctx, selectFromCTE) + case qbtypes.RequestTypeScalar: + return b.buildScalarQuery(ctx, selectFromCTE) + default: + return nil, fmt.Errorf("unsupported request type: %s", requestType) + } +} + +func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) { + sb := sqlbuilder.NewSelectBuilder() + + // Select core fields + sb.Select( + "timestamp", + "trace_id", + "span_id", + "name", + "duration_nano", + "parent_span_id", + ) + + selectedFields := map[string]bool{ + "timestamp": true, + "trace_id": true, + "span_id": true, + "name": true, + "duration_nano": true, + "parent_span_id": true, + } + + // Get keys for selectFields + keySelectors := b.getKeySelectors() + for _, field := range b.operator.SelectFields { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: field.Name, + Signal: telemetrytypes.SignalTraces, + FieldContext: field.FieldContext, + FieldDataType: field.FieldDataType, + }) + } + + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + // Add selectFields using ColumnExpressionFor since we now have all base table columns + for _, field := range b.operator.SelectFields { + if selectedFields[field.Name] { + continue + } + colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &field, keys) + if err != nil { + b.stmtBuilder.logger.WarnContext(ctx, "failed to map select field", + "field", field.Name, "error", err) + continue + } + sb.SelectMore(colExpr) + selectedFields[field.Name] = true + } + + sb.From(selectFromCTE) + + // Add order by support using ColumnExpressionFor + orderApplied := false + for _, orderBy := range b.operator.Order { + colExpr, err := b.stmtBuilder.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue())) + orderApplied = true + } + + if !orderApplied { + sb.OrderBy("timestamp DESC") + } + + if b.operator.Limit > 0 { + sb.Limit(b.operator.Limit) + } else { + sb.Limit(100) + } + + if b.operator.Offset > 0 { + sb.Offset(b.operator.Offset) + } + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return &qbtypes.Statement{ + Query: sql, + Args: args, + }, nil +} + +func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySelector { + var keySelectors []*telemetrytypes.FieldKeySelector + + for _, agg := range b.operator.Aggregations { + selectors := querybuilder.QueryStringToKeysSelectors(agg.Expression) + keySelectors = append(keySelectors, selectors...) + } + + if b.operator.Filter != nil && b.operator.Filter.Expression != "" { + selectors := querybuilder.QueryStringToKeysSelectors(b.operator.Filter.Expression) + keySelectors = append(keySelectors, selectors...) + } + + for _, gb := range b.operator.GroupBy { + selectors := querybuilder.QueryStringToKeysSelectors(gb.TelemetryFieldKey.Name) + keySelectors = append(keySelectors, selectors...) + } + + for _, order := range b.operator.Order { + keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ + Name: order.Key.Name, + Signal: telemetrytypes.SignalTraces, + FieldContext: order.Key.FieldContext, + FieldDataType: order.Key.FieldDataType, + }) + } + + for i := range keySelectors { + keySelectors[i].Signal = telemetrytypes.SignalTraces + } + + return keySelectors +} + +func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) { + sb := sqlbuilder.NewSelectBuilder() + + sb.Select(fmt.Sprintf( + "toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts", + int64(b.operator.StepInterval.Seconds()), + )) + + keySelectors := b.getKeySelectors() + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + var allGroupByArgs []any + + for _, gb := range b.operator.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr( + ctx, + &gb.TelemetryFieldKey, + b.stmtBuilder.fm, + b.stmtBuilder.cb, + keys, + telemetrytypes.FieldDataTypeString, + "", + nil, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to map group by field '%s': %v", + gb.TelemetryFieldKey.Name, + err, + ) + } + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(colExpr) + } + + var allAggChArgs []any + for i, agg := range b.operator.Aggregations { + rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite( + ctx, + agg.Expression, + uint64(b.operator.StepInterval.Seconds()), + keys, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to rewrite aggregation expression '%s': %v", + agg.Expression, + err, + ) + } + allAggChArgs = append(allAggChArgs, chArgs...) + + alias := fmt.Sprintf("__result_%d", i) + + sb.SelectMore(fmt.Sprintf("%s AS %s", rewritten, alias)) + } + + sb.From(selectFromCTE) + + sb.GroupBy("ts") + if len(b.operator.GroupBy) > 0 { + groupByKeys := make([]string, len(b.operator.GroupBy)) + for i, gb := range b.operator.GroupBy { + groupByKeys[i] = fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name) + } + sb.GroupBy(groupByKeys...) + } + + // Add order by support + for _, orderBy := range b.operator.Order { + idx, ok := b.aggOrderBy(orderBy) + if ok { + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue())) + } else { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) + } + } + sb.OrderBy("ts desc") + + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + // Add HAVING clause if specified + b.addHavingClause(sb) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + return &qbtypes.Statement{ + Query: sql, + Args: args, + }, nil +} + +func (b *traceOperatorCTEBuilder) buildTraceSummaryCTE(selectFromCTE string) { + sb := sqlbuilder.NewSelectBuilder() + + sb.Select( + "trace_id", + "count() AS total_span_count", + ) + + sb.From("all_spans") + sb.Where(fmt.Sprintf("trace_id GLOBAL IN (SELECT DISTINCT trace_id FROM %s)", selectFromCTE)) + sb.GroupBy("trace_id") + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + b.addCTE("trace_summary", sql, args, []string{"all_spans", selectFromCTE}) +} + +func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) { + b.buildTraceSummaryCTE(selectFromCTE) + + sb := sqlbuilder.NewSelectBuilder() + + keySelectors := b.getKeySelectors() + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + var allGroupByArgs []any + + for _, gb := range b.operator.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr( + ctx, + &gb.TelemetryFieldKey, + b.stmtBuilder.fm, + b.stmtBuilder.cb, + keys, + telemetrytypes.FieldDataTypeString, + "", + nil, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to map group by field '%s': %v", + gb.TelemetryFieldKey.Name, + err, + ) + } + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(colExpr) + } + + rateInterval := (b.end - b.start) / querybuilder.NsToSeconds + + var allAggChArgs []any + for i, agg := range b.operator.Aggregations { + rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite( + ctx, + agg.Expression, + rateInterval, + keys, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to rewrite aggregation expression '%s': %v", + agg.Expression, + err, + ) + } + allAggChArgs = append(allAggChArgs, chArgs...) + + alias := fmt.Sprintf("__result_%d", i) + + sb.SelectMore(fmt.Sprintf("%s AS %s", rewritten, alias)) + } + + sb.Select( + "any(root.timestamp) as timestamp", + "any(root.`service.name`) as `service.name`", + "any(root.name) as `name`", + "summary.total_span_count as span_count", // Updated column name + "any(root.duration_nano) as `duration_nano`", + "root.trace_id as `trace_id`", + ) + + sb.From("all_spans as root") + sb.JoinWithOption( + sqlbuilder.InnerJoin, + "trace_summary as summary", + "root.trace_id = summary.trace_id", + ) + sb.Where("root.parent_span_id = ''") + + sb.GroupBy("root.trace_id", "summary.total_span_count") + if len(b.operator.GroupBy) > 0 { + groupByKeys := make([]string, len(b.operator.GroupBy)) + for i, gb := range b.operator.GroupBy { + groupByKeys[i] = fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name) + } + sb.GroupBy(groupByKeys...) + } + + b.addHavingClause(sb) + + orderApplied := false + for _, orderBy := range b.operator.Order { + switch orderBy.Key.Name { + case qbtypes.OrderByTraceDuration.StringValue(): + sb.OrderBy(fmt.Sprintf("`duration_nano` %s", orderBy.Direction.StringValue())) + orderApplied = true + case qbtypes.OrderBySpanCount.StringValue(): + sb.OrderBy(fmt.Sprintf("span_count %s", orderBy.Direction.StringValue())) + orderApplied = true + case "timestamp": + sb.OrderBy(fmt.Sprintf("timestamp %s", orderBy.Direction.StringValue())) + orderApplied = true + default: + aggIndex := -1 + for i, agg := range b.operator.Aggregations { + if orderBy.Key.Name == agg.Alias || orderBy.Key.Name == fmt.Sprintf("__result_%d", i) { + aggIndex = i + break + } + } + if aggIndex >= 0 { + alias := fmt.Sprintf("__result_%d", aggIndex) + if b.operator.Aggregations[aggIndex].Alias != "" { + alias = b.operator.Aggregations[aggIndex].Alias + } + sb.OrderBy(fmt.Sprintf("%s %s", alias, orderBy.Direction.StringValue())) + orderApplied = true + } else { + b.stmtBuilder.logger.WarnContext(ctx, + "ignoring order by field that's not available in trace context", + "field", orderBy.Key.Name) + } + } + } + + if !orderApplied { + sb.OrderBy("`duration_nano` DESC") + } + + if b.operator.Limit > 0 { + sb.Limit(b.operator.Limit) + } + + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + return &qbtypes.Statement{ + Query: sql, + Args: args, + }, nil +} + +func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) { + sb := sqlbuilder.NewSelectBuilder() + + keySelectors := b.getKeySelectors() + keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + var allGroupByArgs []any + + for _, gb := range b.operator.GroupBy { + expr, args, err := querybuilder.CollisionHandledFinalExpr( + ctx, + &gb.TelemetryFieldKey, + b.stmtBuilder.fm, + b.stmtBuilder.cb, + keys, + telemetrytypes.FieldDataTypeString, + "", + nil, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to map group by field '%s': %v", + gb.TelemetryFieldKey.Name, + err, + ) + } + colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name) + allGroupByArgs = append(allGroupByArgs, args...) + sb.SelectMore(colExpr) + } + + var allAggChArgs []any + for i, agg := range b.operator.Aggregations { + rewritten, chArgs, err := b.stmtBuilder.aggExprRewriter.Rewrite( + ctx, + agg.Expression, + uint64((b.end-b.start)/querybuilder.NsToSeconds), + keys, + ) + if err != nil { + return nil, errors.NewInvalidInputf( + errors.CodeInvalidInput, + "failed to rewrite aggregation expression '%s': %v", + agg.Expression, + err, + ) + } + allAggChArgs = append(allAggChArgs, chArgs...) + + alias := fmt.Sprintf("__result_%d", i) + + sb.SelectMore(fmt.Sprintf("%s AS %s", rewritten, alias)) + } + + sb.From(selectFromCTE) + + if len(b.operator.GroupBy) > 0 { + groupByKeys := make([]string, len(b.operator.GroupBy)) + for i, gb := range b.operator.GroupBy { + groupByKeys[i] = fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name) + } + sb.GroupBy(groupByKeys...) + } + + // Add order by support + for _, orderBy := range b.operator.Order { + idx, ok := b.aggOrderBy(orderBy) + if ok { + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue())) + } else { + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) + } + } + + // Add default ordering if no orderBy specified + if len(b.operator.Order) == 0 { + sb.OrderBy("__result_0 DESC") + } + + combinedArgs := append(allGroupByArgs, allAggChArgs...) + + // Add HAVING clause if specified + b.addHavingClause(sb) + + sql, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...) + return &qbtypes.Statement{ + Query: sql, + Args: args, + }, nil +} + +func (b *traceOperatorCTEBuilder) addHavingClause(sb *sqlbuilder.SelectBuilder) { + if b.operator.Having != nil && b.operator.Having.Expression != "" { + rewriter := querybuilder.NewHavingExpressionRewriter() + rewrittenExpr := rewriter.RewriteForTraces(b.operator.Having.Expression, b.operator.Aggregations) + sb.Having(rewrittenExpr) + } +} + +func (b *traceOperatorCTEBuilder) addCTE(name, sql string, args []any, dependsOn []string) { + b.ctes = append(b.ctes, cteNode{ + name: name, + sql: sql, + args: args, + dependsOn: dependsOn, + }) + b.cteNameToIndex[name] = len(b.ctes) - 1 +} + +func (b *traceOperatorCTEBuilder) aggOrderBy(k qbtypes.OrderBy) (int, bool) { + for i, agg := range b.operator.Aggregations { + if k.Key.Name == agg.Alias || + k.Key.Name == agg.Expression || + k.Key.Name == fmt.Sprintf("__result_%d", i) { + return i, true + } + } + return 0, false +} diff --git a/pkg/telemetrytraces/trace_operator_cte_builder_test.go b/pkg/telemetrytraces/trace_operator_cte_builder_test.go new file mode 100644 index 000000000000..420f2c10fe2c --- /dev/null +++ b/pkg/telemetrytraces/trace_operator_cte_builder_test.go @@ -0,0 +1,551 @@ +package telemetrytraces + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func TestTraceOperatorStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + operator qbtypes.QueryBuilderTraceOperator + compositeQuery *qbtypes.CompositeQuery + expected qbtypes.Statement + expectedErr error + }{ + { + name: "simple direct descendant operator", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A => B", + SelectFields: []telemetrytypes.TelemetryFieldKey{ + { + Name: "service.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + { + Name: "name", + FieldContext: telemetrytypes.FieldContextSpan, + FieldDataType: telemetrytypes.FieldDataTypeString, + }, + }, + Limit: 10, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, resources_string['service.name'] AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10}, + }, + expectedErr: nil, + }, + { + name: "indirect descendant operator", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A -> B", + Limit: 5, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'gateway'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'database'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_INDIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 5}, + }, + expectedErr: nil, + }, + { + name: "AND operator", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A && B", + Limit: 15, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_AND_B AS (SELECT l.* FROM A AS l INNER JOIN B AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_AND_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 15}, + }, + expectedErr: nil, + }, + { + name: "OR operator", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A || B", + Limit: 20, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_OR_B AS (SELECT * FROM A UNION DISTINCT SELECT * FROM B) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_OR_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 20}, + }, + expectedErr: nil, + }, + { + name: "NOT operator", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A NOT B", + Limit: 10, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_not_B AS (SELECT l.* FROM A AS l WHERE l.trace_id GLOBAL NOT IN (SELECT DISTINCT trace_id FROM B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_not_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10}, + }, + expectedErr: nil, + }, + { + name: "time series query with aggregations", + requestType: qbtypes.RequestTypeTimeSeries, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A => B", + StepInterval: qbtypes.Step{Duration: 60 * time.Second}, + Aggregations: []qbtypes.TraceAggregation{ + { + Expression: "count()", + }, + }, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT toStartOfInterval(timestamp, INTERVAL 60 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM A_DIR_DESC_B GROUP BY ts, `service.name` ORDER BY ts desc SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), true}, + }, + expectedErr: nil, + }, + { + name: "scalar query with aggregation and group by", + requestType: qbtypes.RequestTypeScalar, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A && B", + Aggregations: []qbtypes.TraceAggregation{ + { + Expression: "avg(duration_nano)", + }, + }, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + Order: []qbtypes.OrderBy{ + { + Key: qbtypes.OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "__result_0", + }, + }, + Direction: qbtypes.OrderDirectionDesc, + }, + }, + Limit: 10, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "response_status_code < 400", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND toFloat64(response_status_code) < ?), A_AND_B AS (SELECT l.* FROM A AS l INNER JOIN B AS r ON l.trace_id = r.trace_id) SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, avg(multiIf(duration_nano <> ?, duration_nano, NULL)) AS __result_0 FROM A_AND_B GROUP BY `service.name` ORDER BY __result_0 desc SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), float64(400), true, 0}, + }, + expectedErr: nil, + }, + { + name: "complex nested expression", + requestType: qbtypes.RequestTypeRaw, + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "(A => B) && (C => D)", + Limit: 5, + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'frontend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'backend'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "C", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'auth'", + }, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "D", + Signal: telemetrytypes.SignalTraces, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'database'", + }, + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), __resource_filter_D AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), D AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_D) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND true), C_DIR_DESC_D AS (SELECT p.* FROM C AS p INNER JOIN D AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id), A_DIR_DESC_B_AND_C_DIR_DESC_D AS (SELECT l.* FROM A_DIR_DESC_B AS l INNER JOIN C_DIR_DESC_D AS r ON l.trace_id = r.trace_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM A_DIR_DESC_B_AND_C_DIR_DESC_D ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000", + Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 5}, + }, + expectedErr: nil, + }, + } + + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil) + + resourceFilterStmtBuilder := resourceFilterStmtBuilder() + traceStmtBuilder := NewTraceQueryStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + resourceFilterStmtBuilder, + aggExprRewriter, + nil, + ) + + statementBuilder := NewTraceOperatorStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + traceStmtBuilder, + resourceFilterStmtBuilder, + aggExprRewriter, + ) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Parse the operator expression + err := c.operator.ParseExpression() + require.NoError(t, err) + + q, err := statementBuilder.Build( + context.Background(), + 1747947419000, + 1747983448000, + c.requestType, + c.operator, + c.compositeQuery, + ) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} + +func TestTraceOperatorStatementBuilderErrors(t *testing.T) { + cases := []struct { + name string + operator qbtypes.QueryBuilderTraceOperator + compositeQuery *qbtypes.CompositeQuery + expectedErr string + }{ + { + name: "missing referenced query", + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A => B", + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + }, + }, + // Missing query B + }, + }, + expectedErr: "referenced query 'B' not found", + }, + { + name: "nil composite query", + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A => B", + }, + compositeQuery: nil, + expectedErr: "compositeQuery cannot be nil", + }, + { + name: "unsupported operator", + operator: qbtypes.QueryBuilderTraceOperator{ + Expression: "A XOR B", // Assuming XOR is not supported + }, + compositeQuery: &qbtypes.CompositeQuery{ + Queries: []qbtypes.QueryEnvelope{ + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "A", + Signal: telemetrytypes.SignalTraces, + }, + }, + { + Type: qbtypes.QueryTypeBuilder, + Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{ + Name: "B", + Signal: telemetrytypes.SignalTraces, + }, + }, + }, + }, + expectedErr: "invalid query reference 'A XOR B'", + }, + } + + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil) + + resourceFilterStmtBuilder := resourceFilterStmtBuilder() + traceStmtBuilder := NewTraceQueryStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + resourceFilterStmtBuilder, + aggExprRewriter, + nil, + ) + + statementBuilder := NewTraceOperatorStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + traceStmtBuilder, + resourceFilterStmtBuilder, + aggExprRewriter, + ) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Parse the operator expression + err := c.operator.ParseExpression() + if err == nil { // Only proceed if parsing succeeded + _, err = statementBuilder.Build( + context.Background(), + 1747947419000, + 1747983448000, + qbtypes.RequestTypeRaw, + c.operator, + c.compositeQuery, + ) + } + + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr) + }) + } +} diff --git a/pkg/telemetrytraces/trace_operator_statement_builder.go b/pkg/telemetrytraces/trace_operator_statement_builder.go new file mode 100644 index 000000000000..6ee389d995da --- /dev/null +++ b/pkg/telemetrytraces/trace_operator_statement_builder.go @@ -0,0 +1,95 @@ +package telemetrytraces + +import ( + "context" + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "log/slog" +) + +type traceOperatorStatementBuilder struct { + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] + aggExprRewriter qbtypes.AggExprRewriter +} + +var _ qbtypes.TraceOperatorStatementBuilder = (*traceOperatorStatementBuilder)(nil) + +func NewTraceOperatorStatementBuilder( + settings factory.ProviderSettings, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, + traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], + resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], + aggExprRewriter qbtypes.AggExprRewriter, +) *traceOperatorStatementBuilder { + tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces") + return &traceOperatorStatementBuilder{ + logger: tracesSettings.Logger(), + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + traceStmtBuilder: traceStmtBuilder, + resourceFilterStmtBuilder: resourceFilterStmtBuilder, + aggExprRewriter: aggExprRewriter, + } +} + +// Build builds a SQL query based on the given parameters. +func (b *traceOperatorStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + requestType qbtypes.RequestType, + query qbtypes.QueryBuilderTraceOperator, + compositeQuery *qbtypes.CompositeQuery, +) (*qbtypes.Statement, error) { + + start = querybuilder.ToNanoSecs(start) + end = querybuilder.ToNanoSecs(end) + + // Parse the expression if not already parsed + if query.ParsedExpression == nil { + if err := query.ParseExpression(); err != nil { + return nil, err + } + } + + // Validate compositeQuery parameter + if compositeQuery == nil { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "compositeQuery cannot be nil") + } + + b.logger.DebugContext(ctx, "Building trace operator query", + "expression", query.Expression, + "request_type", requestType) + + // Build the CTE-based query + builder := &traceOperatorCTEBuilder{ + start: start, + end: end, + operator: &query, + stmtBuilder: b, + queries: make(map[string]*qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]), + ctes: []cteNode{}, // Use slice to maintain order + cteNameToIndex: make(map[string]int), + queryToCTEName: make(map[string]string), + compositeQuery: compositeQuery, // Now passed as explicit parameter + } + + // Collect all referenced queries + if err := builder.collectQueries(); err != nil { + return nil, err + } + + // Build the query + return builder.build(ctx, requestType) +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/qb.go b/pkg/types/querybuildertypes/querybuildertypesv5/qb.go index de709787e59a..786f9980689c 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/qb.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/qb.go @@ -52,3 +52,8 @@ type StatementBuilder[T any] interface { // Build builds the query. Build(ctx context.Context, start, end uint64, requestType RequestType, query QueryBuilderQuery[T], variables map[string]VariableItem) (*Statement, error) } + +type TraceOperatorStatementBuilder interface { + // Build builds the trace operator query. + Build(ctx context.Context, start, end uint64, requestType RequestType, query QueryBuilderTraceOperator, compositeQuery *CompositeQuery) (*Statement, error) +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req.go b/pkg/types/querybuildertypes/querybuildertypesv5/req.go index e5fb1986610f..a7a8c60c6f31 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req.go @@ -88,8 +88,8 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { case QueryTypeTraceOperator: var spec QueryBuilderTraceOperator - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid trace operator spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "trace operator spec"); err != nil { + return wrapUnmarshalError(err, "invalid trace operator spec: %v", err) } q.Spec = spec @@ -113,7 +113,7 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { "unknown query type %q", shadow.Type, ).WithAdditional( - "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql", + "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, builder_trace_operator, promql, clickhouse_sql", ) } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go index a70262881170..f3f65c14d82c 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go @@ -132,7 +132,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { "filter": { "expression": "trace_duration > 200ms AND span_count >= 5" }, - "orderBy": [{ + "order": [{ "key": { "name": "trace_duration" }, @@ -231,7 +231,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { "name": "complex_trace_analysis", "expression": "A => (B && NOT C)", "filter": { "expression": "trace_duration BETWEEN 100ms AND 5s AND span_count IN (5, 10, 15)" }, - "orderBy": [{ + "order": [{ "key": { "name": "span_count" }, "direction": "asc" }], @@ -1029,15 +1029,17 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { func TestParseTraceExpression(t *testing.T) { tests := []struct { - name string - expression string - expectError bool - checkResult func(t *testing.T, result *TraceOperand) + name string + expression string + expectError bool + expectedOpCount int + checkResult func(t *testing.T, result *TraceOperand) }{ { - name: "simple query reference", - expression: "A", - expectError: false, + name: "simple query reference", + expression: "A", + expectError: false, + expectedOpCount: 0, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.QueryRef) assert.Equal(t, "A", result.QueryRef.Name) @@ -1045,9 +1047,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "simple implication", - expression: "A => B", - expectError: false, + name: "simple implication", + expression: "A => B", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator) @@ -1058,9 +1061,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "and operation", - expression: "A && B", - expectError: false, + name: "and operation", + expression: "A && B", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorAnd, *result.Operator) @@ -1069,9 +1073,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "or operation", - expression: "A || B", - expectError: false, + name: "or operation", + expression: "A || B", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorOr, *result.Operator) @@ -1080,9 +1085,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "unary NOT operation", - expression: "NOT A", - expectError: false, + name: "unary NOT operation", + expression: "NOT A", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorNot, *result.Operator) @@ -1092,9 +1098,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "binary NOT operation", - expression: "A NOT B", - expectError: false, + name: "binary NOT operation", + expression: "A NOT B", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorExclude, *result.Operator) @@ -1105,9 +1112,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "complex expression with precedence", - expression: "A => B && C || D", - expectError: false, + name: "complex expression with precedence", + expression: "A => B && C || D", + expectError: false, + expectedOpCount: 3, // Three operators: =>, &&, || checkResult: func(t *testing.T, result *TraceOperand) { // Should parse as: A => (B && (C || D)) due to precedence: NOT > || > && > => // The parsing finds operators from lowest precedence first @@ -1121,9 +1129,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "simple parentheses", - expression: "(A)", - expectError: false, + name: "simple parentheses", + expression: "(A)", + expectError: false, + expectedOpCount: 0, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.QueryRef) assert.Equal(t, "A", result.QueryRef.Name) @@ -1131,9 +1140,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "parentheses expression", - expression: "A => (B || C)", - expectError: false, + name: "parentheses expression", + expression: "A => (B || C)", + expectError: false, + expectedOpCount: 2, // Two operators: =>, || checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator) @@ -1147,9 +1157,10 @@ func TestParseTraceExpression(t *testing.T) { }, }, { - name: "nested NOT with parentheses", - expression: "NOT (A && B)", - expectError: false, + name: "nested NOT with parentheses", + expression: "NOT (A && B)", + expectError: false, + expectedOpCount: 2, // Two operators: NOT, && checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorNot, *result.Operator) @@ -1160,6 +1171,13 @@ func TestParseTraceExpression(t *testing.T) { assert.Equal(t, TraceOperatorAnd, *result.Left.Operator) }, }, + { + name: "complex expression exceeding operator limit", + expression: "A => B => C => D => E => F => G => H => I => J => K => L", + expectError: false, // parseTraceExpression doesn't validate count, ParseExpression does + expectedOpCount: 11, // 11 => operators + checkResult: nil, + }, { name: "invalid query reference with numbers", expression: "123", @@ -1175,11 +1193,11 @@ func TestParseTraceExpression(t *testing.T) { expression: "", expectError: true, }, - { - name: "expression with extra whitespace", - expression: " A => B ", - expectError: false, + name: "expression with extra whitespace", + expression: " A => B ", + expectError: false, + expectedOpCount: 1, checkResult: func(t *testing.T, result *TraceOperand) { assert.NotNil(t, result.Operator) assert.Equal(t, TraceOperatorDirectDescendant, *result.Operator) @@ -1191,7 +1209,7 @@ func TestParseTraceExpression(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result, err := parseTraceExpression(tt.expression) + result, opCount, err := parseTraceExpression(tt.expression) if tt.expectError { assert.Error(t, err) @@ -1201,6 +1219,8 @@ func TestParseTraceExpression(t *testing.T) { require.NoError(t, err) require.NotNil(t, result) + assert.Equal(t, tt.expectedOpCount, opCount, "operator count mismatch") + if tt.checkResult != nil { tt.checkResult(t, result) } @@ -1208,6 +1228,63 @@ func TestParseTraceExpression(t *testing.T) { } } +func TestQueryBuilderTraceOperator_ParseExpression_OperatorLimit(t *testing.T) { + tests := []struct { + name string + expression string + expectError bool + errorContains string + }{ + { + name: "within operator limit", + expression: "A => B => C", + expectError: false, + }, + { + name: "exceeding operator limit", + expression: "A => B => C => D => E => F => G => H => I => J => K => L", + expectError: true, + errorContains: "expression contains 11 operators, which exceeds the maximum allowed 10 operators", + }, + { + name: "exactly at limit", + expression: "A => B => C => D => E => F => G => H => I => J => K", + expectError: false, // 10 operators, exactly at limit + }, + { + name: "complex expression at limit", + expression: "(A && B) => (C || D) => (E && F) => (G || H) => (I && J) => K", + expectError: false, // 10 operators: 3 &&, 2 ||, 5 => = 10 total + }, + { + name: "complex expression exceeding limit", + expression: "(A && B) => (C || D) => (E && F) => (G || H) => (I && J) => (K || L)", + expectError: true, + errorContains: "expression contains 11 operators, which exceeds the maximum allowed 10 operators", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := &QueryBuilderTraceOperator{ + Expression: tt.expression, + } + + err := op.ParseExpression() + + if tt.expectError { + assert.Error(t, err) + if tt.errorContains != "" { + assert.Contains(t, err.Error(), tt.errorContains) + } + } else { + assert.NoError(t, err) + assert.NotNil(t, op.ParsedExpression) + } + }) + } +} + func TestQueryBuilderTraceOperator_ValidateTraceOperator(t *testing.T) { tests := []struct { name string diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/trace_operator.go b/pkg/types/querybuildertypes/querybuildertypesv5/trace_operator.go index ada8b6ecee27..32b164bd66d8 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/trace_operator.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/trace_operator.go @@ -29,6 +29,11 @@ var ( OrderByTraceDuration = TraceOrderBy{valuer.NewString("trace_duration")} ) +const ( + // MaxTraceOperators defines the maximum number of operators allowed in a trace expression + MaxTraceOperators = 10 +) + type QueryBuilderTraceOperator struct { Name string `json:"name"` Disabled bool `json:"disabled,omitempty"` @@ -41,15 +46,21 @@ type QueryBuilderTraceOperator struct { ReturnSpansFrom string `json:"returnSpansFrom,omitempty"` // Trace-specific ordering (only span_count and trace_duration allowed) - Order []OrderBy `json:"orderBy,omitempty"` + Order []OrderBy `json:"order,omitempty"` Aggregations []TraceAggregation `json:"aggregations,omitempty"` StepInterval Step `json:"stepInterval,omitempty"` GroupBy []GroupByKey `json:"groupBy,omitempty"` + // having clause to apply to the aggregated query results + Having *Having `json:"having,omitempty"` + Limit int `json:"limit,omitempty"` + Offset int `json:"offset,omitempty"` Cursor string `json:"cursor,omitempty"` + Legend string `json:"legend,omitempty"` + // Other post-processing options SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields,omitempty"` Functions []Function `json:"functions,omitempty"` @@ -84,7 +95,7 @@ func (q *QueryBuilderTraceOperator) ParseExpression() error { ) } - parsed, err := parseTraceExpression(q.Expression) + parsed, operatorCount, err := parseTraceExpression(q.Expression) if err != nil { return errors.WrapInvalidInputf( err, @@ -94,13 +105,24 @@ func (q *QueryBuilderTraceOperator) ParseExpression() error { ) } + // Validate operator count immediately during parsing + if operatorCount > MaxTraceOperators { + return errors.WrapInvalidInputf( + nil, + errors.CodeInvalidInput, + "expression contains %d operators, which exceeds the maximum allowed %d operators", + operatorCount, + MaxTraceOperators, + ) + } + q.ParsedExpression = parsed return nil } // ValidateTraceOperator validates that all referenced queries exist and are trace queries func (q *QueryBuilderTraceOperator) ValidateTraceOperator(queries []QueryEnvelope) error { - // Parse the expression + // Parse the expression - this now includes operator count validation if err := q.ParseExpression(); err != nil { return err } @@ -131,7 +153,7 @@ func (q *QueryBuilderTraceOperator) ValidateTraceOperator(queries []QueryEnvelop } // Get all query names referenced in the expression - referencedQueries := q.collectReferencedQueries(q.ParsedExpression) + referencedQueries := q.CollectReferencedQueries(q.ParsedExpression) // Validate that all referenced queries exist and are trace queries for _, queryName := range referencedQueries { @@ -157,6 +179,15 @@ func (q *QueryBuilderTraceOperator) ValidateTraceOperator(queries []QueryEnvelop } } + if q.StepInterval.Seconds() < 0 { + return errors.WrapInvalidInputf( + nil, + errors.CodeInvalidInput, + "stepInterval cannot be negative, got %f seconds", + q.StepInterval.Seconds(), + ) + } + // Validate ReturnSpansFrom if specified if q.ReturnSpansFrom != "" { if _, exists := availableQueries[q.ReturnSpansFrom]; !exists { @@ -234,6 +265,15 @@ func (q *QueryBuilderTraceOperator) ValidatePagination() error { ) } + if q.Offset < 0 { + return errors.WrapInvalidInputf( + nil, + errors.CodeInvalidInput, + "offset must be non-negative, got %d", + q.Offset, + ) + } + // For production use, you might want to enforce maximum limits if q.Limit > 10000 { return errors.WrapInvalidInputf( @@ -247,8 +287,8 @@ func (q *QueryBuilderTraceOperator) ValidatePagination() error { return nil } -// collectReferencedQueries collects all query names referenced in the expression tree -func (q *QueryBuilderTraceOperator) collectReferencedQueries(operand *TraceOperand) []string { +// CollectReferencedQueries collects all query names referenced in the expression tree +func (q *QueryBuilderTraceOperator) CollectReferencedQueries(operand *TraceOperand) []string { if operand == nil { return nil } @@ -260,8 +300,8 @@ func (q *QueryBuilderTraceOperator) collectReferencedQueries(operand *TraceOpera } // Recursively collect from children - queries = append(queries, q.collectReferencedQueries(operand.Left)...) - queries = append(queries, q.collectReferencedQueries(operand.Right)...) + queries = append(queries, q.CollectReferencedQueries(operand.Left)...) + queries = append(queries, q.CollectReferencedQueries(operand.Right)...) // Remove duplicates seen := make(map[string]bool) @@ -276,6 +316,56 @@ func (q *QueryBuilderTraceOperator) collectReferencedQueries(operand *TraceOpera return unique } +// Copy creates a deep copy of QueryBuilderTraceOperator +func (q QueryBuilderTraceOperator) Copy() QueryBuilderTraceOperator { + // Start with a shallow copy + c := q + + if q.Filter != nil { + c.Filter = q.Filter.Copy() + } + + if q.Order != nil { + c.Order = make([]OrderBy, len(q.Order)) + for i, o := range q.Order { + c.Order[i] = o.Copy() + } + } + + if q.Aggregations != nil { + c.Aggregations = make([]TraceAggregation, len(q.Aggregations)) + copy(c.Aggregations, q.Aggregations) + } + + if q.GroupBy != nil { + c.GroupBy = make([]GroupByKey, len(q.GroupBy)) + for i, gb := range q.GroupBy { + c.GroupBy[i] = gb.Copy() + } + } + + if q.Having != nil { + c.Having = q.Having.Copy() + } + + if q.SelectFields != nil { + c.SelectFields = make([]telemetrytypes.TelemetryFieldKey, len(q.SelectFields)) + copy(c.SelectFields, q.SelectFields) + } + + if q.Functions != nil { + c.Functions = make([]Function, len(q.Functions)) + for i, f := range q.Functions { + c.Functions[i] = f.Copy() + } + } + + // Note: ParsedExpression is not copied as it's internal and will be re-parsed when needed + c.ParsedExpression = nil + + return c +} + // ValidateUniqueTraceOperator ensures only one trace operator exists in queries func ValidateUniqueTraceOperator(queries []QueryEnvelope) error { traceOperatorCount := 0 @@ -304,9 +394,8 @@ func ValidateUniqueTraceOperator(queries []QueryEnvelope) error { return nil } -// parseTraceExpression parses an expression string into a tree structure // Handles precedence: NOT (highest) > || > && > => (lowest) -func parseTraceExpression(expr string) (*TraceOperand, error) { +func parseTraceExpression(expr string) (*TraceOperand, int, error) { expr = strings.TrimSpace(expr) // Handle parentheses @@ -319,40 +408,42 @@ func parseTraceExpression(expr string) (*TraceOperand, error) { // Handle unary NOT operator (prefix) if strings.HasPrefix(expr, "NOT ") { - operand, err := parseTraceExpression(expr[4:]) + operand, count, err := parseTraceExpression(expr[4:]) if err != nil { - return nil, err + return nil, 0, err } notOp := TraceOperatorNot return &TraceOperand{ Operator: ¬Op, Left: operand, - }, nil + }, count + 1, nil // Add 1 for this NOT operator } // Find binary operators with lowest precedence first (=> has lowest precedence) // Order: => (lowest) < && < || < NOT (highest) - operators := []string{"=>", "&&", "||", " NOT "} + operators := []string{"->", "=>", "&&", "||", " NOT "} for _, op := range operators { if pos := findOperatorPosition(expr, op); pos != -1 { leftExpr := strings.TrimSpace(expr[:pos]) rightExpr := strings.TrimSpace(expr[pos+len(op):]) - left, err := parseTraceExpression(leftExpr) + left, leftCount, err := parseTraceExpression(leftExpr) if err != nil { - return nil, err + return nil, 0, err } - right, err := parseTraceExpression(rightExpr) + right, rightCount, err := parseTraceExpression(rightExpr) if err != nil { - return nil, err + return nil, 0, err } var opType TraceOperatorType switch strings.TrimSpace(op) { case "=>": opType = TraceOperatorDirectDescendant + case "->": + opType = TraceOperatorIndirectDescendant case "&&": opType = TraceOperatorAnd case "||": @@ -365,13 +456,13 @@ func parseTraceExpression(expr string) (*TraceOperand, error) { Operator: &opType, Left: left, Right: right, - }, nil + }, leftCount + rightCount + 1, nil // Add counts from both sides + 1 for this operator } } // If no operators found, this should be a query reference if matched, _ := regexp.MatchString(`^[A-Za-z][A-Za-z0-9_]*$`, expr); !matched { - return nil, errors.WrapInvalidInputf( + return nil, 0, errors.WrapInvalidInputf( nil, errors.CodeInvalidInput, "invalid query reference '%s'", @@ -379,9 +470,10 @@ func parseTraceExpression(expr string) (*TraceOperand, error) { ) } + // Leaf node - no operators return &TraceOperand{ QueryRef: &TraceOperatorQueryRef{Name: expr}, - }, nil + }, 0, nil } // isBalancedParentheses checks if parentheses are balanced in the expression diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go index 5e5b3eea2add..3d1a84de61c9 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go @@ -36,6 +36,11 @@ func getQueryIdentifier(envelope QueryEnvelope, index int) string { return fmt.Sprintf("formula '%s'", spec.Name) } return fmt.Sprintf("formula at position %d", index+1) + case QueryTypeTraceOperator: + if spec, ok := envelope.Spec.(QueryBuilderTraceOperator); ok && spec.Name != "" { + return fmt.Sprintf("trace operator '%s'", spec.Name) + } + return fmt.Sprintf("trace operator at position %d", index+1) case QueryTypeJoin: if spec, ok := envelope.Spec.(QueryBuilderJoin); ok && spec.Name != "" { return fmt.Sprintf("join '%s'", spec.Name) @@ -583,6 +588,24 @@ func (r *QueryRangeRequest) validateCompositeQuery() error { queryId, ) } + case QueryTypeTraceOperator: + spec, ok := envelope.Spec.(QueryBuilderTraceOperator) + if !ok { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid spec for %s", + queryId, + ) + } + if spec.Expression == "" { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "expression is required for %s", + queryId, + ) + } case QueryTypePromQL: // PromQL validation is handled separately spec, ok := envelope.Spec.(PromQuery) @@ -629,7 +652,7 @@ func (r *QueryRangeRequest) validateCompositeQuery() error { envelope.Type, queryId, ).WithAdditional( - "Valid query types are: builder_query, builder_formula, builder_join, promql, clickhouse_sql", + "Valid query types are: builder_query, builder_formula, builder_join, promql, clickhouse_sql, trace_operator", ) } } @@ -697,6 +720,21 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro ) } return nil + case QueryTypeTraceOperator: + spec, ok := envelope.Spec.(QueryBuilderTraceOperator) + if !ok { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid trace operator spec", + ) + } + if spec.Expression == "" { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "trace operator expression is required", + ) + } + return nil case QueryTypePromQL: spec, ok := envelope.Spec.(PromQuery) if !ok { @@ -733,7 +771,7 @@ func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) erro "unknown query type: %s", envelope.Type, ).WithAdditional( - "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql", + "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql, trace_operator", ) } }