signoz/pkg/querier/clickhouse_query.go
Srikanth Chekuri 85f04e4bae
chore: add querier HTTP API endpoint and bucket cache implementation (#8178)
* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
2025-06-10 12:56:28 +00:00

81 lines
1.8 KiB
Go

package querier
import (
"context"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type chSQLQuery struct {
telemetryStore telemetrystore.TelemetryStore
query qbtypes.ClickHouseQuery
args []any
fromMS uint64
toMS uint64
kind qbtypes.RequestType
}
var _ qbtypes.Query = (*chSQLQuery)(nil)
func newchSQLQuery(
telemetryStore telemetrystore.TelemetryStore,
query qbtypes.ClickHouseQuery,
args []any,
tr qbtypes.TimeRange,
kind qbtypes.RequestType,
) *chSQLQuery {
return &chSQLQuery{
telemetryStore: telemetryStore,
query: query,
args: args,
fromMS: tr.From,
toMS: tr.To,
kind: kind,
}
}
func (q *chSQLQuery) Fingerprint() string {
// No caching for CH queries for now
return ""
}
func (q *chSQLQuery) Window() (uint64, uint64) { return q.fromMS, q.toMS }
func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
totalRows := uint64(0)
totalBytes := uint64(0)
elapsed := time.Duration(0)
ctx = clickhouse.Context(ctx, clickhouse.WithProgress(func(p *clickhouse.Progress) {
totalRows += p.Rows
totalBytes += p.Bytes
elapsed += p.Elapsed
}))
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, q.query.Query, q.args...)
if err != nil {
return nil, err
}
defer rows.Close()
// TODO: map the errors from ClickHouse to our error types
payload, err := consume(rows, q.kind, nil, qbtypes.Step{}, q.query.Name)
if err != nil {
return nil, err
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
Stats: qbtypes.ExecStats{
RowsScanned: totalRows,
BytesScanned: totalBytes,
DurationMS: uint64(elapsed.Milliseconds()),
},
}, nil
}