chore: disallow unknown fields and address gaps (#8237)

This commit is contained in:
Srikanth Chekuri 2025-06-16 23:11:28 +05:30 committed by GitHub
parent 8455349459
commit 1542b9d6e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 2953 additions and 228 deletions

View File

@ -33,6 +33,12 @@ func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) {
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)

View File

@ -117,7 +117,7 @@ func (bc *bucketCache) GetMissRanges(
}
// Put stores fresh query results in the cache
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) {
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) {
// Get query window
startMs, endMs := q.Window()
@ -159,8 +159,36 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que
return
}
// Convert trimmed result to buckets
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs)
// Adjust start and end times to only cache complete intervals
cachableStartMs := startMs
stepMs := uint64(step.Duration.Milliseconds())
// If we have a step interval, adjust boundaries to only cache complete intervals
if stepMs > 0 {
// If start is not aligned, round up to next step boundary (first complete interval)
if startMs%stepMs != 0 {
cachableStartMs = ((startMs / stepMs) + 1) * stepMs
}
// If end is not aligned, round down to previous step boundary (last complete interval)
if cachableEndMs%stepMs != 0 {
cachableEndMs = (cachableEndMs / stepMs) * stepMs
}
// If after adjustment we have no complete intervals, don't cache
if cachableStartMs >= cachableEndMs {
bc.logger.DebugContext(ctx, "no complete intervals to cache",
"original_start", startMs,
"original_end", endMs,
"adjusted_start", cachableStartMs,
"adjusted_end", cachableEndMs,
"step", stepMs)
return
}
}
// Convert trimmed result to buckets with adjusted boundaries
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, cachableStartMs, cachableEndMs)
// If no fresh buckets and no existing data, don't cache
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
@ -485,6 +513,12 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
}
if existingSeries, ok := seriesMap[key]; ok {
// Merge values, avoiding duplicate timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Pre-allocate capacity for merged values
newCap := len(existingSeries.Values) + len(series.Values)
if cap(existingSeries.Values) < newCap {
@ -492,7 +526,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
copy(newValues, existingSeries.Values)
existingSeries.Values = newValues
}
existingSeries.Values = append(existingSeries.Values, series.Values...)
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// New series
seriesMap[key] = series
@ -697,7 +737,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
// Trim time series data
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok && tsData != nil {
trimmedData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
}

View File

@ -30,7 +30,7 @@ func BenchmarkBucketCache_GetMissRanges(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
}
// Create test queries with varying cache hit patterns
@ -121,7 +121,7 @@ func BenchmarkBucketCache_Put(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := 0; j < tc.numQueries; j++ {
bc.Put(ctx, orgID, queries[j], results[j])
bc.Put(ctx, orgID, queries[j], qbtypes.Step{Duration: 1000 * time.Millisecond}, results[j])
}
}
})
@ -259,7 +259,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
}
b.ResetTimer()
@ -284,7 +284,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
case 2: // Partial read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),

View File

@ -0,0 +1,117 @@
package querier
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBucketCacheStepAlignment(t *testing.T) {
ctx := context.Background()
orgID := valuer.UUID{}
cache := createTestCache(t)
bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute)
// Test with 5-minute step
step := qbtypes.Step{Duration: 5 * time.Minute}
// Query from 12:02 to 12:58 (both unaligned)
// Complete intervals: 12:05 to 12:55
query := &mockQuery{
fingerprint: "test-step-alignment",
startMs: 1672563720000, // 12:02
endMs: 1672567080000, // 12:58
}
result := &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "test",
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: []*qbtypes.TimeSeries{
{
Labels: []*qbtypes.Label{
{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"},
},
Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1672563720000, Value: 1, Partial: true}, // 12:02
{Timestamp: 1672563900000, Value: 2}, // 12:05
{Timestamp: 1672564200000, Value: 2.5}, // 12:10
{Timestamp: 1672564500000, Value: 2.6}, // 12:15
{Timestamp: 1672566600000, Value: 2.9}, // 12:50
{Timestamp: 1672566900000, Value: 3}, // 12:55
{Timestamp: 1672567080000, Value: 4, Partial: true}, // 12:58
},
},
},
},
},
},
}
// Put result in cache
bc.Put(ctx, orgID, query, step, result)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, step)
// Should have cached data
require.NotNil(t, cached)
// Log the missing ranges to debug
t.Logf("Missing ranges: %v", missing)
for i, r := range missing {
t.Logf("Missing range %d: From=%d, To=%d", i, r.From, r.To)
}
// Should have 2 missing ranges for partial intervals
require.Len(t, missing, 2)
// First partial: 12:02 to 12:05
assert.Equal(t, uint64(1672563720000), missing[0].From)
assert.Equal(t, uint64(1672563900000), missing[0].To)
// Second partial: 12:55 to 12:58
assert.Equal(t, uint64(1672566900000), missing[1].From, "Second missing range From")
assert.Equal(t, uint64(1672567080000), missing[1].To, "Second missing range To")
}
func TestBucketCacheNoStepInterval(t *testing.T) {
ctx := context.Background()
orgID := valuer.UUID{}
cache := createTestCache(t)
bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute)
// Test with no step (stepMs = 0)
step := qbtypes.Step{Duration: 0}
query := &mockQuery{
fingerprint: "test-no-step",
startMs: 1672563720000,
endMs: 1672567080000,
}
result := &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "test",
Aggregations: []*qbtypes.AggregationBucket{{Index: 0, Series: []*qbtypes.TimeSeries{}}},
},
}
// Should cache the entire range when step is 0
bc.Put(ctx, orgID, query, step, result)
cached, missing := bc.GetMissRanges(ctx, orgID, query, step)
assert.NotNil(t, cached)
assert.Len(t, missing, 0)
}

View File

@ -128,7 +128,7 @@ func TestBucketCache_GetMissRanges_EmptyCache(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.Nil(t, cached)
assert.Len(t, missing, 1)
@ -159,13 +159,13 @@ func TestBucketCache_Put_And_Get(t *testing.T) {
}
// Store in cache
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait a bit for cache to be written
time.Sleep(10 * time.Millisecond)
// Retrieve from cache
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached.Value)
assert.Len(t, missing, 0)
@ -193,7 +193,7 @@ func TestBucketCache_PartialHit(t *testing.T) {
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 1000, 3000, 1000),
}
bc.Put(context.Background(), valuer.UUID{}, query1, result1)
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1)
// Wait for cache write
time.Sleep(10 * time.Millisecond)
@ -205,7 +205,7 @@ func TestBucketCache_PartialHit(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached.Value)
@ -226,7 +226,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
startMs: 1000,
endMs: 2000,
}
bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 1000, 2000, 100),
})
@ -236,7 +236,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
startMs: 3000,
endMs: 4000,
}
bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 3000, 4000, 100),
})
@ -251,7 +251,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
endMs: 4500,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached.Value)
@ -284,13 +284,13 @@ func TestBucketCache_FluxInterval(t *testing.T) {
}
// This should not be cached due to flux interval
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait a bit
time.Sleep(10 * time.Millisecond)
// Try to get the data
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no cached data
assert.Nil(t, cached)
@ -354,7 +354,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
startMs: 1000,
endMs: 3000,
}
bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
@ -370,7 +370,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
startMs: 3000,
endMs: 5000,
}
bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
@ -390,7 +390,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no missing ranges
assert.Len(t, missing, 0)
@ -445,10 +445,10 @@ func TestBucketCache_RawData(t *testing.T) {
Value: rawData,
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Raw data should not be cached
assert.Nil(t, cached)
@ -485,10 +485,10 @@ func TestBucketCache_ScalarData(t *testing.T) {
Value: scalarData,
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Scalar data should not be cached
assert.Nil(t, cached)
@ -513,11 +513,11 @@ func TestBucketCache_EmptyFingerprint(t *testing.T) {
Value: createTestTimeSeries("A", 1000, 5000, 1000),
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Should still be able to retrieve
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached.Value)
assert.Len(t, missing, 0)
}
@ -568,7 +568,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) {
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries(fmt.Sprintf("Q%d", id), query.startMs, query.endMs, 100),
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 100 * time.Microsecond}, result)
done <- true
}(i)
}
@ -581,7 +581,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) {
startMs: uint64(id * 1000),
endMs: uint64((id + 1) * 1000),
}
bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
done <- true
}(i)
}
@ -628,10 +628,10 @@ func TestBucketCache_GetMissRanges_FluxInterval(t *testing.T) {
},
}
bc.Put(ctx, orgID, query, cachedResult)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, cachedResult)
// Get miss ranges
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached)
t.Logf("Missing ranges: %+v, query range: %d-%d", missing, query.startMs, query.endMs)
@ -690,10 +690,10 @@ func TestBucketCache_Put_FluxIntervalTrimming(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Retrieve cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached)
@ -760,10 +760,10 @@ func TestBucketCache_Put_EntireRangeInFluxInterval(t *testing.T) {
}
// Put the result - should not cache anything
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Try to get cached data - should have no cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no cached value
assert.Nil(t, cached)
@ -785,18 +785,6 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) {
shouldCache bool
description string
}{
{
name: "truly_empty_time_series",
result: &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
Aggregations: []*qbtypes.AggregationBucket{},
},
},
shouldCache: false,
description: "No aggregations means truly empty - should not cache",
},
{
name: "filtered_empty_time_series",
result: &qbtypes.Result{
@ -878,17 +866,16 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, tt.result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, tt.result)
// Wait a bit for cache to be written
time.Sleep(10 * time.Millisecond)
// Try to get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
if tt.shouldCache {
assert.NotNil(t, cached, tt.description)
assert.Len(t, missing, 0, "Should have no missing ranges when data is cached")
} else {
assert.Nil(t, cached, tt.description)
assert.Len(t, missing, 1, "Should have entire range as missing when data is not cached")
@ -944,13 +931,13 @@ func TestBucketCache_PartialValues(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait for cache to be written
time.Sleep(10 * time.Millisecond)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached)
@ -1014,13 +1001,13 @@ func TestBucketCache_AllPartialValues(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait for cache to be written
time.Sleep(10 * time.Millisecond)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// When all values are partial and filtered out, the result is cached as empty
// This prevents re-querying for the same misaligned time range
@ -1075,7 +1062,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) {
}
// Cache the wide range
bc.Put(ctx, orgID, query1, result1)
bc.Put(ctx, orgID, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1)
time.Sleep(10 * time.Millisecond)
// Now query for a smaller range (2000-3500ms)
@ -1086,7 +1073,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) {
}
// Get cached data - should be filtered to requested range
cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no missing ranges
assert.Len(t, missing, 0)
@ -1246,7 +1233,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@ -1300,7 +1287,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@ -1352,7 +1339,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@ -1409,11 +1396,11 @@ func TestBucketCache_NoCache(t *testing.T) {
}
// Put the result in cache
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Verify data is cached
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached)
assert.Len(t, missing, 0)

View File

@ -118,6 +118,10 @@ func (q *builderQuery[T]) Fingerprint() string {
parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression))
}
if q.spec.ShiftBy != 0 {
parts = append(parts, fmt.Sprintf("shiftby=%d", q.spec.ShiftBy))
}
return strings.Join(parts, "&")
}
@ -204,7 +208,14 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
// Pass query window and step for partial value detection
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name)
kind := q.kind
// all metric queries are time series then reduced if required
if q.spec.Signal == telemetrytypes.SignalMetrics {
kind = qbtypes.RequestTypeTimeSeries
}
payload, err := consume(rows, kind, queryWindow, q.spec.StepInterval, q.spec.Name)
if err != nil {
return nil, err
}
@ -224,16 +235,18 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
isAsc := len(q.spec.Order) > 0 &&
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
fromMS, toMS := q.fromMS, q.toMS
// Adjust [fromMS,toMS] window if a cursor was supplied
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
if ts, err := decodeCursor(cur); err == nil {
if isAsc {
if uint64(ts) >= q.fromMS {
q.fromMS = uint64(ts + 1)
if uint64(ts) >= fromMS {
fromMS = uint64(ts + 1)
}
} else { // DESC
if uint64(ts) <= q.toMS {
q.toMS = uint64(ts - 1)
if uint64(ts) <= toMS {
toMS = uint64(ts - 1)
}
}
}
@ -252,7 +265,16 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
totalBytes := uint64(0)
start := time.Now()
for _, r := range makeBuckets(q.fromMS, q.toMS) {
// Get buckets and reverse them for ascending order
buckets := makeBuckets(fromMS, toMS)
if isAsc {
// Reverse the buckets for ascending order
for i, j := 0, len(buckets)-1; i < j; i, j = i+1, j-1 {
buckets[i], buckets[j] = buckets[j], buckets[i]
}
}
for _, r := range buckets {
q.spec.Offset = 0
q.spec.Limit = need

View File

@ -0,0 +1,131 @@
package querier
import (
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
)
func TestBuilderQueryFingerprint(t *testing.T) {
tests := []struct {
name string
query *builderQuery[qbtypes.MetricAggregation]
expectInKey []string
notExpectInKey []string
}{
{
name: "fingerprint includes shiftby when ShiftBy field is set",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 3600,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
},
expectInKey: []string{"shiftby=3600"},
notExpectInKey: []string{"functions=", "timeshift", "absolute"},
},
{
name: "fingerprint includes shiftby but not other functions",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 3600,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
},
expectInKey: []string{"shiftby=3600"},
notExpectInKey: []string{"functions=", "absolute"},
},
{
name: "no shiftby in fingerprint when ShiftBy is zero",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 0,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
},
expectInKey: []string{},
notExpectInKey: []string{"shiftby=", "functions=", "absolute"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fingerprint := tt.query.Fingerprint()
for _, expected := range tt.expectInKey {
assert.True(t, strings.Contains(fingerprint, expected),
"Expected fingerprint to contain '%s', got: %s", expected, fingerprint)
}
for _, notExpected := range tt.notExpectInKey {
assert.False(t, strings.Contains(fingerprint, notExpected),
"Expected fingerprint NOT to contain '%s', got: %s", notExpected, fingerprint)
}
})
}
}
func TestMakeBucketsOrder(t *testing.T) {
// Test that makeBuckets returns buckets in reverse chronological order by default
// Using milliseconds as input - need > 1 hour range to get multiple buckets
now := uint64(1700000000000) // Some timestamp in ms
startMS := now
endMS := now + uint64(10*60*60*1000) // 10 hours later
buckets := makeBuckets(startMS, endMS)
// Should have multiple buckets for a 10 hour range
assert.True(t, len(buckets) > 1, "Should have multiple buckets for 10 hour range, got %d", len(buckets))
// Log buckets for debugging
t.Logf("Generated %d buckets:", len(buckets))
for i, b := range buckets {
durationMs := (b.toNS - b.fromNS) / 1e6
t.Logf("Bucket %d: duration=%dms", i, durationMs)
}
// Verify buckets are in reverse chronological order (newest to oldest)
for i := 0; i < len(buckets)-1; i++ {
assert.True(t, buckets[i].toNS > buckets[i+1].toNS,
"Bucket %d end should be after bucket %d end", i, i+1)
assert.Equal(t, buckets[i].fromNS, buckets[i+1].toNS,
"Bucket %d start should equal bucket %d end (continuous buckets)", i, i+1)
}
// First bucket should end at endNS (converted to nanoseconds)
expectedEndNS := querybuilder.ToNanoSecs(endMS)
assert.Equal(t, expectedEndNS, buckets[0].toNS)
// Last bucket should start at startNS (converted to nanoseconds)
expectedStartNS := querybuilder.ToNanoSecs(startMS)
assert.Equal(t, expectedStartNS, buckets[len(buckets)-1].fromNS)
}

View File

@ -176,7 +176,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
lblVals = append(lblVals, *val)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
Value: *val,
})
default:
@ -227,8 +227,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}
}
if maxAgg < 0 {
//nolint:nilnil
return nil, nil // empty result-set
return &qbtypes.TimeSeriesData{
QueryName: queryName,
}, nil
}
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
@ -319,8 +320,9 @@ func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, erro
}
return &qbtypes.ScalarData{
Columns: cd,
Data: data,
QueryName: queryName,
Columns: cd,
Data: data,
}, nil
}

View File

@ -17,5 +17,5 @@ type BucketCache interface {
// cached portion + list of gaps to fetch
GetMissRanges(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step) (cached *qbtypes.Result, missing []*qbtypes.TimeRange)
// store fresh buckets for future hits
Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result)
Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result)
}

652
pkg/querier/postprocess.go Normal file
View File

@ -0,0 +1,652 @@
package querier
import (
"context"
"fmt"
"sort"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// queryInfo holds common query properties
type queryInfo struct {
Name string
Disabled bool
Step qbtypes.Step
}
// getqueryInfo extracts common info from any query type
func getqueryInfo(spec any) queryInfo {
switch s := spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderFormula:
return queryInfo{Name: s.Name, Disabled: false}
case qbtypes.PromQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.Step}
case qbtypes.ClickHouseQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled}
}
return queryInfo{}
}
// getQueryName is a convenience function when only name is needed
func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
for name, result := range results {
typedResults[name] = &qbtypes.Result{
Value: result,
}
}
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessMetricQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
}
}
// Apply formula calculations
typedResults = q.applyFormulas(ctx, typedResults, req)
// Filter out disabled queries
typedResults = q.filterDisabledQueries(typedResults, req)
// Apply table formatting for UI if requested
if req.FormatOptions != nil && req.FormatOptions.FormatTableResultForUI && req.RequestType == qbtypes.RequestTypeScalar {
// Format results as a table - this merges all queries into a single table
tableResult := q.formatScalarResultsAsTable(typedResults, req)
// Return the table under the first query's name so it gets included in results
if len(req.CompositeQuery.Queries) > 0 {
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
if firstQueryName != "" && tableResult["table"] != nil {
// Return table under first query name
return map[string]any{firstQueryName: tableResult["table"]}, nil
}
}
return tableResult, nil
}
// Convert back to map[string]any
finalResults := make(map[string]any)
for name, result := range typedResults {
finalResults[name] = result.Value
}
return finalResults, nil
}
// postProcessBuilderQuery applies postprocessing to a single builder query result
func postProcessBuilderQuery[T any](
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[T],
_ *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// Apply functions
if len(query.Functions) > 0 {
result = q.applyFunctions(result, query.Functions)
}
return result
}
// postProcessMetricQuery applies postprocessing to a metric query result
func postProcessMetricQuery(
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
if query.Limit > 0 {
result = q.applySeriesLimit(result, query.Limit, query.Order)
}
if len(query.Functions) > 0 {
result = q.applyFunctions(result, query.Functions)
}
// Apply reduce to for scalar request type
if req.RequestType == qbtypes.RequestTypeScalar {
if len(query.Aggregations) > 0 && query.Aggregations[0].ReduceTo != qbtypes.ReduceToUnknown {
result = q.applyMetricReduceTo(result, query.Aggregations[0].ReduceTo)
}
}
return result
}
// applyMetricReduceTo applies reduce to operation using the metric's ReduceTo field
func (q *querier) applyMetricReduceTo(result *qbtypes.Result, reduceOp qbtypes.ReduceTo) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
// Use the FunctionReduceTo helper
reducedSeries := qbtypes.FunctionReduceTo(series, reduceOp)
agg.Series[i] = reducedSeries
}
}
}
scalarData := convertTimeSeriesDataToScalar(tsData, tsData.QueryName)
result.Value = scalarData
return result
}
// applySeriesLimit limits the number of series in the result
func (q *querier) applySeriesLimit(result *qbtypes.Result, limit int, orderBy []qbtypes.OrderBy) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
// Use the ApplySeriesLimit function from querybuildertypes
agg.Series = qbtypes.ApplySeriesLimit(agg.Series, orderBy, limit)
}
}
return result
}
// applyFunctions applies functions to time series data
func (q *querier) applyFunctions(result *qbtypes.Result, functions []qbtypes.Function) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
agg.Series[i] = qbtypes.ApplyFunctions(functions, series)
}
}
}
return result
}
// applyFormulas processes formula queries in the composite query
func (q *querier) applyFormulas(ctx context.Context, results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
// Collect formula queries
formulaQueries := make(map[string]qbtypes.QueryBuilderFormula)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeFormula {
if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok {
formulaQueries[formula.Name] = formula
}
}
}
// Process each formula
for name, formula := range formulaQueries {
// Check if we're dealing with time series or scalar data
if req.RequestType == qbtypes.RequestTypeTimeSeries {
result := q.processTimeSeriesFormula(ctx, results, formula, req)
if result != nil {
results[name] = result
}
}
}
return results
}
// processTimeSeriesFormula handles formula evaluation for time series data
func (q *querier) processTimeSeriesFormula(
ctx context.Context,
results map[string]*qbtypes.Result,
formula qbtypes.QueryBuilderFormula,
_ *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// Prepare time series data for formula evaluation
timeSeriesData := make(map[string]*qbtypes.TimeSeriesData)
// Extract time series data from results
for queryName, result := range results {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
timeSeriesData[queryName] = tsData
}
}
// Create formula evaluator
// TODO(srikanthccv): add conditional default zero
canDefaultZero := make(map[string]bool)
evaluator, err := qbtypes.NewFormulaEvaluator(formula.Expression, canDefaultZero)
if err != nil {
q.logger.ErrorContext(ctx, "failed to create formula evaluator", "error", err, "formula", formula.Name)
return nil
}
// Evaluate the formula
formulaSeries, err := evaluator.EvaluateFormula(timeSeriesData)
if err != nil {
q.logger.ErrorContext(ctx, "failed to evaluate formula", "error", err, "formula", formula.Name)
return nil
}
// Create result for formula
formulaResult := &qbtypes.TimeSeriesData{
QueryName: formula.Name,
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: formulaSeries,
},
},
}
// Apply functions if any
result := &qbtypes.Result{
Value: formulaResult,
}
if len(formula.Functions) > 0 {
result = q.applyFunctions(result, formula.Functions)
}
return result
}
// filterDisabledQueries removes results for disabled queries
func (q *querier) filterDisabledQueries(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
filtered := make(map[string]*qbtypes.Result)
for _, query := range req.CompositeQuery.Queries {
info := getqueryInfo(query.Spec)
if !info.Disabled {
if result, ok := results[info.Name]; ok {
filtered[info.Name] = result
}
}
}
return filtered
}
// formatScalarResultsAsTable formats scalar results as a unified table for UI display
func (q *querier) formatScalarResultsAsTable(results map[string]*qbtypes.Result, _ *qbtypes.QueryRangeRequest) map[string]any {
if len(results) == 0 {
return map[string]any{"table": &qbtypes.ScalarData{}}
}
// Convert all results to ScalarData first
scalarResults := make(map[string]*qbtypes.ScalarData)
for name, result := range results {
if sd, ok := result.Value.(*qbtypes.ScalarData); ok {
scalarResults[name] = sd
} else if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
scalarResults[name] = convertTimeSeriesDataToScalar(tsData, name)
}
}
// If single result already has multiple queries, just deduplicate
if len(scalarResults) == 1 {
for _, sd := range scalarResults {
if hasMultipleQueries(sd) {
return map[string]any{"table": deduplicateRows(sd)}
}
}
}
// Otherwise merge all results
merged := mergeScalarData(scalarResults)
return map[string]any{"table": merged}
}
// convertTimeSeriesDataToScalar converts time series to scalar format
func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName string) *qbtypes.ScalarData {
if tsData == nil || len(tsData.Aggregations) == 0 {
return &qbtypes.ScalarData{QueryName: queryName}
}
columns := []*qbtypes.ColumnDescriptor{}
// Add group columns from first series
if len(tsData.Aggregations[0].Series) > 0 {
for _, label := range tsData.Aggregations[0].Series[0].Labels {
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: label.Key,
QueryName: queryName,
Type: qbtypes.ColumnTypeGroup,
})
}
}
// Add aggregation columns
for _, agg := range tsData.Aggregations {
name := agg.Alias
if name == "" {
name = fmt.Sprintf("__result_%d", agg.Index)
}
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
QueryName: queryName,
AggregationIndex: int64(agg.Index),
Meta: agg.Meta,
Type: qbtypes.ColumnTypeAggregation,
})
}
// Build rows
data := [][]any{}
for seriesIdx, series := range tsData.Aggregations[0].Series {
row := make([]any, len(columns))
// Add group values
for i, label := range series.Labels {
row[i] = label.Value
}
// Add aggregation values (last value)
groupColCount := len(series.Labels)
for aggIdx, agg := range tsData.Aggregations {
if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 {
lastValue := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value
row[groupColCount+aggIdx] = lastValue
} else {
row[groupColCount+aggIdx] = "n/a"
}
}
data = append(data, row)
}
return &qbtypes.ScalarData{
QueryName: queryName,
Columns: columns,
Data: data,
}
}
// hasMultipleQueries checks if ScalarData contains columns from multiple queries
func hasMultipleQueries(sd *qbtypes.ScalarData) bool {
queries := make(map[string]bool)
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" {
queries[col.QueryName] = true
}
}
return len(queries) > 1
}
// deduplicateRows removes duplicate rows based on group columns
func deduplicateRows(sd *qbtypes.ScalarData) *qbtypes.ScalarData {
// Find group column indices
groupIndices := []int{}
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupIndices = append(groupIndices, i)
}
}
// Build unique rows map
uniqueRows := make(map[string][]any)
for _, row := range sd.Data {
key := buildRowKey(row, groupIndices)
if existing, found := uniqueRows[key]; found {
// Merge non-n/a values
for i, val := range row {
if existing[i] == "n/a" && val != "n/a" {
existing[i] = val
}
}
} else {
rowCopy := make([]any, len(row))
copy(rowCopy, row)
uniqueRows[key] = rowCopy
}
}
// Convert back to slice
data := make([][]any, 0, len(uniqueRows))
for _, row := range uniqueRows {
data = append(data, row)
}
// Sort by first aggregation column
sortByFirstAggregation(data, sd.Columns)
return &qbtypes.ScalarData{
Columns: sd.Columns,
Data: data,
}
}
// mergeScalarData merges multiple scalar data results
func mergeScalarData(results map[string]*qbtypes.ScalarData) *qbtypes.ScalarData {
// Collect unique group columns
groupCols := []string{}
groupColMap := make(map[string]*qbtypes.ColumnDescriptor)
for _, sd := range results {
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
if _, exists := groupColMap[col.Name]; !exists {
groupColMap[col.Name] = col
groupCols = append(groupCols, col.Name)
}
}
}
}
// Build final columns
columns := []*qbtypes.ColumnDescriptor{}
// Add group columns
for _, name := range groupCols {
columns = append(columns, groupColMap[name])
}
// Add aggregation columns from each query (sorted by query name)
queryNames := make([]string, 0, len(results))
for name := range results {
queryNames = append(queryNames, name)
}
sort.Strings(queryNames)
for _, queryName := range queryNames {
sd := results[queryName]
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
columns = append(columns, col)
}
}
}
// Merge rows
rowMap := make(map[string][]any)
for queryName, sd := range results {
// Create index mappings
groupMap := make(map[string]int)
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupMap[col.Name] = i
}
}
// Process each row
for _, row := range sd.Data {
key := buildKeyFromGroupCols(row, groupMap, groupCols)
if _, exists := rowMap[key]; !exists {
// Initialize new row
newRow := make([]any, len(columns))
// Set group values
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
newRow[i] = row[idx]
} else {
newRow[i] = "n/a"
}
}
// Initialize all aggregations to n/a
for i := len(groupCols); i < len(columns); i++ {
newRow[i] = "n/a"
}
rowMap[key] = newRow
}
// Set aggregation values for this query
mergedRow := rowMap[key]
colIdx := len(groupCols)
for _, col := range columns[len(groupCols):] {
if col.QueryName == queryName {
// Find the value in the original row
for i, origCol := range sd.Columns {
if origCol.Type == qbtypes.ColumnTypeAggregation &&
origCol.AggregationIndex == col.AggregationIndex {
if i < len(row) {
mergedRow[colIdx] = row[i]
}
break
}
}
}
colIdx++
}
}
}
// Convert to slice
data := make([][]any, 0, len(rowMap))
for _, row := range rowMap {
data = append(data, row)
}
// Sort by first aggregation column
sortByFirstAggregation(data, columns)
return &qbtypes.ScalarData{
Columns: columns,
Data: data,
}
}
// buildRowKey builds a unique key from row values at specified indices
func buildRowKey(row []any, indices []int) string {
parts := make([]string, len(indices))
for i, idx := range indices {
if idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// buildKeyFromGroupCols builds a key from group column values
func buildKeyFromGroupCols(row []any, groupMap map[string]int, groupCols []string) string {
parts := make([]string, len(groupCols))
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// sortByFirstAggregation sorts data by the first aggregation column (descending)
func sortByFirstAggregation(data [][]any, columns []*qbtypes.ColumnDescriptor) {
// Find first aggregation column
aggIdx := -1
for i, col := range columns {
if col.Type == qbtypes.ColumnTypeAggregation {
aggIdx = i
break
}
}
if aggIdx < 0 {
return
}
sort.SliceStable(data, func(i, j int) bool {
return compareValues(data[i][aggIdx], data[j][aggIdx]) > 0
})
}
// compareValues compares two values for sorting (handles n/a and numeric types)
func compareValues(a, b any) int {
// Handle n/a values
if a == "n/a" && b == "n/a" {
return 0
}
if a == "n/a" {
return -1
}
if b == "n/a" {
return 1
}
// Compare numeric values
aFloat, aOk := toFloat64(a)
bFloat, bOk := toFloat64(b)
if aOk && bOk {
if aFloat > bFloat {
return 1
} else if aFloat < bFloat {
return -1
}
return 0
}
// Fallback to string comparison
return 0
}
// toFloat64 attempts to convert a value to float64
func toFloat64(v any) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int64:
return float64(val), true
case int:
return float64(val), true
case int32:
return float64(val), true
}
return 0, false
}

View File

@ -5,12 +5,14 @@ import (
"fmt"
"log/slog"
"slices"
"strconv"
"sync"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
@ -54,8 +56,82 @@ func New(
}
}
// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present
func extractShiftFromBuilderQuery[T any](spec qbtypes.QueryBuilderQuery[T]) int64 {
for _, fn := range spec.Functions {
if fn.Name == qbtypes.FunctionNameTimeShift && len(fn.Args) > 0 {
switch v := fn.Args[0].Value.(type) {
case float64:
return int64(v)
case int64:
return v
case int:
return int64(v)
case string:
if shiftFloat, err := strconv.ParseFloat(v, 64); err == nil {
return int64(shiftFloat)
}
}
}
}
return 0
}
// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function
func adjustTimeRangeForShift[T any](spec qbtypes.QueryBuilderQuery[T], tr qbtypes.TimeRange, kind qbtypes.RequestType) qbtypes.TimeRange {
// Only apply time shift for time series and scalar queries
// Raw/list queries don't support timeshift
if kind != qbtypes.RequestTypeTimeSeries && kind != qbtypes.RequestTypeScalar {
return tr
}
// Use the ShiftBy field if it's already populated, otherwise extract it
shiftBy := spec.ShiftBy
if shiftBy == 0 {
shiftBy = extractShiftFromBuilderQuery(spec)
}
if shiftBy == 0 {
return tr
}
// ShiftBy is in seconds, convert to milliseconds and shift backward in time
shiftMS := shiftBy * 1000
return qbtypes.TimeRange{
From: tr.From - uint64(shiftMS),
To: tr.To - uint64(shiftMS),
}
}
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
// First pass: collect all metric names that need temporality
metricNames := make([]string, 0)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
for _, agg := range spec.Aggregations {
if agg.MetricName != "" {
metricNames = append(metricNames, agg.MetricName)
}
}
}
}
}
// Fetch temporality for all metrics at once
var metricTemporality map[string]metrictypes.Temporality
if len(metricNames) > 0 {
var err error
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
// Continue without temporality - statement builder will handle unspecified
metricTemporality = make(map[string]metrictypes.Temporality)
}
q.logger.DebugContext(ctx, "fetched metric temporalities", "metric_temporality", metricTemporality)
}
queries := make(map[string]qbtypes.Query)
steps := make(map[string]qbtypes.Step)
@ -79,15 +155,28 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
for i := range spec.Aggregations {
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
spec.Aggregations[i].Temporality = temp
}
}
}
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
default:
@ -133,13 +222,18 @@ func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbty
}
}
processedResults, err := q.postProcessResults(ctx, results, req)
if err != nil {
return nil, err
}
return &qbtypes.QueryRangeResponse{
Type: req.RequestType,
Data: struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
}{
Results: maps.Values(results),
Results: maps.Values(processedResults),
Warnings: warnings,
},
Meta: struct {
@ -173,7 +267,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
return nil, err
}
// Store in cache for future use
q.bucketCache.Put(ctx, orgID, query, result)
q.bucketCache.Put(ctx, orgID, query, step, result)
return result, nil
}
}
@ -183,6 +277,10 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
errors := make([]error, len(missingRanges))
totalStats := qbtypes.ExecStats{}
q.logger.DebugContext(ctx, "executing queries for missing ranges",
"missing_ranges_count", len(missingRanges),
"ranges", missingRanges)
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
@ -224,7 +322,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
if err != nil {
return nil, err
}
q.bucketCache.Put(ctx, orgID, query, result)
q.bucketCache.Put(ctx, orgID, query, step, result)
return result, nil
}
}
@ -248,7 +346,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
mergedResult.Stats.DurationMS += totalStats.DurationMS
// Store merged result in cache
q.bucketCache.Put(ctx, orgID, query, mergedResult)
q.bucketCache.Put(ctx, orgID, query, step, mergedResult)
return mergedResult, nil
}
@ -261,11 +359,17 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
case *chSQLQuery:
return newchSQLQuery(q.telemetryStore, qt.query, qt.args, timeRange, qt.kind)
case *builderQuery[qbtypes.TraceAggregation]:
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
case *builderQuery[qbtypes.LogAggregation]:
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
case *builderQuery[qbtypes.MetricAggregation]:
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
default:
return nil
}
@ -273,8 +377,29 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
// mergeResults merges cached result with fresh results
func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result {
if cached == nil && len(fresh) == 1 {
return fresh[0]
if cached == nil {
if len(fresh) == 1 {
return fresh[0]
}
if len(fresh) == 0 {
return nil
}
// If cached is nil but we have multiple fresh results, we need to merge them
// We need to merge all fresh results properly to avoid duplicates
merged := &qbtypes.Result{
Type: fresh[0].Type,
Stats: fresh[0].Stats,
Warnings: fresh[0].Warnings,
}
// Merge all fresh results including the first one
switch merged.Type {
case qbtypes.RequestTypeTimeSeries:
// Pass nil as cached value to ensure proper merging of all fresh results
merged.Value = q.mergeTimeSeriesResults(nil, fresh)
}
return merged
}
// Start with cached result
@ -315,23 +440,52 @@ func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result)
// mergeTimeSeriesResults merges time series data
func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData {
// Map to store merged series by query name and series key
// Map to store merged series by aggregation index and series key
seriesMap := make(map[int]map[string]*qbtypes.TimeSeries)
// Map to store aggregation bucket metadata
bucketMetadata := make(map[int]*qbtypes.AggregationBucket)
for _, aggBucket := range cachedValue.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
seriesMap[aggBucket.Index][key] = series
// Process cached data if available
if cachedValue != nil && cachedValue.Aggregations != nil {
for _, aggBucket := range cachedValue.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
if bucketMetadata[aggBucket.Index] == nil {
bucketMetadata[aggBucket.Index] = aggBucket
}
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
// Merge values from duplicate series in cached data, avoiding duplicate timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// Create a copy to avoid modifying the cached data
seriesCopy := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, len(series.Values)),
}
copy(seriesCopy.Values, series.Values)
seriesMap[aggBucket.Index][key] = seriesCopy
}
}
}
}
// Add fresh series
for _, result := range freshResults {
freshTS, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
if !ok || freshTS == nil || freshTS.Aggregations == nil {
continue
}
@ -339,6 +493,12 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
// Prefer fresh metadata over cached metadata
if aggBucket.Alias != "" || aggBucket.Meta.Unit != "" {
bucketMetadata[aggBucket.Index] = aggBucket
} else if bucketMetadata[aggBucket.Index] == nil {
bucketMetadata[aggBucket.Index] = aggBucket
}
}
for _, aggBucket := range freshTS.Aggregations {
@ -346,8 +506,19 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
key := qbtypes.GetUniqueSeriesKey(series.Labels)
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
// Merge values
existingSeries.Values = append(existingSeries.Values, series.Values...)
// Merge values, avoiding duplicate timestamps
// Create a map to track existing timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// New series
seriesMap[aggBucket.Index][key] = series
@ -357,10 +528,18 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
}
result := &qbtypes.TimeSeriesData{
QueryName: cachedValue.QueryName,
Aggregations: []*qbtypes.AggregationBucket{},
}
// Set QueryName from cached or first fresh result
if cachedValue != nil {
result.QueryName = cachedValue.QueryName
} else if len(freshResults) > 0 {
if freshTS, ok := freshResults[0].Value.(*qbtypes.TimeSeriesData); ok && freshTS != nil {
result.QueryName = freshTS.QueryName
}
}
for index, series := range seriesMap {
var aggSeries []*qbtypes.TimeSeries
for _, s := range series {
@ -377,10 +556,17 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
aggSeries = append(aggSeries, s)
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
// Preserve bucket metadata from either cached or fresh results
bucket := &qbtypes.AggregationBucket{
Index: index,
Series: aggSeries,
})
}
if metadata, ok := bucketMetadata[index]; ok {
bucket.Alias = metadata.Alias
bucket.Meta = metadata.Meta
}
result.Aggregations = append(result.Aggregations, bucket)
}
return result

229
pkg/querier/shift_test.go Normal file
View File

@ -0,0 +1,229 @@
package querier
import (
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/stretchr/testify/assert"
)
// TestAdjustTimeRangeForShift tests the time range adjustment logic
func TestAdjustTimeRangeForShift(t *testing.T) {
tests := []struct {
name string
spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
timeRange qbtypes.TimeRange
requestType qbtypes.RequestType
expectedFromMS uint64
expectedToMS uint64
}{
{
name: "no shift",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 1000000,
expectedToMS: 2000000,
},
{
name: "shift by 60 seconds using timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "60"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 940000, // 1000000 - 60000
expectedToMS: 1940000, // 2000000 - 60000
},
{
name: "shift by negative 30 seconds (future shift)",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "-30"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 1030000, // 1000000 - (-30000)
expectedToMS: 2030000, // 2000000 - (-30000)
},
{
name: "no shift for raw request type even with timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeRaw,
expectedFromMS: 1000000, // No shift for raw queries
expectedToMS: 2000000,
},
{
name: "shift applied for scalar request type with timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 10000000,
To: 20000000,
},
requestType: qbtypes.RequestTypeScalar,
expectedFromMS: 6400000, // 10000000 - 3600000
expectedToMS: 16400000, // 20000000 - 3600000
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := adjustTimeRangeForShift(tt.spec, tt.timeRange, tt.requestType)
assert.Equal(t, tt.expectedFromMS, result.From, "fromMS mismatch")
assert.Equal(t, tt.expectedToMS, result.To, "toMS mismatch")
})
}
}
// TestExtractShiftFromBuilderQuery tests the shift extraction logic
func TestExtractShiftFromBuilderQuery(t *testing.T) {
tests := []struct {
name string
spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expectedShiftBy int64
}{
{
name: "extract from timeShift function with float64",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: float64(3600)},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "extract from timeShift function with int64",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: int64(3600)},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "extract from timeShift function with string",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "no timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
expectedShiftBy: 0,
},
{
name: "invalid timeShift value",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "invalid"},
},
},
},
},
expectedShiftBy: 0,
},
{
name: "multiple functions with timeShift",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "1800"},
},
},
{
Name: qbtypes.FunctionNameClampMax,
Args: []qbtypes.FunctionArg{
{Value: "100"},
},
},
},
},
expectedShiftBy: 1800,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shiftBy := extractShiftFromBuilderQuery(tt.spec)
assert.Equal(t, tt.expectedShiftBy, shiftBy)
})
}
}

View File

@ -68,7 +68,7 @@ func CollisionHandledFinalExpr(
return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
} else {
// not even a close match, return an error
return "", nil, err
return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)
}
} else {
for _, key := range keysForField {

View File

@ -46,7 +46,7 @@ func (b *defaultConditionBuilder) ConditionFor(
) (string, error) {
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", nil
return "true", nil
}
column, err := b.fm.ColumnFor(ctx, key)

View File

@ -22,7 +22,7 @@ type filterExpressionVisitor struct {
conditionBuilder qbtypes.ConditionBuilder
warnings []string
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
errors []error
errors []string
builder *sqlbuilder.SelectBuilder
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
@ -90,11 +90,14 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W
combinedErrors := errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"found %d syntax errors while parsing the filter expression: %v",
"found %d syntax errors while parsing the filter expression",
len(parserErrorListener.SyntaxErrors),
parserErrorListener.SyntaxErrors,
)
return nil, nil, combinedErrors
additionals := make([]string, len(parserErrorListener.SyntaxErrors))
for _, err := range parserErrorListener.SyntaxErrors {
additionals = append(additionals, err.Error())
}
return nil, nil, combinedErrors.WithAdditional(additionals...)
}
// Visit the parse tree with our ClickHouse visitor
@ -105,11 +108,10 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W
combinedErrors := errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"found %d errors while parsing the search expression: %v",
"found %d errors while parsing the search expression",
len(visitor.errors),
visitor.errors,
)
return nil, nil, combinedErrors
return nil, nil, combinedErrors.WithAdditional(visitor.errors...)
}
whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond)
@ -234,15 +236,11 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
// Handle standalone key/value as a full text search term
if ctx.GetChildCount() == 1 {
if v.skipFullTextFilter {
return ""
return "true"
}
if v.fullTextColumn == nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"full text search is not supported",
))
v.errors = append(v.errors, "full text search is not supported")
return ""
}
child := ctx.GetChild(0)
@ -251,7 +249,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
keyText := keyCtx.GetText()
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, keyText, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@ -266,12 +264,12 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
} else if valCtx.KEY() != nil {
text = valCtx.KEY().GetText()
} else {
v.errors = append(v.errors, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unsupported value type: %s", valCtx.GetText()))
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@ -419,7 +417,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
for _, key := range keys {
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, value, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
return ""
}
conds = append(conds, condition)
@ -459,7 +457,7 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
if v.skipFullTextFilter {
return ""
return "true"
}
var text string
@ -471,16 +469,12 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
}
if v.fullTextColumn == nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"full text search is not supported",
))
v.errors = append(v.errors, "full text search is not supported")
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@ -498,34 +492,19 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
functionName = "hasAll"
} else {
// Default fallback
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"unknown function `%s`",
ctx.GetText(),
))
v.errors = append(v.errors, fmt.Sprintf("unknown function `%s`", ctx.GetText()))
return ""
}
params := v.Visit(ctx.FunctionParamList()).([]any)
if len(params) < 2 {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` expects key and value parameters",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key and value parameters", functionName))
return ""
}
keys, ok := params[0].([]*telemetrytypes.TelemetryFieldKey)
if !ok {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` expects key parameter to be a field key",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be a field key", functionName))
return ""
}
value := params[1:]
@ -536,12 +515,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
if strings.HasPrefix(key.Name, v.jsonBodyPrefix) {
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
} else {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` supports only body JSON search",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` supports only body JSON search", functionName))
return ""
}
@ -603,12 +577,7 @@ func (v *filterExpressionVisitor) VisitValue(ctx *grammar.ValueContext) any {
} else if ctx.NUMBER() != nil {
number, err := strconv.ParseFloat(ctx.NUMBER().GetText(), 64)
if err != nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to parse number %s",
ctx.NUMBER().GetText(),
))
v.errors = append(v.errors, fmt.Sprintf("failed to parse number %s", ctx.NUMBER().GetText()))
return ""
}
return number
@ -648,19 +617,11 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
if len(fieldKeysForName) == 0 {
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" && keyName == "" {
v.errors = append(v.errors, errors.NewInvalidInputf(
errors.CodeInvalidInput,
"missing key for body json search - expected key of the form `body.key` (ex: `body.status`)",
))
v.errors = append(v.errors, "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)")
} else {
// TODO(srikanthccv): do we want to return an error here?
// should we infer the type and auto-magically build a key for expression?
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"key `%s` not found",
fieldKey.Name,
))
v.errors = append(v.errors, fmt.Sprintf("key `%s` not found", fieldKey.Name))
}
}

View File

@ -173,7 +173,7 @@ func (m *fieldMapper) ColumnExpressionFor(
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
} else {
// not even a close match, return an error
return "", err
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)
}
}
} else if len(keysForField) == 1 {
@ -186,7 +186,7 @@ func (m *fieldMapper) ColumnExpressionFor(
colName, _ = m.FieldFor(ctx, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", "))
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
}
}

View File

@ -2,8 +2,10 @@ package telemetrylogs
import (
"fmt"
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@ -2315,7 +2317,15 @@ func TestFilterExprLogs(t *testing.T) {
require.Equal(t, tc.expectedArgs, args)
} else {
require.Error(t, err, "Expected error for query: %s", tc.query)
require.Contains(t, err.Error(), tc.expectedErrorContains)
_, _, _, _, _, a := errors.Unwrapb(err)
contains := false
for _, warn := range a {
if strings.Contains(warn, tc.expectedErrorContains) {
contains = true
break
}
}
require.True(t, contains)
}
})
}

View File

@ -6,7 +6,6 @@ import (
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@ -14,10 +13,6 @@ import (
"github.com/huandu/go-sqlbuilder"
)
var (
ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation")
)
type logQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
@ -165,12 +160,18 @@ func (b *logQueryStatementBuilder) buildListQuery(
// Add order by
for _, orderBy := range query.Order {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction))
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
} else {
sb.Limit(100)
}
if query.Offset > 0 {
@ -381,9 +382,9 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
for _, orderBy := range query.Order {
idx, ok := aggOrderBy(orderBy, query)
if ok {
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction))
sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue()))
} else {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction))
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
}
}
@ -420,19 +421,25 @@ func (b *logQueryStatementBuilder) addFilterCondition(
keys map[string][]*telemetrytypes.TelemetryFieldKey,
) ([]string, error) {
// add filter expression
filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonBodyPrefix: b.jsonBodyPrefix,
JsonKeyToKey: b.jsonKeyToKey,
})
var filterWhereClause *sqlbuilder.WhereClause
var warnings []string
var err error
if err != nil {
return nil, err
if query.Filter != nil && query.Filter.Expression != "" {
// add filter expression
filterWhereClause, warnings, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
SkipResourceFilter: true,
FullTextColumn: b.fullTextColumn,
JsonBodyPrefix: b.jsonBodyPrefix,
JsonKeyToKey: b.jsonKeyToKey,
})
if err != nil {
return nil, err
}
}
if filterWhereClause != nil {

View File

@ -70,6 +70,45 @@ func TestStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "test",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
Order: []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL",
Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@ -95,7 +95,7 @@ func (m *fieldMapper) ColumnExpressionFor(
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
} else {
// not even a close match, return an error
return "", err
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)
}
}
} else if len(keysForField) == 1 {
@ -108,7 +108,7 @@ func (m *fieldMapper) ColumnExpressionFor(
colName, _ = m.FieldFor(ctx, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", "))
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@ -54,8 +55,10 @@ func NewTelemetryMetaStore(
relatedMetadataDBName string,
relatedMetadataTblName string,
) telemetrytypes.MetadataStore {
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
t := &telemetryMetaStore{
logger: metadataSettings.Logger(),
telemetrystore: telemetrystore,
tracesDBName: tracesDBName,
tracesFieldsTblName: tracesFieldsTblName,
@ -879,3 +882,90 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
}
return values, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
if err != nil {
return metrictypes.Unknown, err
}
temporality, ok := temporalityMap[metricName]
if !ok {
return metrictypes.Unknown, nil
}
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), nil
}
result := make(map[string]metrictypes.Temporality)
// Build query to fetch temporality for all metrics
// We use attr_string_value where attr_name = '__temporality__'
// Note: The columns are mixed in the current data - temporality column contains metric_name
// and metric_name column contains temporality value, so we use the correct mapping
sb := sqlbuilder.Select(
"temporality as metric_name",
"argMax(attr_string_value, last_reported_unix_milli) as temporality_value",
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("temporality", metricNames))
// Only fetch temporality metadata rows (where attr_name = '__temporality__')
sb.Where(sb.E("attr_name", "__temporality__"))
// Group by metric name to get one temporality per metric
sb.GroupBy("temporality")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching metric temporality", "query", query, "args", args)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName, temporalityStr string
if err := rows.Scan(&metricName, &temporalityStr); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
// Convert string to Temporality type
var temporality metrictypes.Temporality
switch temporalityStr {
case "Delta":
temporality = metrictypes.Delta
case "Cumulative":
temporality = metrictypes.Cumulative
case "Unspecified":
temporality = metrictypes.Unspecified
default:
// Unknown or empty temporality
temporality = metrictypes.Unknown
}
result[metricName] = temporality
}
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if _, exists := result[metricName]; !exists {
result[metricName] = metrictypes.Unknown
}
}
return result, nil
}

View File

@ -305,7 +305,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
sb.LTE("unix_milli", end),
)
if query.Aggregations[0].Temporality != metrictypes.Unspecified {
if query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}

View File

@ -147,8 +147,8 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0},
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0},
},
expectedErr: nil,
},

View File

@ -250,7 +250,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
} else {
// not even a close match, return an error
return "", err
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)
}
}
} else if len(keysForField) == 1 {
@ -263,7 +263,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor(
colName, _ = m.FieldFor(ctx, key)
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
}
colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", "))
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
}
}

View File

@ -179,12 +179,18 @@ func (b *traceQueryStatementBuilder) buildListQuery(
// Add order by
for _, orderBy := range query.Order {
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys)
if err != nil {
return nil, err
}
sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue()))
}
// Add limit and offset
if query.Limit > 0 {
sb.Limit(query.Limit)
} else {
sb.Limit(100)
}
if query.Offset > 0 {

View File

@ -13,7 +13,8 @@ type Temporality struct {
var (
Delta = Temporality{valuer.NewString("delta")}
Cumulative = Temporality{valuer.NewString("cumulative")}
Unspecified = Temporality{valuer.NewString("")}
Unspecified = Temporality{valuer.NewString("unspecified")}
Unknown = Temporality{valuer.NewString("")}
)
// Type is the type of the metric in OTLP data model

View File

@ -328,6 +328,8 @@ type MetricAggregation struct {
TableHints *metrictypes.MetricTableHints `json:"-"`
// value filter to apply to the query
ValueFilter *metrictypes.MetricValueFilter `json:"-"`
// reduce to operator for metric scalar requests
ReduceTo ReduceTo `json:"reduceTo,omitempty"`
}
type Filter struct {
@ -379,7 +381,7 @@ type FunctionArg struct {
// name of the argument
Name string `json:"name,omitempty"`
// value of the argument
Value string `json:"value"`
Value any `json:"value"`
}
type Function struct {

View File

@ -55,4 +55,25 @@ type QueryBuilderQuery[T any] struct {
// functions to apply to the query
Functions []Function `json:"functions,omitempty"`
// ShiftBy is extracted from timeShift function for internal use
// This field is not serialized to JSON
ShiftBy int64 `json:"-"`
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields
func (q *QueryBuilderQuery[T]) UnmarshalJSON(data []byte) error {
// Define a type alias to avoid infinite recursion
type Alias QueryBuilderQuery[T]
var temp Alias
// Use UnmarshalJSONWithContext for better error messages
if err := UnmarshalJSONWithContext(data, &temp, "query spec"); err != nil {
return err
}
// Copy the decoded values back to the original struct
*q = QueryBuilderQuery[T](temp)
return nil
}

View File

@ -20,10 +20,30 @@ type QueryBuilderFormula struct {
// expression to apply to the query
Expression string `json:"expression"`
// order by keys and directions
Order []OrderBy `json:"order,omitempty"`
// limit the maximum number of rows to return
Limit int `json:"limit,omitempty"`
// having clause to apply to the formula result
Having *Having `json:"having,omitempty"`
// functions to apply to the formula result
Functions []Function `json:"functions,omitempty"`
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields
func (f *QueryBuilderFormula) UnmarshalJSON(data []byte) error {
type Alias QueryBuilderFormula
var temp Alias
if err := UnmarshalJSONWithContext(data, &temp, "formula spec"); err != nil {
return err
}
*f = QueryBuilderFormula(temp)
return nil
}
// small container to store the query name and index or alias reference
// for a variable in the formula expression
// read below for more details on aggregation references

View File

@ -93,9 +93,20 @@ func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries {
return result
}
// parseFloat64Arg parses a string argument to float64
func parseFloat64Arg(value string) (float64, error) {
return strconv.ParseFloat(value, 64)
// parseFloat64Arg parses an argument to float64
func parseFloat64Arg(value any) (float64, error) {
switch v := value.(type) {
case float64:
return v, nil
case int64:
return float64(v), nil
case int:
return float64(v), nil
case string:
return strconv.ParseFloat(v, 64)
default:
return 0, strconv.ErrSyntax
}
}
// getEWMAAlpha calculates the alpha value for EWMA functions

View File

@ -0,0 +1,163 @@
package querybuildertypesv5
import (
"bytes"
"encoding/json"
"reflect"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// UnmarshalJSONWithSuggestions unmarshals JSON data into the target struct
// and provides field name suggestions for unknown fields
func UnmarshalJSONWithSuggestions(data []byte, target any) error {
return UnmarshalJSONWithContext(data, target, "")
}
// UnmarshalJSONWithContext unmarshals JSON with context information for better error messages
func UnmarshalJSONWithContext(data []byte, target any, context string) error {
// First, try to unmarshal with DisallowUnknownFields to catch unknown fields
dec := json.NewDecoder(bytes.NewReader(data))
dec.DisallowUnknownFields()
err := dec.Decode(target)
if err == nil {
// No error, successful unmarshal
return nil
}
// Check if it's an unknown field error
if strings.Contains(err.Error(), "unknown field") {
// Extract the unknown field name
unknownField := extractUnknownField(err.Error())
if unknownField != "" {
// Get valid field names from the target struct
validFields := getJSONFieldNames(target)
// Build error message with context
errorMsg := "unknown field %q"
if context != "" {
errorMsg = "unknown field %q in " + context
}
// Find closest match with max distance of 3 (reasonable for typos)
if suggestion, found := telemetrytypes.SuggestCorrection(unknownField, validFields); found {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
errorMsg,
unknownField,
).WithAdditional(
suggestion,
)
}
// No good suggestion found
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
errorMsg,
unknownField,
).WithAdditional(
"Valid fields are: " + strings.Join(validFields, ", "),
)
}
}
// Return the original error if it's not an unknown field error
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid JSON: %v", err)
}
// extractUnknownField extracts the field name from an unknown field error message
func extractUnknownField(errMsg string) string {
// The error message format is: json: unknown field "fieldname"
parts := strings.Split(errMsg, `"`)
if len(parts) >= 2 {
return parts[1]
}
return ""
}
// getJSONFieldNames extracts all JSON field names from a struct
func getJSONFieldNames(v any) []string {
var fields []string
t := reflect.TypeOf(v)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() != reflect.Struct {
return fields
}
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
jsonTag := field.Tag.Get("json")
if jsonTag == "" || jsonTag == "-" {
continue
}
// Extract the field name from the JSON tag
fieldName := strings.Split(jsonTag, ",")[0]
if fieldName != "" {
fields = append(fields, fieldName)
}
}
return fields
}
// wrapUnmarshalError wraps UnmarshalJSONWithContext errors with appropriate context
// It preserves errors that already have additional context or unknown field errors
func wrapUnmarshalError(err error, errorFormat string, args ...interface{}) error {
if err == nil {
return nil
}
// If it's already one of our wrapped errors with additional context, return as-is
_, _, _, _, _, additionals := errors.Unwrapb(err)
if len(additionals) > 0 {
return err
}
// Preserve helpful error messages about unknown fields
if strings.Contains(err.Error(), "unknown field") {
return err
}
// Wrap with the provided error format
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
errorFormat,
args...,
)
}
// wrapValidationError rewraps validation errors with context while preserving additional hints
// It extracts the inner message from the error and creates a new error with the provided format
// The innerMsg is automatically appended to the args for formatting
func wrapValidationError(err error, contextIdentifier string, errorFormat string) error {
if err == nil {
return nil
}
// Extract the underlying error details
_, _, innerMsg, _, _, additionals := errors.Unwrapb(err)
// Create a new error with the provided format
newErr := errors.NewInvalidInputf(
errors.CodeInvalidInput,
errorFormat,
contextIdentifier,
innerMsg,
)
// Add any additional context from the inner error
if len(additionals) > 0 {
newErr = newErr.WithAdditional(additionals...)
}
return newErr
}

View File

@ -2,6 +2,7 @@ package querybuildertypesv5
import (
"encoding/json"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@ -17,12 +18,11 @@ type QueryEnvelope struct {
// implement custom json unmarshaler for the QueryEnvelope
func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
var shadow struct {
Name string `json:"name"`
Type QueryType `json:"type"`
Spec json.RawMessage `json:"spec"`
}
if err := json.Unmarshal(data, &shadow); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid query envelope")
if err := UnmarshalJSONWithSuggestions(data, &shadow); err != nil {
return err
}
q.Type = shadow.Type
@ -34,43 +34,53 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
Signal telemetrytypes.Signal `json:"signal"`
}
if err := json.Unmarshal(shadow.Spec, &header); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "cannot detect builder signal")
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"cannot detect builder signal: %v",
err,
)
}
switch header.Signal {
case telemetrytypes.SignalTraces:
var spec QueryBuilderQuery[TraceAggregation]
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid trace builder query spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil {
return wrapUnmarshalError(err, "invalid trace builder query spec: %v", err)
}
q.Spec = spec
case telemetrytypes.SignalLogs:
var spec QueryBuilderQuery[LogAggregation]
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid log builder query spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil {
return wrapUnmarshalError(err, "invalid log builder query spec: %v", err)
}
q.Spec = spec
case telemetrytypes.SignalMetrics:
var spec QueryBuilderQuery[MetricAggregation]
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid metric builder query spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil {
return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err)
}
q.Spec = spec
default:
return errors.WrapInvalidInputf(nil, errors.CodeInvalidInput, "unknown builder signal %q", header.Signal)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown builder signal %q",
header.Signal,
).WithAdditional(
"Valid signals are: traces, logs, metrics",
)
}
case QueryTypeFormula:
var spec QueryBuilderFormula
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid formula spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "formula spec"); err != nil {
return wrapUnmarshalError(err, "invalid formula spec: %v", err)
}
q.Spec = spec
case QueryTypeJoin:
var spec QueryBuilderJoin
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid join spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "join spec"); err != nil {
return wrapUnmarshalError(err, "invalid join spec: %v", err)
}
q.Spec = spec
@ -83,20 +93,26 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
case QueryTypePromQL:
var spec PromQuery
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid PromQL spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "PromQL spec"); err != nil {
return wrapUnmarshalError(err, "invalid PromQL spec: %v", err)
}
q.Spec = spec
case QueryTypeClickHouseSQL:
var spec ClickHouseQuery
if err := json.Unmarshal(shadow.Spec, &spec); err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid ClickHouse SQL spec")
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "ClickHouse SQL spec"); err != nil {
return wrapUnmarshalError(err, "invalid ClickHouse SQL spec: %v", err)
}
q.Spec = spec
default:
return errors.WrapInvalidInputf(nil, errors.CodeInvalidInput, "unknown query type %q", shadow.Type)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query type %q",
shadow.Type,
).WithAdditional(
"Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql",
)
}
return nil
@ -107,6 +123,59 @@ type CompositeQuery struct {
Queries []QueryEnvelope `json:"queries"`
}
// UnmarshalJSON implements custom JSON unmarshaling to provide better error messages
func (c *CompositeQuery) UnmarshalJSON(data []byte) error {
type Alias CompositeQuery
// First do a normal unmarshal without DisallowUnknownFields
var temp Alias
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// Then check for unknown fields at this level only
var check map[string]json.RawMessage
if err := json.Unmarshal(data, &check); err != nil {
return err
}
// Check for unknown fields at this level
validFields := map[string]bool{
"queries": true,
}
for field := range check {
if !validFields[field] {
// Find closest match
var fieldNames []string
for f := range validFields {
fieldNames = append(fieldNames, f)
}
if suggestion, found := telemetrytypes.SuggestCorrection(field, fieldNames); found {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown field %q in composite query",
field,
).WithAdditional(
suggestion,
)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown field %q in composite query",
field,
).WithAdditional(
"Valid fields are: " + strings.Join(fieldNames, ", "),
)
}
}
*c = CompositeQuery(temp)
return nil
}
type QueryRangeRequest struct {
// SchemaVersion is the version of the schema to use for the request payload.
SchemaVersion string `json:"schemaVersion"`
@ -127,6 +196,69 @@ type QueryRangeRequest struct {
FormatOptions *FormatOptions `json:"formatOptions,omitempty"`
}
// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields
func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error {
// Define a type alias to avoid infinite recursion
type Alias QueryRangeRequest
// First do a normal unmarshal without DisallowUnknownFields to let nested structures handle their own validation
var temp Alias
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// Then check for unknown fields at this level only
var check map[string]json.RawMessage
if err := json.Unmarshal(data, &check); err != nil {
return err
}
// Check for unknown fields at the top level
validFields := map[string]bool{
"schemaVersion": true,
"start": true,
"end": true,
"requestType": true,
"compositeQuery": true,
"variables": true,
"noCache": true,
"formatOptions": true,
}
for field := range check {
if !validFields[field] {
// Find closest match
var fieldNames []string
for f := range validFields {
fieldNames = append(fieldNames, f)
}
if suggestion, found := telemetrytypes.SuggestCorrection(field, fieldNames); found {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown field %q",
field,
).WithAdditional(
suggestion,
)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown field %q",
field,
).WithAdditional(
"Valid fields are: " + strings.Join(fieldNames, ", "),
)
}
}
// Copy the decoded values back to the original struct
*r = QueryRangeRequest(temp)
return nil
}
type FormatOptions struct {
FillGaps bool `json:"fillGaps,omitempty"`
FormatTableResultForUI bool `json:"formatTableResultForUI,omitempty"`

View File

@ -0,0 +1,150 @@
package querybuildertypesv5
import (
"encoding/json"
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestQueryRangeRequest_UnmarshalJSON_ErrorMessages(t *testing.T) {
tests := []struct {
name string
jsonData string
wantErrMsg string
wantAdditionalHints []string
}{
{
name: "unknown field 'function' in query spec",
jsonData: `{
"schemaVersion": "v1",
"start": 1749290340000,
"end": 1749293940000,
"requestType": "scalar",
"compositeQuery": {
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"aggregations": [{
"expression": "count()",
"alias": "spans_count"
}],
"function": [{
"name": "absolute",
"args": []
}]
}
}]
}
}`,
wantErrMsg: `unknown field "function" in query spec`,
wantAdditionalHints: []string{
"did you mean: 'functions'?",
},
},
{
name: "unknown field 'filters' in query spec",
jsonData: `{
"schemaVersion": "v1",
"start": 1749290340000,
"end": 1749293940000,
"requestType": "scalar",
"compositeQuery": {
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": "test"
}],
"filters": {
"expression": "test = 1"
}
}
}]
}
}`,
wantErrMsg: `unknown field "filters" in query spec`,
wantAdditionalHints: []string{
"did you mean: 'filter'?",
},
},
{
name: "unknown field at top level",
jsonData: `{
"schemaVersion": "v1",
"start": 1749290340000,
"end": 1749293940000,
"requestType": "scalar",
"compositeQueries": {
"queries": []
}
}`,
wantErrMsg: `unknown field "compositeQueries"`,
wantAdditionalHints: []string{
"did you mean: 'compositeQuery'?",
},
},
{
name: "unknown field with no good suggestion",
jsonData: `{
"schemaVersion": "v1",
"start": 1749290340000,
"end": 1749293940000,
"requestType": "scalar",
"compositeQuery": {
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{
"metricName": "test"
}],
"randomField": "value"
}
}]
}
}`,
wantErrMsg: `unknown field "randomField" in query spec`,
wantAdditionalHints: []string{
"Valid fields are:",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var req QueryRangeRequest
err := json.Unmarshal([]byte(tt.jsonData), &req)
require.Error(t, err)
// Check main error message
assert.Contains(t, err.Error(), tt.wantErrMsg)
// Check if it's an error from our package using Unwrapb
_, _, _, _, _, additionals := errors.Unwrapb(err)
// Check additional hints if we have any
if len(additionals) > 0 {
for _, hint := range tt.wantAdditionalHints {
found := false
for _, additional := range additionals {
if strings.Contains(additional, hint) {
found = true
break
}
}
assert.True(t, found, "Expected to find hint '%s' in additionals: %v", hint, additionals)
}
}
})
}
}

View File

@ -773,7 +773,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
{
"type": "builder_formula",
"spec": {
"name": "rate",
"name": "B",
"expression": "A * 100"
}
}
@ -799,7 +799,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) {
{
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "rate",
Name: "B",
Expression: "A * 100",
},
},

View File

@ -129,8 +129,9 @@ type ColumnDescriptor struct {
}
type ScalarData struct {
Columns []*ColumnDescriptor `json:"columns"`
Data [][]any `json:"data"`
QueryName string `json:"queryName"`
Columns []*ColumnDescriptor `json:"columns"`
Data [][]any `json:"data"`
}
type RawData struct {

View File

@ -0,0 +1,700 @@
package querybuildertypesv5
import (
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// getQueryIdentifier returns a friendly identifier for a query based on its type and name/content
func getQueryIdentifier(envelope QueryEnvelope, index int) string {
switch envelope.Type {
case QueryTypeBuilder, QueryTypeSubQuery:
switch spec := envelope.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if spec.Name != "" {
return fmt.Sprintf("query '%s'", spec.Name)
}
return fmt.Sprintf("trace query at position %d", index+1)
case QueryBuilderQuery[LogAggregation]:
if spec.Name != "" {
return fmt.Sprintf("query '%s'", spec.Name)
}
return fmt.Sprintf("log query at position %d", index+1)
case QueryBuilderQuery[MetricAggregation]:
if spec.Name != "" {
return fmt.Sprintf("query '%s'", spec.Name)
}
return fmt.Sprintf("metric query at position %d", index+1)
}
case QueryTypeFormula:
if spec, ok := envelope.Spec.(QueryBuilderFormula); ok && spec.Name != "" {
return fmt.Sprintf("formula '%s'", spec.Name)
}
return fmt.Sprintf("formula at position %d", index+1)
case QueryTypeJoin:
if spec, ok := envelope.Spec.(QueryBuilderJoin); ok && spec.Name != "" {
return fmt.Sprintf("join '%s'", spec.Name)
}
return fmt.Sprintf("join at position %d", index+1)
case QueryTypePromQL:
if spec, ok := envelope.Spec.(PromQuery); ok && spec.Name != "" {
return fmt.Sprintf("PromQL query '%s'", spec.Name)
}
return fmt.Sprintf("PromQL query at position %d", index+1)
case QueryTypeClickHouseSQL:
if spec, ok := envelope.Spec.(ClickHouseQuery); ok && spec.Name != "" {
return fmt.Sprintf("ClickHouse query '%s'", spec.Name)
}
return fmt.Sprintf("ClickHouse query at position %d", index+1)
}
return fmt.Sprintf("query at position %d", index+1)
}
const (
// Maximum limit for query results
MaxQueryLimit = 10000
)
// ValidateFunctionName checks if the function name is valid
func ValidateFunctionName(name FunctionName) error {
validFunctions := []FunctionName{
FunctionNameCutOffMin,
FunctionNameCutOffMax,
FunctionNameClampMin,
FunctionNameClampMax,
FunctionNameAbsolute,
FunctionNameRunningDiff,
FunctionNameLog2,
FunctionNameLog10,
FunctionNameCumulativeSum,
FunctionNameEWMA3,
FunctionNameEWMA5,
FunctionNameEWMA7,
FunctionNameMedian3,
FunctionNameMedian5,
FunctionNameMedian7,
FunctionNameTimeShift,
FunctionNameAnomaly,
}
if slices.Contains(validFunctions, name) {
return nil
}
// Format valid functions as comma-separated string
var validFunctionNames []string
for _, fn := range validFunctions {
validFunctionNames = append(validFunctionNames, fn.StringValue())
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid function name: %s",
name.StringValue(),
).WithAdditional(fmt.Sprintf("valid functions are: %s", strings.Join(validFunctionNames, ", ")))
}
// Validate performs preliminary validation on QueryBuilderQuery
func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error {
// Validate signal
if err := q.validateSignal(); err != nil {
return err
}
// Validate aggregations only for non-raw request types
if requestType != RequestTypeRaw {
if err := q.validateAggregations(); err != nil {
return err
}
}
// Validate limit and pagination
if err := q.validateLimitAndPagination(); err != nil {
return err
}
// Validate functions
if err := q.validateFunctions(); err != nil {
return err
}
// Validate secondary aggregations
if err := q.validateSecondaryAggregations(); err != nil {
return err
}
// Validate order by
if err := q.validateOrderBy(); err != nil {
return err
}
return nil
}
func (q *QueryBuilderQuery[T]) validateSignal() error {
// Signal validation is handled during unmarshaling in req.go
// Valid signals are: metrics, traces, logs
switch q.Signal {
case telemetrytypes.SignalMetrics,
telemetrytypes.SignalTraces,
telemetrytypes.SignalLogs,
telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility
return nil
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid signal type: %s",
q.Signal,
).WithAdditional(
"Valid signals are: metrics, traces, logs",
)
}
}
func (q *QueryBuilderQuery[T]) validateAggregations() error {
// At least one aggregation required for non-disabled queries
if len(q.Aggregations) == 0 && !q.Disabled {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"at least one aggregation is required",
)
// TODO: add url with docs
}
// Check for duplicate aliases
aliases := make(map[string]bool)
for i, agg := range q.Aggregations {
// Type-specific validation based on T
switch v := any(agg).(type) {
case MetricAggregation:
if v.MetricName == "" {
aggId := fmt.Sprintf("aggregation #%d", i+1)
if q.Name != "" {
aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"metric name is required for %s",
aggId,
)
}
// Validate metric-specific aggregations
if err := validateMetricAggregation(v); err != nil {
aggId := fmt.Sprintf("aggregation #%d", i+1)
if q.Name != "" {
aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name)
}
return wrapValidationError(err, aggId, "invalid metric %s: %s")
}
case TraceAggregation:
if v.Expression == "" {
aggId := fmt.Sprintf("aggregation #%d", i+1)
if q.Name != "" {
aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"expression is required for trace %s",
aggId,
)
}
if v.Alias != "" {
if aliases[v.Alias] {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"duplicate aggregation alias: %s",
v.Alias,
)
}
aliases[v.Alias] = true
}
case LogAggregation:
if v.Expression == "" {
aggId := fmt.Sprintf("aggregation #%d", i+1)
if q.Name != "" {
aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"expression is required for log %s",
aggId,
)
}
if v.Alias != "" {
if aliases[v.Alias] {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"duplicate aggregation alias: %s",
v.Alias,
)
}
aliases[v.Alias] = true
}
}
}
return nil
}
func (q *QueryBuilderQuery[T]) validateLimitAndPagination() error {
// Validate limit
if q.Limit < 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"limit must be non-negative, got %d",
q.Limit,
)
}
if q.Limit > MaxQueryLimit {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"limit exceeds maximum allowed value of %d",
MaxQueryLimit,
).WithAdditional(
fmt.Sprintf("Provided limit: %d", q.Limit),
)
}
// Validate offset
if q.Offset < 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"offset must be non-negative, got %d",
q.Offset,
)
}
return nil
}
func (q *QueryBuilderQuery[T]) validateFunctions() error {
for i, fn := range q.Functions {
if err := ValidateFunctionName(fn.Name); err != nil {
fnId := fmt.Sprintf("function #%d", i+1)
if q.Name != "" {
fnId = fmt.Sprintf("function #%d in query '%s'", i+1, q.Name)
}
return wrapValidationError(err, fnId, "invalid %s: %s")
}
}
return nil
}
func (q *QueryBuilderQuery[T]) validateSecondaryAggregations() error {
for i, secAgg := range q.SecondaryAggregations {
// Secondary aggregation expression can be empty - we allow it per requirements
// Just validate structure
if secAgg.Limit < 0 {
secAggId := fmt.Sprintf("secondary aggregation #%d", i+1)
if q.Name != "" {
secAggId = fmt.Sprintf("secondary aggregation #%d in query '%s'", i+1, q.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"%s: limit must be non-negative",
secAggId,
)
}
}
return nil
}
func (q *QueryBuilderQuery[T]) validateOrderBy() error {
for i, order := range q.Order {
// Direction validation is handled by the OrderDirection type
if order.Direction != OrderDirectionAsc && order.Direction != OrderDirectionDesc {
orderId := fmt.Sprintf("order by clause #%d", i+1)
if q.Name != "" {
orderId = fmt.Sprintf("order by clause #%d in query '%s'", i+1, q.Name)
}
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid direction for %s: %s",
orderId,
order.Direction.StringValue(),
).WithAdditional(
"Valid directions are: asc, desc",
)
}
}
return nil
}
// ValidateQueryRangeRequest validates the entire query range request
func (r *QueryRangeRequest) Validate() error {
// Validate time range
if r.Start >= r.End {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"start time must be before end time",
)
}
// Validate request type
switch r.RequestType {
case RequestTypeRaw, RequestTypeTimeSeries, RequestTypeScalar:
// Valid request types
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid request type: %s",
r.RequestType,
).WithAdditional(
"Valid request types are: raw, timeseries, scalar",
)
}
// Validate composite query
if err := r.validateCompositeQuery(); err != nil {
return err
}
return nil
}
func (r *QueryRangeRequest) validateCompositeQuery() error {
// Validate queries in composite query
if len(r.CompositeQuery.Queries) == 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"at least one query is required",
)
}
// Track query names for uniqueness (only for non-formula queries)
queryNames := make(map[string]bool)
// Validate each query based on its type
for i, envelope := range r.CompositeQuery.Queries {
switch envelope.Type {
case QueryTypeBuilder, QueryTypeSubQuery:
// Validate based on the concrete type
switch spec := envelope.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
if spec.Name != "" {
if queryNames[spec.Name] {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"duplicate query name '%s'",
spec.Name,
)
}
queryNames[spec.Name] = true
}
case QueryBuilderQuery[LogAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
if spec.Name != "" {
if queryNames[spec.Name] {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"duplicate query name '%s'",
spec.Name,
)
}
queryNames[spec.Name] = true
}
case QueryBuilderQuery[MetricAggregation]:
if err := spec.Validate(r.RequestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
// Check name uniqueness for non-formula context
if spec.Name != "" {
if queryNames[spec.Name] {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"duplicate query name '%s'",
spec.Name,
)
}
queryNames[spec.Name] = true
}
default:
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown spec type for %s",
queryId,
)
}
case QueryTypeFormula:
// Formula validation is handled separately
spec, ok := envelope.Spec.(QueryBuilderFormula)
if !ok {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if spec.Expression == "" {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"expression is required for %s",
queryId,
)
}
case QueryTypeJoin:
// Join validation is handled separately
_, ok := envelope.Spec.(QueryBuilderJoin)
if !ok {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
case QueryTypePromQL:
// PromQL validation is handled separately
spec, ok := envelope.Spec.(PromQuery)
if !ok {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if spec.Query == "" {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
queryId,
)
}
case QueryTypeClickHouseSQL:
// ClickHouse SQL validation is handled separately
spec, ok := envelope.Spec.(ClickHouseQuery)
if !ok {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid spec for %s",
queryId,
)
}
if spec.Query == "" {
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"query expression is required for %s",
queryId,
)
}
default:
queryId := getQueryIdentifier(envelope, i)
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query type '%s' for %s",
envelope.Type,
queryId,
).WithAdditional(
"Valid query types are: builder_query, builder_formula, builder_join, promql, clickhouse_sql",
)
}
}
return nil
}
// Validate performs validation on CompositeQuery
func (c *CompositeQuery) Validate(requestType RequestType) error {
if len(c.Queries) == 0 {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"at least one query is required",
)
}
// Validate each query
for i, envelope := range c.Queries {
if err := validateQueryEnvelope(envelope, requestType); err != nil {
queryId := getQueryIdentifier(envelope, i)
return wrapValidationError(err, queryId, "invalid %s: %s")
}
}
return nil
}
func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) error {
switch envelope.Type {
case QueryTypeBuilder, QueryTypeSubQuery:
switch spec := envelope.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
return spec.Validate(requestType)
case QueryBuilderQuery[LogAggregation]:
return spec.Validate(requestType)
case QueryBuilderQuery[MetricAggregation]:
return spec.Validate(requestType)
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query spec type",
)
}
case QueryTypeFormula:
spec, ok := envelope.Spec.(QueryBuilderFormula)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid formula spec",
)
}
if spec.Expression == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"formula expression is required",
)
}
return nil
case QueryTypeJoin:
_, ok := envelope.Spec.(QueryBuilderJoin)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid join spec",
)
}
return nil
case QueryTypePromQL:
spec, ok := envelope.Spec.(PromQuery)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid PromQL spec",
)
}
if spec.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"PromQL query is required",
)
}
return nil
case QueryTypeClickHouseSQL:
spec, ok := envelope.Spec.(ClickHouseQuery)
if !ok {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid ClickHouse SQL spec",
)
}
if spec.Query == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"ClickHouse SQL query is required",
)
}
return nil
default:
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"unknown query type: %s",
envelope.Type,
).WithAdditional(
"Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql",
)
}
}
// validateMetricAggregation validates metric-specific aggregation parameters
func validateMetricAggregation(agg MetricAggregation) error {
// we can't decide anything here without known temporality
if agg.Temporality == metrictypes.Unknown {
return nil
}
// Validate that rate/increase are only used with appropriate temporalities
if agg.TimeAggregation == metrictypes.TimeAggregationRate || agg.TimeAggregation == metrictypes.TimeAggregationIncrease {
// For gauge metrics (Unspecified temporality), rate/increase doesn't make sense
if agg.Temporality == metrictypes.Unspecified {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"rate/increase aggregation cannot be used with gauge metrics (unspecified temporality)",
)
}
}
// Validate percentile aggregations are only used with histogram types
if agg.SpaceAggregation.IsPercentile() {
if agg.Type != metrictypes.HistogramType && agg.Type != metrictypes.ExpHistogramType && agg.Type != metrictypes.SummaryType {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"percentile aggregation can only be used with histogram or summary metric types",
)
}
}
// Validate time aggregation values
validTimeAggregations := []metrictypes.TimeAggregation{
metrictypes.TimeAggregationUnspecified,
metrictypes.TimeAggregationLatest,
metrictypes.TimeAggregationSum,
metrictypes.TimeAggregationAvg,
metrictypes.TimeAggregationMin,
metrictypes.TimeAggregationMax,
metrictypes.TimeAggregationCount,
metrictypes.TimeAggregationCountDistinct,
metrictypes.TimeAggregationRate,
metrictypes.TimeAggregationIncrease,
}
validTimeAgg := slices.Contains(validTimeAggregations, agg.TimeAggregation)
if !validTimeAgg {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid time aggregation: %s",
agg.TimeAggregation.StringValue(),
).WithAdditional(
"Valid time aggregations: latest, sum, avg, min, max, count, count_distinct, rate, increase",
)
}
// Validate space aggregation values
validSpaceAggregations := []metrictypes.SpaceAggregation{
metrictypes.SpaceAggregationUnspecified,
metrictypes.SpaceAggregationSum,
metrictypes.SpaceAggregationAvg,
metrictypes.SpaceAggregationMin,
metrictypes.SpaceAggregationMax,
metrictypes.SpaceAggregationCount,
metrictypes.SpaceAggregationPercentile50,
metrictypes.SpaceAggregationPercentile75,
metrictypes.SpaceAggregationPercentile90,
metrictypes.SpaceAggregationPercentile95,
metrictypes.SpaceAggregationPercentile99,
}
validSpaceAgg := slices.Contains(validSpaceAggregations, agg.SpaceAggregation)
if !validSpaceAgg {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"invalid space aggregation: %s",
agg.SpaceAggregation.StringValue(),
).WithAdditional(
"Valid space aggregations: sum, avg, min, max, count, p50, p75, p90, p95, p99",
)
}
return nil
}

View File

@ -104,7 +104,7 @@ func SuggestCorrection(input string, knownFieldKeys []string) (string, bool) {
}
if bestSimilarity >= typoSuggestionThreshold {
return fmt.Sprintf("did you mean: %s?", bestMatch), true
return fmt.Sprintf("did you mean: '%s'?", bestMatch), true
}
return "", false

View File

@ -2,6 +2,8 @@ package telemetrytypes
import (
"context"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
// MetadataStore is the interface for the telemetry metadata store.
@ -22,4 +24,10 @@ type MetadataStore interface {
// GetAllValues returns a list of all values.
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, error)
// FetchTemporality fetches the temporality for metric
FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error)
// FetchTemporalityMulti fetches the temporality for multiple metrics
FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"strings"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@ -13,6 +14,7 @@ type MockMetadataStore struct {
KeysMap map[string][]*telemetrytypes.TelemetryFieldKey
RelatedValuesMap map[string][]string
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
TemporalityMap map[string]metrictypes.Temporality
}
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
@ -21,6 +23,7 @@ func NewMockMetadataStore() *MockMetadataStore {
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
RelatedValuesMap: make(map[string][]string),
AllValuesMap: make(map[string]*telemetrytypes.TelemetryFieldValues),
TemporalityMap: make(map[string]metrictypes.Temporality),
}
}
@ -249,3 +252,31 @@ func (m *MockMetadataStore) SetRelatedValues(lookupKey string, values []string)
func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytypes.TelemetryFieldValues) {
m.AllValuesMap[lookupKey] = values
}
// FetchTemporality fetches the temporality for a metric
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
if temporality, exists := m.TemporalityMap[metricName]; exists {
return temporality, nil
}
return metrictypes.Unknown, nil
}
// FetchTemporalityMulti fetches the temporality for multiple metrics
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {
if temporality, exists := m.TemporalityMap[metricName]; exists {
result[metricName] = temporality
} else {
result[metricName] = metrictypes.Unknown
}
}
return result, nil
}
// SetTemporality sets the temporality for a metric in the mock store
func (m *MockMetadataStore) SetTemporality(metricName string, temporality metrictypes.Temporality) {
m.TemporalityMap[metricName] = temporality
}