chore: make queries compatible with 24.1 and fix string json query (#8391)

This commit is contained in:
Srikanth Chekuri 2025-07-02 10:39:16 +05:30 committed by GitHub
parent 9daefeb881
commit 2a53918ebd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 471 additions and 106 deletions

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@ -205,6 +206,10 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...) rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil { if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "Query timed out").
WithAdditional("Try refining your search by adding relevant resource attributes filtering")
}
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()

View File

@ -89,6 +89,9 @@ func newProvider(
resourceFilterFieldMapper, resourceFilterFieldMapper,
resourceFilterConditionBuilder, resourceFilterConditionBuilder,
telemetryMetadataStore, telemetryMetadataStore,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
) )
logAggExprRewriter := querybuilder.NewAggExprRewriter( logAggExprRewriter := querybuilder.NewAggExprRewriter(
telemetrylogs.DefaultFullTextColumn, telemetrylogs.DefaultFullTextColumn,

View File

@ -94,3 +94,11 @@ func CollisionHandledFinalExpr(
return multiIfStmt, allArgs, nil return multiIfStmt, allArgs, nil
} }
func GroupByKeys(keys []qbtypes.GroupByKey) []string {
k := []string{}
for _, key := range keys {
k = append(k, "`"+key.Name+"`")
}
return k
}

View File

@ -38,6 +38,10 @@ type resourceFilterStatementBuilder[T any] struct {
conditionBuilder qbtypes.ConditionBuilder conditionBuilder qbtypes.ConditionBuilder
metadataStore telemetrytypes.MetadataStore metadataStore telemetrytypes.MetadataStore
signal telemetrytypes.Signal signal telemetrytypes.Signal
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
} }
// Ensure interface compliance at compile time // Ensure interface compliance at compile time
@ -64,12 +68,18 @@ func NewLogResourceFilterStatementBuilder(
fieldMapper qbtypes.FieldMapper, fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder, conditionBuilder qbtypes.ConditionBuilder,
metadataStore telemetrytypes.MetadataStore, metadataStore telemetrytypes.MetadataStore,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *resourceFilterStatementBuilder[qbtypes.LogAggregation] { ) *resourceFilterStatementBuilder[qbtypes.LogAggregation] {
return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{ return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{
fieldMapper: fieldMapper, fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder, conditionBuilder: conditionBuilder,
metadataStore: metadataStore, metadataStore: metadataStore,
signal: telemetrytypes.SignalLogs, signal: telemetrytypes.SignalLogs,
fullTextColumn: fullTextColumn,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
} }
} }
@ -140,7 +150,11 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
FieldMapper: b.fieldMapper, FieldMapper: b.fieldMapper,
ConditionBuilder: b.conditionBuilder, ConditionBuilder: b.conditionBuilder,
FieldKeys: keys, FieldKeys: keys,
FullTextColumn: b.fullTextColumn,
JsonBodyPrefix: b.jsonBodyPrefix,
JsonKeyToKey: b.jsonKeyToKey,
SkipFullTextFilter: true, SkipFullTextFilter: true,
SkipFunctionCalls: true,
Variables: variables, Variables: variables,
}) })

View File

@ -29,7 +29,10 @@ type filterExpressionVisitor struct {
jsonKeyToKey qbtypes.JsonKeyToFieldFunc jsonKeyToKey qbtypes.JsonKeyToFieldFunc
skipResourceFilter bool skipResourceFilter bool
skipFullTextFilter bool skipFullTextFilter bool
skipFunctionCalls bool
variables map[string]qbtypes.VariableItem variables map[string]qbtypes.VariableItem
keysWithWarnings map[string]bool
} }
type FilterExprVisitorOpts struct { type FilterExprVisitorOpts struct {
@ -42,6 +45,7 @@ type FilterExprVisitorOpts struct {
JsonKeyToKey qbtypes.JsonKeyToFieldFunc JsonKeyToKey qbtypes.JsonKeyToFieldFunc
SkipResourceFilter bool SkipResourceFilter bool
SkipFullTextFilter bool SkipFullTextFilter bool
SkipFunctionCalls bool
Variables map[string]qbtypes.VariableItem Variables map[string]qbtypes.VariableItem
} }
@ -57,7 +61,9 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
jsonKeyToKey: opts.JsonKeyToKey, jsonKeyToKey: opts.JsonKeyToKey,
skipResourceFilter: opts.SkipResourceFilter, skipResourceFilter: opts.SkipResourceFilter,
skipFullTextFilter: opts.SkipFullTextFilter, skipFullTextFilter: opts.SkipFullTextFilter,
skipFunctionCalls: opts.SkipFunctionCalls,
variables: opts.Variables, variables: opts.Variables,
keysWithWarnings: make(map[string]bool),
} }
} }
@ -547,6 +553,10 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
// VisitFunctionCall handles function calls like has(), hasAny(), etc. // VisitFunctionCall handles function calls like has(), hasAny(), etc.
func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallContext) any { func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallContext) any {
if v.skipFunctionCalls {
return "true"
}
// Get function name based on which token is present // Get function name based on which token is present
var functionName string var functionName string
if ctx.HAS() != nil { if ctx.HAS() != nil {
@ -690,7 +700,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
} }
} }
if len(fieldKeysForName) > 1 { if len(fieldKeysForName) > 1 && !v.keysWithWarnings[keyName] {
// this is warning state, we must have a unambiguous key // this is warning state, we must have a unambiguous key
v.warnings = append(v.warnings, fmt.Sprintf( v.warnings = append(v.warnings, fmt.Sprintf(
"key `%s` is ambiguous, found %d different combinations of field context and data type: %v", "key `%s` is ambiguous, found %d different combinations of field context and data type: %v",
@ -698,6 +708,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
len(fieldKeysForName), len(fieldKeysForName),
fieldKeysForName, fieldKeysForName,
)) ))
v.keysWithWarnings[keyName] = true
} }
return fieldKeysForName return fieldKeysForName

View File

@ -9,6 +9,7 @@ import (
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator" schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
"github.com/huandu/go-sqlbuilder" "github.com/huandu/go-sqlbuilder"
) )
@ -148,7 +149,7 @@ func (c *conditionBuilder) conditionFor(
} }
// if the field is intrinsic, it always exists // if the field is intrinsic, it always exists
if slices.Contains(IntrinsicFields, key.Name) { if slices.Contains(maps.Keys(IntrinsicFields), key.Name) {
return "true", nil return "true", nil
} }
@ -210,7 +211,7 @@ func (c *conditionBuilder) ConditionFor(
// skip adding exists filter for intrinsic fields // skip adding exists filter for intrinsic fields
// with an exception for body json search // with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key) field, _ := c.fm.FieldFor(ctx, key)
if slices.Contains(IntrinsicFields, field) && !strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) { if slices.Contains(maps.Keys(IntrinsicFields), field) && !strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
return condition, nil return condition, nil
} }

View File

@ -337,7 +337,7 @@ func TestConditionForJSONBodySearch(t *testing.T) {
}, },
operator: qbtypes.FilterOperatorEqual, operator: qbtypes.FilterOperatorEqual,
value: "GET", value: "GET",
expectedSQL: `JSONExtract(JSON_VALUE(body, '$."http"."method"'), 'String') = ?`, expectedSQL: `JSON_VALUE(body, '$."http"."method"') = ?`,
expectedError: nil, expectedError: nil,
}, },
{ {
@ -417,7 +417,7 @@ func TestConditionForJSONBodySearch(t *testing.T) {
}, },
operator: qbtypes.FilterOperatorContains, operator: qbtypes.FilterOperatorContains,
value: "200", value: "200",
expectedSQL: `LOWER(JSONExtract(JSON_VALUE(body, '$."http"."status_code"'), 'String')) LIKE LOWER(?)`, expectedSQL: `LOWER(JSON_VALUE(body, '$."http"."status_code"')) LIKE LOWER(?)`,
expectedError: nil, expectedError: nil,
}, },
{ {
@ -427,7 +427,7 @@ func TestConditionForJSONBodySearch(t *testing.T) {
}, },
operator: qbtypes.FilterOperatorNotContains, operator: qbtypes.FilterOperatorNotContains,
value: "200", value: "200",
expectedSQL: `LOWER(JSONExtract(JSON_VALUE(body, '$."http"."status_code"'), 'String')) NOT LIKE LOWER(?)`, expectedSQL: `LOWER(JSON_VALUE(body, '$."http"."status_code"')) NOT LIKE LOWER(?)`,
expectedError: nil, expectedError: nil,
}, },
{ {

View File

@ -10,7 +10,57 @@ var (
FieldDataType: telemetrytypes.FieldDataTypeString, FieldDataType: telemetrytypes.FieldDataTypeString,
} }
BodyJSONStringSearchPrefix = `body.` BodyJSONStringSearchPrefix = `body.`
IntrinsicFields = []string{ IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"body", "trace_id", "span_id", "trace_flags", "severity_text", "severity_number", "scope_name", "scope_version", "body": {
Name: "body",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"trace_id": {
Name: "trace_id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"span_id": {
Name: "span_id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"trace_flags": {
Name: "trace_flags",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"severity_text": {
Name: "severity_text",
Description: "Log level. Learn more [here](https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitytext)",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"severity_number": {
Name: "severity_number",
Description: "Numerical value of the severity. Learn more [here](https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber)",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"scope_name": {
Name: "scope_name",
Description: "Logger name. Learn more about instrumentation scope [here](https://opentelemetry.io/docs/concepts/instrumentation-scope/)",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"scope_version": {
Name: "scope_version",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
} }
) )

View File

@ -73,7 +73,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
category: "json", category: "json",
query: "body.message = hello", query: "body.message = hello",
shouldPass: true, shouldPass: true,
expectedQuery: `WHERE (JSONExtract(JSON_VALUE(body, '$."message"'), 'String') = ? AND JSON_EXISTS(body, '$."message"'))`, expectedQuery: `WHERE (JSON_VALUE(body, '$."message"') = ? AND JSON_EXISTS(body, '$."message"'))`,
expectedArgs: []any{"hello"}, expectedArgs: []any{"hello"},
expectedErrorContains: "", expectedErrorContains: "",
}, },
@ -113,7 +113,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
category: "json", category: "json",
query: "body.message REGEXP 'a*'", query: "body.message REGEXP 'a*'",
shouldPass: true, shouldPass: true,
expectedQuery: `WHERE (match(JSONExtract(JSON_VALUE(body, '$."message"'), 'String'), ?) AND JSON_EXISTS(body, '$."message"'))`, expectedQuery: `WHERE (match(JSON_VALUE(body, '$."message"'), ?) AND JSON_EXISTS(body, '$."message"'))`,
expectedArgs: []any{"a*"}, expectedArgs: []any{"a*"},
expectedErrorContains: "", expectedErrorContains: "",
}, },
@ -121,7 +121,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
category: "json", category: "json",
query: `body.message CONTAINS "hello 'world'"`, query: `body.message CONTAINS "hello 'world'"`,
shouldPass: true, shouldPass: true,
expectedQuery: `WHERE (LOWER(JSONExtract(JSON_VALUE(body, '$."message"'), 'String')) LIKE LOWER(?) AND JSON_EXISTS(body, '$."message"'))`, expectedQuery: `WHERE (LOWER(JSON_VALUE(body, '$."message"')) LIKE LOWER(?) AND JSON_EXISTS(body, '$."message"'))`,
expectedArgs: []any{"%hello 'world'%"}, expectedArgs: []any{"%hello 'world'%"},
expectedErrorContains: "", expectedErrorContains: "",
}, },
@ -136,7 +136,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
category: "json", category: "json",
query: `body.name IN ('hello', 'world')`, query: `body.name IN ('hello', 'world')`,
shouldPass: true, shouldPass: true,
expectedQuery: `WHERE ((JSONExtract(JSON_VALUE(body, '$."name"'), 'String') = ? OR JSONExtract(JSON_VALUE(body, '$."name"'), 'String') = ?) AND JSON_EXISTS(body, '$."name"'))`, expectedQuery: `WHERE ((JSON_VALUE(body, '$."name"') = ? OR JSON_VALUE(body, '$."name"') = ?) AND JSON_EXISTS(body, '$."name"'))`,
expectedArgs: []any{"hello", "world"}, expectedArgs: []any{"hello", "world"},
expectedErrorContains: "", expectedErrorContains: "",
}, },

View File

@ -61,7 +61,7 @@ func inferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytyp
} }
// check if it is array // check if it is array
if strings.HasSuffix(key.Name, "[*]") { if strings.HasSuffix(key.Name, "[*]") || strings.HasSuffix(key.Name, "[]") {
valueType = telemetrytypes.FieldDataType{String: valuer.NewString(fmt.Sprintf("[]%s", valueType.StringValue()))} valueType = telemetrytypes.FieldDataType{String: valuer.NewString(fmt.Sprintf("[]%s", valueType.StringValue()))}
} }
@ -74,6 +74,8 @@ func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string {
for _, part := range parts { for _, part := range parts {
if strings.HasSuffix(part, "[*]") { if strings.HasSuffix(part, "[*]") {
newParts = append(newParts, fmt.Sprintf(`"%s"[*]`, strings.TrimSuffix(part, "[*]"))) newParts = append(newParts, fmt.Sprintf(`"%s"[*]`, strings.TrimSuffix(part, "[*]")))
} else if strings.HasSuffix(part, "[]") {
newParts = append(newParts, fmt.Sprintf(`"%s"[*]`, strings.TrimSuffix(part, "[]")))
} else { } else {
newParts = append(newParts, fmt.Sprintf(`"%s"`, part)) newParts = append(newParts, fmt.Sprintf(`"%s"`, part))
} }
@ -94,9 +96,13 @@ func GetBodyJSONKey(_ context.Context, key *telemetrytypes.TelemetryFieldKey, op
return fmt.Sprintf("JSONExtract(JSON_QUERY(body, '$.%s'), '%s')", getBodyJSONPath(key), dataType.CHDataType()), value return fmt.Sprintf("JSONExtract(JSON_QUERY(body, '$.%s'), '%s')", getBodyJSONPath(key), dataType.CHDataType()), value
} }
// for all other types, we need to extract the value from the JSON_VALUE if dataType != telemetrytypes.FieldDataTypeString {
// for all types except strings, we need to extract the value from the JSON_VALUE
return fmt.Sprintf("JSONExtract(JSON_VALUE(body, '$.%s'), '%s')", getBodyJSONPath(key), dataType.CHDataType()), value return fmt.Sprintf("JSONExtract(JSON_VALUE(body, '$.%s'), '%s')", getBodyJSONPath(key), dataType.CHDataType()), value
} }
// for string types, we should compare with the JSON_VALUE
return fmt.Sprintf("JSON_VALUE(body, '$.%s')", getBodyJSONPath(key)), value
}
func GetBodyJSONKeyForExists(_ context.Context, key *telemetrytypes.TelemetryFieldKey, _ qbtypes.FilterOperator, _ any) string { func GetBodyJSONKeyForExists(_ context.Context, key *telemetrytypes.TelemetryFieldKey, _ qbtypes.FilterOperator, _ any) string {
return fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getBodyJSONPath(key)) return fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getBodyJSONPath(key))

View File

@ -270,10 +270,11 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
// Constrain the main query to the rows that appear in the CTE. // Constrain the main query to the rows that appear in the CTE.
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", ")) tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
sb.Where(fmt.Sprintf("%s IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", "))) sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
// Group by all dimensions // Group by all dimensions
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" { if query.Having != nil && query.Having.Expression != "" {
// Rewrite having expression to use SQL column names // Rewrite having expression to use SQL column names
rewriter := querybuilder.NewHavingExpressionRewriter() rewriter := querybuilder.NewHavingExpressionRewriter()
@ -290,7 +291,8 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
} else { } else {
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" { if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter() rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations) rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
@ -380,7 +382,7 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
} }
// Group by dimensions // Group by dimensions
sb.GroupBy("ALL") sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
// Add having clause if needed // Add having clause if needed
if query.Having != nil && query.Having.Expression != "" { if query.Having != nil && query.Having.Expression != "" {
@ -492,7 +494,7 @@ func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
return "", nil, err return "", nil, err
} }
sb.Where("resource_fingerprint IN (SELECT fingerprint FROM __resource_filter)") sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
} }

View File

@ -30,6 +30,9 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
fm, fm,
cb, cb,
mockMetadataStore, mockMetadataStore,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
) )
} }
@ -65,7 +68,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
}, },
expectedErr: nil, expectedErr: nil,
@ -104,7 +107,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
}, },
expectedErr: nil, expectedErr: nil,

View File

@ -16,6 +16,7 @@ import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder" "github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
) )
var ( var (
@ -208,6 +209,7 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
} }
keys = append(keys, key) keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
} }
if rows.Err() != nil { if rows.Err() != nil {
@ -215,8 +217,8 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
} }
staticKeys := []string{"isRoot", "isEntrypoint"} staticKeys := []string{"isRoot", "isEntrypoint"}
staticKeys = append(staticKeys, telemetrytraces.IntrinsicFields...) staticKeys = append(staticKeys, maps.Keys(telemetrytraces.IntrinsicFields)...)
staticKeys = append(staticKeys, telemetrytraces.CalculatedFields...) staticKeys = append(staticKeys, maps.Keys(telemetrytraces.CalculatedFields)...)
// add matching intrinsic and matching calculated fields // add matching intrinsic and matching calculated fields
for _, key := range staticKeys { for _, key := range staticKeys {
@ -228,6 +230,19 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
} }
} }
if found { if found {
if field, exists := telemetrytraces.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
if field, exists := telemetrytraces.CalculatedFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{ keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: key, Name: key,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
@ -361,6 +376,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
} }
keys = append(keys, key) keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
} }
if rows.Err() != nil { if rows.Err() != nil {
@ -368,7 +384,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
} }
staticKeys := []string{} staticKeys := []string{}
staticKeys = append(staticKeys, telemetrylogs.IntrinsicFields...) staticKeys = append(staticKeys, maps.Keys(telemetrylogs.IntrinsicFields)...)
// add matching intrinsic and matching calculated fields // add matching intrinsic and matching calculated fields
for _, key := range staticKeys { for _, key := range staticKeys {
@ -380,6 +396,13 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
} }
} }
if found { if found {
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{ keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: key, Name: key,
FieldContext: telemetrytypes.FieldContextLog, FieldContext: telemetrytypes.FieldContextLog,

View File

@ -258,7 +258,8 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.GTE("unix_milli", start), sb.GTE("unix_milli", start),
sb.LT("unix_milli", end), sb.LT("unix_milli", end),
) )
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
@ -320,7 +321,8 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
sb.AddWhereClause(filterWhere) sb.AddWhereClause(filterWhere)
} }
sb.GroupBy("ALL") sb.GroupBy("fingerprint")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
@ -375,7 +377,8 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
sb.GTE("unix_milli", start), sb.GTE("unix_milli", start),
sb.LT("unix_milli", end), sb.LT("unix_milli", end),
) )
sb.GroupBy("ALL") sb.GroupBy("fingerprint", "ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("fingerprint", "ts") sb.OrderBy("fingerprint", "ts")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
@ -412,7 +415,8 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb.GTE("unix_milli", start), baseSb.GTE("unix_milli", start),
baseSb.LT("unix_milli", end), baseSb.LT("unix_milli", end),
) )
baseSb.GroupBy("ALL") baseSb.GroupBy("fingerprint", "ts")
baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
baseSb.OrderBy("fingerprint", "ts") baseSb.OrderBy("fingerprint", "ts")
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...) innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
@ -438,7 +442,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
} }
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr)) wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW increase_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
default: default:
@ -465,7 +469,8 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
if query.Aggregations[0].ValueFilter != nil { if query.Aggregations[0].ValueFilter != nil {
sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value)) sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value))
} }
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args

View File

@ -49,7 +49,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0}, Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0},
}, },
expectedErr: nil, expectedErr: nil,
@ -82,7 +82,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)}, Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)},
}, },
expectedErr: nil, expectedErr: nil,
@ -114,7 +114,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts", Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts",
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)}, Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)},
}, },
expectedErr: nil, expectedErr: nil,
@ -147,7 +147,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0}, Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0},
}, },
expectedErr: nil, expectedErr: nil,
@ -176,7 +176,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts", Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts",
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "http_server_duration_bucket", uint64(1747947419000), uint64(1747983448000), 0}, Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "http_server_duration_bucket", uint64(1747947419000), uint64(1747983448000), 0},
}, },
expectedErr: nil, expectedErr: nil,

View File

@ -11,6 +11,7 @@ import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder" "github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
) )
type conditionBuilder struct { type conditionBuilder struct {
@ -129,10 +130,10 @@ func (c *conditionBuilder) conditionFor(
// key membership checks, so depending on the column type, the condition changes // key membership checks, so depending on the column type, the condition changes
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists: case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
// if the field is intrinsic, it always exists // if the field is intrinsic, it always exists
if slices.Contains(IntrinsicFields, tblFieldName) || if slices.Contains(maps.Keys(IntrinsicFields), tblFieldName) ||
slices.Contains(CalculatedFields, tblFieldName) || slices.Contains(maps.Keys(CalculatedFields), tblFieldName) ||
slices.Contains(IntrinsicFieldsDeprecated, tblFieldName) || slices.Contains(maps.Keys(IntrinsicFieldsDeprecated), tblFieldName) ||
slices.Contains(CalculatedFieldsDeprecated, tblFieldName) { slices.Contains(maps.Keys(CalculatedFieldsDeprecated), tblFieldName) {
return "true", nil return "true", nil
} }
@ -205,10 +206,10 @@ func (c *conditionBuilder) ConditionFor(
if operator.AddDefaultExistsFilter() { if operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields // skip adding exists filter for intrinsic fields
field, _ := c.fm.FieldFor(ctx, key) field, _ := c.fm.FieldFor(ctx, key)
if slices.Contains(IntrinsicFields, field) || if slices.Contains(maps.Keys(IntrinsicFields), field) ||
slices.Contains(IntrinsicFieldsDeprecated, field) || slices.Contains(maps.Keys(IntrinsicFieldsDeprecated), field) ||
slices.Contains(CalculatedFields, field) || slices.Contains(maps.Keys(CalculatedFields), field) ||
slices.Contains(CalculatedFieldsDeprecated, field) { slices.Contains(maps.Keys(CalculatedFieldsDeprecated), field) {
return condition, nil return condition, nil
} }

View File

@ -3,55 +3,273 @@ package telemetrytraces
import "github.com/SigNoz/signoz/pkg/types/telemetrytypes" import "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
var ( var (
IntrinsicFields = []string{ IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"trace_id", "trace_id": {
"span_id", Name: "trace_id",
"trace_state", Signal: telemetrytypes.SignalTraces,
"parent_span_id", FieldContext: telemetrytypes.FieldContextSpan,
"flags", FieldDataType: telemetrytypes.FieldDataTypeString,
"name", },
"kind", "span_id": {
"kind_string", Name: "span_id",
"duration_nano", Signal: telemetrytypes.SignalTraces,
"status_code", FieldContext: telemetrytypes.FieldContextSpan,
"status_message", FieldDataType: telemetrytypes.FieldDataTypeString,
"status_code_string", },
"trace_state": {
Name: "trace_state",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"parent_span_id": {
Name: "parent_span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"flags": {
Name: "flags",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"name": {
Name: "name",
Description: "Name of the span",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"kind": {
Name: "kind",
Description: "Span kind enum (number). Use `kind_string` instead. Learn more [here](https://opentelemetry.io/docs/concepts/signals/traces/#span-kind)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"kind_string": {
Name: "kind_string",
Description: "Span kind enum (string). Known values are ['Client', 'Server', 'Internal', 'Producer', 'Consumer']. Learn more [here](https://opentelemetry.io/docs/concepts/signals/traces/#span-kind)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"duration_nano": {
Name: "duration_nano",
Description: "Span duration",
Unit: "ns",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"status_code": {
Name: "status_code",
Description: "Span status code enum (number). Use `status_code_string` instead. Learn more [here](https://opentelemetry.io/docs/concepts/signals/traces/#span-status)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"status_message": {
Name: "status_message",
Description: "Span status message. Learn more [here](https://opentelemetry.io/docs/concepts/signals/traces/#span-status)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"status_code_string": {
Name: "status_code_string",
Description: "Span status code enum (string). Learn more [here](https://opentelemetry.io/docs/concepts/signals/traces/#span-status)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
} }
IntrinsicFieldsDeprecated = []string{ IntrinsicFieldsDeprecated = map[string]telemetrytypes.TelemetryFieldKey{
"traceID", "traceID": {
"spanID", Name: "traceID",
"parentSpanID", Signal: telemetrytypes.SignalTraces,
"spanKind", FieldContext: telemetrytypes.FieldContextSpan,
"durationNano", FieldDataType: telemetrytypes.FieldDataTypeString,
"statusCode", },
"statusMessage", "spanID": {
"statusCodeString", Name: "spanID",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"parentSpanID": {
Name: "parentSpanID",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"spanKind": {
Name: "spanKind",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"durationNano": {
Name: "durationNano",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"statusCode": {
Name: "statusCode",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"statusMessage": {
Name: "statusMessage",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"statusCodeString": {
Name: "statusCodeString",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
} }
CalculatedFields = []string{ CalculatedFields = map[string]telemetrytypes.TelemetryFieldKey{
"response_status_code", "response_status_code": {
"external_http_url", Name: "response_status_code",
"http_url", Description: "Derived response status code from the HTTP/RPC status code attributes. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#response_status_code)",
"external_http_method", Signal: telemetrytypes.SignalTraces,
"http_method", FieldContext: telemetrytypes.FieldContextSpan,
"http_host", FieldDataType: telemetrytypes.FieldDataTypeNumber,
"db_name", },
"db_operation", "external_http_url": {
"has_error", Name: "external_http_url",
"is_remote", Description: "The hostname of the external HTTP URL. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#external_http_url)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"http_url": {
Name: "http_url",
Description: "HTTP URL of the request. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#http_url)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"external_http_method": {
Name: "external_http_method",
Description: "HTTP request method of client spans. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#external_http_method)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"http_method": {
Name: "http_method",
Description: "The HTTP request method. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#http_method)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"http_host": {
Name: "http_host",
Description: "The HTTP host or server address. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#http_host)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"db_name": {
Name: "db_name",
Description: "The database name or namespace. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#db_name)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"db_operation": {
Name: "db_operation",
Description: "The database operation being performed. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#db_operation)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"has_error": {
Name: "has_error",
Description: "Whether the span has an error. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#has_error)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
"is_remote": {
Name: "is_remote",
Description: "Whether the span is remote. Learn more [here](https://signoz.io/docs/traces-management/guides/derived-fields-spans/#is_remote)",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
} }
CalculatedFieldsDeprecated = []string{ CalculatedFieldsDeprecated = map[string]telemetrytypes.TelemetryFieldKey{
"responseStatusCode", "responseStatusCode": {
"externalHttpUrl", Name: "responseStatusCode",
"httpUrl", Signal: telemetrytypes.SignalTraces,
"externalHttpMethod", FieldContext: telemetrytypes.FieldContextSpan,
"httpMethod", FieldDataType: telemetrytypes.FieldDataTypeNumber,
"httpHost", },
"dbName", "externalHttpUrl": {
"dbOperation", Name: "externalHttpUrl",
"hasError", Signal: telemetrytypes.SignalTraces,
"isRemote", FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"httpUrl": {
Name: "httpUrl",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"externalHttpMethod": {
Name: "externalHttpMethod",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"httpMethod": {
Name: "httpMethod",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"httpHost": {
Name: "httpHost",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"dbName": {
Name: "dbName",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"dbOperation": {
Name: "dbOperation",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
"hasError": {
Name: "hasError",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
"isRemote": {
Name: "isRemote",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
} }
SpanSearchScopeRoot = "isroot" SpanSearchScopeRoot = "isroot"
SpanSearchScopeEntryPoint = "isentrypoint" SpanSearchScopeEntryPoint = "isentrypoint"
@ -59,33 +277,46 @@ var (
DefaultFields = []telemetrytypes.TelemetryFieldKey{ DefaultFields = []telemetrytypes.TelemetryFieldKey{
{ {
Name: "timestamp", Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}, },
{ {
Name: "span_id", Name: "span_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
}, },
{ {
Name: "trace_id", Name: "trace_id",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
}, },
{ {
Name: "name", Name: "name",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
}, },
{ {
Name: "service.name", Name: "service.name",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextResource, FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString, FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true, Materialized: true,
}, },
{ {
Name: "duration_nano", Name: "duration_nano",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}, },
{ {
Name: "response_status_code", Name: "response_status_code",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan, FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeString,
}, },
} }
) )

View File

@ -305,10 +305,11 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
// Constrain the main query to the rows that appear in the CTE. // Constrain the main query to the rows that appear in the CTE.
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", ")) tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
sb.Where(fmt.Sprintf("%s IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", "))) sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
// Group by all dimensions // Group by all dimensions
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" { if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter() rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations) rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations)
@ -323,7 +324,8 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs) finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
} else { } else {
sb.GroupBy("ALL") sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
if query.Having != nil && query.Having.Expression != "" { if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter() rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations) rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations)
@ -412,7 +414,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
} }
// Group by dimensions // Group by dimensions
sb.GroupBy("ALL") sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
// Add having clause if needed // Add having clause if needed
if query.Having != nil && query.Having.Expression != "" && !skipHaving { if query.Having != nil && query.Having.Expression != "" && !skipHaving {
@ -521,7 +523,7 @@ func (b *traceQueryStatementBuilder) maybeAttachResourceFilter(
return "", nil, err return "", nil, err
} }
sb.Where("resource_fingerprint IN (SELECT fingerprint FROM __resource_filter)") sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
} }

View File

@ -59,7 +59,7 @@ func TestStatementBuilder(t *testing.T) {
}, },
}, },
expected: qbtypes.Statement{ expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, Args: []any{"redis-manual", "%service.name%", "%service.name%redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
}, },
expectedErr: nil, expectedErr: nil,