mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
367 lines
9.8 KiB
Go
367 lines
9.8 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
)
|
|
|
|
type builderQuery[T any] struct {
|
|
telemetryStore telemetrystore.TelemetryStore
|
|
stmtBuilder qbtypes.StatementBuilder[T]
|
|
spec qbtypes.QueryBuilderQuery[T]
|
|
variables map[string]qbtypes.VariableItem
|
|
|
|
fromMS uint64
|
|
toMS uint64
|
|
kind qbtypes.RequestType
|
|
}
|
|
|
|
var _ qbtypes.Query = (*builderQuery[any])(nil)
|
|
|
|
func newBuilderQuery[T any](
|
|
telemetryStore telemetrystore.TelemetryStore,
|
|
stmtBuilder qbtypes.StatementBuilder[T],
|
|
spec qbtypes.QueryBuilderQuery[T],
|
|
tr qbtypes.TimeRange,
|
|
kind qbtypes.RequestType,
|
|
variables map[string]qbtypes.VariableItem,
|
|
) *builderQuery[T] {
|
|
return &builderQuery[T]{
|
|
telemetryStore: telemetryStore,
|
|
stmtBuilder: stmtBuilder,
|
|
spec: spec,
|
|
variables: variables,
|
|
fromMS: tr.From,
|
|
toMS: tr.To,
|
|
kind: kind,
|
|
}
|
|
}
|
|
|
|
func (q *builderQuery[T]) Fingerprint() string {
|
|
|
|
if (q.spec.Signal == telemetrytypes.SignalTraces ||
|
|
q.spec.Signal == telemetrytypes.SignalLogs) && q.kind != qbtypes.RequestTypeTimeSeries {
|
|
// No caching for non-timeseries queries
|
|
return ""
|
|
}
|
|
|
|
// Create a deterministic fingerprint for builder queries
|
|
// This needs to include all fields that affect the query results
|
|
parts := []string{"builder"}
|
|
|
|
// Add signal type
|
|
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
|
|
|
|
// Add step interval if present
|
|
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))
|
|
|
|
// Add aggregations (convert to string representation)
|
|
if len(q.spec.Aggregations) > 0 {
|
|
aggParts := []string{}
|
|
for _, agg := range q.spec.Aggregations {
|
|
switch a := any(agg).(type) {
|
|
case qbtypes.TraceAggregation:
|
|
aggParts = append(aggParts, a.Expression)
|
|
case qbtypes.LogAggregation:
|
|
aggParts = append(aggParts, a.Expression)
|
|
case qbtypes.MetricAggregation:
|
|
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s",
|
|
a.MetricName,
|
|
a.Temporality.StringValue(),
|
|
a.TimeAggregation.StringValue(),
|
|
a.SpaceAggregation.StringValue(),
|
|
))
|
|
}
|
|
}
|
|
parts = append(parts, fmt.Sprintf("aggs=[%s]", strings.Join(aggParts, ",")))
|
|
}
|
|
|
|
// Add filter if present
|
|
if q.spec.Filter != nil && q.spec.Filter.Expression != "" {
|
|
parts = append(parts, fmt.Sprintf("filter=%s", q.spec.Filter.Expression))
|
|
|
|
for name, item := range q.variables {
|
|
if strings.Contains(q.spec.Filter.Expression, "$"+name) {
|
|
parts = append(parts, fmt.Sprintf("%s=%s", name, fmt.Sprint(item.Value)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add group by keys
|
|
if len(q.spec.GroupBy) > 0 {
|
|
groupByParts := []string{}
|
|
for _, gb := range q.spec.GroupBy {
|
|
groupByParts = append(groupByParts, fingerprintGroupByKey(gb))
|
|
}
|
|
parts = append(parts, fmt.Sprintf("groupby=[%s]", strings.Join(groupByParts, ",")))
|
|
}
|
|
|
|
// Add order by
|
|
if len(q.spec.Order) > 0 {
|
|
orderParts := []string{}
|
|
for _, o := range q.spec.Order {
|
|
orderParts = append(orderParts, fingerprintOrderBy(o))
|
|
}
|
|
parts = append(parts, fmt.Sprintf("order=[%s]", strings.Join(orderParts, ",")))
|
|
}
|
|
|
|
// Add limit and offset
|
|
if q.spec.Limit > 0 {
|
|
parts = append(parts, fmt.Sprintf("limit=%d", q.spec.Limit))
|
|
}
|
|
if q.spec.Offset > 0 {
|
|
parts = append(parts, fmt.Sprintf("offset=%d", q.spec.Offset))
|
|
}
|
|
|
|
// Add having clause
|
|
if q.spec.Having != nil && q.spec.Having.Expression != "" {
|
|
parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression))
|
|
}
|
|
|
|
if q.spec.ShiftBy != 0 {
|
|
parts = append(parts, fmt.Sprintf("shiftby=%d", q.spec.ShiftBy))
|
|
}
|
|
|
|
return strings.Join(parts, "&")
|
|
}
|
|
|
|
func fingerprintGroupByKey(gb qbtypes.GroupByKey) string {
|
|
return fingerprintFieldKey(gb.TelemetryFieldKey)
|
|
}
|
|
|
|
func fingerprintOrderBy(o qbtypes.OrderBy) string {
|
|
return fmt.Sprintf("%s:%s", fingerprintFieldKey(o.Key.TelemetryFieldKey), o.Direction.StringValue())
|
|
}
|
|
|
|
func fingerprintFieldKey(key telemetrytypes.TelemetryFieldKey) string {
|
|
// Include the essential fields that identify a field key
|
|
return fmt.Sprintf("%s-%s-%s-%s",
|
|
key.Name,
|
|
key.FieldDataType.StringValue(),
|
|
key.FieldContext.StringValue(),
|
|
key.Signal.StringValue())
|
|
}
|
|
|
|
func (q *builderQuery[T]) Window() (uint64, uint64) {
|
|
return q.fromMS, q.toMS
|
|
}
|
|
|
|
// must be a single query, ordered by timestamp (logs need an id tie-break).
|
|
func (q *builderQuery[T]) isWindowList() bool {
|
|
if len(q.spec.Order) == 0 {
|
|
return false
|
|
}
|
|
|
|
// first ORDER BY must be `timestamp`
|
|
if q.spec.Order[0].Key.Name != "timestamp" {
|
|
return false
|
|
}
|
|
|
|
if q.spec.Signal == telemetrytypes.SignalLogs {
|
|
// logs require timestamp,id with identical direction
|
|
if len(q.spec.Order) != 2 || q.spec.Order[1].Key.Name != "id" ||
|
|
q.spec.Order[1].Direction != q.spec.Order[0].Direction {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) {
|
|
|
|
// can we do window based pagination?
|
|
if q.kind == qbtypes.RequestTypeRaw && q.isWindowList() {
|
|
return q.executeWindowList(ctx)
|
|
}
|
|
|
|
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Execute the query with proper context for partial value detection
|
|
result, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result.Warnings = stmt.Warnings
|
|
return result, nil
|
|
}
|
|
|
|
// executeWithContext executes the query with query window and step context for partial value detection
|
|
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
|
|
totalRows := uint64(0)
|
|
totalBytes := uint64(0)
|
|
elapsed := time.Duration(0)
|
|
|
|
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
|
|
totalRows += p.Rows
|
|
totalBytes += p.Bytes
|
|
elapsed += p.Elapsed
|
|
}))
|
|
|
|
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "Query timed out").
|
|
WithAdditional("Try refining your search by adding relevant resource attributes filtering")
|
|
}
|
|
|
|
if !errors.Is(err, context.Canceled) {
|
|
return nil, errors.Newf(
|
|
errors.TypeInternal,
|
|
errors.CodeInternal,
|
|
"Something went wrong on our end. It's not you, it's us. Our team is notified about it. Reach out to support if issue persists.",
|
|
)
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Pass query window and step for partial value detection
|
|
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
|
|
|
|
kind := q.kind
|
|
// all metric queries are time series then reduced if required
|
|
if q.spec.Signal == telemetrytypes.SignalMetrics {
|
|
kind = qbtypes.RequestTypeTimeSeries
|
|
}
|
|
|
|
payload, err := consume(rows, kind, queryWindow, q.spec.StepInterval, q.spec.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &qbtypes.Result{
|
|
Type: q.kind,
|
|
Value: payload,
|
|
Stats: qbtypes.ExecStats{
|
|
RowsScanned: totalRows,
|
|
BytesScanned: totalBytes,
|
|
DurationMS: uint64(elapsed.Milliseconds()),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Result, error) {
|
|
isAsc := len(q.spec.Order) > 0 &&
|
|
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
|
|
|
|
fromMS, toMS := q.fromMS, q.toMS
|
|
|
|
// Adjust [fromMS,toMS] window if a cursor was supplied
|
|
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
|
|
if ts, err := decodeCursor(cur); err == nil {
|
|
if isAsc {
|
|
if uint64(ts) >= fromMS {
|
|
fromMS = uint64(ts + 1)
|
|
}
|
|
} else { // DESC
|
|
if uint64(ts) <= toMS {
|
|
toMS = uint64(ts - 1)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
reqLimit := q.spec.Limit
|
|
if reqLimit == 0 {
|
|
reqLimit = 10_000 // sane upper-bound default
|
|
}
|
|
offsetLeft := q.spec.Offset
|
|
need := reqLimit + offsetLeft // rows to fetch from ClickHouse
|
|
|
|
var rows []*qbtypes.RawRow
|
|
|
|
totalRows := uint64(0)
|
|
totalBytes := uint64(0)
|
|
start := time.Now()
|
|
|
|
// Get buckets and reverse them for ascending order
|
|
buckets := makeBuckets(fromMS, toMS)
|
|
if isAsc {
|
|
// Reverse the buckets for ascending order
|
|
for i, j := 0, len(buckets)-1; i < j; i, j = i+1, j-1 {
|
|
buckets[i], buckets[j] = buckets[j], buckets[i]
|
|
}
|
|
}
|
|
|
|
for _, r := range buckets {
|
|
q.spec.Offset = 0
|
|
q.spec.Limit = need
|
|
|
|
stmt, err := q.stmtBuilder.Build(ctx, r.fromNS/1e6, r.toNS/1e6, q.kind, q.spec, q.variables)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Execute with proper context for partial value detection
|
|
res, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
totalRows += res.Stats.RowsScanned
|
|
totalBytes += res.Stats.BytesScanned
|
|
|
|
rawRows := res.Value.(*qbtypes.RawData).Rows
|
|
need -= len(rawRows)
|
|
|
|
for _, rr := range rawRows {
|
|
if offsetLeft > 0 { // client-requested initial offset
|
|
offsetLeft--
|
|
continue
|
|
}
|
|
rows = append(rows, rr)
|
|
if len(rows) >= reqLimit { // page filled
|
|
break
|
|
}
|
|
}
|
|
if len(rows) >= reqLimit {
|
|
break
|
|
}
|
|
}
|
|
|
|
nextCursor := ""
|
|
if len(rows) == reqLimit {
|
|
lastTS := rows[len(rows)-1].Timestamp.UnixMilli()
|
|
nextCursor = encodeCursor(lastTS)
|
|
}
|
|
|
|
return &qbtypes.Result{
|
|
Type: qbtypes.RequestTypeRaw,
|
|
Value: &qbtypes.RawData{
|
|
QueryName: q.spec.Name,
|
|
Rows: rows,
|
|
NextCursor: nextCursor,
|
|
},
|
|
Stats: qbtypes.ExecStats{
|
|
RowsScanned: totalRows,
|
|
BytesScanned: totalBytes,
|
|
DurationMS: uint64(time.Since(start).Milliseconds()),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func encodeCursor(tsMilli int64) string {
|
|
return base64.StdEncoding.EncodeToString([]byte(strconv.FormatInt(tsMilli, 10)))
|
|
}
|
|
|
|
func decodeCursor(cur string) (int64, error) {
|
|
b, err := base64.StdEncoding.DecodeString(cur)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return strconv.ParseInt(string(b), 10, 64)
|
|
}
|