mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
## 📄 Summary
- Fix the order by for the time series result
- Add the statement builder for trace query (was supposed to be replaced with new development but that never happened, so we continue the old table)
- Removed `pkg/types/telemetrytypes/virtualfield.go`, not used currently anywhere but causing circular import. Will re-introduce later.
859 lines
24 KiB
Go
859 lines
24 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/cache"
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
)
|
|
|
|
// bucketCache implements the BucketCache interface
|
|
type bucketCache struct {
|
|
cache cache.Cache
|
|
logger *slog.Logger
|
|
cacheTTL time.Duration
|
|
fluxInterval time.Duration
|
|
}
|
|
|
|
var _ BucketCache = (*bucketCache)(nil)
|
|
|
|
// NewBucketCache creates a new BucketCache implementation
|
|
func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheTTL time.Duration, fluxInterval time.Duration) BucketCache {
|
|
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier/bucket_cache")
|
|
return &bucketCache{
|
|
cache: cache,
|
|
logger: cacheSettings.Logger(),
|
|
cacheTTL: cacheTTL,
|
|
fluxInterval: fluxInterval,
|
|
}
|
|
}
|
|
|
|
// cachedBucket represents a cached time bucket
|
|
type cachedBucket struct {
|
|
StartMs uint64 `json:"startMs"`
|
|
EndMs uint64 `json:"endMs"`
|
|
Type qbtypes.RequestType `json:"type"`
|
|
Value json.RawMessage `json:"value"`
|
|
Stats qbtypes.ExecStats `json:"stats"`
|
|
}
|
|
|
|
// cachedData represents the full cached data for a query
|
|
type cachedData struct {
|
|
Buckets []*cachedBucket `json:"buckets"`
|
|
Warnings []string `json:"warnings"`
|
|
}
|
|
|
|
func (c *cachedData) UnmarshalBinary(data []byte) error {
|
|
return json.Unmarshal(data, c)
|
|
}
|
|
|
|
func (c *cachedData) MarshalBinary() ([]byte, error) {
|
|
return json.Marshal(c)
|
|
}
|
|
|
|
// GetMissRanges returns cached data and missing time ranges
|
|
func (bc *bucketCache) GetMissRanges(
|
|
ctx context.Context,
|
|
orgID valuer.UUID,
|
|
q qbtypes.Query,
|
|
step qbtypes.Step,
|
|
) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) {
|
|
|
|
// Get query window
|
|
startMs, endMs := q.Window()
|
|
|
|
bc.logger.DebugContext(ctx, "getting miss ranges", "fingerprint", q.Fingerprint(), "start", startMs, "end", endMs)
|
|
|
|
// Generate cache key
|
|
cacheKey := bc.generateCacheKey(q)
|
|
|
|
bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey)
|
|
|
|
// Try to get cached data
|
|
var data cachedData
|
|
err := bc.cache.Get(ctx, orgID, cacheKey, &data, false)
|
|
if err != nil {
|
|
if !errors.Ast(err, errors.TypeNotFound) {
|
|
bc.logger.ErrorContext(ctx, "error getting cached data", "error", err)
|
|
}
|
|
// No cached data, need to fetch entire range
|
|
missing = []*qbtypes.TimeRange{{From: startMs, To: endMs}}
|
|
return nil, missing
|
|
}
|
|
|
|
// Extract step interval if this is a builder query
|
|
stepMs := uint64(step.Duration.Milliseconds())
|
|
|
|
// Find missing ranges with step alignment
|
|
missing = bc.findMissingRangesWithStep(data.Buckets, startMs, endMs, stepMs)
|
|
bc.logger.DebugContext(ctx, "missing ranges", "missing", missing, "step", stepMs)
|
|
|
|
// If no cached data overlaps with requested range, return empty result
|
|
if len(data.Buckets) == 0 {
|
|
return nil, missing
|
|
}
|
|
|
|
// Extract relevant buckets and merge them
|
|
relevantBuckets := bc.filterRelevantBuckets(data.Buckets, startMs, endMs)
|
|
if len(relevantBuckets) == 0 {
|
|
return nil, missing
|
|
}
|
|
|
|
// Merge buckets into a single result
|
|
mergedResult := bc.mergeBuckets(ctx, relevantBuckets, data.Warnings)
|
|
|
|
// Filter the merged result to only include values within the requested time range
|
|
mergedResult = bc.filterResultToTimeRange(mergedResult, startMs, endMs)
|
|
|
|
return mergedResult, missing
|
|
}
|
|
|
|
// Put stores fresh query results in the cache
|
|
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) {
|
|
// Get query window
|
|
startMs, endMs := q.Window()
|
|
|
|
// Calculate the flux boundary - data after this point should not be cached
|
|
currentMs := uint64(time.Now().UnixMilli())
|
|
fluxBoundary := currentMs - uint64(bc.fluxInterval.Milliseconds())
|
|
|
|
// If the entire range is within flux interval, skip caching
|
|
if startMs >= fluxBoundary {
|
|
bc.logger.DebugContext(ctx, "entire range within flux interval, skipping cache",
|
|
"start", startMs,
|
|
"end", endMs,
|
|
"flux_boundary", fluxBoundary)
|
|
return
|
|
}
|
|
|
|
// Adjust endMs to not include data within flux interval
|
|
cachableEndMs := endMs
|
|
if endMs > fluxBoundary {
|
|
cachableEndMs = fluxBoundary
|
|
bc.logger.DebugContext(ctx, "adjusting end time to exclude flux interval",
|
|
"original_end", endMs,
|
|
"cachable_end", cachableEndMs)
|
|
}
|
|
|
|
// Generate cache key
|
|
cacheKey := bc.generateCacheKey(q)
|
|
|
|
// Get existing cached data
|
|
var existingData cachedData
|
|
if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData, true); err != nil {
|
|
existingData = cachedData{}
|
|
}
|
|
|
|
// Trim the result to exclude data within flux interval
|
|
trimmedResult := bc.trimResultToFluxBoundary(fresh, cachableEndMs)
|
|
if trimmedResult == nil {
|
|
// Result type is not cacheable (raw or scalar)
|
|
return
|
|
}
|
|
|
|
// Adjust start and end times to only cache complete intervals
|
|
cachableStartMs := startMs
|
|
stepMs := uint64(step.Duration.Milliseconds())
|
|
|
|
// If we have a step interval, adjust boundaries to only cache complete intervals
|
|
if stepMs > 0 {
|
|
// If start is not aligned, round up to next step boundary (first complete interval)
|
|
if startMs%stepMs != 0 {
|
|
cachableStartMs = ((startMs / stepMs) + 1) * stepMs
|
|
}
|
|
|
|
// If end is not aligned, round down to previous step boundary (last complete interval)
|
|
if cachableEndMs%stepMs != 0 {
|
|
cachableEndMs = (cachableEndMs / stepMs) * stepMs
|
|
}
|
|
|
|
// If after adjustment we have no complete intervals, don't cache
|
|
if cachableStartMs >= cachableEndMs {
|
|
bc.logger.DebugContext(ctx, "no complete intervals to cache",
|
|
"original_start", startMs,
|
|
"original_end", endMs,
|
|
"adjusted_start", cachableStartMs,
|
|
"adjusted_end", cachableEndMs,
|
|
"step", stepMs)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Convert trimmed result to buckets with adjusted boundaries
|
|
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, cachableStartMs, cachableEndMs)
|
|
|
|
// If no fresh buckets and no existing data, don't cache
|
|
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
|
|
return
|
|
}
|
|
|
|
// Merge with existing buckets
|
|
mergedBuckets := bc.mergeAndDeduplicateBuckets(existingData.Buckets, freshBuckets)
|
|
|
|
// Update warnings
|
|
allWarnings := append(existingData.Warnings, trimmedResult.Warnings...)
|
|
uniqueWarnings := bc.deduplicateWarnings(allWarnings)
|
|
|
|
// Create updated cached data
|
|
updatedData := cachedData{
|
|
Buckets: mergedBuckets,
|
|
Warnings: uniqueWarnings,
|
|
}
|
|
|
|
// Marshal and store in cache
|
|
if err := bc.cache.Set(ctx, orgID, cacheKey, &updatedData, bc.cacheTTL); err != nil {
|
|
bc.logger.ErrorContext(ctx, "error setting cached data", "error", err)
|
|
}
|
|
}
|
|
|
|
// generateCacheKey creates a unique cache key based on query fingerprint
|
|
func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string {
|
|
fingerprint := q.Fingerprint()
|
|
|
|
return fmt.Sprintf("v5:query:%s", fingerprint)
|
|
}
|
|
|
|
// findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment
|
|
func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
|
|
// When step is 0 or window is too small to be cached, use simple algorithm
|
|
if stepMs == 0 || (startMs+stepMs) > endMs {
|
|
return bc.findMissingRangesBasic(buckets, startMs, endMs)
|
|
}
|
|
|
|
// When no buckets exist, handle partial windows specially
|
|
if len(buckets) == 0 {
|
|
missing := make([]*qbtypes.TimeRange, 0, 3)
|
|
|
|
currentMs := startMs
|
|
|
|
// Check if start is not aligned - add partial window
|
|
if startMs%stepMs != 0 {
|
|
nextAggStart := startMs - (startMs % stepMs) + stepMs
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: startMs,
|
|
To: min(nextAggStart, endMs),
|
|
})
|
|
currentMs = nextAggStart
|
|
}
|
|
|
|
// Add the main range if needed
|
|
if currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
|
|
return missing
|
|
}
|
|
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(buckets); i++ {
|
|
if buckets[i].StartMs < buckets[i-1].StartMs {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
|
|
// Pre-allocate with reasonable capacity
|
|
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+2)
|
|
|
|
currentMs := startMs
|
|
|
|
// Check if start is not aligned - add partial window
|
|
if startMs%stepMs != 0 {
|
|
nextAggStart := startMs - (startMs % stepMs) + stepMs
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: startMs,
|
|
To: min(nextAggStart, endMs),
|
|
})
|
|
currentMs = nextAggStart
|
|
}
|
|
|
|
for _, bucket := range buckets {
|
|
// Skip buckets that end before current position
|
|
if bucket.EndMs <= currentMs {
|
|
continue
|
|
}
|
|
// Stop processing if we've reached the end time
|
|
if bucket.StartMs >= endMs {
|
|
break
|
|
}
|
|
|
|
// Align bucket boundaries to step intervals
|
|
alignedBucketStart := bucket.StartMs
|
|
if bucket.StartMs%stepMs != 0 {
|
|
// Round up to next step boundary
|
|
alignedBucketStart = bucket.StartMs - (bucket.StartMs % stepMs) + stepMs
|
|
}
|
|
|
|
// Add gap before this bucket if needed
|
|
if currentMs < alignedBucketStart && currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(alignedBucketStart, endMs),
|
|
})
|
|
}
|
|
|
|
// Update current position to the end of this bucket
|
|
// But ensure it's aligned to step boundary
|
|
bucketEnd := min(bucket.EndMs, endMs)
|
|
if bucketEnd%stepMs != 0 && bucketEnd < endMs {
|
|
// Round down to step boundary
|
|
bucketEnd = bucketEnd - (bucketEnd % stepMs)
|
|
}
|
|
currentMs = max(currentMs, bucketEnd)
|
|
}
|
|
|
|
// Add final gap if needed
|
|
if currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
|
|
// Don't merge ranges - keep partial windows separate for proper handling
|
|
return missing
|
|
}
|
|
|
|
// findMissingRangesBasic is the simple algorithm without step alignment
|
|
func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(buckets); i++ {
|
|
if buckets[i].StartMs < buckets[i-1].StartMs {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
|
|
// Pre-allocate with reasonable capacity
|
|
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+1)
|
|
currentMs := startMs
|
|
|
|
for _, bucket := range buckets {
|
|
// Skip buckets that end before start time
|
|
if bucket.EndMs <= startMs {
|
|
continue
|
|
}
|
|
// Stop processing if we've reached the end time
|
|
if bucket.StartMs >= endMs {
|
|
break
|
|
}
|
|
|
|
// Add gap before this bucket if needed
|
|
if currentMs < bucket.StartMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(bucket.StartMs, endMs),
|
|
})
|
|
}
|
|
|
|
// Update current position, but don't go past the end time
|
|
currentMs = max(currentMs, min(bucket.EndMs, endMs))
|
|
}
|
|
|
|
// Add final gap if needed
|
|
if currentMs < endMs {
|
|
// Check if we need to limit due to flux interval
|
|
currentTime := uint64(time.Now().UnixMilli())
|
|
fluxBoundary := currentTime - uint64(bc.fluxInterval.Milliseconds())
|
|
|
|
// If the missing range extends beyond flux boundary, limit it
|
|
if currentMs < fluxBoundary {
|
|
// Add range up to flux boundary
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(endMs, fluxBoundary),
|
|
})
|
|
// If endMs is beyond flux boundary, add that as another missing range
|
|
if endMs > fluxBoundary {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: fluxBoundary,
|
|
To: endMs,
|
|
})
|
|
}
|
|
} else {
|
|
// Entire missing range is within flux interval
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Don't merge ranges - keep partial windows separate for proper handling
|
|
return missing
|
|
}
|
|
|
|
// filterRelevantBuckets returns buckets that overlap with the requested time range
|
|
func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket {
|
|
// Pre-allocate with estimated capacity
|
|
relevant := make([]*cachedBucket, 0, len(buckets))
|
|
|
|
for _, bucket := range buckets {
|
|
// Check if bucket overlaps with requested range
|
|
if bucket.EndMs > startMs && bucket.StartMs < endMs {
|
|
relevant = append(relevant, bucket)
|
|
}
|
|
}
|
|
|
|
// Sort by start time
|
|
slices.SortFunc(relevant, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return relevant
|
|
}
|
|
|
|
// mergeBuckets combines multiple cached buckets into a single result
|
|
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result {
|
|
if len(buckets) == 0 {
|
|
return &qbtypes.Result{}
|
|
}
|
|
|
|
// All buckets should have the same type
|
|
resultType := buckets[0].Type
|
|
|
|
// Aggregate stats
|
|
var totalStats qbtypes.ExecStats
|
|
for _, bucket := range buckets {
|
|
totalStats.RowsScanned += bucket.Stats.RowsScanned
|
|
totalStats.BytesScanned += bucket.Stats.BytesScanned
|
|
totalStats.DurationMS += bucket.Stats.DurationMS
|
|
}
|
|
|
|
// Merge values based on type
|
|
var mergedValue any
|
|
switch resultType {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
mergedValue = bc.mergeTimeSeriesValues(ctx, buckets)
|
|
// Raw and Scalar types are not cached, so no merge needed
|
|
}
|
|
|
|
return &qbtypes.Result{
|
|
Type: resultType,
|
|
Value: mergedValue,
|
|
Stats: totalStats,
|
|
Warnings: warnings,
|
|
}
|
|
}
|
|
|
|
// mergeTimeSeriesValues merges time series data from multiple buckets
|
|
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData {
|
|
// Estimate capacity based on bucket count
|
|
estimatedSeries := len(buckets) * 10
|
|
|
|
// Flat map with composite key for better performance
|
|
type seriesKey struct {
|
|
aggIndex int
|
|
key string
|
|
}
|
|
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
|
|
var queryName string
|
|
|
|
for _, bucket := range buckets {
|
|
var tsData *qbtypes.TimeSeriesData
|
|
if err := json.Unmarshal(bucket.Value, &tsData); err != nil {
|
|
bc.logger.ErrorContext(ctx, "failed to unmarshal time series data", "error", err)
|
|
continue
|
|
}
|
|
|
|
// Preserve the query name from the first bucket
|
|
if queryName == "" && tsData.QueryName != "" {
|
|
queryName = tsData.QueryName
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
for _, series := range aggBucket.Series {
|
|
// Create series key from labels
|
|
key := seriesKey{
|
|
aggIndex: aggBucket.Index,
|
|
key: qbtypes.GetUniqueSeriesKey(series.Labels),
|
|
}
|
|
|
|
if existingSeries, ok := seriesMap[key]; ok {
|
|
// Merge values, avoiding duplicate timestamps
|
|
timestampMap := make(map[int64]bool)
|
|
for _, v := range existingSeries.Values {
|
|
timestampMap[v.Timestamp] = true
|
|
}
|
|
|
|
// Pre-allocate capacity for merged values
|
|
newCap := len(existingSeries.Values) + len(series.Values)
|
|
if cap(existingSeries.Values) < newCap {
|
|
newValues := make([]*qbtypes.TimeSeriesValue, len(existingSeries.Values), newCap)
|
|
copy(newValues, existingSeries.Values)
|
|
existingSeries.Values = newValues
|
|
}
|
|
|
|
// Only add values with new timestamps
|
|
for _, v := range series.Values {
|
|
if !timestampMap[v.Timestamp] {
|
|
existingSeries.Values = append(existingSeries.Values, v)
|
|
}
|
|
}
|
|
} else {
|
|
// New series
|
|
seriesMap[key] = series
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Group series by aggregation index
|
|
aggMap := make(map[int][]*qbtypes.TimeSeries)
|
|
for key, series := range seriesMap {
|
|
aggMap[key.aggIndex] = append(aggMap[key.aggIndex], series)
|
|
}
|
|
|
|
// Convert map back to slice
|
|
result := &qbtypes.TimeSeriesData{
|
|
QueryName: queryName,
|
|
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(aggMap)),
|
|
}
|
|
|
|
for index, seriesList := range aggMap {
|
|
// Sort values by timestamp for each series
|
|
for _, s := range seriesList {
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(s.Values); i++ {
|
|
if s.Values[i].Timestamp < s.Values[i-1].Timestamp {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
|
|
if a.Timestamp < b.Timestamp {
|
|
return -1
|
|
}
|
|
if a.Timestamp > b.Timestamp {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
}
|
|
|
|
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
|
|
Index: index,
|
|
Series: seriesList,
|
|
})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// isEmptyResult checks if a result is truly empty (no data exists) vs filtered empty (data was filtered out)
|
|
func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFiltered bool) {
|
|
if result.Value == nil {
|
|
return true, false
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
|
// No aggregations at all means truly empty
|
|
if len(tsData.Aggregations) == 0 {
|
|
return true, false
|
|
}
|
|
|
|
// Check if we have aggregations but no series (filtered out)
|
|
totalSeries := 0
|
|
for _, agg := range tsData.Aggregations {
|
|
totalSeries += len(agg.Series)
|
|
}
|
|
|
|
if totalSeries == 0 {
|
|
// We have aggregations but no series - data was filtered out
|
|
return true, true
|
|
}
|
|
|
|
// Check if all series have no values
|
|
hasValues := false
|
|
for _, agg := range tsData.Aggregations {
|
|
for _, series := range agg.Series {
|
|
if len(series.Values) > 0 {
|
|
hasValues = true
|
|
break
|
|
}
|
|
}
|
|
if hasValues {
|
|
break
|
|
}
|
|
}
|
|
|
|
return !hasValues, !hasValues && totalSeries > 0
|
|
}
|
|
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
|
|
// Raw and scalar data are not cached
|
|
return true, false
|
|
}
|
|
|
|
return true, false
|
|
}
|
|
|
|
// resultToBuckets converts a query result into time-based buckets
|
|
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket {
|
|
// Check if result is empty
|
|
isEmpty, isFiltered := bc.isEmptyResult(result)
|
|
|
|
// Don't cache if result is empty but not filtered
|
|
// Empty filtered results should be cached to avoid re-querying
|
|
if isEmpty && !isFiltered {
|
|
bc.logger.DebugContext(ctx, "skipping cache for empty non-filtered result")
|
|
return nil
|
|
}
|
|
|
|
// For now, create a single bucket for the entire range
|
|
// In the future, we could split large ranges into smaller buckets
|
|
valueBytes, err := json.Marshal(result.Value)
|
|
if err != nil {
|
|
bc.logger.ErrorContext(ctx, "failed to marshal result value", "error", err)
|
|
return nil
|
|
}
|
|
|
|
// Always create a bucket, even for empty filtered results
|
|
// This ensures we don't re-query for data that doesn't exist
|
|
return []*cachedBucket{
|
|
{
|
|
StartMs: startMs,
|
|
EndMs: endMs,
|
|
Type: result.Type,
|
|
Value: valueBytes,
|
|
Stats: result.Stats,
|
|
},
|
|
}
|
|
}
|
|
|
|
// mergeAndDeduplicateBuckets combines and deduplicates bucket lists
|
|
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket {
|
|
// Create a map to deduplicate by time range
|
|
bucketMap := make(map[string]*cachedBucket)
|
|
|
|
// Add existing buckets
|
|
for _, bucket := range existing {
|
|
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
|
|
bucketMap[key] = bucket
|
|
}
|
|
|
|
// Add/update with fresh buckets
|
|
for _, bucket := range fresh {
|
|
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
|
|
bucketMap[key] = bucket
|
|
}
|
|
|
|
// Convert back to slice with pre-allocated capacity
|
|
result := make([]*cachedBucket, 0, len(bucketMap))
|
|
for _, bucket := range bucketMap {
|
|
result = append(result, bucket)
|
|
}
|
|
|
|
// Sort by start time
|
|
slices.SortFunc(result, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return result
|
|
}
|
|
|
|
// deduplicateWarnings removes duplicate warnings
|
|
func (bc *bucketCache) deduplicateWarnings(warnings []string) []string {
|
|
if len(warnings) == 0 {
|
|
return nil
|
|
}
|
|
|
|
seen := make(map[string]bool, len(warnings))
|
|
unique := make([]string, 0, len(warnings)) // Pre-allocate capacity
|
|
|
|
for _, warning := range warnings {
|
|
if !seen[warning] {
|
|
seen[warning] = true
|
|
unique = append(unique, warning)
|
|
}
|
|
}
|
|
|
|
return unique
|
|
}
|
|
|
|
// trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary
|
|
func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoundary uint64) *qbtypes.Result {
|
|
trimmedResult := &qbtypes.Result{
|
|
Type: result.Type,
|
|
Stats: result.Stats,
|
|
Warnings: result.Warnings,
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
// Trim time series data
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok && tsData != nil {
|
|
trimmedData := &qbtypes.TimeSeriesData{
|
|
QueryName: tsData.QueryName,
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
trimmedBucket := &qbtypes.AggregationBucket{
|
|
Index: aggBucket.Index,
|
|
}
|
|
|
|
for _, series := range aggBucket.Series {
|
|
trimmedSeries := &qbtypes.TimeSeries{
|
|
Labels: series.Labels,
|
|
}
|
|
|
|
// Filter values to exclude those beyond flux boundary and partial values
|
|
for _, value := range series.Values {
|
|
// Skip partial values - they cannot be cached
|
|
if value.Partial {
|
|
continue
|
|
}
|
|
if uint64(value.Timestamp) <= fluxBoundary {
|
|
trimmedSeries.Values = append(trimmedSeries.Values, value)
|
|
}
|
|
}
|
|
|
|
// Always add the series to preserve filtered empty results
|
|
trimmedBucket.Series = append(trimmedBucket.Series, trimmedSeries)
|
|
}
|
|
|
|
// Always add the bucket to preserve aggregation structure
|
|
trimmedData.Aggregations = append(trimmedData.Aggregations, trimmedBucket)
|
|
}
|
|
|
|
// Always set the value to preserve empty filtered results
|
|
trimmedResult.Value = trimmedData
|
|
}
|
|
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
|
|
// Don't cache raw or scalar data
|
|
return nil
|
|
}
|
|
|
|
return trimmedResult
|
|
}
|
|
|
|
func min(a, b uint64) uint64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func max(a, b uint64) uint64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// filterResultToTimeRange filters the result to only include values within the requested time range
|
|
func (bc *bucketCache) filterResultToTimeRange(result *qbtypes.Result, startMs, endMs uint64) *qbtypes.Result {
|
|
if result == nil || result.Value == nil {
|
|
return result
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
|
filteredData := &qbtypes.TimeSeriesData{
|
|
QueryName: tsData.QueryName,
|
|
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(tsData.Aggregations)),
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
filteredBucket := &qbtypes.AggregationBucket{
|
|
Index: aggBucket.Index,
|
|
Alias: aggBucket.Alias,
|
|
Meta: aggBucket.Meta,
|
|
Series: make([]*qbtypes.TimeSeries, 0, len(aggBucket.Series)),
|
|
}
|
|
|
|
for _, series := range aggBucket.Series {
|
|
filteredSeries := &qbtypes.TimeSeries{
|
|
Labels: series.Labels,
|
|
Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Values)),
|
|
}
|
|
|
|
// Filter values to only include those within the requested time range
|
|
for _, value := range series.Values {
|
|
timestampMs := uint64(value.Timestamp)
|
|
if timestampMs >= startMs && timestampMs < endMs {
|
|
filteredSeries.Values = append(filteredSeries.Values, value)
|
|
}
|
|
}
|
|
|
|
// Always add series to preserve structure (even if empty)
|
|
filteredBucket.Series = append(filteredBucket.Series, filteredSeries)
|
|
}
|
|
|
|
// Only add bucket if it has series
|
|
if len(filteredBucket.Series) > 0 {
|
|
filteredData.Aggregations = append(filteredData.Aggregations, filteredBucket)
|
|
}
|
|
}
|
|
|
|
// Create a new result with the filtered data
|
|
return &qbtypes.Result{
|
|
Type: result.Type,
|
|
Value: filteredData,
|
|
Stats: result.Stats,
|
|
Warnings: result.Warnings,
|
|
}
|
|
}
|
|
}
|
|
|
|
// For non-time series data, return as is
|
|
return result
|
|
}
|