signoz/pkg/querier/trace_operator_query.go

91 lines
2.1 KiB
Go
Raw Permalink Normal View History

feat: trace operators BE (#8293) * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: [draft] added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: added implementation of trace operators * feat: refactor trace operator * feat: added postprocess * feat: added postprocess * feat: added postprocess * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: refactored the consume function * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: replaced info to debug logs * feat: replaced info to debug logs * feat: replaced info to debug logs * feat: updated time series query * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: fixed merge conflicts * feat: added deep copy in ranged queries * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: refactored fingerprinting * feat: added comment for build all spans cte * feat: added postprocess for timeseries and added limits to memory * feat: fixed span count in trace view * feat: fixed span count in trace view * feat: fixed linting issues * feat: fixed linting issues * feat: fixed linting issues * feat: fixed linting issues --------- Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2025-09-05 21:07:10 +05:30
package querier
import (
"context"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type traceOperatorQuery struct {
telemetryStore telemetrystore.TelemetryStore
stmtBuilder qbtypes.TraceOperatorStatementBuilder
spec qbtypes.QueryBuilderTraceOperator
compositeQuery *qbtypes.CompositeQuery
fromMS uint64
toMS uint64
kind qbtypes.RequestType
}
var _ qbtypes.Query = (*traceOperatorQuery)(nil)
func (q *traceOperatorQuery) Fingerprint() string {
return ""
}
func (q *traceOperatorQuery) Window() (uint64, uint64) {
return q.fromMS, q.toMS
}
func (q *traceOperatorQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
stmt, err := q.stmtBuilder.Build(
ctx,
q.fromMS,
q.toMS,
q.kind,
q.spec,
q.compositeQuery,
)
if err != nil {
return nil, err
}
// Execute the query with proper context
result, err := q.executeWithContext(ctx, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}
result.Warnings = stmt.Warnings
return result, nil
}
func (q *traceOperatorQuery) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
totalRows := uint64(0)
totalBytes := uint64(0)
elapsed := time.Duration(0)
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
totalRows += p.Rows
totalBytes += p.Bytes
elapsed += p.Elapsed
}))
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
// Pass query window and step for partial value detection
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
// Use the consume function like builderQuery does
payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name)
if err != nil {
return nil, err
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(elapsed.Milliseconds()),
},
}, nil
}