mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
This introduces a new Raw Data Export module to the codebase, enabling users to export raw log data via a dedicated API endpoint. The changes include the implementation of the module and handler, integration with existing infrastructure, configuration updates, and adjustments to tests and module wiring.
667 lines
20 KiB
Go
667 lines
20 KiB
Go
package telemetrylogs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
"github.com/huandu/go-sqlbuilder"
|
|
)
|
|
|
|
type logQueryStatementBuilder struct {
|
|
logger *slog.Logger
|
|
metadataStore telemetrytypes.MetadataStore
|
|
fm qbtypes.FieldMapper
|
|
cb qbtypes.ConditionBuilder
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
|
aggExprRewriter qbtypes.AggExprRewriter
|
|
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
|
jsonBodyPrefix string
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
|
}
|
|
|
|
var _ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*logQueryStatementBuilder)(nil)
|
|
|
|
func NewLogQueryStatementBuilder(
|
|
settings factory.ProviderSettings,
|
|
metadataStore telemetrytypes.MetadataStore,
|
|
fieldMapper qbtypes.FieldMapper,
|
|
conditionBuilder qbtypes.ConditionBuilder,
|
|
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
|
|
aggExprRewriter qbtypes.AggExprRewriter,
|
|
fullTextColumn *telemetrytypes.TelemetryFieldKey,
|
|
jsonBodyPrefix string,
|
|
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
|
|
) *logQueryStatementBuilder {
|
|
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
|
|
|
|
return &logQueryStatementBuilder{
|
|
logger: logsSettings.Logger(),
|
|
metadataStore: metadataStore,
|
|
fm: fieldMapper,
|
|
cb: conditionBuilder,
|
|
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
|
|
aggExprRewriter: aggExprRewriter,
|
|
fullTextColumn: fullTextColumn,
|
|
jsonBodyPrefix: jsonBodyPrefix,
|
|
jsonKeyToKey: jsonKeyToKey,
|
|
}
|
|
}
|
|
|
|
// Build builds a SQL query for logs based on the given parameters
|
|
func (b *logQueryStatementBuilder) Build(
|
|
ctx context.Context,
|
|
start uint64,
|
|
end uint64,
|
|
requestType qbtypes.RequestType,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
start = querybuilder.ToNanoSecs(start)
|
|
end = querybuilder.ToNanoSecs(end)
|
|
|
|
keySelectors := getKeySelectors(query)
|
|
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b.adjustKeys(ctx, keys, query)
|
|
|
|
// Create SQL builder
|
|
q := sqlbuilder.NewSelectBuilder()
|
|
|
|
switch requestType {
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeRawStream:
|
|
return b.buildListQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
return b.buildTimeSeriesQuery(ctx, q, query, start, end, keys, variables)
|
|
case qbtypes.RequestTypeScalar:
|
|
return b.buildScalarQuery(ctx, q, query, start, end, keys, false, variables)
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported request type: %s", requestType)
|
|
}
|
|
|
|
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
|
|
var keySelectors []*telemetrytypes.FieldKeySelector
|
|
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
selectors := querybuilder.QueryStringToKeysSelectors(aggExpr.Expression)
|
|
keySelectors = append(keySelectors, selectors...)
|
|
}
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
|
|
keySelectors = append(keySelectors, whereClauseSelectors...)
|
|
}
|
|
|
|
for idx := range query.GroupBy {
|
|
groupBy := query.GroupBy[idx]
|
|
selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name)
|
|
keySelectors = append(keySelectors, selectors...)
|
|
}
|
|
|
|
for idx := range query.Order {
|
|
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
|
Name: query.Order[idx].Key.Name,
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: query.Order[idx].Key.FieldContext,
|
|
FieldDataType: query.Order[idx].Key.FieldDataType,
|
|
})
|
|
}
|
|
|
|
for idx := range keySelectors {
|
|
keySelectors[idx].Signal = telemetrytypes.SignalLogs
|
|
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
|
}
|
|
|
|
return keySelectors
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) {
|
|
// for group by / order by, if there is a key
|
|
// that exactly matches the name of intrinsic field but has
|
|
// a field context or data type that doesn't match the field context or data type of the
|
|
// intrinsic field,
|
|
// and there is no additional key present in the data with the incoming key match,
|
|
// then override the given context with
|
|
// intrinsic field context and data type
|
|
// Why does that happen? Because we have a lot of dashboards created by users and shared over web
|
|
// that has incorrect context or data type populated so we fix it
|
|
// note: this override happens only when there is no match; if there is a match,
|
|
// we can't make decision on behalf of users so we let it use unmodified
|
|
|
|
// example: {"key": "severity_text","type": "tag","dataType": "string"}
|
|
// This is sent as "tag", when it's not, this was earlier managed with
|
|
// `isColumn`, which we don't have in v5 (because it's not a user concern whether it's mat col or not)
|
|
// Such requests as-is look for attributes, the following code exists to handle them
|
|
checkMatch := func(k *telemetrytypes.TelemetryFieldKey) {
|
|
var overallMatch bool
|
|
|
|
findMatch := func(staticKeys map[string]telemetrytypes.TelemetryFieldKey) bool {
|
|
// for a given key `k`, iterate over the metadata keys `keys`
|
|
// and see if there is any exact match
|
|
match := false
|
|
for _, mapKey := range keys[k.Name] {
|
|
if mapKey.FieldContext == k.FieldContext && mapKey.FieldDataType == k.FieldDataType {
|
|
match = true
|
|
}
|
|
}
|
|
// we don't have exact match, then it's doesn't exist in attribute or resource attribute
|
|
// use the intrinsic/calculated field
|
|
if !match {
|
|
b.logger.InfoContext(ctx, "overriding the field context and data type", "key", k.Name)
|
|
k.FieldContext = staticKeys[k.Name].FieldContext
|
|
k.FieldDataType = staticKeys[k.Name].FieldDataType
|
|
}
|
|
return match
|
|
}
|
|
|
|
if _, ok := IntrinsicFields[k.Name]; ok {
|
|
overallMatch = overallMatch || findMatch(IntrinsicFields)
|
|
}
|
|
|
|
if !overallMatch {
|
|
// check if all the key for the given field have been materialized, if so
|
|
// set the key to materialized
|
|
materilized := true
|
|
for _, key := range keys[k.Name] {
|
|
materilized = materilized && key.Materialized
|
|
}
|
|
k.Materialized = materilized
|
|
}
|
|
}
|
|
|
|
for idx := range query.GroupBy {
|
|
checkMatch(&query.GroupBy[idx].TelemetryFieldKey)
|
|
}
|
|
for idx := range query.Order {
|
|
checkMatch(&query.Order[idx].Key.TelemetryFieldKey)
|
|
}
|
|
|
|
keys["id"] = []*telemetrytypes.TelemetryFieldKey{
|
|
{
|
|
Name: "id",
|
|
Signal: telemetrytypes.SignalLogs,
|
|
FieldContext: telemetrytypes.FieldContextLog,
|
|
FieldDataType: telemetrytypes.FieldDataTypeString,
|
|
},
|
|
}
|
|
}
|
|
|
|
// buildListQuery builds a query for list panel type
|
|
func (b *logQueryStatementBuilder) buildListQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
// Select timestamp and id by default
|
|
sb.Select(LogsV2TimestampColumn)
|
|
sb.SelectMore(LogsV2IDColumn)
|
|
if len(query.SelectFields) == 0 {
|
|
// Select all default columns
|
|
sb.SelectMore(LogsV2TraceIDColumn)
|
|
sb.SelectMore(LogsV2SpanIDColumn)
|
|
sb.SelectMore(LogsV2TraceFlagsColumn)
|
|
sb.SelectMore(LogsV2SeverityTextColumn)
|
|
sb.SelectMore(LogsV2SeverityNumberColumn)
|
|
sb.SelectMore(LogsV2ScopeNameColumn)
|
|
sb.SelectMore(LogsV2ScopeVersionColumn)
|
|
sb.SelectMore(LogsV2BodyColumn)
|
|
sb.SelectMore(LogsV2AttributesStringColumn)
|
|
sb.SelectMore(LogsV2AttributesNumberColumn)
|
|
sb.SelectMore(LogsV2AttributesBoolColumn)
|
|
sb.SelectMore(LogsV2ResourcesStringColumn)
|
|
sb.SelectMore(LogsV2ScopeStringColumn)
|
|
|
|
} else {
|
|
// Select specified columns
|
|
for index := range query.SelectFields {
|
|
if query.SelectFields[index].Name == LogsV2TimestampColumn || query.SelectFields[index].Name == LogsV2IDColumn {
|
|
continue
|
|
}
|
|
// get column expression for the field - use array index directly to avoid pointer to loop variable
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
}
|
|
|
|
// From table
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
} else {
|
|
sb.Limit(100)
|
|
}
|
|
|
|
if query.Offset > 0 {
|
|
sb.Offset(query.Offset)
|
|
}
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
sb.SelectMore(fmt.Sprintf(
|
|
"toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts",
|
|
int64(query.StepInterval.Seconds()),
|
|
))
|
|
|
|
var allGroupByArgs []any
|
|
|
|
// Keep original column expressions so we can build the tuple
|
|
fieldNames := make([]string, 0, len(query.GroupBy))
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
fieldNames = append(fieldNames, fmt.Sprintf("`%s`", gb.TelemetryFieldKey.Name))
|
|
}
|
|
|
|
// Aggregations
|
|
allAggChArgs := make([]any, 0)
|
|
for i, agg := range query.Aggregations {
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, agg.Expression,
|
|
uint64(query.StepInterval.Seconds()),
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
|
|
}
|
|
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var finalSQL string
|
|
var finalArgs []any
|
|
|
|
if query.Limit > 0 && len(query.GroupBy) > 0 {
|
|
// build the scalar “top/bottom-N” query in its own builder.
|
|
cteSB := sqlbuilder.NewSelectBuilder()
|
|
cteStmt, err := b.buildScalarQuery(ctx, cteSB, query, start, end, keys, true, variables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cteFragments = append(cteFragments, fmt.Sprintf("__limit_cte AS (%s)", cteStmt.Query))
|
|
cteArgs = append(cteArgs, cteStmt.Args)
|
|
|
|
// Constrain the main query to the rows that appear in the CTE.
|
|
tuple := fmt.Sprintf("(%s)", strings.Join(fieldNames, ", "))
|
|
sb.Where(fmt.Sprintf("%s GLOBAL IN (SELECT %s FROM __limit_cte)", tuple, strings.Join(fieldNames, ", ")))
|
|
|
|
// Group by all dimensions
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
// Rewrite having expression to use SQL column names
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
} else {
|
|
sb.GroupBy("ts")
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
if len(query.Order) != 0 {
|
|
for _, orderBy := range query.Order {
|
|
_, ok := aggOrderBy(orderBy, query)
|
|
if !ok {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
sb.OrderBy("ts desc")
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
// Stitch it all together: WITH … SELECT …
|
|
finalSQL = querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs = querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
}
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildScalarQuery builds a query for scalar panel type
|
|
func (b *logQueryStatementBuilder) buildScalarQuery(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
skipResourceCTE bool,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
var (
|
|
cteFragments []string
|
|
cteArgs [][]any
|
|
)
|
|
|
|
if frag, args, err := b.maybeAttachResourceFilter(ctx, sb, query, start, end, variables); err != nil {
|
|
return nil, err
|
|
} else if frag != "" && !skipResourceCTE {
|
|
cteFragments = append(cteFragments, frag)
|
|
cteArgs = append(cteArgs, args)
|
|
}
|
|
|
|
allAggChArgs := []any{}
|
|
|
|
var allGroupByArgs []any
|
|
|
|
for _, gb := range query.GroupBy {
|
|
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
|
|
allGroupByArgs = append(allGroupByArgs, args...)
|
|
sb.SelectMore(colExpr)
|
|
}
|
|
|
|
// for scalar queries, the rate would be end-start
|
|
rateInterval := (end - start) / querybuilder.NsToSeconds
|
|
|
|
// Add aggregation
|
|
if len(query.Aggregations) > 0 {
|
|
for idx := range query.Aggregations {
|
|
aggExpr := query.Aggregations[idx]
|
|
rewritten, chArgs, err := b.aggExprRewriter.Rewrite(
|
|
ctx, aggExpr.Expression,
|
|
rateInterval,
|
|
keys,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
allAggChArgs = append(allAggChArgs, chArgs...)
|
|
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, idx))
|
|
}
|
|
}
|
|
|
|
// From table
|
|
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
|
|
|
|
// Add filter conditions
|
|
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Group by dimensions
|
|
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
|
|
|
// Add having clause if needed
|
|
if query.Having != nil && query.Having.Expression != "" {
|
|
rewriter := querybuilder.NewHavingExpressionRewriter()
|
|
rewrittenExpr := rewriter.RewriteForLogs(query.Having.Expression, query.Aggregations)
|
|
sb.Having(rewrittenExpr)
|
|
}
|
|
|
|
// Add order by
|
|
for _, orderBy := range query.Order {
|
|
idx, ok := aggOrderBy(orderBy, query)
|
|
if ok {
|
|
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
|
|
} else {
|
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
|
}
|
|
}
|
|
|
|
// if there is no order by, then use the __result_0 as the order by
|
|
if len(query.Order) == 0 {
|
|
sb.OrderBy("__result_0 DESC")
|
|
}
|
|
|
|
// Add limit and offset
|
|
if query.Limit > 0 {
|
|
sb.Limit(query.Limit)
|
|
}
|
|
|
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
|
|
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
|
|
|
finalSQL := querybuilder.CombineCTEs(cteFragments) + mainSQL
|
|
finalArgs := querybuilder.PrependArgs(cteArgs, mainArgs)
|
|
|
|
stmt := &qbtypes.Statement{
|
|
Query: finalSQL,
|
|
Args: finalArgs,
|
|
}
|
|
if preparedWhereClause != nil {
|
|
stmt.Warnings = preparedWhereClause.Warnings
|
|
stmt.WarningsDocURL = preparedWhereClause.WarningsDocURL
|
|
}
|
|
|
|
return stmt, nil
|
|
}
|
|
|
|
// buildFilterCondition builds SQL condition from filter expression
|
|
func (b *logQueryStatementBuilder) addFilterCondition(
|
|
_ context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
start, end uint64,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*querybuilder.PreparedWhereClause, error) {
|
|
|
|
var preparedWhereClause *querybuilder.PreparedWhereClause
|
|
var err error
|
|
|
|
if query.Filter != nil && query.Filter.Expression != "" {
|
|
// add filter expression
|
|
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
|
|
Logger: b.logger,
|
|
FieldMapper: b.fm,
|
|
ConditionBuilder: b.cb,
|
|
FieldKeys: keys,
|
|
SkipResourceFilter: true,
|
|
FullTextColumn: b.fullTextColumn,
|
|
JsonBodyPrefix: b.jsonBodyPrefix,
|
|
JsonKeyToKey: b.jsonKeyToKey,
|
|
Variables: variables,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if preparedWhereClause != nil {
|
|
sb.AddWhereClause(preparedWhereClause.WhereClause)
|
|
}
|
|
|
|
// add time filter
|
|
startBucket := start/querybuilder.NsToSeconds - querybuilder.BucketAdjustment
|
|
var endBucket uint64
|
|
if end != 0 {
|
|
endBucket = end / querybuilder.NsToSeconds
|
|
}
|
|
|
|
if start != 0 {
|
|
sb.Where(sb.GE("timestamp", fmt.Sprintf("%d", start)), sb.GE("ts_bucket_start", startBucket))
|
|
}
|
|
if end != 0 {
|
|
sb.Where(sb.L("timestamp", fmt.Sprintf("%d", end)), sb.LE("ts_bucket_start", endBucket))
|
|
}
|
|
|
|
return preparedWhereClause, nil
|
|
}
|
|
|
|
func aggOrderBy(k qbtypes.OrderBy, q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) (int, bool) {
|
|
for i, agg := range q.Aggregations {
|
|
if k.Key.Name == agg.Alias ||
|
|
k.Key.Name == agg.Expression ||
|
|
k.Key.Name == fmt.Sprintf("%d", i) {
|
|
return i, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) maybeAttachResourceFilter(
|
|
ctx context.Context,
|
|
sb *sqlbuilder.SelectBuilder,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (cteSQL string, cteArgs []any, err error) {
|
|
|
|
stmt, err := b.buildResourceFilterCTE(ctx, query, start, end, variables)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
sb.Where("resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter)")
|
|
|
|
return fmt.Sprintf("__resource_filter AS (%s)", stmt.Query), stmt.Args, nil
|
|
}
|
|
|
|
func (b *logQueryStatementBuilder) buildResourceFilterCTE(
|
|
ctx context.Context,
|
|
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation],
|
|
start, end uint64,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) (*qbtypes.Statement, error) {
|
|
|
|
return b.resourceFilterStmtBuilder.Build(
|
|
ctx,
|
|
start,
|
|
end,
|
|
qbtypes.RequestTypeRaw,
|
|
query,
|
|
variables,
|
|
)
|
|
}
|