2025-05-25 22:14:47 +05:30
|
|
|
package telemetrytraces
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
2025-05-27 20:54:48 +05:30
|
|
|
"log/slog"
|
2025-07-12 16:47:59 +05:30
|
|
|
"slices"
|
2025-05-25 22:14:47 +05:30
|
|
|
"strings"
|
|
|
|
|
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
2025-06-10 18:26:28 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
2025-05-25 22:14:47 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
2025-06-23 09:39:19 +05:30
|
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
2025-05-25 22:14:47 +05:30
|
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
|
|
|
"github.com/huandu/go-sqlbuilder"
|
2025-07-12 16:47:59 +05:30
|
|
|
"golang.org/x/exp/maps"
|
2025-05-25 22:14:47 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type traceQueryStatementBuilder struct {
|
2025-05-27 20:54:48 +05:30
|
|
|
logger *slog.Logger
|
|
|
|
|
metadataStore telemetrytypes.MetadataStore
|
|
|
|
|
fm qbtypes.FieldMapper
|
|
|
|
|
cb qbtypes.ConditionBuilder
|
|
|
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
|
|
|
|
aggExprRewriter qbtypes.AggExprRewriter
|
2025-06-23 09:39:19 +05:30
|
|
|
telemetryStore telemetrystore.TelemetryStore
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ qbtypes.StatementBuilder[qbtypes.TraceAggregation] = (*traceQueryStatementBuilder)(nil)
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
func NewTraceQueryStatementBuilder(
|
2025-06-10 18:26:28 +05:30
|
|
|
settings factory.ProviderSettings,
|
2025-05-27 20:54:48 +05:30
|
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
|
|
|
fieldMapper qbtypes.FieldMapper,
|
|
|
|
|
conditionBuilder qbtypes.ConditionBuilder,
|
|
|
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
|
|
|
|
|
aggExprRewriter qbtypes.AggExprRewriter,
|
2025-06-23 09:39:19 +05:30
|
|
|
telemetryStore telemetrystore.TelemetryStore,
|
2025-05-27 20:54:48 +05:30
|
|
|
) *traceQueryStatementBuilder {
|
2025-06-10 18:26:28 +05:30
|
|
|
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
|
2025-05-25 22:14:47 +05:30
|
|
|
return &traceQueryStatementBuilder{
|
2025-06-10 18:26:28 +05:30
|
|
|
logger: tracesSettings.Logger(),
|
2025-05-27 20:54:48 +05:30
|
|
|
metadataStore: metadataStore,
|
|
|
|
|
fm: fieldMapper,
|
|
|
|
|
cb: conditionBuilder,
|
|
|
|
|
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
|
|
|
|
|
aggExprRewriter: aggExprRewriter,
|
2025-06-23 09:39:19 +05:30
|
|
|
telemetryStore: telemetryStore,
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build builds a SQL query for traces based on the given parameters
|
|
|
|
|
func (b *traceQueryStatementBuilder) Build(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
start uint64,
|
|
|
|
|
end uint64,
|
|
|
|
|
requestType qbtypes.RequestType,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
|
|
|
|
start = querybuilder.ToNanoSecs(start)
|
|
|
|
|
end = querybuilder.ToNanoSecs(end)
|
|
|
|
|
|
|
|
|
|
keySelectors := getKeySelectors(query)
|
2025-05-27 20:54:48 +05:30
|
|
|
|
2025-08-06 23:05:39 +05:30
|
|
|
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-18 18:37:57 +05:30
|
|
|
b.adjustKeys(ctx, keys, query)
|
|
|
|
|
|
2025-06-23 09:39:19 +05:30
|
|
|
// Check if filter contains trace_id(s) and optimize time range if needed
|
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" && b.telemetryStore != nil {
|
|
|
|
|
traceIDs, found := ExtractTraceIDsFromFilter(query.Filter.Expression)
|
|
|
|
|
if found && len(traceIDs) > 0 {
|
|
|
|
|
finder := NewTraceTimeRangeFinder(b.telemetryStore)
|
|
|
|
|
|
|
|
|
|
traceStart, traceEnd, err := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
b.logger.DebugContext(ctx, "failed to get trace time range", "trace_ids", traceIDs, "error", err)
|
|
|
|
|
} else if traceStart > 0 && traceEnd > 0 {
|
|
|
|
|
start = uint64(traceStart)
|
|
|
|
|
end = uint64(traceEnd)
|
|
|
|
|
b.logger.DebugContext(ctx, "optimized time range for traces", "trace_ids", traceIDs, "start", start, "end", end)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-25 22:14:47 +05:30
|
|
|
// Create SQL builder
|
2025-05-27 20:54:48 +05:30
|
|
|
q := sqlbuilder.NewSelectBuilder()
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
switch requestType {
|
|
|
|
|
case qbtypes.RequestTypeRaw:
|
2025-06-21 17:49:33 +05:30
|
|
|
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
case qbtypes.RequestTypeTimeSeries:
|
2025-06-21 17:49:33 +05:30
|
|
|
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
case qbtypes.RequestTypeScalar:
|
2025-06-23 14:00:50 +05:30
|
|
|
return b.buildScalarQuery(ctx, q, query, start, end, keys, variables, false, false)
|
2025-07-28 21:42:56 +05:30
|
|
|
case qbtypes.RequestTypeTrace:
|
|
|
|
|
return b.buildTraceQuery(ctx, q, query, start, end, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("unsupported request type: %s", requestType)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) []*telemetrytypes.FieldKeySelector {
|
|
|
|
|
var keySelectors []*telemetrytypes.FieldKeySelector
|
|
|
|
|
|
|
|
|
|
for idx := range query.Aggregations {
|
|
|
|
|
aggExpr := query.Aggregations[idx]
|
|
|
|
|
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
|
|
|
|
|
keySelectors = append(keySelectors, selectors...)
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
|
|
|
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
|
|
|
|
|
keySelectors = append(keySelectors, whereClauseSelectors...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for idx := range query.GroupBy {
|
|
|
|
|
groupBy := query.GroupBy[idx]
|
|
|
|
|
selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name)
|
|
|
|
|
keySelectors = append(keySelectors, selectors...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for idx := range query.SelectFields {
|
|
|
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
2025-07-18 18:37:57 +05:30
|
|
|
Name: query.SelectFields[idx].Name,
|
|
|
|
|
Signal: telemetrytypes.SignalTraces,
|
|
|
|
|
FieldContext: query.SelectFields[idx].FieldContext,
|
2025-05-27 20:54:48 +05:30
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for idx := range query.Order {
|
|
|
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
2025-07-18 18:37:57 +05:30
|
|
|
Name: query.Order[idx].Key.Name,
|
|
|
|
|
Signal: telemetrytypes.SignalTraces,
|
|
|
|
|
FieldContext: query.Order[idx].Key.FieldContext,
|
2025-05-27 20:54:48 +05:30
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for idx := range keySelectors {
|
|
|
|
|
keySelectors[idx].Signal = telemetrytypes.SignalTraces
|
2025-08-06 23:05:39 +05:30
|
|
|
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
2025-05-27 20:54:48 +05:30
|
|
|
}
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
return keySelectors
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-18 18:37:57 +05:30
|
|
|
func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) {
|
|
|
|
|
// for group by / order by / selected fields, if there is a key
|
|
|
|
|
// that exactly matches the name of intrinsic / calculated field but has
|
|
|
|
|
// a field context or data type that doesn't match the field context or data type of the
|
|
|
|
|
// intrinsic field,
|
|
|
|
|
// and there is no additional key present in the data with the incoming key match,
|
|
|
|
|
// then override the given context with
|
|
|
|
|
// intrinsic / calculated field context and data type
|
|
|
|
|
// Why does that happen? Because we have a lot of assets created by users and shared over web
|
|
|
|
|
// that has incorrect context or data type populated so we fix it
|
|
|
|
|
// note: this override happens only when there is no match; if there is a match,
|
|
|
|
|
// we can't make decision on behalf of users so we let it use unmodified
|
|
|
|
|
|
|
|
|
|
// example: {"key": "httpRoute","type": "tag","dataType": "string"}
|
|
|
|
|
// This is sent as "tag", when it's not, this was earlier managed with
|
|
|
|
|
// `isColumn`, which we don't have in v5 (because it's not a user concern whether it's mat col or not)
|
|
|
|
|
// Such requests as-is look for attributes, the following code exists to handle them
|
|
|
|
|
checkMatch := func(k *telemetrytypes.TelemetryFieldKey) {
|
|
|
|
|
var overallMatch bool
|
|
|
|
|
|
|
|
|
|
findMatch := func(staticKeys map[string]telemetrytypes.TelemetryFieldKey) bool {
|
|
|
|
|
// for a given key `k`, iterate over the metadata keys `keys`
|
|
|
|
|
// and see if there is any exact match
|
|
|
|
|
match := false
|
|
|
|
|
for _, mapKey := range keys[k.Name] {
|
|
|
|
|
if mapKey.FieldContext == k.FieldContext && mapKey.FieldDataType == k.FieldDataType {
|
|
|
|
|
match = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// we don't have exact match, then it's doesn't exist in attribute or resource attribute
|
|
|
|
|
// use the intrinsic/calculated field
|
|
|
|
|
if !match {
|
|
|
|
|
b.logger.InfoContext(ctx, "overriding the field context and data type", "key", k.Name)
|
|
|
|
|
k.FieldContext = staticKeys[k.Name].FieldContext
|
|
|
|
|
k.FieldDataType = staticKeys[k.Name].FieldDataType
|
|
|
|
|
}
|
|
|
|
|
return match
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, ok := IntrinsicFields[k.Name]; ok {
|
|
|
|
|
overallMatch = overallMatch || findMatch(IntrinsicFields)
|
|
|
|
|
}
|
|
|
|
|
if _, ok := CalculatedFields[k.Name]; ok {
|
|
|
|
|
overallMatch = overallMatch || findMatch(CalculatedFields)
|
|
|
|
|
}
|
|
|
|
|
if _, ok := IntrinsicFieldsDeprecated[k.Name]; ok {
|
|
|
|
|
overallMatch = overallMatch || findMatch(IntrinsicFieldsDeprecated)
|
|
|
|
|
}
|
|
|
|
|
if _, ok := CalculatedFieldsDeprecated[k.Name]; ok {
|
|
|
|
|
overallMatch = overallMatch || findMatch(CalculatedFieldsDeprecated)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !overallMatch {
|
|
|
|
|
// check if all the key for the given field have been materialized, if so
|
|
|
|
|
// set the key to materialized
|
|
|
|
|
materilized := true
|
|
|
|
|
for _, key := range keys[k.Name] {
|
|
|
|
|
materilized = materilized && key.Materialized
|
|
|
|
|
}
|
|
|
|
|
k.Materialized = materilized
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for idx := range query.GroupBy {
|
|
|
|
|
checkMatch(&query.GroupBy[idx].TelemetryFieldKey)
|
|
|
|
|
}
|
|
|
|
|
for idx := range query.Order {
|
|
|
|
|
checkMatch(&query.Order[idx].Key.TelemetryFieldKey)
|
|
|
|
|
}
|
|
|
|
|
for idx := range query.SelectFields {
|
|
|
|
|
checkMatch(&query.SelectFields[idx])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add deprecated fields only during statement building
|
|
|
|
|
// why?
|
|
|
|
|
// 1. to not fail filter expression that use deprecated cols
|
|
|
|
|
// 2. this could have been moved to metadata fetching itself, however, that
|
|
|
|
|
// would mean, they also show up in suggestions we we don't want to do
|
|
|
|
|
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
|
|
|
|
|
if _, ok := keys[fieldKeyName]; !ok {
|
|
|
|
|
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
|
|
|
|
} else {
|
|
|
|
|
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
|
|
|
|
|
if _, ok := keys[fieldKeyName]; !ok {
|
|
|
|
|
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
|
|
|
|
} else {
|
|
|
|
|
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-25 22:14:47 +05:30
|
|
|
// buildListQuery builds a query for list panel type
|
|
|
|
|
func (b *traceQueryStatementBuilder) buildListQuery(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
sb *sqlbuilder.SelectBuilder,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
|
|
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
cteFragments []string
|
|
|
|
|
cteArgs [][]any
|
|
|
|
|
)
|
|
|
|
|
|
2025-06-21 17:49:33 +05:30
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
2025-05-25 22:14:47 +05:30
|
|
|
return nil, err
|
|
|
|
|
} else if frag != "" {
|
|
|
|
|
cteFragments = append(cteFragments, frag)
|
|
|
|
|
cteArgs = append(cteArgs, args)
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-26 15:10:31 +05:30
|
|
|
selectedFields := query.SelectFields
|
|
|
|
|
|
|
|
|
|
if len(selectedFields) == 0 {
|
2025-07-18 18:37:57 +05:30
|
|
|
sortedKeys := maps.Keys(DefaultFields)
|
|
|
|
|
slices.Sort(sortedKeys)
|
|
|
|
|
for _, key := range sortedKeys {
|
|
|
|
|
selectedFields = append(selectedFields, DefaultFields[key])
|
|
|
|
|
}
|
2025-07-12 16:47:59 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
selectFieldKeys := []string{}
|
|
|
|
|
for _, field := range selectedFields {
|
|
|
|
|
selectFieldKeys = append(selectFieldKeys, field.Name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
|
|
|
|
if !slices.Contains(selectFieldKeys, x) {
|
|
|
|
|
selectedFields = append(selectedFields, DefaultFields[x])
|
|
|
|
|
}
|
2025-06-26 15:10:31 +05:30
|
|
|
}
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
|
2025-06-26 15:10:31 +05:30
|
|
|
for _, field := range selectedFields {
|
2025-05-25 22:14:47 +05:30
|
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-07-18 18:37:57 +05:30
|
|
|
sb.SelectMore(colExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// From table
|
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
|
|
|
|
|
|
|
|
|
// Add filter conditions
|
2025-08-04 21:02:54 +05:30
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add order by
|
|
|
|
|
for _, orderBy := range query.Order {
|
2025-06-16 23:11:28 +05:30
|
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add limit and offset
|
|
|
|
|
if query.Limit > 0 {
|
|
|
|
|
sb.Limit(query.Limit)
|
2025-06-16 23:11:28 +05:30
|
|
|
} else {
|
|
|
|
|
sb.Limit(100)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if query.Offset > 0 {
|
|
|
|
|
sb.Offset(query.Offset)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
|
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
stmt := &qbtypes.Statement{
|
|
|
|
|
Query: finalSQL,
|
|
|
|
|
Args: finalArgs,
|
|
|
|
|
}
|
|
|
|
|
if preparedWhereClause != nil {
|
|
|
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
|
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stmt, nil
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-07-28 21:42:56 +05:30
|
|
|
func (b *traceQueryStatementBuilder) buildTraceQuery(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
_ *sqlbuilder.SelectBuilder,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
|
|
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
|
|
|
variables map[string]qbtypes.VariableItem,
|
|
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
|
|
|
|
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
|
|
|
|
endBucket := end / querybuilder.NsToSeconds
|
|
|
|
|
|
|
|
|
|
distSB := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
distSB.Select("trace_id")
|
|
|
|
|
distSB.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
cteFragments []string
|
|
|
|
|
cteArgs [][]any
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, distSB, query, start, end, variables); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if frag != "" {
|
|
|
|
|
cteFragments = append(cteFragments, frag)
|
|
|
|
|
cteArgs = append(cteArgs, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add filter conditions
|
2025-08-04 21:02:54 +05:30
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, distSB, start, end, query, keys, variables)
|
2025-07-28 21:42:56 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
distSQL, distArgs := distSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
|
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__toe AS (%s)", distSQL))
|
|
|
|
|
cteArgs = append(cteArgs, distArgs)
|
|
|
|
|
|
|
|
|
|
// Build the inner subquery for root spans
|
|
|
|
|
innerSB := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
innerSB.Select("trace_id", "duration_nano", sqlbuilder.Escape("resource_string_service$$name as `service.name`"), "name")
|
|
|
|
|
innerSB.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
|
|
|
|
innerSB.Where("parent_span_id = ''")
|
|
|
|
|
|
|
|
|
|
// Add time filter to inner query
|
|
|
|
|
innerSB.Where(
|
|
|
|
|
innerSB.GE("timestamp", fmt.Sprintf("%d", start)),
|
|
|
|
|
innerSB.L("timestamp", fmt.Sprintf("%d", end)),
|
|
|
|
|
innerSB.GE("ts_bucket_start", startBucket),
|
|
|
|
|
innerSB.LE("ts_bucket_start", endBucket))
|
|
|
|
|
|
|
|
|
|
// order by duration and limit 1 per trace
|
|
|
|
|
innerSB.OrderBy("duration_nano DESC")
|
|
|
|
|
innerSB.SQL("LIMIT 1 BY trace_id")
|
|
|
|
|
|
|
|
|
|
innerSQL, innerArgs := innerSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
|
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__toe_duration_sorted AS (%s)", innerSQL))
|
|
|
|
|
cteArgs = append(cteArgs, innerArgs)
|
|
|
|
|
|
|
|
|
|
// main query that joins everything
|
|
|
|
|
mainSB := sqlbuilder.NewSelectBuilder()
|
|
|
|
|
mainSB.Select(
|
|
|
|
|
"__toe_duration_sorted.`service.name` AS `service.name`",
|
|
|
|
|
"__toe_duration_sorted.name AS `name`",
|
|
|
|
|
"count() AS span_count",
|
|
|
|
|
"__toe_duration_sorted.duration_nano AS `duration_nano`",
|
|
|
|
|
"__toe_duration_sorted.trace_id AS `trace_id`",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Join the distributed table with the inner subquery
|
|
|
|
|
mainSB.SQL("FROM __toe")
|
|
|
|
|
mainSB.SQL("INNER JOIN __toe_duration_sorted")
|
|
|
|
|
mainSB.SQL("ON __toe.trace_id = __toe_duration_sorted.trace_id")
|
|
|
|
|
|
|
|
|
|
// Group by trace-level fields
|
|
|
|
|
mainSB.GroupBy("trace_id", "duration_nano", "name", "`service.name`")
|
|
|
|
|
|
|
|
|
|
// order by duration only supported for now
|
|
|
|
|
mainSB.OrderBy("duration_nano DESC")
|
|
|
|
|
|
|
|
|
|
// Limit by trace_id to ensure one row per trace
|
|
|
|
|
mainSB.SQL("LIMIT 1 BY trace_id")
|
|
|
|
|
|
|
|
|
|
if query.Limit > 0 {
|
|
|
|
|
mainSB.Limit(query.Limit)
|
|
|
|
|
} else {
|
|
|
|
|
mainSB.Limit(100)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if query.Offset > 0 {
|
|
|
|
|
mainSB.Offset(query.Offset)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mainSQL, mainArgs := mainSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
|
|
|
|
|
// combine it all together: WITH … SELECT …
|
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL + " SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000"
|
|
|
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
stmt := &qbtypes.Statement{
|
|
|
|
|
Query: finalSQL,
|
|
|
|
|
Args: finalArgs,
|
|
|
|
|
}
|
|
|
|
|
if preparedWhereClause != nil {
|
|
|
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
|
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stmt, nil
|
2025-07-28 21:42:56 +05:30
|
|
|
}
|
|
|
|
|
|
2025-05-25 22:14:47 +05:30
|
|
|
func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
sb *sqlbuilder.SelectBuilder,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
|
|
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
cteFragments []string
|
|
|
|
|
cteArgs [][]any
|
|
|
|
|
)
|
|
|
|
|
|
2025-06-21 17:49:33 +05:30
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
2025-05-25 22:14:47 +05:30
|
|
|
return nil, err
|
|
|
|
|
} else if frag != "" {
|
|
|
|
|
cteFragments = append(cteFragments, frag)
|
|
|
|
|
cteArgs = append(cteArgs, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sb.SelectMore(fmt.Sprintf(
|
|
|
|
|
"toStartOfInterval(timestamp, INTERVAL %d SECOND) AS ts",
|
|
|
|
|
int64(query.StepInterval.Seconds()),
|
|
|
|
|
))
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
var allGroupByArgs []any
|
|
|
|
|
|
2025-05-25 22:14:47 +05:30
|
|
|
// Keep original column expressions so we can build the tuple
|
|
|
|
|
fieldNames := make([]string, 0, len(query.GroupBy))
|
|
|
|
|
for _, gb := range query.GroupBy {
|
2025-05-27 20:54:48 +05:30
|
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-05-27 20:54:48 +05:30
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
|
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
2025-07-18 18:37:57 +05:30
|
|
|
sb.SelectMore(colExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Aggregations
|
|
|
|
|
allAggChArgs := make([]any, 0)
|
|
|
|
|
for i, agg := range query.Aggregations {
|
2025-05-27 20:54:48 +05:30
|
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
|
|
|
ctx, agg.Expression,
|
|
|
|
|
uint64(query.StepInterval.Seconds()),
|
|
|
|
|
keys,
|
|
|
|
|
)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
|
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
2025-08-04 21:02:54 +05:30
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var finalSQL string
|
|
|
|
|
var finalArgs []any
|
|
|
|
|
|
2025-06-21 17:49:33 +05:30
|
|
|
if query.Limit > 0 && len(query.GroupBy) > 0 {
|
2025-05-25 22:14:47 +05:30
|
|
|
// build the scalar “top/bottom-N” query in its own builder.
|
2025-05-27 20:54:48 +05:30
|
|
|
cteSB := sqlbuilder.NewSelectBuilder()
|
2025-06-23 14:00:50 +05:30
|
|
|
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, variables, true, true)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
|
|
|
|
|
cteArgs = append(cteArgs, cteStmt.Args)
|
|
|
|
|
|
|
|
|
|
// Constrain the main query to the rows that appear in the CTE.
|
|
|
|
|
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
|
2025-07-02 10:39:16 +05:30
|
|
|
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
// Group by all dimensions
|
2025-07-02 10:39:16 +05:30
|
|
|
sb.GroupBy("ts")
|
|
|
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
2025-05-25 22:14:47 +05:30
|
|
|
if query.Having != nil && query.Having.Expression != "" {
|
2025-06-23 14:00:50 +05:30
|
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
|
|
|
rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations)
|
|
|
|
|
sb.Having(rewrittenExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-07-28 21:42:56 +05:30
|
|
|
if len(query.Order) != 0 {
|
|
|
|
|
for _, orderBy := range query.Order {
|
|
|
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
|
|
|
if !ok {
|
|
|
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sb.OrderBy("ts desc")
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
2025-05-27 20:54:48 +05:30
|
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
|
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
} else {
|
2025-07-02 10:39:16 +05:30
|
|
|
sb.GroupBy("ts")
|
|
|
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
2025-05-25 22:14:47 +05:30
|
|
|
if query.Having != nil && query.Having.Expression != "" {
|
2025-06-23 14:00:50 +05:30
|
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
|
|
|
rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations)
|
|
|
|
|
sb.Having(rewrittenExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-07-28 21:42:56 +05:30
|
|
|
if len(query.Order) != 0 {
|
|
|
|
|
for _, orderBy := range query.Order {
|
|
|
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
|
|
|
if !ok {
|
|
|
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sb.OrderBy("ts desc")
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
2025-05-27 20:54:48 +05:30
|
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
|
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
stmt := &qbtypes.Statement{
|
|
|
|
|
Query: finalSQL,
|
|
|
|
|
Args: finalArgs,
|
|
|
|
|
}
|
|
|
|
|
if preparedWhereClause != nil {
|
|
|
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
|
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stmt, nil
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// buildScalarQuery builds a query for scalar panel type
|
|
|
|
|
func (b *traceQueryStatementBuilder) buildScalarQuery(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
sb *sqlbuilder.SelectBuilder,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
|
|
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-06-23 14:00:50 +05:30
|
|
|
skipResourceCTE bool,
|
|
|
|
|
skipHaving bool,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
cteFragments []string
|
|
|
|
|
cteArgs [][]any
|
|
|
|
|
)
|
|
|
|
|
|
2025-06-21 17:49:33 +05:30
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
2025-05-25 22:14:47 +05:30
|
|
|
return nil, err
|
|
|
|
|
} else if frag != "" && !skipResourceCTE {
|
|
|
|
|
cteFragments = append(cteFragments, frag)
|
|
|
|
|
cteArgs = append(cteArgs, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
allAggChArgs := []any{}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
var allGroupByArgs []any
|
|
|
|
|
for _, gb := range query.GroupBy {
|
|
|
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-05-27 20:54:48 +05:30
|
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
|
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
2025-07-18 18:37:57 +05:30
|
|
|
sb.SelectMore(colExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
// for scalar queries, the rate would be end-start
|
|
|
|
|
rateInterval := (end - start) / querybuilder.NsToSeconds
|
|
|
|
|
|
2025-05-25 22:14:47 +05:30
|
|
|
// Add aggregation
|
|
|
|
|
if len(query.Aggregations) > 0 {
|
|
|
|
|
for idx := range query.Aggregations {
|
|
|
|
|
aggExpr := query.Aggregations[idx]
|
2025-05-27 20:54:48 +05:30
|
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
|
|
|
ctx, aggExpr.Expression,
|
|
|
|
|
rateInterval,
|
|
|
|
|
keys,
|
|
|
|
|
)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
|
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// From table
|
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, SpanIndexV3TableName))
|
|
|
|
|
|
|
|
|
|
// Add filter conditions
|
2025-08-04 21:02:54 +05:30
|
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Group by dimensions
|
2025-07-02 10:39:16 +05:30
|
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
// Add having clause if needed
|
2025-06-23 14:00:50 +05:30
|
|
|
if query.Having != nil && query.Having.Expression != "" && !skipHaving {
|
|
|
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
|
|
|
rewrittenExpr := rewriter.RewriteForTraces(query.Having.Expression, query.Aggregations)
|
|
|
|
|
sb.Having(rewrittenExpr)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add order by
|
|
|
|
|
for _, orderBy := range query.Order {
|
|
|
|
|
idx, ok := aggOrderBy(orderBy, query)
|
|
|
|
|
if ok {
|
2025-05-27 20:54:48 +05:30
|
|
|
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
|
2025-05-25 22:14:47 +05:30
|
|
|
} else {
|
2025-05-27 20:54:48 +05:30
|
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if there is no order by, then use the __result_0 as the order by
|
|
|
|
|
if len(query.Order) == 0 {
|
|
|
|
|
sb.OrderBy("__result_0 DESC")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add limit and offset
|
|
|
|
|
if query.Limit > 0 {
|
|
|
|
|
sb.Limit(query.Limit)
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
|
|
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
|
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
stmt := &qbtypes.Statement{
|
|
|
|
|
Query: finalSQL,
|
|
|
|
|
Args: finalArgs,
|
|
|
|
|
}
|
|
|
|
|
if preparedWhereClause != nil {
|
|
|
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
|
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stmt, nil
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// buildFilterCondition builds SQL condition from filter expression
|
2025-05-27 20:54:48 +05:30
|
|
|
func (b *traceQueryStatementBuilder) addFilterCondition(
|
|
|
|
|
_ context.Context,
|
|
|
|
|
sb *sqlbuilder.SelectBuilder,
|
|
|
|
|
start, end uint64,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
2025-06-23 14:00:50 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-08-04 21:02:54 +05:30
|
|
|
) (*querybuilder.PreparedWhereClause, error) {
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
var preparedWhereClause *querybuilder.PreparedWhereClause
|
2025-05-27 20:54:48 +05:30
|
|
|
var err error
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
|
|
|
// add filter expression
|
2025-08-04 21:02:54 +05:30
|
|
|
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
2025-08-12 18:10:35 +05:30
|
|
|
Logger: b.logger,
|
2025-05-27 20:54:48 +05:30
|
|
|
FieldMapper: b.fm,
|
|
|
|
|
ConditionBuilder: b.cb,
|
|
|
|
|
FieldKeys: keys,
|
|
|
|
|
SkipResourceFilter: true,
|
2025-06-23 14:00:50 +05:30
|
|
|
Variables: variables,
|
2025-05-27 20:54:48 +05:30
|
|
|
})
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
if preparedWhereClause != nil {
|
|
|
|
|
sb.AddWhereClause(preparedWhereClause.WhereClause)
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add time filter
|
2025-05-27 20:54:48 +05:30
|
|
|
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
|
|
|
|
endBucket := end / querybuilder.NsToSeconds
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-06-10 18:26:28 +05:30
|
|
|
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.L("timestamp", fmt.Sprintf("%d", end)), sb.GE("ts_bucket_start", startBucket), sb.LE("ts_bucket_start", endBucket))
|
2025-05-25 22:14:47 +05:30
|
|
|
|
2025-08-04 21:02:54 +05:30
|
|
|
return preparedWhereClause, nil
|
2025-05-25 22:14:47 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) (int, bool) {
|
|
|
|
|
for i, agg := range q.Aggregations {
|
|
|
|
|
if k.Key.Name == agg.Alias ||
|
|
|
|
|
k.Key.Name == agg.Expression ||
|
|
|
|
|
k.Key.Name == fmt.Sprintf("%d", i) {
|
|
|
|
|
return i, true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *traceQueryStatementBuilder) maybeAttachResourceFilter(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
sb *sqlbuilder.SelectBuilder,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (cteSQL string, cteArgs []any, err error) {
|
|
|
|
|
|
2025-06-21 17:49:33 +05:30
|
|
|
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end, variables)
|
2025-05-25 22:14:47 +05:30
|
|
|
if err != nil {
|
|
|
|
|
return "", nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-02 10:39:16 +05:30
|
|
|
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
|
2025-05-25 22:14:47 +05:30
|
|
|
|
|
|
|
|
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *traceQueryStatementBuilder) buildResourceFilterCTE(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation],
|
|
|
|
|
start, end uint64,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables map[string]qbtypes.VariableItem,
|
2025-05-25 22:14:47 +05:30
|
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
|
|
2025-05-27 20:54:48 +05:30
|
|
|
return b.resourceFilterStmtBuilder.Build(
|
2025-05-25 22:14:47 +05:30
|
|
|
ctx,
|
|
|
|
|
start,
|
|
|
|
|
end,
|
|
|
|
|
qbtypes.RequestTypeRaw,
|
|
|
|
|
query,
|
2025-06-21 17:49:33 +05:30
|
|
|
variables,
|
2025-05-25 22:14:47 +05:30
|
|
|
)
|
|
|
|
|
}
|