mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-19 00:17:00 +00:00
* chore: update types 1. add partial bool to indicate if the value covers the partial interval 2. add optional unit if present (ex: duration_nano, metrics with units) 3. use pointers wherever necessary 4. add format options for request and remove redundant name in query envelope * chore: fix some gaps 1. make the range as [start, end) 2. provide the logs statement builder with the body column 3. skip the body filter on resource filter statement builder 4. remove unnecessary agg expr rewriter in metrics 5. add ability to skip full text in where clause visitor * chore: add API endpoint for new query range * chore: add bucket cache implementation * chore: add fingerprinting impl and add bucket cache to querier * chore: add provider factory
423 lines
10 KiB
Go
423 lines
10 KiB
Go
package querier
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"reflect"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
)
|
|
|
|
var (
|
|
aggRe = regexp.MustCompile(`^__result_(\d+)$`)
|
|
)
|
|
|
|
// consume reads every row and shapes it into the payload expected for the
|
|
// given request type.
|
|
//
|
|
// * Time-series - *qbtypes.TimeSeriesData
|
|
// * Scalar - *qbtypes.ScalarData
|
|
// * Raw - *qbtypes.RawData
|
|
// * Distribution- *qbtypes.DistributionData
|
|
func consume(rows driver.Rows, kind qbtypes.RequestType, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (any, error) {
|
|
var (
|
|
payload any
|
|
err error
|
|
)
|
|
|
|
switch kind {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
payload, err = readAsTimeSeries(rows, queryWindow, step, queryName)
|
|
case qbtypes.RequestTypeScalar:
|
|
payload, err = readAsScalar(rows, queryName)
|
|
case qbtypes.RequestTypeRaw:
|
|
payload, err = readAsRaw(rows, queryName)
|
|
// TODO: add support for other request types
|
|
}
|
|
|
|
return payload, err
|
|
}
|
|
|
|
func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (*qbtypes.TimeSeriesData, error) {
|
|
|
|
colTypes := rows.ColumnTypes()
|
|
colNames := rows.Columns()
|
|
|
|
slots := make([]any, len(colTypes))
|
|
numericColsCount := 0
|
|
for i, ct := range colTypes {
|
|
slots[i] = reflect.New(ct.ScanType()).Interface()
|
|
if numericKind(ct.ScanType().Kind()) {
|
|
numericColsCount++
|
|
}
|
|
}
|
|
|
|
type sKey struct {
|
|
agg int
|
|
key string // deterministic join of label values
|
|
}
|
|
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
|
|
|
stepMs := uint64(step.Duration.Milliseconds())
|
|
|
|
// Helper function to check if a timestamp represents a partial value
|
|
isPartialValue := func(timestamp int64) bool {
|
|
if stepMs == 0 || queryWindow == nil {
|
|
return false
|
|
}
|
|
|
|
timestampMs := uint64(timestamp)
|
|
|
|
// For the first interval, check if query start is misaligned
|
|
// The first complete interval starts at the first timestamp >= queryWindow.From that is aligned to step
|
|
firstCompleteInterval := queryWindow.From
|
|
if queryWindow.From%stepMs != 0 {
|
|
// Round up to next step boundary
|
|
firstCompleteInterval = ((queryWindow.From / stepMs) + 1) * stepMs
|
|
}
|
|
|
|
// If timestamp is before the first complete interval, it's partial
|
|
if timestampMs < firstCompleteInterval {
|
|
return true
|
|
}
|
|
|
|
// For the last interval, check if it would extend beyond query end
|
|
if timestampMs+stepMs > queryWindow.To {
|
|
return queryWindow.To%stepMs != 0
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Pre-allocate for labels based on column count
|
|
lblValsCapacity := len(colNames) - 1 // -1 for timestamp
|
|
if lblValsCapacity < 0 {
|
|
lblValsCapacity = 0
|
|
}
|
|
|
|
for rows.Next() {
|
|
if err := rows.Scan(slots...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
ts int64
|
|
lblVals = make([]string, 0, lblValsCapacity)
|
|
lblObjs = make([]*qbtypes.Label, 0, lblValsCapacity)
|
|
aggValues = map[int]float64{} // all __result_N in this row
|
|
fallbackValue float64 // value when NO __result_N columns exist
|
|
fallbackSeen bool
|
|
)
|
|
|
|
for idx, ptr := range slots {
|
|
name := colNames[idx]
|
|
|
|
switch v := ptr.(type) {
|
|
case *time.Time:
|
|
ts = v.UnixMilli()
|
|
|
|
case *float64, *float32, *int64, *int32, *uint64, *uint32:
|
|
val := numericAsFloat(reflect.ValueOf(ptr).Elem().Interface())
|
|
if m := aggRe.FindStringSubmatch(name); m != nil {
|
|
id, _ := strconv.Atoi(m[1])
|
|
aggValues[id] = val
|
|
} else if numericColsCount == 1 { // classic single-value query
|
|
fallbackValue = val
|
|
fallbackSeen = true
|
|
} else {
|
|
// numeric label
|
|
lblVals = append(lblVals, fmt.Sprint(val))
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
}
|
|
|
|
case **float64, **float32, **int64, **int32, **uint64, **uint32:
|
|
tempVal := reflect.ValueOf(ptr)
|
|
if tempVal.IsValid() && !tempVal.IsNil() && !tempVal.Elem().IsNil() {
|
|
val := numericAsFloat(tempVal.Elem().Elem().Interface())
|
|
if m := aggRe.FindStringSubmatch(name); m != nil {
|
|
id, _ := strconv.Atoi(m[1])
|
|
aggValues[id] = val
|
|
} else if numericColsCount == 1 { // classic single-value query
|
|
fallbackValue = val
|
|
fallbackSeen = true
|
|
} else {
|
|
// numeric label
|
|
lblVals = append(lblVals, fmt.Sprint(val))
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
}
|
|
}
|
|
|
|
case *string:
|
|
lblVals = append(lblVals, *v)
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: *v,
|
|
})
|
|
|
|
case **string:
|
|
val := *v
|
|
if val == nil {
|
|
var empty string
|
|
val = &empty
|
|
}
|
|
lblVals = append(lblVals, *val)
|
|
lblObjs = append(lblObjs, &qbtypes.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: val,
|
|
})
|
|
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Edge-case: no __result_N columns, but a single numeric column present
|
|
if len(aggValues) == 0 && fallbackSeen {
|
|
aggValues[0] = fallbackValue
|
|
}
|
|
|
|
if ts == 0 || len(aggValues) == 0 {
|
|
continue // nothing useful
|
|
}
|
|
|
|
sort.Strings(lblVals)
|
|
labelsKey := strings.Join(lblVals, ",")
|
|
|
|
// one point per aggregation in this row
|
|
for aggIdx, val := range aggValues {
|
|
if math.IsNaN(val) || math.IsInf(val, 0) {
|
|
continue
|
|
}
|
|
|
|
key := sKey{agg: aggIdx, key: labelsKey}
|
|
|
|
series, ok := seriesMap[key]
|
|
if !ok {
|
|
series = &qbtypes.TimeSeries{Labels: lblObjs}
|
|
seriesMap[key] = series
|
|
}
|
|
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
|
|
Timestamp: ts,
|
|
Value: val,
|
|
Partial: isPartialValue(ts),
|
|
})
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
maxAgg := -1
|
|
for k := range seriesMap {
|
|
if k.agg > maxAgg {
|
|
maxAgg = k.agg
|
|
}
|
|
}
|
|
if maxAgg < 0 {
|
|
//nolint:nilnil
|
|
return nil, nil // empty result-set
|
|
}
|
|
|
|
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
|
|
for i := range buckets {
|
|
buckets[i] = &qbtypes.AggregationBucket{
|
|
Index: i,
|
|
Alias: "__result_" + strconv.Itoa(i),
|
|
}
|
|
}
|
|
for k, s := range seriesMap {
|
|
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
|
|
}
|
|
|
|
var nonEmpty []*qbtypes.AggregationBucket
|
|
for _, b := range buckets {
|
|
if len(b.Series) > 0 {
|
|
nonEmpty = append(nonEmpty, b)
|
|
}
|
|
}
|
|
|
|
return &qbtypes.TimeSeriesData{
|
|
QueryName: queryName,
|
|
Aggregations: nonEmpty,
|
|
}, nil
|
|
}
|
|
|
|
func numericKind(k reflect.Kind) bool {
|
|
switch k {
|
|
case reflect.Float32, reflect.Float64,
|
|
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
|
|
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, error) {
|
|
colNames := rows.Columns()
|
|
colTypes := rows.ColumnTypes()
|
|
|
|
cd := make([]*qbtypes.ColumnDescriptor, len(colNames))
|
|
|
|
var aggIndex int64
|
|
for i, name := range colNames {
|
|
colType := qbtypes.ColumnTypeGroup
|
|
if aggRe.MatchString(name) {
|
|
colType = qbtypes.ColumnTypeAggregation
|
|
}
|
|
cd[i] = &qbtypes.ColumnDescriptor{
|
|
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
QueryName: queryName,
|
|
AggregationIndex: aggIndex,
|
|
Type: colType,
|
|
}
|
|
if colType == qbtypes.ColumnTypeAggregation {
|
|
aggIndex++
|
|
}
|
|
}
|
|
|
|
// Pre-allocate scan slots once
|
|
scan := make([]any, len(colTypes))
|
|
for i := range scan {
|
|
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
|
}
|
|
|
|
var data [][]any
|
|
|
|
for rows.Next() {
|
|
if err := rows.Scan(scan...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 2. deref each slot into the output row
|
|
row := make([]any, len(scan))
|
|
for i, cell := range scan {
|
|
valPtr := reflect.ValueOf(cell)
|
|
if valPtr.Kind() == reflect.Pointer && !valPtr.IsNil() {
|
|
row[i] = valPtr.Elem().Interface()
|
|
} else {
|
|
row[i] = nil // Nullable columns come back as nil pointers
|
|
}
|
|
}
|
|
data = append(data, row)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &qbtypes.ScalarData{
|
|
Columns: cd,
|
|
Data: data,
|
|
}, nil
|
|
}
|
|
|
|
func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
|
|
|
|
colNames := rows.Columns()
|
|
colTypes := rows.ColumnTypes()
|
|
colCnt := len(colNames)
|
|
|
|
// Build a template slice of correctly-typed pointers once
|
|
scanTpl := make([]any, colCnt)
|
|
for i, ct := range colTypes {
|
|
scanTpl[i] = reflect.New(ct.ScanType()).Interface()
|
|
}
|
|
|
|
var outRows []*qbtypes.RawRow
|
|
|
|
for rows.Next() {
|
|
// fresh copy of the scan slice (otherwise the driver reuses pointers)
|
|
scan := make([]any, colCnt)
|
|
for i := range scanTpl {
|
|
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
|
}
|
|
|
|
if err := rows.Scan(scan...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rr := qbtypes.RawRow{
|
|
Data: make(map[string]*any, colCnt),
|
|
}
|
|
|
|
for i, cellPtr := range scan {
|
|
name := colNames[i]
|
|
|
|
// de-reference the typed pointer to any
|
|
val := reflect.ValueOf(cellPtr).Elem().Interface()
|
|
|
|
// special-case: timestamp column
|
|
if name == "timestamp" || name == "timestamp_datetime" {
|
|
switch t := val.(type) {
|
|
case time.Time:
|
|
rr.Timestamp = t
|
|
case uint64: // epoch-ns stored as integer
|
|
rr.Timestamp = time.Unix(0, int64(t))
|
|
case int64:
|
|
rr.Timestamp = time.Unix(0, t)
|
|
default:
|
|
// leave zero time if unrecognised
|
|
}
|
|
}
|
|
|
|
// store value in map as *any, to match the schema
|
|
v := any(val)
|
|
rr.Data[name] = &v
|
|
}
|
|
outRows = append(outRows, &rr)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &qbtypes.RawData{
|
|
QueryName: queryName,
|
|
Rows: outRows,
|
|
}, nil
|
|
}
|
|
|
|
// numericAsFloat converts numeric types to float64 efficiently
|
|
func numericAsFloat(v any) float64 {
|
|
switch x := v.(type) {
|
|
case float64:
|
|
return x
|
|
case int64:
|
|
return float64(x)
|
|
case float32:
|
|
return float64(x)
|
|
case int32:
|
|
return float64(x)
|
|
case uint64:
|
|
return float64(x)
|
|
case uint32:
|
|
return float64(x)
|
|
case int:
|
|
return float64(x)
|
|
case uint:
|
|
return float64(x)
|
|
case int16:
|
|
return float64(x)
|
|
case int8:
|
|
return float64(x)
|
|
case uint16:
|
|
return float64(x)
|
|
case uint8:
|
|
return float64(x)
|
|
default:
|
|
return math.NaN()
|
|
}
|
|
}
|