signoz/pkg/querier/consume.go
Srikanth Chekuri 85f04e4bae
chore: add querier HTTP API endpoint and bucket cache implementation (#8178)
* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
2025-06-10 12:56:28 +00:00

423 lines
10 KiB
Go

package querier
import (
"fmt"
"math"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
var (
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
)
// consume reads every row and shapes it into the payload expected for the
// given request type.
//
// * Time-series - *qbtypes.TimeSeriesData
// * Scalar - *qbtypes.ScalarData
// * Raw - *qbtypes.RawData
// * Distribution- *qbtypes.DistributionData
func consume(rows driver.Rows, kind qbtypes.RequestType, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (any, error) {
var (
payload any
err error
)
switch kind {
case qbtypes.RequestTypeTimeSeries:
payload, err = readAsTimeSeries(rows, queryWindow, step, queryName)
case qbtypes.RequestTypeScalar:
payload, err = readAsScalar(rows, queryName)
case qbtypes.RequestTypeRaw:
payload, err = readAsRaw(rows, queryName)
// TODO: add support for other request types
}
return payload, err
}
func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (*qbtypes.TimeSeriesData, error) {
colTypes := rows.ColumnTypes()
colNames := rows.Columns()
slots := make([]any, len(colTypes))
numericColsCount := 0
for i, ct := range colTypes {
slots[i] = reflect.New(ct.ScanType()).Interface()
if numericKind(ct.ScanType().Kind()) {
numericColsCount++
}
}
type sKey struct {
agg int
key string // deterministic join of label values
}
seriesMap := map[sKey]*qbtypes.TimeSeries{}
stepMs := uint64(step.Duration.Milliseconds())
// Helper function to check if a timestamp represents a partial value
isPartialValue := func(timestamp int64) bool {
if stepMs == 0 || queryWindow == nil {
return false
}
timestampMs := uint64(timestamp)
// For the first interval, check if query start is misaligned
// The first complete interval starts at the first timestamp >= queryWindow.From that is aligned to step
firstCompleteInterval := queryWindow.From
if queryWindow.From%stepMs != 0 {
// Round up to next step boundary
firstCompleteInterval = ((queryWindow.From / stepMs) + 1) * stepMs
}
// If timestamp is before the first complete interval, it's partial
if timestampMs < firstCompleteInterval {
return true
}
// For the last interval, check if it would extend beyond query end
if timestampMs+stepMs > queryWindow.To {
return queryWindow.To%stepMs != 0
}
return false
}
// Pre-allocate for labels based on column count
lblValsCapacity := len(colNames) - 1 // -1 for timestamp
if lblValsCapacity < 0 {
lblValsCapacity = 0
}
for rows.Next() {
if err := rows.Scan(slots...); err != nil {
return nil, err
}
var (
ts int64
lblVals = make([]string, 0, lblValsCapacity)
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
aggValues = map[int]float64{} // all __result_N in this row
fallbackValue float64 // value when NO __result_N columns exist
fallbackSeen bool
)
for idx, ptr := range slots {
name := colNames[idx]
switch v := ptr.(type) {
case *time.Time:
ts = v.UnixMilli()
case *float64, *float32, *int64, *int32, *uint64, *uint32:
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
fallbackValue = val
fallbackSeen = true
} else {
// numeric label
lblVals = append(lblVals, fmt.Sprint(val))
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
}
case **float64, **float32, **int64, **int32, **uint64, **uint32:
tempVal := reflect.ValueOf(ptr)
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
val := numericAsFloat(tempVal.Elem().Elem().Interface())
if m := aggRe.FindStringSubmatch(name); m != nil {
id, _ := strconv.Atoi(m[1])
aggValues[id] = val
} else if numericColsCount == 1 { // classic single-value query
fallbackValue = val
fallbackSeen = true
} else {
// numeric label
lblVals = append(lblVals, fmt.Sprint(val))
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
}
}
case *string:
lblVals = append(lblVals, *v)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: *v,
})
case **string:
val := *v
if val == nil {
var empty string
val = &empty
}
lblVals = append(lblVals, *val)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
})
default:
continue
}
}
// Edge-case: no __result_N columns, but a single numeric column present
if len(aggValues) == 0 && fallbackSeen {
aggValues[0] = fallbackValue
}
if ts == 0 || len(aggValues) == 0 {
continue // nothing useful
}
sort.Strings(lblVals)
labelsKey := strings.Join(lblVals, ",")
// one point per aggregation in this row
for aggIdx, val := range aggValues {
if math.IsNaN(val) || math.IsInf(val, 0) {
continue
}
key := sKey{agg: aggIdx, key: labelsKey}
series, ok := seriesMap[key]
if !ok {
series = &qbtypes.TimeSeries{Labels: lblObjs}
seriesMap[key] = series
}
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
Timestamp: ts,
Value: val,
Partial: isPartialValue(ts),
})
}
}
if err := rows.Err(); err != nil {
return nil, err
}
maxAgg := -1
for k := range seriesMap {
if k.agg > maxAgg {
maxAgg = k.agg
}
}
if maxAgg < 0 {
//nolint:nilnil
return nil, nil // empty result-set
}
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
for i := range buckets {
buckets[i] = &qbtypes.AggregationBucket{
Index: i,
Alias: "__result_" + strconv.Itoa(i),
}
}
for k, s := range seriesMap {
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
}
var nonEmpty []*qbtypes.AggregationBucket
for _, b := range buckets {
if len(b.Series) > 0 {
nonEmpty = append(nonEmpty, b)
}
}
return &qbtypes.TimeSeriesData{
QueryName: queryName,
Aggregations: nonEmpty,
}, nil
}
func numericKind(k reflect.Kind) bool {
switch k {
case reflect.Float32, reflect.Float64,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return true
default:
return false
}
}
func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
cd := make([]*qbtypes.ColumnDescriptor, len(colNames))
var aggIndex int64
for i, name := range colNames {
colType := qbtypes.ColumnTypeGroup
if aggRe.MatchString(name) {
colType = qbtypes.ColumnTypeAggregation
}
cd[i] = &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
QueryName: queryName,
AggregationIndex: aggIndex,
Type: colType,
}
if colType == qbtypes.ColumnTypeAggregation {
aggIndex++
}
}
// Pre-allocate scan slots once
scan := make([]any, len(colTypes))
for i := range scan {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
var data [][]any
for rows.Next() {
if err := rows.Scan(scan...); err != nil {
return nil, err
}
// 2. deref each slot into the output row
row := make([]any, len(scan))
for i, cell := range scan {
valPtr := reflect.ValueOf(cell)
if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() {
row[i] = valPtr.Elem().Interface()
} else {
row[i] = nil // Nullable columns come back as nil pointers
}
}
data = append(data, row)
}
if err := rows.Err(); err != nil {
return nil, err
}
return &qbtypes.ScalarData{
Columns: cd,
Data: data,
}, nil
}
func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
colNames := rows.Columns()
colTypes := rows.ColumnTypes()
colCnt := len(colNames)
// Build a template slice of correctly-typed pointers once
scanTpl := make([]any, colCnt)
for i, ct := range colTypes {
scanTpl[i] = reflect.New(ct.ScanType()).Interface()
}
var outRows []*qbtypes.RawRow
for rows.Next() {
// fresh copy of the scan slice (otherwise the driver reuses pointers)
scan := make([]any, colCnt)
for i := range scanTpl {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
}
if err := rows.Scan(scan...); err != nil {
return nil, err
}
rr := qbtypes.RawRow{
Data: make(map[string]*any, colCnt),
}
for i, cellPtr := range scan {
name := colNames[i]
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// special-case: timestamp column
if name == "timestamp" || name == "timestamp_datetime" {
switch t := val.(type) {
case time.Time:
rr.Timestamp = t
case uint64: // epoch-ns stored as integer
rr.Timestamp = time.Unix(0, int64(t))
case int64:
rr.Timestamp = time.Unix(0, t)
default:
// leave zero time if unrecognised
}
}
// store value in map as *any, to match the schema
v := any(val)
rr.Data[name] = &v
}
outRows = append(outRows, &rr)
}
if err := rows.Err(); err != nil {
return nil, err
}
return &qbtypes.RawData{
QueryName: queryName,
Rows: outRows,
}, nil
}
// numericAsFloat converts numeric types to float64 efficiently
func numericAsFloat(v any) float64 {
switch x := v.(type) {
case float64:
return x
case int64:
return float64(x)
case float32:
return float64(x)
case int32:
return float64(x)
case uint64:
return float64(x)
case uint32:
return float64(x)
case int:
return float64(x)
case uint:
return float64(x)
case int16:
return float64(x)
case int8:
return float64(x)
case uint16:
return float64(x)
case uint8:
return float64(x)
default:
return math.NaN()
}
}