signoz/pkg/querier/builder_query.go

352 lines
9.3 KiB
Go
Raw Normal View History

package querier
import (
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
type builderQuery[T any] struct {
telemetryStore telemetrystore.TelemetryStore
stmtBuilder qbtypes.StatementBuilder[T]
spec qbtypes.QueryBuilderQuery[T]
variables map[string]qbtypes.VariableItem
fromMS uint64
toMS uint64
kind qbtypes.RequestType
}
var _ qbtypes.Query = (*builderQuery[any])(nil)
func newBuilderQuery[T any](
telemetryStore telemetrystore.TelemetryStore,
stmtBuilder qbtypes.StatementBuilder[T],
spec qbtypes.QueryBuilderQuery[T],
tr qbtypes.TimeRange,
kind qbtypes.RequestType,
variables map[string]qbtypes.VariableItem,
) *builderQuery[T] {
return &builderQuery[T]{
telemetryStore: telemetryStore,
stmtBuilder: stmtBuilder,
spec: spec,
variables: variables,
fromMS: tr.From,
toMS: tr.To,
kind: kind,
}
}
func (q *builderQuery[T]) Fingerprint() string {
if (q.spec.Signal == telemetrytypes.SignalTraces ||
q.spec.Signal == telemetrytypes.SignalLogs) && q.kind != qbtypes.RequestTypeTimeSeries {
// No caching for non-timeseries queries
return ""
}
// Create a deterministic fingerprint for builder queries
// This needs to include all fields that affect the query results
parts := []string{"builder"}
// Add signal type
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
// Add step interval if present
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))
// Add aggregations (convert to string representation)
if len(q.spec.Aggregations) > 0 {
aggParts := []string{}
for _, agg := range q.spec.Aggregations {
switch a := any(agg).(type) {
case qbtypes.TraceAggregation:
aggParts = append(aggParts, a.Expression)
case qbtypes.LogAggregation:
aggParts = append(aggParts, a.Expression)
case qbtypes.MetricAggregation:
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s",
a.MetricName,
a.Temporality.StringValue(),
a.TimeAggregation.StringValue(),
a.SpaceAggregation.StringValue(),
))
}
}
parts = append(parts, fmt.Sprintf("aggs=[%s]", strings.Join(aggParts, ",")))
}
// Add filter if present
if q.spec.Filter != nil && q.spec.Filter.Expression != "" {
parts = append(parts, fmt.Sprintf("filter=%s", q.spec.Filter.Expression))
}
// Add group by keys
if len(q.spec.GroupBy) > 0 {
groupByParts := []string{}
for _, gb := range q.spec.GroupBy {
groupByParts = append(groupByParts, fingerprintGroupByKey(gb))
}
parts = append(parts, fmt.Sprintf("groupby=[%s]", strings.Join(groupByParts, ",")))
}
// Add order by
if len(q.spec.Order) > 0 {
orderParts := []string{}
for _, o := range q.spec.Order {
orderParts = append(orderParts, fingerprintOrderBy(o))
}
parts = append(parts, fmt.Sprintf("order=[%s]", strings.Join(orderParts, ",")))
}
// Add limit and offset
if q.spec.Limit > 0 {
parts = append(parts, fmt.Sprintf("limit=%d", q.spec.Limit))
}
if q.spec.Offset > 0 {
parts = append(parts, fmt.Sprintf("offset=%d", q.spec.Offset))
}
// Add having clause
if q.spec.Having != nil && q.spec.Having.Expression != "" {
parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression))
}
if q.spec.ShiftBy != 0 {
parts = append(parts, fmt.Sprintf("shiftby=%d", q.spec.ShiftBy))
}
return strings.Join(parts, "&")
}
func fingerprintGroupByKey(gb qbtypes.GroupByKey) string {
return fingerprintFieldKey(gb.TelemetryFieldKey)
}
func fingerprintOrderBy(o qbtypes.OrderBy) string {
return fmt.Sprintf("%s:%s", fingerprintFieldKey(o.Key.TelemetryFieldKey), o.Direction.StringValue())
}
func fingerprintFieldKey(key telemetrytypes.TelemetryFieldKey) string {
// Include the essential fields that identify a field key
return fmt.Sprintf("%s-%s-%s-%s",
key.Name,
key.FieldDataType.StringValue(),
key.FieldContext.StringValue(),
key.Signal.StringValue())
}
func (q *builderQuery[T]) Window() (uint64, uint64) {
return q.fromMS, q.toMS
}
// must be a single query, ordered by timestamp (logs need an id tie-break).
func (q *builderQuery[T]) isWindowList() bool {
if len(q.spec.Order) == 0 {
return false
}
// first ORDER BY must be `timestamp`
if q.spec.Order[0].Key.Name != "timestamp" {
return false
}
if q.spec.Signal == telemetrytypes.SignalLogs {
// logs require timestamp,id with identical direction
if len(q.spec.Order) != 2 || q.spec.Order[1].Key.Name != "id" ||
q.spec.Order[1].Direction != q.spec.Order[0].Direction {
return false
}
}
return true
}
func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) {
// can we do window based pagination?
if q.kind == qbtypes.RequestTypeRaw && q.isWindowList() {
return q.executeWindowList(ctx)
}
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
if err != nil {
return nil, err
}
// Execute the query with proper context for partial value detection
result, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}
result.Warnings = stmt.Warnings
return result, nil
}
// executeWithContext executes the query with query window and step context for partial value detection
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
totalRows := uint64(0)
totalBytes := uint64(0)
elapsed := time.Duration(0)
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
totalRows += p.Rows
totalBytes += p.Bytes
elapsed += p.Elapsed
}))
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "Query timed out").
WithAdditional("Try refining your search by adding relevant resource attributes filtering")
}
return nil, err
}
defer rows.Close()
// Pass query window and step for partial value detection
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
kind := q.kind
// all metric queries are time series then reduced if required
if q.spec.Signal == telemetrytypes.SignalMetrics {
kind = qbtypes.RequestTypeTimeSeries
}
payload, err := consume(rows, kind, queryWindow, q.spec.StepInterval, q.spec.Name)
if err != nil {
return nil, err
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(elapsed.Milliseconds()),
},
}, nil
}
func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) {
isAsc := len(q.spec.Order) > 0 &&
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
fromMS, toMS := q.fromMS, q.toMS
// Adjust [fromMS,toMS] window if a cursor was supplied
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
if ts, err := decodeCursor(cur); err == nil {
if isAsc {
if uint64(ts) >= fromMS {
fromMS = uint64(ts + 1)
}
} else { // DESC
if uint64(ts) <= toMS {
toMS = uint64(ts - 1)
}
}
}
}
reqLimit := q.spec.Limit
if reqLimit == 0 {
reqLimit = 10_000 // sane upper-bound default
}
offsetLeft := q.spec.Offset
need := reqLimit + offsetLeft // rows to fetch from ClickHouse
var rows []*qbtypes.RawRow
totalRows := uint64(0)
totalBytes := uint64(0)
start := time.Now()
// Get buckets and reverse them for ascending order
buckets := makeBuckets(fromMS, toMS)
if isAsc {
// Reverse the buckets for ascending order
for i, j := 0, len(buckets)-1; i < j; i, j = i+1, j-1 {
buckets[i], buckets[j] = buckets[j], buckets[i]
}
}
for _, r := range buckets {
q.spec.Offset = 0
q.spec.Limit = need
stmt, err := q.stmtBuilder.Build(ctx, r.fromNS/1e6, r.toNS/1e6, q.kind, q.spec, q.variables)
if err != nil {
return nil, err
}
// Execute with proper context for partial value detection
res, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}
totalRows += res.Stats.RowsScanned
totalBytes += res.Stats.BytesScanned
rawRows := res.Value.(*qbtypes.RawData).Rows
need -= len(rawRows)
for _, rr := range rawRows {
if offsetLeft > 0 { // client-requested initial offset
offsetLeft--
continue
}
rows = append(rows, rr)
if len(rows) >= reqLimit { // page filled
break
}
}
if len(rows) >= reqLimit {
break
}
}
nextCursor := ""
if len(rows) == reqLimit {
lastTS := rows[len(rows)-1].Timestamp.UnixMilli()
nextCursor = encodeCursor(lastTS)
}
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
Rows: rows,
NextCursor: nextCursor,
},
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
func encodeCursor(tsMilli int64) string {
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(tsMilli, 10)))
}
func decodeCursor(cur string) (int64, error) {
b, err := base64.StdEncoding.DecodeString(cur)
if err != nil {
return 0, err
}
return strconv.ParseInt(string(b), 10, 64)
}