mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-19 00:17:00 +00:00
* chore: update types 1. add partial bool to indicate if the value covers the partial interval 2. add optional unit if present (ex: duration_nano, metrics with units) 3. use pointers wherever necessary 4. add format options for request and remove redundant name in query envelope * chore: fix some gaps 1. make the range as [start, end) 2. provide the logs statement builder with the body column 3. skip the body filter on resource filter statement builder 4. remove unnecessary agg expr rewriter in metrics 5. add ability to skip full text in where clause visitor * chore: add API endpoint for new query range * chore: add bucket cache implementation * chore: add fingerprinting impl and add bucket cache to querier * chore: add provider factory
819 lines
22 KiB
Go
819 lines
22 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/cache"
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/factory"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
)
|
|
|
|
// bucketCache implements the BucketCache interface
|
|
type bucketCache struct {
|
|
cache cache.Cache
|
|
logger *slog.Logger
|
|
cacheTTL time.Duration
|
|
fluxInterval time.Duration
|
|
}
|
|
|
|
var _ BucketCache = (*bucketCache)(nil)
|
|
|
|
// NewBucketCache creates a new BucketCache implementation
|
|
func NewBucketCache(settings factory.ProviderSettings, cache cache.Cache, cacheTTL time.Duration, fluxInterval time.Duration) BucketCache {
|
|
cacheSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier/bucket_cache")
|
|
return &bucketCache{
|
|
cache: cache,
|
|
logger: cacheSettings.Logger(),
|
|
cacheTTL: cacheTTL,
|
|
fluxInterval: fluxInterval,
|
|
}
|
|
}
|
|
|
|
// cachedBucket represents a cached time bucket
|
|
type cachedBucket struct {
|
|
StartMs uint64 `json:"startMs"`
|
|
EndMs uint64 `json:"endMs"`
|
|
Type qbtypes.RequestType `json:"type"`
|
|
Value json.RawMessage `json:"value"`
|
|
Stats qbtypes.ExecStats `json:"stats"`
|
|
}
|
|
|
|
// cachedData represents the full cached data for a query
|
|
type cachedData struct {
|
|
Buckets []*cachedBucket `json:"buckets"`
|
|
Warnings []string `json:"warnings"`
|
|
}
|
|
|
|
func (c *cachedData) UnmarshalBinary(data []byte) error {
|
|
return json.Unmarshal(data, c)
|
|
}
|
|
|
|
func (c *cachedData) MarshalBinary() ([]byte, error) {
|
|
return json.Marshal(c)
|
|
}
|
|
|
|
// GetMissRanges returns cached data and missing time ranges
|
|
func (bc *bucketCache) GetMissRanges(
|
|
ctx context.Context,
|
|
orgID valuer.UUID,
|
|
q qbtypes.Query,
|
|
step qbtypes.Step,
|
|
) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) {
|
|
|
|
// Get query window
|
|
startMs, endMs := q.Window()
|
|
|
|
bc.logger.DebugContext(ctx, "getting miss ranges", "fingerprint", q.Fingerprint(), "start", startMs, "end", endMs)
|
|
|
|
// Generate cache key
|
|
cacheKey := bc.generateCacheKey(q)
|
|
|
|
bc.logger.DebugContext(ctx, "cache key", "cache_key", cacheKey)
|
|
|
|
// Try to get cached data
|
|
var data cachedData
|
|
err := bc.cache.Get(ctx, orgID, cacheKey, &data, false)
|
|
if err != nil {
|
|
if !errors.Ast(err, errors.TypeNotFound) {
|
|
bc.logger.ErrorContext(ctx, "error getting cached data", "error", err)
|
|
}
|
|
// No cached data, need to fetch entire range
|
|
missing = []*qbtypes.TimeRange{{From: startMs, To: endMs}}
|
|
return nil, missing
|
|
}
|
|
|
|
// Extract step interval if this is a builder query
|
|
stepMs := uint64(step.Duration.Milliseconds())
|
|
|
|
// Find missing ranges with step alignment
|
|
missing = bc.findMissingRangesWithStep(data.Buckets, startMs, endMs, stepMs)
|
|
bc.logger.DebugContext(ctx, "missing ranges", "missing", missing, "step", stepMs)
|
|
|
|
// If no cached data overlaps with requested range, return empty result
|
|
if len(data.Buckets) == 0 {
|
|
return nil, missing
|
|
}
|
|
|
|
// Extract relevant buckets and merge them
|
|
relevantBuckets := bc.filterRelevantBuckets(data.Buckets, startMs, endMs)
|
|
if len(relevantBuckets) == 0 {
|
|
return nil, missing
|
|
}
|
|
|
|
// Merge buckets into a single result
|
|
mergedResult := bc.mergeBuckets(ctx, relevantBuckets, data.Warnings)
|
|
|
|
// Filter the merged result to only include values within the requested time range
|
|
mergedResult = bc.filterResultToTimeRange(mergedResult, startMs, endMs)
|
|
|
|
return mergedResult, missing
|
|
}
|
|
|
|
// Put stores fresh query results in the cache
|
|
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) {
|
|
// Get query window
|
|
startMs, endMs := q.Window()
|
|
|
|
// Calculate the flux boundary - data after this point should not be cached
|
|
currentMs := uint64(time.Now().UnixMilli())
|
|
fluxBoundary := currentMs - uint64(bc.fluxInterval.Milliseconds())
|
|
|
|
// If the entire range is within flux interval, skip caching
|
|
if startMs >= fluxBoundary {
|
|
bc.logger.DebugContext(ctx, "entire range within flux interval, skipping cache",
|
|
"start", startMs,
|
|
"end", endMs,
|
|
"flux_boundary", fluxBoundary)
|
|
return
|
|
}
|
|
|
|
// Adjust endMs to not include data within flux interval
|
|
cachableEndMs := endMs
|
|
if endMs > fluxBoundary {
|
|
cachableEndMs = fluxBoundary
|
|
bc.logger.DebugContext(ctx, "adjusting end time to exclude flux interval",
|
|
"original_end", endMs,
|
|
"cachable_end", cachableEndMs)
|
|
}
|
|
|
|
// Generate cache key
|
|
cacheKey := bc.generateCacheKey(q)
|
|
|
|
// Get existing cached data
|
|
var existingData cachedData
|
|
if err := bc.cache.Get(ctx, orgID, cacheKey, &existingData, true); err != nil {
|
|
existingData = cachedData{}
|
|
}
|
|
|
|
// Trim the result to exclude data within flux interval
|
|
trimmedResult := bc.trimResultToFluxBoundary(fresh, cachableEndMs)
|
|
if trimmedResult == nil {
|
|
// Result type is not cacheable (raw or scalar)
|
|
return
|
|
}
|
|
|
|
// Convert trimmed result to buckets
|
|
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs)
|
|
|
|
// If no fresh buckets and no existing data, don't cache
|
|
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
|
|
return
|
|
}
|
|
|
|
// Merge with existing buckets
|
|
mergedBuckets := bc.mergeAndDeduplicateBuckets(existingData.Buckets, freshBuckets)
|
|
|
|
// Update warnings
|
|
allWarnings := append(existingData.Warnings, trimmedResult.Warnings...)
|
|
uniqueWarnings := bc.deduplicateWarnings(allWarnings)
|
|
|
|
// Create updated cached data
|
|
updatedData := cachedData{
|
|
Buckets: mergedBuckets,
|
|
Warnings: uniqueWarnings,
|
|
}
|
|
|
|
// Marshal and store in cache
|
|
if err := bc.cache.Set(ctx, orgID, cacheKey, &updatedData, bc.cacheTTL); err != nil {
|
|
bc.logger.ErrorContext(ctx, "error setting cached data", "error", err)
|
|
}
|
|
}
|
|
|
|
// generateCacheKey creates a unique cache key based on query fingerprint
|
|
func (bc *bucketCache) generateCacheKey(q qbtypes.Query) string {
|
|
fingerprint := q.Fingerprint()
|
|
|
|
return fmt.Sprintf("v5:query:%s", fingerprint)
|
|
}
|
|
|
|
// findMissingRangesWithStep identifies time ranges not covered by cached buckets with step alignment
|
|
func (bc *bucketCache) findMissingRangesWithStep(buckets []*cachedBucket, startMs, endMs uint64, stepMs uint64) []*qbtypes.TimeRange {
|
|
// When step is 0 or window is too small to be cached, use simple algorithm
|
|
if stepMs == 0 || (startMs+stepMs) > endMs {
|
|
return bc.findMissingRangesBasic(buckets, startMs, endMs)
|
|
}
|
|
|
|
// When no buckets exist, handle partial windows specially
|
|
if len(buckets) == 0 {
|
|
missing := make([]*qbtypes.TimeRange, 0, 3)
|
|
|
|
currentMs := startMs
|
|
|
|
// Check if start is not aligned - add partial window
|
|
if startMs%stepMs != 0 {
|
|
nextAggStart := startMs - (startMs % stepMs) + stepMs
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: startMs,
|
|
To: min(nextAggStart, endMs),
|
|
})
|
|
currentMs = nextAggStart
|
|
}
|
|
|
|
// Add the main range if needed
|
|
if currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
|
|
return missing
|
|
}
|
|
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(buckets); i++ {
|
|
if buckets[i].StartMs < buckets[i-1].StartMs {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
|
|
// Pre-allocate with reasonable capacity
|
|
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+2)
|
|
|
|
currentMs := startMs
|
|
|
|
// Check if start is not aligned - add partial window
|
|
if startMs%stepMs != 0 {
|
|
nextAggStart := startMs - (startMs % stepMs) + stepMs
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: startMs,
|
|
To: min(nextAggStart, endMs),
|
|
})
|
|
currentMs = nextAggStart
|
|
}
|
|
|
|
for _, bucket := range buckets {
|
|
// Skip buckets that end before current position
|
|
if bucket.EndMs <= currentMs {
|
|
continue
|
|
}
|
|
// Stop processing if we've reached the end time
|
|
if bucket.StartMs >= endMs {
|
|
break
|
|
}
|
|
|
|
// Align bucket boundaries to step intervals
|
|
alignedBucketStart := bucket.StartMs
|
|
if bucket.StartMs%stepMs != 0 {
|
|
// Round up to next step boundary
|
|
alignedBucketStart = bucket.StartMs - (bucket.StartMs % stepMs) + stepMs
|
|
}
|
|
|
|
// Add gap before this bucket if needed
|
|
if currentMs < alignedBucketStart && currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(alignedBucketStart, endMs),
|
|
})
|
|
}
|
|
|
|
// Update current position to the end of this bucket
|
|
// But ensure it's aligned to step boundary
|
|
bucketEnd := min(bucket.EndMs, endMs)
|
|
if bucketEnd%stepMs != 0 && bucketEnd < endMs {
|
|
// Round down to step boundary
|
|
bucketEnd = bucketEnd - (bucketEnd % stepMs)
|
|
}
|
|
currentMs = max(currentMs, bucketEnd)
|
|
}
|
|
|
|
// Add final gap if needed
|
|
if currentMs < endMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
|
|
// Don't merge ranges - keep partial windows separate for proper handling
|
|
return missing
|
|
}
|
|
|
|
// findMissingRangesBasic is the simple algorithm without step alignment
|
|
func (bc *bucketCache) findMissingRangesBasic(buckets []*cachedBucket, startMs, endMs uint64) []*qbtypes.TimeRange {
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(buckets); i++ {
|
|
if buckets[i].StartMs < buckets[i-1].StartMs {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(buckets, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
|
|
// Pre-allocate with reasonable capacity
|
|
missing := make([]*qbtypes.TimeRange, 0, len(buckets)+1)
|
|
currentMs := startMs
|
|
|
|
for _, bucket := range buckets {
|
|
// Skip buckets that end before start time
|
|
if bucket.EndMs <= startMs {
|
|
continue
|
|
}
|
|
// Stop processing if we've reached the end time
|
|
if bucket.StartMs >= endMs {
|
|
break
|
|
}
|
|
|
|
// Add gap before this bucket if needed
|
|
if currentMs < bucket.StartMs {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(bucket.StartMs, endMs),
|
|
})
|
|
}
|
|
|
|
// Update current position, but don't go past the end time
|
|
currentMs = max(currentMs, min(bucket.EndMs, endMs))
|
|
}
|
|
|
|
// Add final gap if needed
|
|
if currentMs < endMs {
|
|
// Check if we need to limit due to flux interval
|
|
currentTime := uint64(time.Now().UnixMilli())
|
|
fluxBoundary := currentTime - uint64(bc.fluxInterval.Milliseconds())
|
|
|
|
// If the missing range extends beyond flux boundary, limit it
|
|
if currentMs < fluxBoundary {
|
|
// Add range up to flux boundary
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: min(endMs, fluxBoundary),
|
|
})
|
|
// If endMs is beyond flux boundary, add that as another missing range
|
|
if endMs > fluxBoundary {
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: fluxBoundary,
|
|
To: endMs,
|
|
})
|
|
}
|
|
} else {
|
|
// Entire missing range is within flux interval
|
|
missing = append(missing, &qbtypes.TimeRange{
|
|
From: currentMs,
|
|
To: endMs,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Don't merge ranges - keep partial windows separate for proper handling
|
|
return missing
|
|
}
|
|
|
|
// filterRelevantBuckets returns buckets that overlap with the requested time range
|
|
func (bc *bucketCache) filterRelevantBuckets(buckets []*cachedBucket, startMs, endMs uint64) []*cachedBucket {
|
|
// Pre-allocate with estimated capacity
|
|
relevant := make([]*cachedBucket, 0, len(buckets))
|
|
|
|
for _, bucket := range buckets {
|
|
// Check if bucket overlaps with requested range
|
|
if bucket.EndMs > startMs && bucket.StartMs < endMs {
|
|
relevant = append(relevant, bucket)
|
|
}
|
|
}
|
|
|
|
// Sort by start time
|
|
slices.SortFunc(relevant, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return relevant
|
|
}
|
|
|
|
// mergeBuckets combines multiple cached buckets into a single result
|
|
func (bc *bucketCache) mergeBuckets(ctx context.Context, buckets []*cachedBucket, warnings []string) *qbtypes.Result {
|
|
if len(buckets) == 0 {
|
|
return &qbtypes.Result{}
|
|
}
|
|
|
|
// All buckets should have the same type
|
|
resultType := buckets[0].Type
|
|
|
|
// Aggregate stats
|
|
var totalStats qbtypes.ExecStats
|
|
for _, bucket := range buckets {
|
|
totalStats.RowsScanned += bucket.Stats.RowsScanned
|
|
totalStats.BytesScanned += bucket.Stats.BytesScanned
|
|
totalStats.DurationMS += bucket.Stats.DurationMS
|
|
}
|
|
|
|
// Merge values based on type
|
|
var mergedValue any
|
|
switch resultType {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
mergedValue = bc.mergeTimeSeriesValues(ctx, buckets)
|
|
// Raw and Scalar types are not cached, so no merge needed
|
|
}
|
|
|
|
return &qbtypes.Result{
|
|
Type: resultType,
|
|
Value: mergedValue,
|
|
Stats: totalStats,
|
|
Warnings: warnings,
|
|
}
|
|
}
|
|
|
|
// mergeTimeSeriesValues merges time series data from multiple buckets
|
|
func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cachedBucket) *qbtypes.TimeSeriesData {
|
|
// Estimate capacity based on bucket count
|
|
estimatedSeries := len(buckets) * 10
|
|
|
|
// Flat map with composite key for better performance
|
|
type seriesKey struct {
|
|
aggIndex int
|
|
key string
|
|
}
|
|
seriesMap := make(map[seriesKey]*qbtypes.TimeSeries, estimatedSeries)
|
|
var queryName string
|
|
|
|
for _, bucket := range buckets {
|
|
var tsData *qbtypes.TimeSeriesData
|
|
if err := json.Unmarshal(bucket.Value, &tsData); err != nil {
|
|
bc.logger.ErrorContext(ctx, "failed to unmarshal time series data", "error", err)
|
|
continue
|
|
}
|
|
|
|
// Preserve the query name from the first bucket
|
|
if queryName == "" && tsData.QueryName != "" {
|
|
queryName = tsData.QueryName
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
for _, series := range aggBucket.Series {
|
|
// Create series key from labels
|
|
key := seriesKey{
|
|
aggIndex: aggBucket.Index,
|
|
key: qbtypes.GetUniqueSeriesKey(series.Labels),
|
|
}
|
|
|
|
if existingSeries, ok := seriesMap[key]; ok {
|
|
// Pre-allocate capacity for merged values
|
|
newCap := len(existingSeries.Values) + len(series.Values)
|
|
if cap(existingSeries.Values) < newCap {
|
|
newValues := make([]*qbtypes.TimeSeriesValue, len(existingSeries.Values), newCap)
|
|
copy(newValues, existingSeries.Values)
|
|
existingSeries.Values = newValues
|
|
}
|
|
existingSeries.Values = append(existingSeries.Values, series.Values...)
|
|
} else {
|
|
// New series
|
|
seriesMap[key] = series
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Group series by aggregation index
|
|
aggMap := make(map[int][]*qbtypes.TimeSeries)
|
|
for key, series := range seriesMap {
|
|
aggMap[key.aggIndex] = append(aggMap[key.aggIndex], series)
|
|
}
|
|
|
|
// Convert map back to slice
|
|
result := &qbtypes.TimeSeriesData{
|
|
QueryName: queryName,
|
|
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(aggMap)),
|
|
}
|
|
|
|
for index, seriesList := range aggMap {
|
|
// Sort values by timestamp for each series
|
|
for _, s := range seriesList {
|
|
// Check if already sorted before sorting
|
|
needsSort := false
|
|
for i := 1; i < len(s.Values); i++ {
|
|
if s.Values[i].Timestamp < s.Values[i-1].Timestamp {
|
|
needsSort = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if needsSort {
|
|
slices.SortFunc(s.Values, func(a, b *qbtypes.TimeSeriesValue) int {
|
|
if a.Timestamp < b.Timestamp {
|
|
return -1
|
|
}
|
|
if a.Timestamp > b.Timestamp {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
}
|
|
}
|
|
|
|
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
|
|
Index: index,
|
|
Series: seriesList,
|
|
})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// isEmptyResult checks if a result is truly empty (no data exists) vs filtered empty (data was filtered out)
|
|
func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFiltered bool) {
|
|
if result.Value == nil {
|
|
return true, false
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
|
// No aggregations at all means truly empty
|
|
if len(tsData.Aggregations) == 0 {
|
|
return true, false
|
|
}
|
|
|
|
// Check if we have aggregations but no series (filtered out)
|
|
totalSeries := 0
|
|
for _, agg := range tsData.Aggregations {
|
|
totalSeries += len(agg.Series)
|
|
}
|
|
|
|
if totalSeries == 0 {
|
|
// We have aggregations but no series - data was filtered out
|
|
return true, true
|
|
}
|
|
|
|
// Check if all series have no values
|
|
hasValues := false
|
|
for _, agg := range tsData.Aggregations {
|
|
for _, series := range agg.Series {
|
|
if len(series.Values) > 0 {
|
|
hasValues = true
|
|
break
|
|
}
|
|
}
|
|
if hasValues {
|
|
break
|
|
}
|
|
}
|
|
|
|
return !hasValues, !hasValues && totalSeries > 0
|
|
}
|
|
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
|
|
// Raw and scalar data are not cached
|
|
return true, false
|
|
}
|
|
|
|
return true, false
|
|
}
|
|
|
|
// resultToBuckets converts a query result into time-based buckets
|
|
func (bc *bucketCache) resultToBuckets(ctx context.Context, result *qbtypes.Result, startMs, endMs uint64) []*cachedBucket {
|
|
// Check if result is empty
|
|
isEmpty, isFiltered := bc.isEmptyResult(result)
|
|
|
|
// Don't cache if result is empty but not filtered
|
|
// Empty filtered results should be cached to avoid re-querying
|
|
if isEmpty && !isFiltered {
|
|
bc.logger.DebugContext(ctx, "skipping cache for empty non-filtered result")
|
|
return nil
|
|
}
|
|
|
|
// For now, create a single bucket for the entire range
|
|
// In the future, we could split large ranges into smaller buckets
|
|
valueBytes, err := json.Marshal(result.Value)
|
|
if err != nil {
|
|
bc.logger.ErrorContext(ctx, "failed to marshal result value", "error", err)
|
|
return nil
|
|
}
|
|
|
|
// Always create a bucket, even for empty filtered results
|
|
// This ensures we don't re-query for data that doesn't exist
|
|
return []*cachedBucket{
|
|
{
|
|
StartMs: startMs,
|
|
EndMs: endMs,
|
|
Type: result.Type,
|
|
Value: valueBytes,
|
|
Stats: result.Stats,
|
|
},
|
|
}
|
|
}
|
|
|
|
// mergeAndDeduplicateBuckets combines and deduplicates bucket lists
|
|
func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*cachedBucket) []*cachedBucket {
|
|
// Create a map to deduplicate by time range
|
|
bucketMap := make(map[string]*cachedBucket)
|
|
|
|
// Add existing buckets
|
|
for _, bucket := range existing {
|
|
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
|
|
bucketMap[key] = bucket
|
|
}
|
|
|
|
// Add/update with fresh buckets
|
|
for _, bucket := range fresh {
|
|
key := fmt.Sprintf("%d-%d", bucket.StartMs, bucket.EndMs)
|
|
bucketMap[key] = bucket
|
|
}
|
|
|
|
// Convert back to slice with pre-allocated capacity
|
|
result := make([]*cachedBucket, 0, len(bucketMap))
|
|
for _, bucket := range bucketMap {
|
|
result = append(result, bucket)
|
|
}
|
|
|
|
// Sort by start time
|
|
slices.SortFunc(result, func(a, b *cachedBucket) int {
|
|
if a.StartMs < b.StartMs {
|
|
return -1
|
|
}
|
|
if a.StartMs > b.StartMs {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
|
|
return result
|
|
}
|
|
|
|
// deduplicateWarnings removes duplicate warnings
|
|
func (bc *bucketCache) deduplicateWarnings(warnings []string) []string {
|
|
if len(warnings) == 0 {
|
|
return nil
|
|
}
|
|
|
|
seen := make(map[string]bool, len(warnings))
|
|
unique := make([]string, 0, len(warnings)) // Pre-allocate capacity
|
|
|
|
for _, warning := range warnings {
|
|
if !seen[warning] {
|
|
seen[warning] = true
|
|
unique = append(unique, warning)
|
|
}
|
|
}
|
|
|
|
return unique
|
|
}
|
|
|
|
// trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary
|
|
func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoundary uint64) *qbtypes.Result {
|
|
trimmedResult := &qbtypes.Result{
|
|
Type: result.Type,
|
|
Stats: result.Stats,
|
|
Warnings: result.Warnings,
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
// Trim time series data
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
|
trimmedData := &qbtypes.TimeSeriesData{
|
|
QueryName: tsData.QueryName,
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
trimmedBucket := &qbtypes.AggregationBucket{
|
|
Index: aggBucket.Index,
|
|
}
|
|
|
|
for _, series := range aggBucket.Series {
|
|
trimmedSeries := &qbtypes.TimeSeries{
|
|
Labels: series.Labels,
|
|
}
|
|
|
|
// Filter values to exclude those beyond flux boundary and partial values
|
|
for _, value := range series.Values {
|
|
// Skip partial values - they cannot be cached
|
|
if value.Partial {
|
|
continue
|
|
}
|
|
if uint64(value.Timestamp) <= fluxBoundary {
|
|
trimmedSeries.Values = append(trimmedSeries.Values, value)
|
|
}
|
|
}
|
|
|
|
// Always add the series to preserve filtered empty results
|
|
trimmedBucket.Series = append(trimmedBucket.Series, trimmedSeries)
|
|
}
|
|
|
|
// Always add the bucket to preserve aggregation structure
|
|
trimmedData.Aggregations = append(trimmedData.Aggregations, trimmedBucket)
|
|
}
|
|
|
|
// Always set the value to preserve empty filtered results
|
|
trimmedResult.Value = trimmedData
|
|
}
|
|
|
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
|
|
// Don't cache raw or scalar data
|
|
return nil
|
|
}
|
|
|
|
return trimmedResult
|
|
}
|
|
|
|
func min(a, b uint64) uint64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func max(a, b uint64) uint64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// filterResultToTimeRange filters the result to only include values within the requested time range
|
|
func (bc *bucketCache) filterResultToTimeRange(result *qbtypes.Result, startMs, endMs uint64) *qbtypes.Result {
|
|
if result == nil || result.Value == nil {
|
|
return result
|
|
}
|
|
|
|
switch result.Type {
|
|
case qbtypes.RequestTypeTimeSeries:
|
|
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
|
filteredData := &qbtypes.TimeSeriesData{
|
|
QueryName: tsData.QueryName,
|
|
Aggregations: make([]*qbtypes.AggregationBucket, 0, len(tsData.Aggregations)),
|
|
}
|
|
|
|
for _, aggBucket := range tsData.Aggregations {
|
|
filteredBucket := &qbtypes.AggregationBucket{
|
|
Index: aggBucket.Index,
|
|
Alias: aggBucket.Alias,
|
|
Meta: aggBucket.Meta,
|
|
Series: make([]*qbtypes.TimeSeries, 0, len(aggBucket.Series)),
|
|
}
|
|
|
|
for _, series := range aggBucket.Series {
|
|
filteredSeries := &qbtypes.TimeSeries{
|
|
Labels: series.Labels,
|
|
Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Values)),
|
|
}
|
|
|
|
// Filter values to only include those within the requested time range
|
|
for _, value := range series.Values {
|
|
timestampMs := uint64(value.Timestamp)
|
|
if timestampMs >= startMs && timestampMs < endMs {
|
|
filteredSeries.Values = append(filteredSeries.Values, value)
|
|
}
|
|
}
|
|
|
|
// Always add series to preserve structure (even if empty)
|
|
filteredBucket.Series = append(filteredBucket.Series, filteredSeries)
|
|
}
|
|
|
|
// Only add bucket if it has series
|
|
if len(filteredBucket.Series) > 0 {
|
|
filteredData.Aggregations = append(filteredData.Aggregations, filteredBucket)
|
|
}
|
|
}
|
|
|
|
// Create a new result with the filtered data
|
|
return &qbtypes.Result{
|
|
Type: result.Type,
|
|
Value: filteredData,
|
|
Stats: result.Stats,
|
|
Warnings: result.Warnings,
|
|
}
|
|
}
|
|
}
|
|
|
|
// For non-time series data, return as is
|
|
return result
|
|
}
|