signoz/pkg/querier/bucket_cache.go
Srikanth Chekuri 85f04e4bae
chore: add querier HTTP API endpoint and bucket cache implementation (#8178)
* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
2025-06-10 12:56:28 +00:00

819 lines
22 KiB
Go

package querier
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"slices"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// bucketCache implements the BucketCache interface
type bucketCache struct {
cache cache.Cache
logger *slog.Logger
cacheTTL time.Duration
fluxInterval time.Duration
}
var _ BucketCache = (*bucketCache)(nil)
// NewBucketCache creates a new BucketCache implementation
func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheTTL time.Duration, fluxInterval time.Duration) BucketCache {
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier/bucket_cache")
return &bucketCache{
cache: cache,
logger: cacheSettings.Logger(),
cacheTTL: cacheTTL,
fluxInterval: fluxInterval,
}
}
// cachedBucket represents a cached time bucket
type cachedBucket struct {
StartMs uint64 `json:"startMs"`
EndMs uint64 `json:"endMs"`
Type qbtypes.RequestType `json:"type"`
Value json.RawMessage `json:"value"`
Stats qbtypes.ExecStats `json:"stats"`
}
// cachedData represents the full cached data for a query
type cachedData struct {
Buckets []*cachedBucket `json:"buckets"`
Warnings []string `json:"warnings"`
}
func (c *cachedData) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
func (c *cachedData) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}
// GetMissRanges returns cached data and missing time ranges
func (bc *bucketCache) GetMissRanges(
ctx context.Context,
orgID valuer.UUID,
q qbtypes.Query,
step qbtypes.Step,
) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) {
// Get query window
startMs, endMs := q.Window()
bc.logger.DebugContext(ctx, "getting miss ranges", "fingerprint", q.Fingerprint(), "start", startMs, "end", endMs)
// Generate cache key
cacheKey := bc.generateCacheKey(q)
bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey)
// Try to get cached data
var data cachedData
err := bc.cache.Get(ctx, orgID, cacheKey, &data, false)
if err != nil {
if !errors.Ast(err, errors.TypeNotFound) {
bc.logger.ErrorContext(ctx, "error getting cached data", "error", err)
}
// No cached data, need to fetch entire range
missing = []*qbtypes.TimeRange{{From: startMs, To: endMs}}
return nil, missing
}
// Extract step interval if this is a builder query
stepMs := uint64(step.Duration.Milliseconds())
// Find missing ranges with step alignment
missing = bc.findMissingRangesWithStep(data.Buckets, startMs, endMs, stepMs)
bc.logger.DebugContext(ctx, "missing ranges", "missing", missing, "step", stepMs)
// If no cached data overlaps with requested range, return empty result
if len(data.Buckets) == 0 {
return nil, missing
}
// Extract relevant buckets and merge them
relevantBuckets := bc.filterRelevantBuckets(data.Buckets, startMs, endMs)
if len(relevantBuckets) == 0 {
return nil, missing
}
// Merge buckets into a single result
mergedResult := bc.mergeBuckets(ctx, relevantBuckets, data.Warnings)
// Filter the merged result to only include values within the requested time range
mergedResult = bc.filterResultToTimeRange(mergedResult, startMs, endMs)
return mergedResult, missing
}
// Put stores fresh query results in the cache
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) {
// Get query window
startMs, endMs := q.Window()
// Calculate the flux boundary - data after this point should not be cached
currentMs := uint64(time.Now().UnixMilli())
fluxBoundary := currentMs - uint64(bc.fluxInterval.Milliseconds())
// If the entire range is within flux interval, skip caching
if startMs >= fluxBoundary {
bc.logger.DebugContext(ctx, "entire range within flux interval, skipping cache",
"start", startMs,
"end", endMs,
"flux_boundary", fluxBoundary)
return
}
// Adjust endMs to not include data within flux interval
cachableEndMs := endMs
if endMs > fluxBoundary {
cachableEndMs = fluxBoundary
bc.logger.DebugContext(ctx, "adjusting end time to exclude flux interval",
"original_end", endMs,
"cachable_end", cachableEndMs)
}
// Generate cache key
cacheKey := bc.generateCacheKey(q)
// Get existing cached data
var existingData cachedData
if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData, true); err != nil {
existingData = cachedData{}
}
// Trim the result to exclude data within flux interval
trimmedResult := bc.trimResultToFluxBoundary(fresh, cachableEndMs)
if trimmedResult == nil {
// Result type is not cacheable (raw or scalar)
return
}
// Convert trimmed result to buckets
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs)
// If no fresh buckets and no existing data, don't cache
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
return
}
// Merge with existing buckets
mergedBuckets := bc.mergeAndDeduplicateBuckets(existingData.Buckets, freshBuckets)
// Update warnings
allWarnings := append(existingData.Warnings, trimmedResult.Warnings...)
uniqueWarnings := bc.deduplicateWarnings(allWarnings)
// Create updated cached data
updatedData := cachedData{
Buckets: mergedBuckets,
Warnings: uniqueWarnings,
}
// Marshal and store in cache
if err := bc.cache.Set(ctx, orgID, cacheKey, &updatedData, bc.cacheTTL); err != nil {
bc.logger.ErrorContext(ctx, "error setting cached data", "error", err)
}
}
// generateCacheKey creates a unique cache key based on query fingerprint
func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string {
fingerprint := q.Fingerprint()
return fmt.Sprintf("v5:query:%s", fingerprint)
}
// findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment
func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
// When step is 0 or window is too small to be cached, use simple algorithm
if stepMs == 0 || (startMs+stepMs) > endMs {
return bc.findMissingRangesBasic(buckets, startMs, endMs)
}
// When no buckets exist, handle partial windows specially
if len(buckets) == 0 {
missing := make([]*qbtypes.TimeRange, 0, 3)
currentMs := startMs
// Check if start is not aligned - add partial window
if startMs%stepMs != 0 {
nextAggStart := startMs - (startMs % stepMs) + stepMs
missing = append(missing, &qbtypes.TimeRange{
From: startMs,
To: min(nextAggStart, endMs),
})
currentMs = nextAggStart
}
// Add the main range if needed
if currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
return missing
}
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(buckets); i++ {
if buckets[i].StartMs < buckets[i-1].StartMs {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
}
// Pre-allocate with reasonable capacity
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+2)
currentMs := startMs
// Check if start is not aligned - add partial window
if startMs%stepMs != 0 {
nextAggStart := startMs - (startMs % stepMs) + stepMs
missing = append(missing, &qbtypes.TimeRange{
From: startMs,
To: min(nextAggStart, endMs),
})
currentMs = nextAggStart
}
for _, bucket := range buckets {
// Skip buckets that end before current position
if bucket.EndMs <= currentMs {
continue
}
// Stop processing if we've reached the end time
if bucket.StartMs >= endMs {
break
}
// Align bucket boundaries to step intervals
alignedBucketStart := bucket.StartMs
if bucket.StartMs%stepMs != 0 {
// Round up to next step boundary
alignedBucketStart = bucket.StartMs - (bucket.StartMs % stepMs) + stepMs
}
// Add gap before this bucket if needed
if currentMs < alignedBucketStart && currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(alignedBucketStart, endMs),
})
}
// Update current position to the end of this bucket
// But ensure it's aligned to step boundary
bucketEnd := min(bucket.EndMs, endMs)
if bucketEnd%stepMs != 0 && bucketEnd < endMs {
// Round down to step boundary
bucketEnd = bucketEnd - (bucketEnd % stepMs)
}
currentMs = max(currentMs, bucketEnd)
}
// Add final gap if needed
if currentMs < endMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
// Don't merge ranges - keep partial windows separate for proper handling
return missing
}
// findMissingRangesBasic is the simple algorithm without step alignment
func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(buckets); i++ {
if buckets[i].StartMs < buckets[i-1].StartMs {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
}
// Pre-allocate with reasonable capacity
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+1)
currentMs := startMs
for _, bucket := range buckets {
// Skip buckets that end before start time
if bucket.EndMs <= startMs {
continue
}
// Stop processing if we've reached the end time
if bucket.StartMs >= endMs {
break
}
// Add gap before this bucket if needed
if currentMs < bucket.StartMs {
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(bucket.StartMs, endMs),
})
}
// Update current position, but don't go past the end time
currentMs = max(currentMs, min(bucket.EndMs, endMs))
}
// Add final gap if needed
if currentMs < endMs {
// Check if we need to limit due to flux interval
currentTime := uint64(time.Now().UnixMilli())
fluxBoundary := currentTime - uint64(bc.fluxInterval.Milliseconds())
// If the missing range extends beyond flux boundary, limit it
if currentMs < fluxBoundary {
// Add range up to flux boundary
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: min(endMs, fluxBoundary),
})
// If endMs is beyond flux boundary, add that as another missing range
if endMs > fluxBoundary {
missing = append(missing, &qbtypes.TimeRange{
From: fluxBoundary,
To: endMs,
})
}
} else {
// Entire missing range is within flux interval
missing = append(missing, &qbtypes.TimeRange{
From: currentMs,
To: endMs,
})
}
}
// Don't merge ranges - keep partial windows separate for proper handling
return missing
}
// filterRelevantBuckets returns buckets that overlap with the requested time range
func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket {
// Pre-allocate with estimated capacity
relevant := make([]*cachedBucket, 0, len(buckets))
for _, bucket := range buckets {
// Check if bucket overlaps with requested range
if bucket.EndMs > startMs && bucket.StartMs < endMs {
relevant = append(relevant, bucket)
}
}
// Sort by start time
slices.SortFunc(relevant, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
return relevant
}
// mergeBuckets combines multiple cached buckets into a single result
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result {
if len(buckets) == 0 {
return &qbtypes.Result{}
}
// All buckets should have the same type
resultType := buckets[0].Type
// Aggregate stats
var totalStats qbtypes.ExecStats
for _, bucket := range buckets {
totalStats.RowsScanned += bucket.Stats.RowsScanned
totalStats.BytesScanned += bucket.Stats.BytesScanned
totalStats.DurationMS += bucket.Stats.DurationMS
}
// Merge values based on type
var mergedValue any
switch resultType {
case qbtypes.RequestTypeTimeSeries:
mergedValue = bc.mergeTimeSeriesValues(ctx, buckets)
// Raw and Scalar types are not cached, so no merge needed
}
return &qbtypes.Result{
Type: resultType,
Value: mergedValue,
Stats: totalStats,
Warnings: warnings,
}
}
// mergeTimeSeriesValues merges time series data from multiple buckets
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData {
// Estimate capacity based on bucket count
estimatedSeries := len(buckets) * 10
// Flat map with composite key for better performance
type seriesKey struct {
aggIndex int
key string
}
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
var queryName string
for _, bucket := range buckets {
var tsData *qbtypes.TimeSeriesData
if err := json.Unmarshal(bucket.Value, &tsData); err != nil {
bc.logger.ErrorContext(ctx, "failed to unmarshal time series data", "error", err)
continue
}
// Preserve the query name from the first bucket
if queryName == "" && tsData.QueryName != "" {
queryName = tsData.QueryName
}
for _, aggBucket := range tsData.Aggregations {
for _, series := range aggBucket.Series {
// Create series key from labels
key := seriesKey{
aggIndex: aggBucket.Index,
key: qbtypes.GetUniqueSeriesKey(series.Labels),
}
if existingSeries, ok := seriesMap[key]; ok {
// Pre-allocate capacity for merged values
newCap := len(existingSeries.Values) + len(series.Values)
if cap(existingSeries.Values) < newCap {
newValues := make([]*qbtypes.TimeSeriesValue, len(existingSeries.Values), newCap)
copy(newValues, existingSeries.Values)
existingSeries.Values = newValues
}
existingSeries.Values = append(existingSeries.Values, series.Values...)
} else {
// New series
seriesMap[key] = series
}
}
}
}
// Group series by aggregation index
aggMap := make(map[int][]*qbtypes.TimeSeries)
for key, series := range seriesMap {
aggMap[key.aggIndex] = append(aggMap[key.aggIndex], series)
}
// Convert map back to slice
result := &qbtypes.TimeSeriesData{
QueryName: queryName,
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(aggMap)),
}
for index, seriesList := range aggMap {
// Sort values by timestamp for each series
for _, s := range seriesList {
// Check if already sorted before sorting
needsSort := false
for i := 1; i < len(s.Values); i++ {
if s.Values[i].Timestamp < s.Values[i-1].Timestamp {
needsSort = true
break
}
}
if needsSort {
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
if a.Timestamp < b.Timestamp {
return -1
}
if a.Timestamp > b.Timestamp {
return 1
}
return 0
})
}
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
Index: index,
Series: seriesList,
})
}
return result
}
// isEmptyResult checks if a result is truly empty (no data exists) vs filtered empty (data was filtered out)
func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFiltered bool) {
if result.Value == nil {
return true, false
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
// No aggregations at all means truly empty
if len(tsData.Aggregations) == 0 {
return true, false
}
// Check if we have aggregations but no series (filtered out)
totalSeries := 0
for _, agg := range tsData.Aggregations {
totalSeries += len(agg.Series)
}
if totalSeries == 0 {
// We have aggregations but no series - data was filtered out
return true, true
}
// Check if all series have no values
hasValues := false
for _, agg := range tsData.Aggregations {
for _, series := range agg.Series {
if len(series.Values) > 0 {
hasValues = true
break
}
}
if hasValues {
break
}
}
return !hasValues, !hasValues && totalSeries > 0
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
// Raw and scalar data are not cached
return true, false
}
return true, false
}
// resultToBuckets converts a query result into time-based buckets
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket {
// Check if result is empty
isEmpty, isFiltered := bc.isEmptyResult(result)
// Don't cache if result is empty but not filtered
// Empty filtered results should be cached to avoid re-querying
if isEmpty && !isFiltered {
bc.logger.DebugContext(ctx, "skipping cache for empty non-filtered result")
return nil
}
// For now, create a single bucket for the entire range
// In the future, we could split large ranges into smaller buckets
valueBytes, err := json.Marshal(result.Value)
if err != nil {
bc.logger.ErrorContext(ctx, "failed to marshal result value", "error", err)
return nil
}
// Always create a bucket, even for empty filtered results
// This ensures we don't re-query for data that doesn't exist
return []*cachedBucket{
{
StartMs: startMs,
EndMs: endMs,
Type: result.Type,
Value: valueBytes,
Stats: result.Stats,
},
}
}
// mergeAndDeduplicateBuckets combines and deduplicates bucket lists
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket {
// Create a map to deduplicate by time range
bucketMap := make(map[string]*cachedBucket)
// Add existing buckets
for _, bucket := range existing {
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
bucketMap[key] = bucket
}
// Add/update with fresh buckets
for _, bucket := range fresh {
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
bucketMap[key] = bucket
}
// Convert back to slice with pre-allocated capacity
result := make([]*cachedBucket, 0, len(bucketMap))
for _, bucket := range bucketMap {
result = append(result, bucket)
}
// Sort by start time
slices.SortFunc(result, func(a, b *cachedBucket) int {
if a.StartMs < b.StartMs {
return -1
}
if a.StartMs > b.StartMs {
return 1
}
return 0
})
return result
}
// deduplicateWarnings removes duplicate warnings
func (bc *bucketCache) deduplicateWarnings(warnings []string) []string {
if len(warnings) == 0 {
return nil
}
seen := make(map[string]bool, len(warnings))
unique := make([]string, 0, len(warnings)) // Pre-allocate capacity
for _, warning := range warnings {
if !seen[warning] {
seen[warning] = true
unique = append(unique, warning)
}
}
return unique
}
// trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary
func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoundary uint64) *qbtypes.Result {
trimmedResult := &qbtypes.Result{
Type: result.Type,
Stats: result.Stats,
Warnings: result.Warnings,
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
// Trim time series data
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
trimmedData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
}
for _, aggBucket := range tsData.Aggregations {
trimmedBucket := &qbtypes.AggregationBucket{
Index: aggBucket.Index,
}
for _, series := range aggBucket.Series {
trimmedSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
}
// Filter values to exclude those beyond flux boundary and partial values
for _, value := range series.Values {
// Skip partial values - they cannot be cached
if value.Partial {
continue
}
if uint64(value.Timestamp) <= fluxBoundary {
trimmedSeries.Values = append(trimmedSeries.Values, value)
}
}
// Always add the series to preserve filtered empty results
trimmedBucket.Series = append(trimmedBucket.Series, trimmedSeries)
}
// Always add the bucket to preserve aggregation structure
trimmedData.Aggregations = append(trimmedData.Aggregations, trimmedBucket)
}
// Always set the value to preserve empty filtered results
trimmedResult.Value = trimmedData
}
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
// Don't cache raw or scalar data
return nil
}
return trimmedResult
}
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
// filterResultToTimeRange filters the result to only include values within the requested time range
func (bc *bucketCache) filterResultToTimeRange(result *qbtypes.Result, startMs, endMs uint64) *qbtypes.Result {
if result == nil || result.Value == nil {
return result
}
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
filteredData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(tsData.Aggregations)),
}
for _, aggBucket := range tsData.Aggregations {
filteredBucket := &qbtypes.AggregationBucket{
Index: aggBucket.Index,
Alias: aggBucket.Alias,
Meta: aggBucket.Meta,
Series: make([]*qbtypes.TimeSeries, 0, len(aggBucket.Series)),
}
for _, series := range aggBucket.Series {
filteredSeries := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Values)),
}
// Filter values to only include those within the requested time range
for _, value := range series.Values {
timestampMs := uint64(value.Timestamp)
if timestampMs >= startMs && timestampMs < endMs {
filteredSeries.Values = append(filteredSeries.Values, value)
}
}
// Always add series to preserve structure (even if empty)
filteredBucket.Series = append(filteredBucket.Series, filteredSeries)
}
// Only add bucket if it has series
if len(filteredBucket.Series) > 0 {
filteredData.Aggregations = append(filteredData.Aggregations, filteredBucket)
}
}
// Create a new result with the filtered data
return &qbtypes.Result{
Type: result.Type,
Value: filteredData,
Stats: result.Stats,
Warnings: result.Warnings,
}
}
}
// For non-time series data, return as is
return result
}