package querier import ( "context" "encoding/json" "fmt" "log/slog" "slices" "time" "github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/factory" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" "github.com/SigNoz/signoz/pkg/valuer" ) // bucketCache implements the BucketCache interface type bucketCache struct { cache cache.Cache logger *slog.Logger cacheTTL time.Duration fluxInterval time.Duration } var _ BucketCache = (*bucketCache)(nil) // NewBucketCache creates a new BucketCache implementation func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheTTL time.Duration, fluxInterval time.Duration) BucketCache { cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier/bucket_cache") return &bucketCache{ cache: cache, logger: cacheSettings.Logger(), cacheTTL: cacheTTL, fluxInterval: fluxInterval, } } // cachedBucket represents a cached time bucket type cachedBucket struct { StartMs uint64 `json:"startMs"` EndMs uint64 `json:"endMs"` Type qbtypes.RequestType `json:"type"` Value json.RawMessage `json:"value"` Stats qbtypes.ExecStats `json:"stats"` } // cachedData represents the full cached data for a query type cachedData struct { Buckets []*cachedBucket `json:"buckets"` Warnings []string `json:"warnings"` } func (c *cachedData) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, c) } func (c *cachedData) MarshalBinary() ([]byte, error) { return json.Marshal(c) } // GetMissRanges returns cached data and missing time ranges func (bc *bucketCache) GetMissRanges( ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, ) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) { // Get query window startMs, endMs := q.Window() bc.logger.DebugContext(ctx, "getting miss ranges", "fingerprint", q.Fingerprint(), "start", startMs, "end", endMs) // Generate cache key cacheKey := bc.generateCacheKey(q) bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey) // Try to get cached data var data cachedData err := bc.cache.Get(ctx, orgID, cacheKey, &data, false) if err != nil { if !errors.Ast(err, errors.TypeNotFound) { bc.logger.ErrorContext(ctx, "error getting cached data", "error", err) } // No cached data, need to fetch entire range missing = []*qbtypes.TimeRange{{From: startMs, To: endMs}} return nil, missing } // Extract step interval if this is a builder query stepMs := uint64(step.Duration.Milliseconds()) // Find missing ranges with step alignment missing = bc.findMissingRangesWithStep(data.Buckets, startMs, endMs, stepMs) bc.logger.DebugContext(ctx, "missing ranges", "missing", missing, "step", stepMs) // If no cached data overlaps with requested range, return empty result if len(data.Buckets) == 0 { return nil, missing } // Extract relevant buckets and merge them relevantBuckets := bc.filterRelevantBuckets(data.Buckets, startMs, endMs) if len(relevantBuckets) == 0 { return nil, missing } // Merge buckets into a single result mergedResult := bc.mergeBuckets(ctx, relevantBuckets, data.Warnings) // Filter the merged result to only include values within the requested time range mergedResult = bc.filterResultToTimeRange(mergedResult, startMs, endMs) return mergedResult, missing } // Put stores fresh query results in the cache func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) { // Get query window startMs, endMs := q.Window() // Calculate the flux boundary - data after this point should not be cached currentMs := uint64(time.Now().UnixMilli()) fluxBoundary := currentMs - uint64(bc.fluxInterval.Milliseconds()) // If the entire range is within flux interval, skip caching if startMs >= fluxBoundary { bc.logger.DebugContext(ctx, "entire range within flux interval, skipping cache", "start", startMs, "end", endMs, "flux_boundary", fluxBoundary) return } // Adjust endMs to not include data within flux interval cachableEndMs := endMs if endMs > fluxBoundary { cachableEndMs = fluxBoundary bc.logger.DebugContext(ctx, "adjusting end time to exclude flux interval", "original_end", endMs, "cachable_end", cachableEndMs) } // Generate cache key cacheKey := bc.generateCacheKey(q) // Get existing cached data var existingData cachedData if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData, true); err != nil { existingData = cachedData{} } // Trim the result to exclude data within flux interval trimmedResult := bc.trimResultToFluxBoundary(fresh, cachableEndMs) if trimmedResult == nil { // Result type is not cacheable (raw or scalar) return } // Adjust start and end times to only cache complete intervals cachableStartMs := startMs stepMs := uint64(step.Duration.Milliseconds()) // If we have a step interval, adjust boundaries to only cache complete intervals if stepMs > 0 { // If start is not aligned, round up to next step boundary (first complete interval) if startMs%stepMs != 0 { cachableStartMs = ((startMs / stepMs) + 1) * stepMs } // If end is not aligned, round down to previous step boundary (last complete interval) if cachableEndMs%stepMs != 0 { cachableEndMs = (cachableEndMs / stepMs) * stepMs } // If after adjustment we have no complete intervals, don't cache if cachableStartMs >= cachableEndMs { bc.logger.DebugContext(ctx, "no complete intervals to cache", "original_start", startMs, "original_end", endMs, "adjusted_start", cachableStartMs, "adjusted_end", cachableEndMs, "step", stepMs) return } } // Convert trimmed result to buckets with adjusted boundaries freshBuckets := bc.resultToBuckets(ctx, trimmedResult, cachableStartMs, cachableEndMs) // If no fresh buckets and no existing data, don't cache if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 { return } // Merge with existing buckets mergedBuckets := bc.mergeAndDeduplicateBuckets(existingData.Buckets, freshBuckets) // Update warnings allWarnings := append(existingData.Warnings, trimmedResult.Warnings...) uniqueWarnings := bc.deduplicateWarnings(allWarnings) // Create updated cached data updatedData := cachedData{ Buckets: mergedBuckets, Warnings: uniqueWarnings, } // Marshal and store in cache if err := bc.cache.Set(ctx, orgID, cacheKey, &updatedData, bc.cacheTTL); err != nil { bc.logger.ErrorContext(ctx, "error setting cached data", "error", err) } } // generateCacheKey creates a unique cache key based on query fingerprint func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string { fingerprint := q.Fingerprint() return fmt.Sprintf("v5:query:%s", fingerprint) } // findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange { // When step is 0 or window is too small to be cached, use simple algorithm if stepMs == 0 || (startMs+stepMs) > endMs { return bc.findMissingRangesBasic(buckets, startMs, endMs) } // When no buckets exist, handle partial windows specially if len(buckets) == 0 { missing := make([]*qbtypes.TimeRange, 0, 3) currentMs := startMs // Check if start is not aligned - add partial window if startMs%stepMs != 0 { nextAggStart := startMs - (startMs % stepMs) + stepMs missing = append(missing, &qbtypes.TimeRange{ From: startMs, To: min(nextAggStart, endMs), }) currentMs = nextAggStart } // Add the main range if needed if currentMs < endMs { missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: endMs, }) } return missing } // Check if already sorted before sorting needsSort := false for i := 1; i < len(buckets); i++ { if buckets[i].StartMs < buckets[i-1].StartMs { needsSort = true break } } if needsSort { slices.SortFunc(buckets, func(a, b *cachedBucket) int { if a.StartMs < b.StartMs { return -1 } if a.StartMs > b.StartMs { return 1 } return 0 }) } // Pre-allocate with reasonable capacity missing := make([]*qbtypes.TimeRange, 0, len(buckets)+2) currentMs := startMs // Check if start is not aligned - add partial window if startMs%stepMs != 0 { nextAggStart := startMs - (startMs % stepMs) + stepMs missing = append(missing, &qbtypes.TimeRange{ From: startMs, To: min(nextAggStart, endMs), }) currentMs = nextAggStart } for _, bucket := range buckets { // Skip buckets that end before current position if bucket.EndMs <= currentMs { continue } // Stop processing if we've reached the end time if bucket.StartMs >= endMs { break } // Align bucket boundaries to step intervals alignedBucketStart := bucket.StartMs if bucket.StartMs%stepMs != 0 { // Round up to next step boundary alignedBucketStart = bucket.StartMs - (bucket.StartMs % stepMs) + stepMs } // Add gap before this bucket if needed if currentMs < alignedBucketStart && currentMs < endMs { missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: min(alignedBucketStart, endMs), }) } // Update current position to the end of this bucket // But ensure it's aligned to step boundary bucketEnd := min(bucket.EndMs, endMs) if bucketEnd%stepMs != 0 && bucketEnd < endMs { // Round down to step boundary bucketEnd = bucketEnd - (bucketEnd % stepMs) } currentMs = max(currentMs, bucketEnd) } // Add final gap if needed if currentMs < endMs { missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: endMs, }) } // Don't merge ranges - keep partial windows separate for proper handling return missing } // findMissingRangesBasic is the simple algorithm without step alignment func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange { // Check if already sorted before sorting needsSort := false for i := 1; i < len(buckets); i++ { if buckets[i].StartMs < buckets[i-1].StartMs { needsSort = true break } } if needsSort { slices.SortFunc(buckets, func(a, b *cachedBucket) int { if a.StartMs < b.StartMs { return -1 } if a.StartMs > b.StartMs { return 1 } return 0 }) } // Pre-allocate with reasonable capacity missing := make([]*qbtypes.TimeRange, 0, len(buckets)+1) currentMs := startMs for _, bucket := range buckets { // Skip buckets that end before start time if bucket.EndMs <= startMs { continue } // Stop processing if we've reached the end time if bucket.StartMs >= endMs { break } // Add gap before this bucket if needed if currentMs < bucket.StartMs { missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: min(bucket.StartMs, endMs), }) } // Update current position, but don't go past the end time currentMs = max(currentMs, min(bucket.EndMs, endMs)) } // Add final gap if needed if currentMs < endMs { // Check if we need to limit due to flux interval currentTime := uint64(time.Now().UnixMilli()) fluxBoundary := currentTime - uint64(bc.fluxInterval.Milliseconds()) // If the missing range extends beyond flux boundary, limit it if currentMs < fluxBoundary { // Add range up to flux boundary missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: min(endMs, fluxBoundary), }) // If endMs is beyond flux boundary, add that as another missing range if endMs > fluxBoundary { missing = append(missing, &qbtypes.TimeRange{ From: fluxBoundary, To: endMs, }) } } else { // Entire missing range is within flux interval missing = append(missing, &qbtypes.TimeRange{ From: currentMs, To: endMs, }) } } // Don't merge ranges - keep partial windows separate for proper handling return missing } // filterRelevantBuckets returns buckets that overlap with the requested time range func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket { // Pre-allocate with estimated capacity relevant := make([]*cachedBucket, 0, len(buckets)) for _, bucket := range buckets { // Check if bucket overlaps with requested range if bucket.EndMs > startMs && bucket.StartMs < endMs { relevant = append(relevant, bucket) } } // Sort by start time slices.SortFunc(relevant, func(a, b *cachedBucket) int { if a.StartMs < b.StartMs { return -1 } if a.StartMs > b.StartMs { return 1 } return 0 }) return relevant } // mergeBuckets combines multiple cached buckets into a single result func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result { if len(buckets) == 0 { return &qbtypes.Result{} } // All buckets should have the same type resultType := buckets[0].Type // Aggregate stats var totalStats qbtypes.ExecStats for _, bucket := range buckets { totalStats.RowsScanned += bucket.Stats.RowsScanned totalStats.BytesScanned += bucket.Stats.BytesScanned totalStats.DurationMS += bucket.Stats.DurationMS } // Merge values based on type var mergedValue any switch resultType { case qbtypes.RequestTypeTimeSeries: mergedValue = bc.mergeTimeSeriesValues(ctx, buckets) // Raw and Scalar types are not cached, so no merge needed } return &qbtypes.Result{ Type: resultType, Value: mergedValue, Stats: totalStats, Warnings: warnings, } } // mergeTimeSeriesValues merges time series data from multiple buckets func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData { // Estimate capacity based on bucket count estimatedSeries := len(buckets) * 10 // Flat map with composite key for better performance type seriesKey struct { aggIndex int key string } seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries) for _, bucket := range buckets { var tsData *qbtypes.TimeSeriesData if err := json.Unmarshal(bucket.Value, &tsData); err != nil { bc.logger.ErrorContext(ctx, "failed to unmarshal time series data", "error", err) continue } for _, aggBucket := range tsData.Aggregations { for _, series := range aggBucket.Series { // Create series key from labels key := seriesKey{ aggIndex: aggBucket.Index, key: qbtypes.GetUniqueSeriesKey(series.Labels), } if existingSeries, ok := seriesMap[key]; ok { // Merge values, avoiding duplicate timestamps timestampMap := make(map[int64]bool) for _, v := range existingSeries.Values { timestampMap[v.Timestamp] = true } // Pre-allocate capacity for merged values newCap := len(existingSeries.Values) + len(series.Values) if cap(existingSeries.Values) < newCap { newValues := make([]*qbtypes.TimeSeriesValue, len(existingSeries.Values), newCap) copy(newValues, existingSeries.Values) existingSeries.Values = newValues } // Only add values with new timestamps for _, v := range series.Values { if !timestampMap[v.Timestamp] { existingSeries.Values = append(existingSeries.Values, v) } } } else { // New series seriesMap[key] = series } } } } // Group series by aggregation index aggMap := make(map[int][]*qbtypes.TimeSeries) for key, series := range seriesMap { aggMap[key.aggIndex] = append(aggMap[key.aggIndex], series) } // Convert map back to slice result := &qbtypes.TimeSeriesData{ Aggregations: make([]*qbtypes.AggregationBucket, 0, len(aggMap)), } for index, seriesList := range aggMap { // Sort values by timestamp for each series for _, s := range seriesList { // Check if already sorted before sorting needsSort := false for i := 1; i < len(s.Values); i++ { if s.Values[i].Timestamp < s.Values[i-1].Timestamp { needsSort = true break } } if needsSort { slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int { if a.Timestamp < b.Timestamp { return -1 } if a.Timestamp > b.Timestamp { return 1 } return 0 }) } } result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{ Index: index, Series: seriesList, }) } return result } // isEmptyResult checks if a result is truly empty (no data exists) vs filtered empty (data was filtered out) func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFiltered bool) { if result.Value == nil { return true, false } switch result.Type { case qbtypes.RequestTypeTimeSeries: if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok { // No aggregations at all means truly empty if len(tsData.Aggregations) == 0 { return true, false } // Check if we have aggregations but no series (filtered out) totalSeries := 0 for _, agg := range tsData.Aggregations { totalSeries += len(agg.Series) } if totalSeries == 0 { // We have aggregations but no series - data was filtered out return true, true } // Check if all series have no values hasValues := false for _, agg := range tsData.Aggregations { for _, series := range agg.Series { if len(series.Values) > 0 { hasValues = true break } } if hasValues { break } } return !hasValues, !hasValues && totalSeries > 0 } case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace: // Raw and scalar data are not cached return true, false } return true, false } // resultToBuckets converts a query result into time-based buckets func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket { // Check if result is empty isEmpty, isFiltered := bc.isEmptyResult(result) // Don't cache if result is empty but not filtered // Empty filtered results should be cached to avoid re-querying if isEmpty && !isFiltered { bc.logger.DebugContext(ctx, "skipping cache for empty non-filtered result") return nil } // For now, create a single bucket for the entire range // In the future, we could split large ranges into smaller buckets valueBytes, err := json.Marshal(result.Value) if err != nil { bc.logger.ErrorContext(ctx, "failed to marshal result value", "error", err) return nil } // Always create a bucket, even for empty filtered results // This ensures we don't re-query for data that doesn't exist return []*cachedBucket{ { StartMs: startMs, EndMs: endMs, Type: result.Type, Value: valueBytes, Stats: result.Stats, }, } } // mergeAndDeduplicateBuckets combines and deduplicates bucket lists func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket { // Create a map to deduplicate by time range bucketMap := make(map[string]*cachedBucket) // Add existing buckets for _, bucket := range existing { key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs) bucketMap[key] = bucket } // Add/update with fresh buckets for _, bucket := range fresh { key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs) bucketMap[key] = bucket } // Convert back to slice with pre-allocated capacity result := make([]*cachedBucket, 0, len(bucketMap)) for _, bucket := range bucketMap { result = append(result, bucket) } // Sort by start time slices.SortFunc(result, func(a, b *cachedBucket) int { if a.StartMs < b.StartMs { return -1 } if a.StartMs > b.StartMs { return 1 } return 0 }) return result } // deduplicateWarnings removes duplicate warnings func (bc *bucketCache) deduplicateWarnings(warnings []string) []string { if len(warnings) == 0 { return nil } seen := make(map[string]bool, len(warnings)) unique := make([]string, 0, len(warnings)) // Pre-allocate capacity for _, warning := range warnings { if !seen[warning] { seen[warning] = true unique = append(unique, warning) } } return unique } // trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoundary uint64) *qbtypes.Result { trimmedResult := &qbtypes.Result{ Type: result.Type, Stats: result.Stats, Warnings: result.Warnings, } switch result.Type { case qbtypes.RequestTypeTimeSeries: // Trim time series data if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok && tsData != nil { trimmedData := &qbtypes.TimeSeriesData{} for _, aggBucket := range tsData.Aggregations { trimmedBucket := &qbtypes.AggregationBucket{ Index: aggBucket.Index, } for _, series := range aggBucket.Series { trimmedSeries := &qbtypes.TimeSeries{ Labels: series.Labels, } // Filter values to exclude those beyond flux boundary and partial values for _, value := range series.Values { // Skip partial values - they cannot be cached if value.Partial { continue } if uint64(value.Timestamp) <= fluxBoundary { trimmedSeries.Values = append(trimmedSeries.Values, value) } } // Always add the series to preserve filtered empty results trimmedBucket.Series = append(trimmedBucket.Series, trimmedSeries) } // Always add the bucket to preserve aggregation structure trimmedData.Aggregations = append(trimmedData.Aggregations, trimmedBucket) } // Always set the value to preserve empty filtered results trimmedResult.Value = trimmedData } case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace: // Don't cache raw or scalar data return nil } return trimmedResult } func min(a, b uint64) uint64 { if a < b { return a } return b } func max(a, b uint64) uint64 { if a > b { return a } return b } // filterResultToTimeRange filters the result to only include values within the requested time range func (bc *bucketCache) filterResultToTimeRange(result *qbtypes.Result, startMs, endMs uint64) *qbtypes.Result { if result == nil || result.Value == nil { return result } switch result.Type { case qbtypes.RequestTypeTimeSeries: if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok { filteredData := &qbtypes.TimeSeriesData{ Aggregations: make([]*qbtypes.AggregationBucket, 0, len(tsData.Aggregations)), } for _, aggBucket := range tsData.Aggregations { filteredBucket := &qbtypes.AggregationBucket{ Index: aggBucket.Index, Alias: aggBucket.Alias, Meta: aggBucket.Meta, Series: make([]*qbtypes.TimeSeries, 0, len(aggBucket.Series)), } for _, series := range aggBucket.Series { filteredSeries := &qbtypes.TimeSeries{ Labels: series.Labels, Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Values)), } // Filter values to only include those within the requested time range for _, value := range series.Values { timestampMs := uint64(value.Timestamp) if timestampMs >= startMs && timestampMs < endMs { filteredSeries.Values = append(filteredSeries.Values, value) } } // Always add series to preserve structure (even if empty) filteredBucket.Series = append(filteredBucket.Series, filteredSeries) } // Only add bucket if it has series if len(filteredBucket.Series) > 0 { filteredData.Aggregations = append(filteredData.Aggregations, filteredBucket) } } // Create a new result with the filtered data return &qbtypes.Result{ Type: result.Type, Value: filteredData, Stats: result.Stats, Warnings: result.Warnings, } } } // For non-time series data, return as is return result }