From 1542b9d6e9906669dc9d1dc378ffd4111e2d7d42 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 16 Jun 2025 23:11:28 +0530 Subject: [PATCH] chore: disallow unknown fields and address gaps (#8237) --- pkg/querier/api.go | 6 + pkg/querier/bucket_cache.go | 50 +- pkg/querier/bucket_cache_bench_test.go | 8 +- pkg/querier/bucket_cache_step_test.go | 117 +++ pkg/querier/bucket_cache_test.go | 93 +-- pkg/querier/builder_query.go | 34 +- pkg/querier/builder_query_test.go | 131 ++++ pkg/querier/consume.go | 12 +- pkg/querier/interfaces.go | 4 +- pkg/querier/postprocess.go | 652 ++++++++++++++++ pkg/querier/querier.go | 238 +++++- pkg/querier/shift_test.go | 229 ++++++ pkg/querybuilder/fallback_expr.go | 2 +- .../resourcefilter/condition_builder.go | 2 +- pkg/querybuilder/where_clause_visitor.go | 89 +-- pkg/telemetrylogs/field_mapper.go | 4 +- pkg/telemetrylogs/filter_expr_logs_test.go | 12 +- pkg/telemetrylogs/statement_builder.go | 47 +- pkg/telemetrylogs/stmt_builder_test.go | 39 + pkg/telemetrymetadata/field_mapper.go | 4 +- pkg/telemetrymetadata/metadata.go | 90 +++ pkg/telemetrymetrics/statement_builder.go | 2 +- pkg/telemetrymetrics/stmt_builder_test.go | 4 +- pkg/telemetrytraces/field_mapper.go | 4 +- pkg/telemetrytraces/statement_builder.go | 8 +- pkg/types/metrictypes/metrictypes.go | 3 +- .../querybuildertypesv5/builder_elements.go | 4 +- .../querybuildertypesv5/builder_query.go | 21 + .../querybuildertypesv5/formula.go | 20 + .../querybuildertypesv5/functions.go | 17 +- .../querybuildertypesv5/json_decoder.go | 163 ++++ .../querybuildertypesv5/req.go | 172 ++++- .../querybuildertypesv5/req_error_test.go | 150 ++++ .../querybuildertypesv5/req_test.go | 4 +- .../querybuildertypesv5/resp.go | 5 +- .../querybuildertypesv5/validation.go | 700 ++++++++++++++++++ pkg/types/telemetrytypes/maybe_typo.go | 2 +- pkg/types/telemetrytypes/store.go | 8 + .../telemetrytypestest/metadata_store.go | 31 + 39 files changed, 2953 insertions(+), 228 deletions(-) create mode 100644 pkg/querier/bucket_cache_step_test.go create mode 100644 pkg/querier/builder_query_test.go create mode 100644 pkg/querier/postprocess.go create mode 100644 pkg/querier/shift_test.go create mode 100644 pkg/types/querybuildertypes/querybuildertypesv5/json_decoder.go create mode 100644 pkg/types/querybuildertypes/querybuildertypesv5/req_error_test.go create mode 100644 pkg/types/querybuildertypes/querybuildertypesv5/validation.go diff --git a/pkg/querier/api.go b/pkg/querier/api.go index fe922a7b176a..c8777ff6dd70 100644 --- a/pkg/querier/api.go +++ b/pkg/querier/api.go @@ -33,6 +33,12 @@ func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) { return } + // Validate the query request + if err := queryRangeRequest.Validate(); err != nil { + render.Error(rw, err) + return + } + orgID, err := valuer.NewUUID(claims.OrgID) if err != nil { render.Error(rw, err) diff --git a/pkg/querier/bucket_cache.go b/pkg/querier/bucket_cache.go index 842a7280d656..698d3c236cd1 100644 --- a/pkg/querier/bucket_cache.go +++ b/pkg/querier/bucket_cache.go @@ -117,7 +117,7 @@ func (bc *bucketCache) GetMissRanges( } // Put stores fresh query results in the cache -func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) { +func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) { // Get query window startMs, endMs := q.Window() @@ -159,8 +159,36 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que return } - // Convert trimmed result to buckets - freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs) + // Adjust start and end times to only cache complete intervals + cachableStartMs := startMs + stepMs := uint64(step.Duration.Milliseconds()) + + // If we have a step interval, adjust boundaries to only cache complete intervals + if stepMs > 0 { + // If start is not aligned, round up to next step boundary (first complete interval) + if startMs%stepMs != 0 { + cachableStartMs = ((startMs / stepMs) + 1) * stepMs + } + + // If end is not aligned, round down to previous step boundary (last complete interval) + if cachableEndMs%stepMs != 0 { + cachableEndMs = (cachableEndMs / stepMs) * stepMs + } + + // If after adjustment we have no complete intervals, don't cache + if cachableStartMs >= cachableEndMs { + bc.logger.DebugContext(ctx, "no complete intervals to cache", + "original_start", startMs, + "original_end", endMs, + "adjusted_start", cachableStartMs, + "adjusted_end", cachableEndMs, + "step", stepMs) + return + } + } + + // Convert trimmed result to buckets with adjusted boundaries + freshBuckets := bc.resultToBuckets(ctx, trimmedResult, cachableStartMs, cachableEndMs) // If no fresh buckets and no existing data, don't cache if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 { @@ -485,6 +513,12 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac } if existingSeries, ok := seriesMap[key]; ok { + // Merge values, avoiding duplicate timestamps + timestampMap := make(map[int64]bool) + for _, v := range existingSeries.Values { + timestampMap[v.Timestamp] = true + } + // Pre-allocate capacity for merged values newCap := len(existingSeries.Values) + len(series.Values) if cap(existingSeries.Values) < newCap { @@ -492,7 +526,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac copy(newValues, existingSeries.Values) existingSeries.Values = newValues } - existingSeries.Values = append(existingSeries.Values, series.Values...) + + // Only add values with new timestamps + for _, v := range series.Values { + if !timestampMap[v.Timestamp] { + existingSeries.Values = append(existingSeries.Values, v) + } + } } else { // New series seriesMap[key] = series @@ -697,7 +737,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun switch result.Type { case qbtypes.RequestTypeTimeSeries: // Trim time series data - if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok { + if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok && tsData != nil { trimmedData := &qbtypes.TimeSeriesData{ QueryName: tsData.QueryName, } diff --git a/pkg/querier/bucket_cache_bench_test.go b/pkg/querier/bucket_cache_bench_test.go index 9af91f13f7de..8be802d84170 100644 --- a/pkg/querier/bucket_cache_bench_test.go +++ b/pkg/querier/bucket_cache_bench_test.go @@ -30,7 +30,7 @@ func BenchmarkBucketCache_GetMissRanges(b *testing.B) { endMs: uint64((i + 1) * 10000), } result := createBenchmarkResult(query.startMs, query.endMs, 1000) - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) } // Create test queries with varying cache hit patterns @@ -121,7 +121,7 @@ func BenchmarkBucketCache_Put(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < tc.numQueries; j++ { - bc.Put(ctx, orgID, queries[j], results[j]) + bc.Put(ctx, orgID, queries[j], qbtypes.Step{Duration: 1000 * time.Millisecond}, results[j]) } } }) @@ -259,7 +259,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) { endMs: uint64((i + 1) * 10000), } result := createBenchmarkResult(query.startMs, query.endMs, 1000) - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) } b.ResetTimer() @@ -284,7 +284,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) { endMs: uint64((i + 1) * 10000), } result := createBenchmarkResult(query.startMs, query.endMs, 1000) - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) case 2: // Partial read query := &mockQuery{ fingerprint: fmt.Sprintf("concurrent-query-%d", i%100), diff --git a/pkg/querier/bucket_cache_step_test.go b/pkg/querier/bucket_cache_step_test.go new file mode 100644 index 000000000000..80a1f6d96817 --- /dev/null +++ b/pkg/querier/bucket_cache_step_test.go @@ -0,0 +1,117 @@ +package querier + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBucketCacheStepAlignment(t *testing.T) { + ctx := context.Background() + orgID := valuer.UUID{} + cache := createTestCache(t) + bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute) + + // Test with 5-minute step + step := qbtypes.Step{Duration: 5 * time.Minute} + + // Query from 12:02 to 12:58 (both unaligned) + // Complete intervals: 12:05 to 12:55 + query := &mockQuery{ + fingerprint: "test-step-alignment", + startMs: 1672563720000, // 12:02 + endMs: 1672567080000, // 12:58 + } + + result := &qbtypes.Result{ + Type: qbtypes.RequestTypeTimeSeries, + Value: &qbtypes.TimeSeriesData{ + QueryName: "test", + Aggregations: []*qbtypes.AggregationBucket{ + { + Index: 0, + Series: []*qbtypes.TimeSeries{ + { + Labels: []*qbtypes.Label{ + {Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"}, + }, + Values: []*qbtypes.TimeSeriesValue{ + {Timestamp: 1672563720000, Value: 1, Partial: true}, // 12:02 + {Timestamp: 1672563900000, Value: 2}, // 12:05 + {Timestamp: 1672564200000, Value: 2.5}, // 12:10 + {Timestamp: 1672564500000, Value: 2.6}, // 12:15 + {Timestamp: 1672566600000, Value: 2.9}, // 12:50 + {Timestamp: 1672566900000, Value: 3}, // 12:55 + {Timestamp: 1672567080000, Value: 4, Partial: true}, // 12:58 + }, + }, + }, + }, + }, + }, + } + + // Put result in cache + bc.Put(ctx, orgID, query, step, result) + + // Get cached data + cached, missing := bc.GetMissRanges(ctx, orgID, query, step) + + // Should have cached data + require.NotNil(t, cached) + + // Log the missing ranges to debug + t.Logf("Missing ranges: %v", missing) + for i, r := range missing { + t.Logf("Missing range %d: From=%d, To=%d", i, r.From, r.To) + } + + // Should have 2 missing ranges for partial intervals + require.Len(t, missing, 2) + + // First partial: 12:02 to 12:05 + assert.Equal(t, uint64(1672563720000), missing[0].From) + assert.Equal(t, uint64(1672563900000), missing[0].To) + + // Second partial: 12:55 to 12:58 + assert.Equal(t, uint64(1672566900000), missing[1].From, "Second missing range From") + assert.Equal(t, uint64(1672567080000), missing[1].To, "Second missing range To") +} + +func TestBucketCacheNoStepInterval(t *testing.T) { + ctx := context.Background() + orgID := valuer.UUID{} + cache := createTestCache(t) + bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute) + + // Test with no step (stepMs = 0) + step := qbtypes.Step{Duration: 0} + + query := &mockQuery{ + fingerprint: "test-no-step", + startMs: 1672563720000, + endMs: 1672567080000, + } + + result := &qbtypes.Result{ + Type: qbtypes.RequestTypeTimeSeries, + Value: &qbtypes.TimeSeriesData{ + QueryName: "test", + Aggregations: []*qbtypes.AggregationBucket{{Index: 0, Series: []*qbtypes.TimeSeries{}}}, + }, + } + + // Should cache the entire range when step is 0 + bc.Put(ctx, orgID, query, step, result) + + cached, missing := bc.GetMissRanges(ctx, orgID, query, step) + assert.NotNil(t, cached) + assert.Len(t, missing, 0) +} diff --git a/pkg/querier/bucket_cache_test.go b/pkg/querier/bucket_cache_test.go index 98a3aed00cb4..f141be1ae42f 100644 --- a/pkg/querier/bucket_cache_test.go +++ b/pkg/querier/bucket_cache_test.go @@ -128,7 +128,7 @@ func TestBucketCache_GetMissRanges_EmptyCache(t *testing.T) { endMs: 5000, } - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) assert.Nil(t, cached) assert.Len(t, missing, 1) @@ -159,13 +159,13 @@ func TestBucketCache_Put_And_Get(t *testing.T) { } // Store in cache - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Wait a bit for cache to be written time.Sleep(10 * time.Millisecond) // Retrieve from cache - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) assert.NotNil(t, cached.Value) assert.Len(t, missing, 0) @@ -193,7 +193,7 @@ func TestBucketCache_PartialHit(t *testing.T) { Type: qbtypes.RequestTypeTimeSeries, Value: createTestTimeSeries("A", 1000, 3000, 1000), } - bc.Put(context.Background(), valuer.UUID{}, query1, result1) + bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1) // Wait for cache write time.Sleep(10 * time.Millisecond) @@ -205,7 +205,7 @@ func TestBucketCache_PartialHit(t *testing.T) { endMs: 5000, } - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have cached data assert.NotNil(t, cached.Value) @@ -226,7 +226,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) { startMs: 1000, endMs: 2000, } - bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{ + bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{ Type: qbtypes.RequestTypeTimeSeries, Value: createTestTimeSeries("A", 1000, 2000, 100), }) @@ -236,7 +236,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) { startMs: 3000, endMs: 4000, } - bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{ + bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{ Type: qbtypes.RequestTypeTimeSeries, Value: createTestTimeSeries("A", 3000, 4000, 100), }) @@ -251,7 +251,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) { endMs: 4500, } - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have cached data assert.NotNil(t, cached.Value) @@ -284,13 +284,13 @@ func TestBucketCache_FluxInterval(t *testing.T) { } // This should not be cached due to flux interval - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Wait a bit time.Sleep(10 * time.Millisecond) // Try to get the data - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have no cached data assert.Nil(t, cached) @@ -354,7 +354,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) { startMs: 1000, endMs: 3000, } - bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{ + bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{ Type: qbtypes.RequestTypeTimeSeries, Value: &qbtypes.TimeSeriesData{ QueryName: "A", @@ -370,7 +370,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) { startMs: 3000, endMs: 5000, } - bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{ + bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{ Type: qbtypes.RequestTypeTimeSeries, Value: &qbtypes.TimeSeriesData{ QueryName: "A", @@ -390,7 +390,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) { endMs: 5000, } - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have no missing ranges assert.Len(t, missing, 0) @@ -445,10 +445,10 @@ func TestBucketCache_RawData(t *testing.T) { Value: rawData, } - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Raw data should not be cached assert.Nil(t, cached) @@ -485,10 +485,10 @@ func TestBucketCache_ScalarData(t *testing.T) { Value: scalarData, } - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Scalar data should not be cached assert.Nil(t, cached) @@ -513,11 +513,11 @@ func TestBucketCache_EmptyFingerprint(t *testing.T) { Value: createTestTimeSeries("A", 1000, 5000, 1000), } - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) // Should still be able to retrieve - cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) assert.NotNil(t, cached.Value) assert.Len(t, missing, 0) } @@ -568,7 +568,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) { Type: qbtypes.RequestTypeTimeSeries, Value: createTestTimeSeries(fmt.Sprintf("Q%d", id), query.startMs, query.endMs, 100), } - bc.Put(context.Background(), valuer.UUID{}, query, result) + bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 100 * time.Microsecond}, result) done <- true }(i) } @@ -581,7 +581,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) { startMs: uint64(id * 1000), endMs: uint64((id + 1) * 1000), } - bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000}) + bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) done <- true }(i) } @@ -628,10 +628,10 @@ func TestBucketCache_GetMissRanges_FluxInterval(t *testing.T) { }, } - bc.Put(ctx, orgID, query, cachedResult) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, cachedResult) // Get miss ranges - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) assert.NotNil(t, cached) t.Logf("Missing ranges: %+v, query range: %d-%d", missing, query.startMs, query.endMs) @@ -690,10 +690,10 @@ func TestBucketCache_Put_FluxIntervalTrimming(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Retrieve cached data - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have cached data assert.NotNil(t, cached) @@ -760,10 +760,10 @@ func TestBucketCache_Put_EntireRangeInFluxInterval(t *testing.T) { } // Put the result - should not cache anything - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Try to get cached data - should have no cached data - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have no cached value assert.Nil(t, cached) @@ -785,18 +785,6 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) { shouldCache bool description string }{ - { - name: "truly_empty_time_series", - result: &qbtypes.Result{ - Type: qbtypes.RequestTypeTimeSeries, - Value: &qbtypes.TimeSeriesData{ - QueryName: "A", - Aggregations: []*qbtypes.AggregationBucket{}, - }, - }, - shouldCache: false, - description: "No aggregations means truly empty - should not cache", - }, { name: "filtered_empty_time_series", result: &qbtypes.Result{ @@ -878,17 +866,16 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, tt.result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, tt.result) // Wait a bit for cache to be written time.Sleep(10 * time.Millisecond) // Try to get cached data - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) if tt.shouldCache { assert.NotNil(t, cached, tt.description) - assert.Len(t, missing, 0, "Should have no missing ranges when data is cached") } else { assert.Nil(t, cached, tt.description) assert.Len(t, missing, 1, "Should have entire range as missing when data is not cached") @@ -944,13 +931,13 @@ func TestBucketCache_PartialValues(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Wait for cache to be written time.Sleep(10 * time.Millisecond) // Get cached data - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have cached data assert.NotNil(t, cached) @@ -1014,13 +1001,13 @@ func TestBucketCache_AllPartialValues(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) // Wait for cache to be written time.Sleep(10 * time.Millisecond) // Get cached data - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) // When all values are partial and filtered out, the result is cached as empty // This prevents re-querying for the same misaligned time range @@ -1075,7 +1062,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) { } // Cache the wide range - bc.Put(ctx, orgID, query1, result1) + bc.Put(ctx, orgID, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1) time.Sleep(10 * time.Millisecond) // Now query for a smaller range (2000-3500ms) @@ -1086,7 +1073,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) { } // Get cached data - should be filtered to requested range - cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000 * time.Millisecond}) // Should have no missing ranges assert.Len(t, missing, 0) @@ -1246,7 +1233,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) // Get cached data @@ -1300,7 +1287,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) // Get cached data @@ -1352,7 +1339,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) { } // Put the result - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) // Get cached data @@ -1409,11 +1396,11 @@ func TestBucketCache_NoCache(t *testing.T) { } // Put the result in cache - bc.Put(ctx, orgID, query, result) + bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result) time.Sleep(10 * time.Millisecond) // Verify data is cached - cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000}) + cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}) assert.NotNil(t, cached) assert.Len(t, missing, 0) diff --git a/pkg/querier/builder_query.go b/pkg/querier/builder_query.go index c93b9a0f5ca0..fb91fe6af720 100644 --- a/pkg/querier/builder_query.go +++ b/pkg/querier/builder_query.go @@ -118,6 +118,10 @@ func (q *builderQuery[T]) Fingerprint() string { parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression)) } + if q.spec.ShiftBy != 0 { + parts = append(parts, fmt.Sprintf("shiftby=%d", q.spec.ShiftBy)) + } + return strings.Join(parts, "&") } @@ -204,7 +208,14 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, // Pass query window and step for partial value detection queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS} - payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name) + + kind := q.kind + // all metric queries are time series then reduced if required + if q.spec.Signal == telemetrytypes.SignalMetrics { + kind = qbtypes.RequestTypeTimeSeries + } + + payload, err := consume(rows, kind, queryWindow, q.spec.StepInterval, q.spec.Name) if err != nil { return nil, err } @@ -224,16 +235,18 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul isAsc := len(q.spec.Order) > 0 && strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc" + fromMS, toMS := q.fromMS, q.toMS + // Adjust [fromMS,toMS] window if a cursor was supplied if cur := strings.TrimSpace(q.spec.Cursor); cur != "" { if ts, err := decodeCursor(cur); err == nil { if isAsc { - if uint64(ts) >= q.fromMS { - q.fromMS = uint64(ts + 1) + if uint64(ts) >= fromMS { + fromMS = uint64(ts + 1) } } else { // DESC - if uint64(ts) <= q.toMS { - q.toMS = uint64(ts - 1) + if uint64(ts) <= toMS { + toMS = uint64(ts - 1) } } } @@ -252,7 +265,16 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul totalBytes := uint64(0) start := time.Now() - for _, r := range makeBuckets(q.fromMS, q.toMS) { + // Get buckets and reverse them for ascending order + buckets := makeBuckets(fromMS, toMS) + if isAsc { + // Reverse the buckets for ascending order + for i, j := 0, len(buckets)-1; i < j; i, j = i+1, j-1 { + buckets[i], buckets[j] = buckets[j], buckets[i] + } + } + + for _, r := range buckets { q.spec.Offset = 0 q.spec.Limit = need diff --git a/pkg/querier/builder_query_test.go b/pkg/querier/builder_query_test.go new file mode 100644 index 000000000000..e2ac4da5f1be --- /dev/null +++ b/pkg/querier/builder_query_test.go @@ -0,0 +1,131 @@ +package querier + +import ( + "strings" + "testing" + + "github.com/SigNoz/signoz/pkg/querybuilder" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/stretchr/testify/assert" +) + +func TestBuilderQueryFingerprint(t *testing.T) { + tests := []struct { + name string + query *builderQuery[qbtypes.MetricAggregation] + expectInKey []string + notExpectInKey []string + }{ + { + name: "fingerprint includes shiftby when ShiftBy field is set", + query: &builderQuery[qbtypes.MetricAggregation]{ + kind: qbtypes.RequestTypeTimeSeries, + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + ShiftBy: 3600, + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "3600"}, + }, + }, + }, + }, + }, + expectInKey: []string{"shiftby=3600"}, + notExpectInKey: []string{"functions=", "timeshift", "absolute"}, + }, + { + name: "fingerprint includes shiftby but not other functions", + query: &builderQuery[qbtypes.MetricAggregation]{ + kind: qbtypes.RequestTypeTimeSeries, + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + ShiftBy: 3600, + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "3600"}, + }, + }, + { + Name: qbtypes.FunctionNameAbsolute, + }, + }, + }, + }, + expectInKey: []string{"shiftby=3600"}, + notExpectInKey: []string{"functions=", "absolute"}, + }, + { + name: "no shiftby in fingerprint when ShiftBy is zero", + query: &builderQuery[qbtypes.MetricAggregation]{ + kind: qbtypes.RequestTypeTimeSeries, + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + ShiftBy: 0, + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameAbsolute, + }, + }, + }, + }, + expectInKey: []string{}, + notExpectInKey: []string{"shiftby=", "functions=", "absolute"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fingerprint := tt.query.Fingerprint() + for _, expected := range tt.expectInKey { + assert.True(t, strings.Contains(fingerprint, expected), + "Expected fingerprint to contain '%s', got: %s", expected, fingerprint) + } + for _, notExpected := range tt.notExpectInKey { + assert.False(t, strings.Contains(fingerprint, notExpected), + "Expected fingerprint NOT to contain '%s', got: %s", notExpected, fingerprint) + } + }) + } +} + +func TestMakeBucketsOrder(t *testing.T) { + // Test that makeBuckets returns buckets in reverse chronological order by default + // Using milliseconds as input - need > 1 hour range to get multiple buckets + now := uint64(1700000000000) // Some timestamp in ms + startMS := now + endMS := now + uint64(10*60*60*1000) // 10 hours later + + buckets := makeBuckets(startMS, endMS) + + // Should have multiple buckets for a 10 hour range + assert.True(t, len(buckets) > 1, "Should have multiple buckets for 10 hour range, got %d", len(buckets)) + + // Log buckets for debugging + t.Logf("Generated %d buckets:", len(buckets)) + for i, b := range buckets { + durationMs := (b.toNS - b.fromNS) / 1e6 + t.Logf("Bucket %d: duration=%dms", i, durationMs) + } + + // Verify buckets are in reverse chronological order (newest to oldest) + for i := 0; i < len(buckets)-1; i++ { + assert.True(t, buckets[i].toNS > buckets[i+1].toNS, + "Bucket %d end should be after bucket %d end", i, i+1) + assert.Equal(t, buckets[i].fromNS, buckets[i+1].toNS, + "Bucket %d start should equal bucket %d end (continuous buckets)", i, i+1) + } + + // First bucket should end at endNS (converted to nanoseconds) + expectedEndNS := querybuilder.ToNanoSecs(endMS) + assert.Equal(t, expectedEndNS, buckets[0].toNS) + + // Last bucket should start at startNS (converted to nanoseconds) + expectedStartNS := querybuilder.ToNanoSecs(startMS) + assert.Equal(t, expectedStartNS, buckets[len(buckets)-1].fromNS) +} diff --git a/pkg/querier/consume.go b/pkg/querier/consume.go index b6cc69709720..06d8f4d0e3da 100644 --- a/pkg/querier/consume.go +++ b/pkg/querier/consume.go @@ -176,7 +176,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt lblVals = append(lblVals, *val) lblObjs = append(lblObjs, &qbtypes.Label{ Key: telemetrytypes.TelemetryFieldKey{Name: name}, - Value: val, + Value: *val, }) default: @@ -227,8 +227,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt } } if maxAgg < 0 { - //nolint:nilnil - return nil, nil // empty result-set + return &qbtypes.TimeSeriesData{ + QueryName: queryName, + }, nil } buckets := make([]*qbtypes.AggregationBucket, maxAgg+1) @@ -319,8 +320,9 @@ func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, erro } return &qbtypes.ScalarData{ - Columns: cd, - Data: data, + QueryName: queryName, + Columns: cd, + Data: data, }, nil } diff --git a/pkg/querier/interfaces.go b/pkg/querier/interfaces.go index e61384e7f243..4e261f959465 100644 --- a/pkg/querier/interfaces.go +++ b/pkg/querier/interfaces.go @@ -17,5 +17,5 @@ type BucketCache interface { // cached portion + list of gaps to fetch GetMissRanges(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step) (cached *qbtypes.Result, missing []*qbtypes.TimeRange) // store fresh buckets for future hits - Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) -} \ No newline at end of file + Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) +} diff --git a/pkg/querier/postprocess.go b/pkg/querier/postprocess.go new file mode 100644 index 000000000000..501690d34f25 --- /dev/null +++ b/pkg/querier/postprocess.go @@ -0,0 +1,652 @@ +package querier + +import ( + "context" + "fmt" + "sort" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +// queryInfo holds common query properties +type queryInfo struct { + Name string + Disabled bool + Step qbtypes.Step +} + +// getqueryInfo extracts common info from any query type +func getqueryInfo(spec any) queryInfo { + switch s := spec.(type) { + case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: + return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} + case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]: + return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} + case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: + return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} + case qbtypes.QueryBuilderFormula: + return queryInfo{Name: s.Name, Disabled: false} + case qbtypes.PromQuery: + return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.Step} + case qbtypes.ClickHouseQuery: + return queryInfo{Name: s.Name, Disabled: s.Disabled} + } + return queryInfo{} +} + +// getQueryName is a convenience function when only name is needed +func getQueryName(spec any) string { + return getqueryInfo(spec).Name +} + +func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) { + // Convert results to typed format for processing + typedResults := make(map[string]*qbtypes.Result) + for name, result := range results { + typedResults[name] = &qbtypes.Result{ + Value: result, + } + } + + for _, query := range req.CompositeQuery.Queries { + switch spec := query.Spec.(type) { + case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: + if result, ok := typedResults[spec.Name]; ok { + result = postProcessBuilderQuery(q, result, spec, req) + typedResults[spec.Name] = result + } + case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]: + if result, ok := typedResults[spec.Name]; ok { + result = postProcessBuilderQuery(q, result, spec, req) + typedResults[spec.Name] = result + } + case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: + if result, ok := typedResults[spec.Name]; ok { + result = postProcessMetricQuery(q, result, spec, req) + typedResults[spec.Name] = result + } + } + } + + // Apply formula calculations + typedResults = q.applyFormulas(ctx, typedResults, req) + + // Filter out disabled queries + typedResults = q.filterDisabledQueries(typedResults, req) + + // Apply table formatting for UI if requested + if req.FormatOptions != nil && req.FormatOptions.FormatTableResultForUI && req.RequestType == qbtypes.RequestTypeScalar { + // Format results as a table - this merges all queries into a single table + tableResult := q.formatScalarResultsAsTable(typedResults, req) + + // Return the table under the first query's name so it gets included in results + if len(req.CompositeQuery.Queries) > 0 { + firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec) + if firstQueryName != "" && tableResult["table"] != nil { + // Return table under first query name + return map[string]any{firstQueryName: tableResult["table"]}, nil + } + } + + return tableResult, nil + } + + // Convert back to map[string]any + finalResults := make(map[string]any) + for name, result := range typedResults { + finalResults[name] = result.Value + } + + return finalResults, nil +} + +// postProcessBuilderQuery applies postprocessing to a single builder query result +func postProcessBuilderQuery[T any]( + q *querier, + result *qbtypes.Result, + query qbtypes.QueryBuilderQuery[T], + _ *qbtypes.QueryRangeRequest, +) *qbtypes.Result { + + // Apply functions + if len(query.Functions) > 0 { + result = q.applyFunctions(result, query.Functions) + } + + return result +} + +// postProcessMetricQuery applies postprocessing to a metric query result +func postProcessMetricQuery( + q *querier, + result *qbtypes.Result, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + req *qbtypes.QueryRangeRequest, +) *qbtypes.Result { + + if query.Limit > 0 { + result = q.applySeriesLimit(result, query.Limit, query.Order) + } + + if len(query.Functions) > 0 { + result = q.applyFunctions(result, query.Functions) + } + + // Apply reduce to for scalar request type + if req.RequestType == qbtypes.RequestTypeScalar { + if len(query.Aggregations) > 0 && query.Aggregations[0].ReduceTo != qbtypes.ReduceToUnknown { + result = q.applyMetricReduceTo(result, query.Aggregations[0].ReduceTo) + } + } + + return result +} + +// applyMetricReduceTo applies reduce to operation using the metric's ReduceTo field +func (q *querier) applyMetricReduceTo(result *qbtypes.Result, reduceOp qbtypes.ReduceTo) *qbtypes.Result { + tsData, ok := result.Value.(*qbtypes.TimeSeriesData) + if !ok { + return result + } + + if tsData != nil { + for _, agg := range tsData.Aggregations { + for i, series := range agg.Series { + // Use the FunctionReduceTo helper + reducedSeries := qbtypes.FunctionReduceTo(series, reduceOp) + agg.Series[i] = reducedSeries + } + } + } + + scalarData := convertTimeSeriesDataToScalar(tsData, tsData.QueryName) + result.Value = scalarData + + return result +} + +// applySeriesLimit limits the number of series in the result +func (q *querier) applySeriesLimit(result *qbtypes.Result, limit int, orderBy []qbtypes.OrderBy) *qbtypes.Result { + tsData, ok := result.Value.(*qbtypes.TimeSeriesData) + if !ok { + return result + } + + if tsData != nil { + for _, agg := range tsData.Aggregations { + // Use the ApplySeriesLimit function from querybuildertypes + agg.Series = qbtypes.ApplySeriesLimit(agg.Series, orderBy, limit) + } + } + + return result +} + +// applyFunctions applies functions to time series data +func (q *querier) applyFunctions(result *qbtypes.Result, functions []qbtypes.Function) *qbtypes.Result { + tsData, ok := result.Value.(*qbtypes.TimeSeriesData) + if !ok { + return result + } + + if tsData != nil { + for _, agg := range tsData.Aggregations { + for i, series := range agg.Series { + agg.Series[i] = qbtypes.ApplyFunctions(functions, series) + } + } + } + + return result +} + +// applyFormulas processes formula queries in the composite query +func (q *querier) applyFormulas(ctx context.Context, results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result { + // Collect formula queries + formulaQueries := make(map[string]qbtypes.QueryBuilderFormula) + + for _, query := range req.CompositeQuery.Queries { + if query.Type == qbtypes.QueryTypeFormula { + if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok { + formulaQueries[formula.Name] = formula + } + } + } + + // Process each formula + for name, formula := range formulaQueries { + // Check if we're dealing with time series or scalar data + if req.RequestType == qbtypes.RequestTypeTimeSeries { + result := q.processTimeSeriesFormula(ctx, results, formula, req) + if result != nil { + results[name] = result + } + } + } + + return results +} + +// processTimeSeriesFormula handles formula evaluation for time series data +func (q *querier) processTimeSeriesFormula( + ctx context.Context, + results map[string]*qbtypes.Result, + formula qbtypes.QueryBuilderFormula, + _ *qbtypes.QueryRangeRequest, +) *qbtypes.Result { + // Prepare time series data for formula evaluation + timeSeriesData := make(map[string]*qbtypes.TimeSeriesData) + + // Extract time series data from results + for queryName, result := range results { + if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok { + timeSeriesData[queryName] = tsData + } + } + + // Create formula evaluator + // TODO(srikanthccv): add conditional default zero + canDefaultZero := make(map[string]bool) + evaluator, err := qbtypes.NewFormulaEvaluator(formula.Expression, canDefaultZero) + if err != nil { + q.logger.ErrorContext(ctx, "failed to create formula evaluator", "error", err, "formula", formula.Name) + return nil + } + + // Evaluate the formula + formulaSeries, err := evaluator.EvaluateFormula(timeSeriesData) + if err != nil { + q.logger.ErrorContext(ctx, "failed to evaluate formula", "error", err, "formula", formula.Name) + return nil + } + + // Create result for formula + formulaResult := &qbtypes.TimeSeriesData{ + QueryName: formula.Name, + Aggregations: []*qbtypes.AggregationBucket{ + { + Index: 0, + Series: formulaSeries, + }, + }, + } + + // Apply functions if any + result := &qbtypes.Result{ + Value: formulaResult, + } + + if len(formula.Functions) > 0 { + result = q.applyFunctions(result, formula.Functions) + } + + return result +} + +// filterDisabledQueries removes results for disabled queries +func (q *querier) filterDisabledQueries(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result { + filtered := make(map[string]*qbtypes.Result) + + for _, query := range req.CompositeQuery.Queries { + info := getqueryInfo(query.Spec) + if !info.Disabled { + if result, ok := results[info.Name]; ok { + filtered[info.Name] = result + } + } + } + + return filtered +} + +// formatScalarResultsAsTable formats scalar results as a unified table for UI display +func (q *querier) formatScalarResultsAsTable(results map[string]*qbtypes.Result, _ *qbtypes.QueryRangeRequest) map[string]any { + if len(results) == 0 { + return map[string]any{"table": &qbtypes.ScalarData{}} + } + + // Convert all results to ScalarData first + scalarResults := make(map[string]*qbtypes.ScalarData) + for name, result := range results { + if sd, ok := result.Value.(*qbtypes.ScalarData); ok { + scalarResults[name] = sd + } else if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok { + scalarResults[name] = convertTimeSeriesDataToScalar(tsData, name) + } + } + + // If single result already has multiple queries, just deduplicate + if len(scalarResults) == 1 { + for _, sd := range scalarResults { + if hasMultipleQueries(sd) { + return map[string]any{"table": deduplicateRows(sd)} + } + } + } + + // Otherwise merge all results + merged := mergeScalarData(scalarResults) + return map[string]any{"table": merged} +} + +// convertTimeSeriesDataToScalar converts time series to scalar format +func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName string) *qbtypes.ScalarData { + if tsData == nil || len(tsData.Aggregations) == 0 { + return &qbtypes.ScalarData{QueryName: queryName} + } + + columns := []*qbtypes.ColumnDescriptor{} + + // Add group columns from first series + if len(tsData.Aggregations[0].Series) > 0 { + for _, label := range tsData.Aggregations[0].Series[0].Labels { + columns = append(columns, &qbtypes.ColumnDescriptor{ + TelemetryFieldKey: label.Key, + QueryName: queryName, + Type: qbtypes.ColumnTypeGroup, + }) + } + } + + // Add aggregation columns + for _, agg := range tsData.Aggregations { + name := agg.Alias + if name == "" { + name = fmt.Sprintf("__result_%d", agg.Index) + } + columns = append(columns, &qbtypes.ColumnDescriptor{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name}, + QueryName: queryName, + AggregationIndex: int64(agg.Index), + Meta: agg.Meta, + Type: qbtypes.ColumnTypeAggregation, + }) + } + + // Build rows + data := [][]any{} + for seriesIdx, series := range tsData.Aggregations[0].Series { + row := make([]any, len(columns)) + + // Add group values + for i, label := range series.Labels { + row[i] = label.Value + } + + // Add aggregation values (last value) + groupColCount := len(series.Labels) + for aggIdx, agg := range tsData.Aggregations { + if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 { + lastValue := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value + row[groupColCount+aggIdx] = lastValue + } else { + row[groupColCount+aggIdx] = "n/a" + } + } + + data = append(data, row) + } + + return &qbtypes.ScalarData{ + QueryName: queryName, + Columns: columns, + Data: data, + } +} + +// hasMultipleQueries checks if ScalarData contains columns from multiple queries +func hasMultipleQueries(sd *qbtypes.ScalarData) bool { + queries := make(map[string]bool) + for _, col := range sd.Columns { + if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" { + queries[col.QueryName] = true + } + } + return len(queries) > 1 +} + +// deduplicateRows removes duplicate rows based on group columns +func deduplicateRows(sd *qbtypes.ScalarData) *qbtypes.ScalarData { + // Find group column indices + groupIndices := []int{} + for i, col := range sd.Columns { + if col.Type == qbtypes.ColumnTypeGroup { + groupIndices = append(groupIndices, i) + } + } + + // Build unique rows map + uniqueRows := make(map[string][]any) + for _, row := range sd.Data { + key := buildRowKey(row, groupIndices) + if existing, found := uniqueRows[key]; found { + // Merge non-n/a values + for i, val := range row { + if existing[i] == "n/a" && val != "n/a" { + existing[i] = val + } + } + } else { + rowCopy := make([]any, len(row)) + copy(rowCopy, row) + uniqueRows[key] = rowCopy + } + } + + // Convert back to slice + data := make([][]any, 0, len(uniqueRows)) + for _, row := range uniqueRows { + data = append(data, row) + } + + // Sort by first aggregation column + sortByFirstAggregation(data, sd.Columns) + + return &qbtypes.ScalarData{ + Columns: sd.Columns, + Data: data, + } +} + +// mergeScalarData merges multiple scalar data results +func mergeScalarData(results map[string]*qbtypes.ScalarData) *qbtypes.ScalarData { + // Collect unique group columns + groupCols := []string{} + groupColMap := make(map[string]*qbtypes.ColumnDescriptor) + + for _, sd := range results { + for _, col := range sd.Columns { + if col.Type == qbtypes.ColumnTypeGroup { + if _, exists := groupColMap[col.Name]; !exists { + groupColMap[col.Name] = col + groupCols = append(groupCols, col.Name) + } + } + } + } + + // Build final columns + columns := []*qbtypes.ColumnDescriptor{} + + // Add group columns + for _, name := range groupCols { + columns = append(columns, groupColMap[name]) + } + + // Add aggregation columns from each query (sorted by query name) + queryNames := make([]string, 0, len(results)) + for name := range results { + queryNames = append(queryNames, name) + } + sort.Strings(queryNames) + + for _, queryName := range queryNames { + sd := results[queryName] + for _, col := range sd.Columns { + if col.Type == qbtypes.ColumnTypeAggregation { + columns = append(columns, col) + } + } + } + + // Merge rows + rowMap := make(map[string][]any) + + for queryName, sd := range results { + // Create index mappings + groupMap := make(map[string]int) + for i, col := range sd.Columns { + if col.Type == qbtypes.ColumnTypeGroup { + groupMap[col.Name] = i + } + } + + // Process each row + for _, row := range sd.Data { + key := buildKeyFromGroupCols(row, groupMap, groupCols) + + if _, exists := rowMap[key]; !exists { + // Initialize new row + newRow := make([]any, len(columns)) + // Set group values + for i, colName := range groupCols { + if idx, ok := groupMap[colName]; ok && idx < len(row) { + newRow[i] = row[idx] + } else { + newRow[i] = "n/a" + } + } + // Initialize all aggregations to n/a + for i := len(groupCols); i < len(columns); i++ { + newRow[i] = "n/a" + } + rowMap[key] = newRow + } + + // Set aggregation values for this query + mergedRow := rowMap[key] + colIdx := len(groupCols) + for _, col := range columns[len(groupCols):] { + if col.QueryName == queryName { + // Find the value in the original row + for i, origCol := range sd.Columns { + if origCol.Type == qbtypes.ColumnTypeAggregation && + origCol.AggregationIndex == col.AggregationIndex { + if i < len(row) { + mergedRow[colIdx] = row[i] + } + break + } + } + } + colIdx++ + } + } + } + + // Convert to slice + data := make([][]any, 0, len(rowMap)) + for _, row := range rowMap { + data = append(data, row) + } + + // Sort by first aggregation column + sortByFirstAggregation(data, columns) + + return &qbtypes.ScalarData{ + Columns: columns, + Data: data, + } +} + +// buildRowKey builds a unique key from row values at specified indices +func buildRowKey(row []any, indices []int) string { + parts := make([]string, len(indices)) + for i, idx := range indices { + if idx < len(row) { + parts[i] = fmt.Sprintf("%v", row[idx]) + } else { + parts[i] = "n/a" + } + } + return fmt.Sprintf("%v", parts) +} + +// buildKeyFromGroupCols builds a key from group column values +func buildKeyFromGroupCols(row []any, groupMap map[string]int, groupCols []string) string { + parts := make([]string, len(groupCols)) + for i, colName := range groupCols { + if idx, ok := groupMap[colName]; ok && idx < len(row) { + parts[i] = fmt.Sprintf("%v", row[idx]) + } else { + parts[i] = "n/a" + } + } + return fmt.Sprintf("%v", parts) +} + +// sortByFirstAggregation sorts data by the first aggregation column (descending) +func sortByFirstAggregation(data [][]any, columns []*qbtypes.ColumnDescriptor) { + // Find first aggregation column + aggIdx := -1 + for i, col := range columns { + if col.Type == qbtypes.ColumnTypeAggregation { + aggIdx = i + break + } + } + + if aggIdx < 0 { + return + } + + sort.SliceStable(data, func(i, j int) bool { + return compareValues(data[i][aggIdx], data[j][aggIdx]) > 0 + }) +} + +// compareValues compares two values for sorting (handles n/a and numeric types) +func compareValues(a, b any) int { + // Handle n/a values + if a == "n/a" && b == "n/a" { + return 0 + } + if a == "n/a" { + return -1 + } + if b == "n/a" { + return 1 + } + + // Compare numeric values + aFloat, aOk := toFloat64(a) + bFloat, bOk := toFloat64(b) + + if aOk && bOk { + if aFloat > bFloat { + return 1 + } else if aFloat < bFloat { + return -1 + } + return 0 + } + + // Fallback to string comparison + return 0 +} + +// toFloat64 attempts to convert a value to float64 +func toFloat64(v any) (float64, bool) { + switch val := v.(type) { + case float64: + return val, true + case int64: + return float64(val), true + case int: + return float64(val), true + case int32: + return float64(val), true + } + return 0, false +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9526bb7fad00..ecd44212beb6 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,12 +5,14 @@ import ( "fmt" "log/slog" "slices" + "strconv" "sync" "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/prometheus" "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/SigNoz/signoz/pkg/types/metrictypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" "golang.org/x/exp/maps" @@ -54,8 +56,82 @@ func New( } } +// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present +func extractShiftFromBuilderQuery[T any](spec qbtypes.QueryBuilderQuery[T]) int64 { + for _, fn := range spec.Functions { + if fn.Name == qbtypes.FunctionNameTimeShift && len(fn.Args) > 0 { + switch v := fn.Args[0].Value.(type) { + case float64: + return int64(v) + case int64: + return v + case int: + return int64(v) + case string: + if shiftFloat, err := strconv.ParseFloat(v, 64); err == nil { + return int64(shiftFloat) + } + } + } + } + return 0 +} + +// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function +func adjustTimeRangeForShift[T any](spec qbtypes.QueryBuilderQuery[T], tr qbtypes.TimeRange, kind qbtypes.RequestType) qbtypes.TimeRange { + // Only apply time shift for time series and scalar queries + // Raw/list queries don't support timeshift + if kind != qbtypes.RequestTypeTimeSeries && kind != qbtypes.RequestTypeScalar { + return tr + } + + // Use the ShiftBy field if it's already populated, otherwise extract it + shiftBy := spec.ShiftBy + if shiftBy == 0 { + shiftBy = extractShiftFromBuilderQuery(spec) + } + + if shiftBy == 0 { + return tr + } + + // ShiftBy is in seconds, convert to milliseconds and shift backward in time + shiftMS := shiftBy * 1000 + return qbtypes.TimeRange{ + From: tr.From - uint64(shiftMS), + To: tr.To - uint64(shiftMS), + } +} + func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) { + // First pass: collect all metric names that need temporality + metricNames := make([]string, 0) + for _, query := range req.CompositeQuery.Queries { + if query.Type == qbtypes.QueryTypeBuilder { + if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok { + for _, agg := range spec.Aggregations { + if agg.MetricName != "" { + metricNames = append(metricNames, agg.MetricName) + } + } + } + } + } + + // Fetch temporality for all metrics at once + var metricTemporality map[string]metrictypes.Temporality + if len(metricNames) > 0 { + var err error + metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...) + if err != nil { + q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames) + // Continue without temporality - statement builder will handle unspecified + metricTemporality = make(map[string]metrictypes.Temporality) + } + q.logger.DebugContext(ctx, "fetched metric temporalities", "metric_temporality", metricTemporality) + } + queries := make(map[string]qbtypes.Query) steps := make(map[string]qbtypes.Step) @@ -79,15 +155,28 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype case qbtypes.QueryTypeBuilder: switch spec := query.Spec.(type) { case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]: - bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + spec.ShiftBy = extractShiftFromBuilderQuery(spec) + timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType) queries[spec.Name] = bq steps[spec.Name] = spec.StepInterval case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]: - bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + spec.ShiftBy = extractShiftFromBuilderQuery(spec) + timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType) queries[spec.Name] = bq steps[spec.Name] = spec.StepInterval case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: - bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + for i := range spec.Aggregations { + if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown { + if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown { + spec.Aggregations[i].Temporality = temp + } + } + } + spec.ShiftBy = extractShiftFromBuilderQuery(spec) + timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) + bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType) queries[spec.Name] = bq steps[spec.Name] = spec.StepInterval default: @@ -133,13 +222,18 @@ func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbty } } + processedResults, err := q.postProcessResults(ctx, results, req) + if err != nil { + return nil, err + } + return &qbtypes.QueryRangeResponse{ Type: req.RequestType, Data: struct { Results []any `json:"results"` Warnings []string `json:"warnings"` }{ - Results: maps.Values(results), + Results: maps.Values(processedResults), Warnings: warnings, }, Meta: struct { @@ -173,7 +267,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query return nil, err } // Store in cache for future use - q.bucketCache.Put(ctx, orgID, query, result) + q.bucketCache.Put(ctx, orgID, query, step, result) return result, nil } } @@ -183,6 +277,10 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query errors := make([]error, len(missingRanges)) totalStats := qbtypes.ExecStats{} + q.logger.DebugContext(ctx, "executing queries for missing ranges", + "missing_ranges_count", len(missingRanges), + "ranges", missingRanges) + sem := make(chan struct{}, 4) var wg sync.WaitGroup @@ -224,7 +322,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query if err != nil { return nil, err } - q.bucketCache.Put(ctx, orgID, query, result) + q.bucketCache.Put(ctx, orgID, query, step, result) return result, nil } } @@ -248,7 +346,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query mergedResult.Stats.DurationMS += totalStats.DurationMS // Store merged result in cache - q.bucketCache.Put(ctx, orgID, query, mergedResult) + q.bucketCache.Put(ctx, orgID, query, step, mergedResult) return mergedResult, nil } @@ -261,11 +359,17 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp case *chSQLQuery: return newchSQLQuery(q.telemetryStore, qt.query, qt.args, timeRange, qt.kind) case *builderQuery[qbtypes.TraceAggregation]: - return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, timeRange, qt.kind) + qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec) + adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind) + return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, adjustedTimeRange, qt.kind) case *builderQuery[qbtypes.LogAggregation]: - return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, timeRange, qt.kind) + qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec) + adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind) + return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, adjustedTimeRange, qt.kind) case *builderQuery[qbtypes.MetricAggregation]: - return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, timeRange, qt.kind) + qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec) + adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind) + return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, adjustedTimeRange, qt.kind) default: return nil } @@ -273,8 +377,29 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp // mergeResults merges cached result with fresh results func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result { - if cached == nil && len(fresh) == 1 { - return fresh[0] + if cached == nil { + if len(fresh) == 1 { + return fresh[0] + } + if len(fresh) == 0 { + return nil + } + // If cached is nil but we have multiple fresh results, we need to merge them + // We need to merge all fresh results properly to avoid duplicates + merged := &qbtypes.Result{ + Type: fresh[0].Type, + Stats: fresh[0].Stats, + Warnings: fresh[0].Warnings, + } + + // Merge all fresh results including the first one + switch merged.Type { + case qbtypes.RequestTypeTimeSeries: + // Pass nil as cached value to ensure proper merging of all fresh results + merged.Value = q.mergeTimeSeriesResults(nil, fresh) + } + + return merged } // Start with cached result @@ -315,23 +440,52 @@ func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) // mergeTimeSeriesResults merges time series data func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData { - // Map to store merged series by query name and series key + // Map to store merged series by aggregation index and series key seriesMap := make(map[int]map[string]*qbtypes.TimeSeries) + // Map to store aggregation bucket metadata + bucketMetadata := make(map[int]*qbtypes.AggregationBucket) - for _, aggBucket := range cachedValue.Aggregations { - if seriesMap[aggBucket.Index] == nil { - seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries) - } - for _, series := range aggBucket.Series { - key := qbtypes.GetUniqueSeriesKey(series.Labels) - seriesMap[aggBucket.Index][key] = series + // Process cached data if available + if cachedValue != nil && cachedValue.Aggregations != nil { + for _, aggBucket := range cachedValue.Aggregations { + if seriesMap[aggBucket.Index] == nil { + seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries) + } + if bucketMetadata[aggBucket.Index] == nil { + bucketMetadata[aggBucket.Index] = aggBucket + } + for _, series := range aggBucket.Series { + key := qbtypes.GetUniqueSeriesKey(series.Labels) + if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok { + // Merge values from duplicate series in cached data, avoiding duplicate timestamps + timestampMap := make(map[int64]bool) + for _, v := range existingSeries.Values { + timestampMap[v.Timestamp] = true + } + + // Only add values with new timestamps + for _, v := range series.Values { + if !timestampMap[v.Timestamp] { + existingSeries.Values = append(existingSeries.Values, v) + } + } + } else { + // Create a copy to avoid modifying the cached data + seriesCopy := &qbtypes.TimeSeries{ + Labels: series.Labels, + Values: make([]*qbtypes.TimeSeriesValue, len(series.Values)), + } + copy(seriesCopy.Values, series.Values) + seriesMap[aggBucket.Index][key] = seriesCopy + } + } } } // Add fresh series for _, result := range freshResults { freshTS, ok := result.Value.(*qbtypes.TimeSeriesData) - if !ok { + if !ok || freshTS == nil || freshTS.Aggregations == nil { continue } @@ -339,6 +493,12 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr if seriesMap[aggBucket.Index] == nil { seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries) } + // Prefer fresh metadata over cached metadata + if aggBucket.Alias != "" || aggBucket.Meta.Unit != "" { + bucketMetadata[aggBucket.Index] = aggBucket + } else if bucketMetadata[aggBucket.Index] == nil { + bucketMetadata[aggBucket.Index] = aggBucket + } } for _, aggBucket := range freshTS.Aggregations { @@ -346,8 +506,19 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr key := qbtypes.GetUniqueSeriesKey(series.Labels) if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok { - // Merge values - existingSeries.Values = append(existingSeries.Values, series.Values...) + // Merge values, avoiding duplicate timestamps + // Create a map to track existing timestamps + timestampMap := make(map[int64]bool) + for _, v := range existingSeries.Values { + timestampMap[v.Timestamp] = true + } + + // Only add values with new timestamps + for _, v := range series.Values { + if !timestampMap[v.Timestamp] { + existingSeries.Values = append(existingSeries.Values, v) + } + } } else { // New series seriesMap[aggBucket.Index][key] = series @@ -357,10 +528,18 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr } result := &qbtypes.TimeSeriesData{ - QueryName: cachedValue.QueryName, Aggregations: []*qbtypes.AggregationBucket{}, } + // Set QueryName from cached or first fresh result + if cachedValue != nil { + result.QueryName = cachedValue.QueryName + } else if len(freshResults) > 0 { + if freshTS, ok := freshResults[0].Value.(*qbtypes.TimeSeriesData); ok && freshTS != nil { + result.QueryName = freshTS.QueryName + } + } + for index, series := range seriesMap { var aggSeries []*qbtypes.TimeSeries for _, s := range series { @@ -377,10 +556,17 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr aggSeries = append(aggSeries, s) } - result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{ + // Preserve bucket metadata from either cached or fresh results + bucket := &qbtypes.AggregationBucket{ Index: index, Series: aggSeries, - }) + } + if metadata, ok := bucketMetadata[index]; ok { + bucket.Alias = metadata.Alias + bucket.Meta = metadata.Meta + } + + result.Aggregations = append(result.Aggregations, bucket) } return result diff --git a/pkg/querier/shift_test.go b/pkg/querier/shift_test.go new file mode 100644 index 000000000000..24d200da9be1 --- /dev/null +++ b/pkg/querier/shift_test.go @@ -0,0 +1,229 @@ +package querier + +import ( + "testing" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/stretchr/testify/assert" +) + +// TestAdjustTimeRangeForShift tests the time range adjustment logic +func TestAdjustTimeRangeForShift(t *testing.T) { + tests := []struct { + name string + spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] + timeRange qbtypes.TimeRange + requestType qbtypes.RequestType + expectedFromMS uint64 + expectedToMS uint64 + }{ + { + name: "no shift", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{}, + }, + timeRange: qbtypes.TimeRange{ + From: 1000000, + To: 2000000, + }, + requestType: qbtypes.RequestTypeTimeSeries, + expectedFromMS: 1000000, + expectedToMS: 2000000, + }, + { + name: "shift by 60 seconds using timeShift function", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "60"}, + }, + }, + }, + }, + timeRange: qbtypes.TimeRange{ + From: 1000000, + To: 2000000, + }, + requestType: qbtypes.RequestTypeTimeSeries, + expectedFromMS: 940000, // 1000000 - 60000 + expectedToMS: 1940000, // 2000000 - 60000 + }, + { + name: "shift by negative 30 seconds (future shift)", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "-30"}, + }, + }, + }, + }, + timeRange: qbtypes.TimeRange{ + From: 1000000, + To: 2000000, + }, + requestType: qbtypes.RequestTypeTimeSeries, + expectedFromMS: 1030000, // 1000000 - (-30000) + expectedToMS: 2030000, // 2000000 - (-30000) + }, + { + name: "no shift for raw request type even with timeShift function", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "3600"}, + }, + }, + }, + }, + timeRange: qbtypes.TimeRange{ + From: 1000000, + To: 2000000, + }, + requestType: qbtypes.RequestTypeRaw, + expectedFromMS: 1000000, // No shift for raw queries + expectedToMS: 2000000, + }, + { + name: "shift applied for scalar request type with timeShift function", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "3600"}, + }, + }, + }, + }, + timeRange: qbtypes.TimeRange{ + From: 10000000, + To: 20000000, + }, + requestType: qbtypes.RequestTypeScalar, + expectedFromMS: 6400000, // 10000000 - 3600000 + expectedToMS: 16400000, // 20000000 - 3600000 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := adjustTimeRangeForShift(tt.spec, tt.timeRange, tt.requestType) + assert.Equal(t, tt.expectedFromMS, result.From, "fromMS mismatch") + assert.Equal(t, tt.expectedToMS, result.To, "toMS mismatch") + }) + } +} + +// TestExtractShiftFromBuilderQuery tests the shift extraction logic +func TestExtractShiftFromBuilderQuery(t *testing.T) { + tests := []struct { + name string + spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] + expectedShiftBy int64 + }{ + { + name: "extract from timeShift function with float64", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: float64(3600)}, + }, + }, + }, + }, + expectedShiftBy: 3600, + }, + { + name: "extract from timeShift function with int64", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: int64(3600)}, + }, + }, + }, + }, + expectedShiftBy: 3600, + }, + { + name: "extract from timeShift function with string", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "3600"}, + }, + }, + }, + }, + expectedShiftBy: 3600, + }, + { + name: "no timeShift function", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameAbsolute, + }, + }, + }, + expectedShiftBy: 0, + }, + { + name: "invalid timeShift value", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "invalid"}, + }, + }, + }, + }, + expectedShiftBy: 0, + }, + { + name: "multiple functions with timeShift", + spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Functions: []qbtypes.Function{ + { + Name: qbtypes.FunctionNameAbsolute, + }, + { + Name: qbtypes.FunctionNameTimeShift, + Args: []qbtypes.FunctionArg{ + {Value: "1800"}, + }, + }, + { + Name: qbtypes.FunctionNameClampMax, + Args: []qbtypes.FunctionArg{ + {Value: "100"}, + }, + }, + }, + }, + expectedShiftBy: 1800, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shiftBy := extractShiftFromBuilderQuery(tt.spec) + assert.Equal(t, tt.expectedShiftBy, shiftBy) + }) + } +} diff --git a/pkg/querybuilder/fallback_expr.go b/pkg/querybuilder/fallback_expr.go index 3001cc3fcde9..3a7367156f8b 100644 --- a/pkg/querybuilder/fallback_expr.go +++ b/pkg/querybuilder/fallback_expr.go @@ -68,7 +68,7 @@ func CollisionHandledFinalExpr( return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction) } else { // not even a close match, return an error - return "", nil, err + return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name) } } else { for _, key := range keysForField { diff --git a/pkg/querybuilder/resourcefilter/condition_builder.go b/pkg/querybuilder/resourcefilter/condition_builder.go index 779748e08fc8..b5ebf1759b32 100644 --- a/pkg/querybuilder/resourcefilter/condition_builder.go +++ b/pkg/querybuilder/resourcefilter/condition_builder.go @@ -46,7 +46,7 @@ func (b *defaultConditionBuilder) ConditionFor( ) (string, error) { if key.FieldContext != telemetrytypes.FieldContextResource { - return "", nil + return "true", nil } column, err := b.fm.ColumnFor(ctx, key) diff --git a/pkg/querybuilder/where_clause_visitor.go b/pkg/querybuilder/where_clause_visitor.go index 3d1d22267225..83ad92e19d26 100644 --- a/pkg/querybuilder/where_clause_visitor.go +++ b/pkg/querybuilder/where_clause_visitor.go @@ -22,7 +22,7 @@ type filterExpressionVisitor struct { conditionBuilder qbtypes.ConditionBuilder warnings []string fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey - errors []error + errors []string builder *sqlbuilder.SelectBuilder fullTextColumn *telemetrytypes.TelemetryFieldKey jsonBodyPrefix string @@ -90,11 +90,14 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W combinedErrors := errors.Newf( errors.TypeInvalidInput, errors.CodeInvalidInput, - "found %d syntax errors while parsing the filter expression: %v", + "found %d syntax errors while parsing the filter expression", len(parserErrorListener.SyntaxErrors), - parserErrorListener.SyntaxErrors, ) - return nil, nil, combinedErrors + additionals := make([]string, len(parserErrorListener.SyntaxErrors)) + for _, err := range parserErrorListener.SyntaxErrors { + additionals = append(additionals, err.Error()) + } + return nil, nil, combinedErrors.WithAdditional(additionals...) } // Visit the parse tree with our ClickHouse visitor @@ -105,11 +108,10 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W combinedErrors := errors.Newf( errors.TypeInvalidInput, errors.CodeInvalidInput, - "found %d errors while parsing the search expression: %v", + "found %d errors while parsing the search expression", len(visitor.errors), - visitor.errors, ) - return nil, nil, combinedErrors + return nil, nil, combinedErrors.WithAdditional(visitor.errors...) } whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond) @@ -234,15 +236,11 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any // Handle standalone key/value as a full text search term if ctx.GetChildCount() == 1 { if v.skipFullTextFilter { - return "" + return "true" } if v.fullTextColumn == nil { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "full text search is not supported", - )) + v.errors = append(v.errors, "full text search is not supported") return "" } child := ctx.GetChild(0) @@ -251,7 +249,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any keyText := keyCtx.GetText() cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, keyText, v.builder) if err != nil { - v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition")) + v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error())) return "" } return cond @@ -266,12 +264,12 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any } else if valCtx.KEY() != nil { text = valCtx.KEY().GetText() } else { - v.errors = append(v.errors, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unsupported value type: %s", valCtx.GetText())) + v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText())) return "" } cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder) if err != nil { - v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition")) + v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error())) return "" } return cond @@ -419,7 +417,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext for _, key := range keys { condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, value, v.builder) if err != nil { - v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build condition")) + v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error())) return "" } conds = append(conds, condition) @@ -459,7 +457,7 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext) func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any { if v.skipFullTextFilter { - return "" + return "true" } var text string @@ -471,16 +469,12 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an } if v.fullTextColumn == nil { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "full text search is not supported", - )) + v.errors = append(v.errors, "full text search is not supported") return "" } cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder) if err != nil { - v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition")) + v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error())) return "" } return cond @@ -498,34 +492,19 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon functionName = "hasAll" } else { // Default fallback - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "unknown function `%s`", - ctx.GetText(), - )) + v.errors = append(v.errors, fmt.Sprintf("unknown function `%s`", ctx.GetText())) return "" } params := v.Visit(ctx.FunctionParamList()).([]any) if len(params) < 2 { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "function `%s` expects key and value parameters", - functionName, - )) + v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key and value parameters", functionName)) return "" } keys, ok := params[0].([]*telemetrytypes.TelemetryFieldKey) if !ok { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "function `%s` expects key parameter to be a field key", - functionName, - )) + v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be a field key", functionName)) return "" } value := params[1:] @@ -536,12 +515,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon if strings.HasPrefix(key.Name, v.jsonBodyPrefix) { fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value) } else { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "function `%s` supports only body JSON search", - functionName, - )) + v.errors = append(v.errors, fmt.Sprintf("function `%s` supports only body JSON search", functionName)) return "" } @@ -603,12 +577,7 @@ func (v *filterExpressionVisitor) VisitValue(ctx *grammar.ValueContext) any { } else if ctx.NUMBER() != nil { number, err := strconv.ParseFloat(ctx.NUMBER().GetText(), 64) if err != nil { - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "failed to parse number %s", - ctx.NUMBER().GetText(), - )) + v.errors = append(v.errors, fmt.Sprintf("failed to parse number %s", ctx.NUMBER().GetText())) return "" } return number @@ -648,19 +617,11 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any { if len(fieldKeysForName) == 0 { if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" && keyName == "" { - v.errors = append(v.errors, errors.NewInvalidInputf( - errors.CodeInvalidInput, - "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)", - )) + v.errors = append(v.errors, "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)") } else { // TODO(srikanthccv): do we want to return an error here? // should we infer the type and auto-magically build a key for expression? - v.errors = append(v.errors, errors.Newf( - errors.TypeInvalidInput, - errors.CodeInvalidInput, - "key `%s` not found", - fieldKey.Name, - )) + v.errors = append(v.errors, fmt.Sprintf("key `%s` not found", fieldKey.Name)) } } diff --git a/pkg/telemetrylogs/field_mapper.go b/pkg/telemetrylogs/field_mapper.go index a52e68a45df0..78ff7d4cc36c 100644 --- a/pkg/telemetrylogs/field_mapper.go +++ b/pkg/telemetrylogs/field_mapper.go @@ -173,7 +173,7 @@ func (m *fieldMapper) ColumnExpressionFor( return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction) } else { // not even a close match, return an error - return "", err + return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name) } } } else if len(keysForField) == 1 { @@ -186,7 +186,7 @@ func (m *fieldMapper) ColumnExpressionFor( colName, _ = m.FieldFor(ctx, key) args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName)) } - colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", ")) + colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", ")) } } diff --git a/pkg/telemetrylogs/filter_expr_logs_test.go b/pkg/telemetrylogs/filter_expr_logs_test.go index c667988234d8..2490ec09d7d9 100644 --- a/pkg/telemetrylogs/filter_expr_logs_test.go +++ b/pkg/telemetrylogs/filter_expr_logs_test.go @@ -2,8 +2,10 @@ package telemetrylogs import ( "fmt" + "strings" "testing" + "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/querybuilder" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/huandu/go-sqlbuilder" @@ -2315,7 +2317,15 @@ func TestFilterExprLogs(t *testing.T) { require.Equal(t, tc.expectedArgs, args) } else { require.Error(t, err, "Expected error for query: %s", tc.query) - require.Contains(t, err.Error(), tc.expectedErrorContains) + _, _, _, _, _, a := errors.Unwrapb(err) + contains := false + for _, warn := range a { + if strings.Contains(warn, tc.expectedErrorContains) { + contains = true + break + } + } + require.True(t, contains) } }) } diff --git a/pkg/telemetrylogs/statement_builder.go b/pkg/telemetrylogs/statement_builder.go index dd85a829d184..2ff1ec8f6640 100644 --- a/pkg/telemetrylogs/statement_builder.go +++ b/pkg/telemetrylogs/statement_builder.go @@ -6,7 +6,6 @@ import ( "log/slog" "strings" - "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/querybuilder" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" @@ -14,10 +13,6 @@ import ( "github.com/huandu/go-sqlbuilder" ) -var ( - ErrUnsupportedAggregation = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported aggregation") -) - type logQueryStatementBuilder struct { logger *slog.Logger metadataStore telemetrytypes.MetadataStore @@ -165,12 +160,18 @@ func (b *logQueryStatementBuilder) buildListQuery( // Add order by for _, orderBy := range query.Order { - sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue())) } // Add limit and offset if query.Limit > 0 { sb.Limit(query.Limit) + } else { + sb.Limit(100) } if query.Offset > 0 { @@ -381,9 +382,9 @@ func (b *logQueryStatementBuilder) buildScalarQuery( for _, orderBy := range query.Order { idx, ok := aggOrderBy(orderBy, query) if ok { - sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction)) + sb.OrderBy(fmt.Sprintf("__result_%d %s", idx, orderBy.Direction.StringValue())) } else { - sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction)) + sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) } } @@ -420,19 +421,25 @@ func (b *logQueryStatementBuilder) addFilterCondition( keys map[string][]*telemetrytypes.TelemetryFieldKey, ) ([]string, error) { - // add filter expression - filterWhereClause, warnings, err := querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ - FieldMapper: b.fm, - ConditionBuilder: b.cb, - FieldKeys: keys, - SkipResourceFilter: true, - FullTextColumn: b.fullTextColumn, - JsonBodyPrefix: b.jsonBodyPrefix, - JsonKeyToKey: b.jsonKeyToKey, - }) + var filterWhereClause *sqlbuilder.WhereClause + var warnings []string + var err error - if err != nil { - return nil, err + if query.Filter != nil && query.Filter.Expression != "" { + // add filter expression + filterWhereClause, warnings, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + SkipResourceFilter: true, + FullTextColumn: b.fullTextColumn, + JsonBodyPrefix: b.jsonBodyPrefix, + JsonKeyToKey: b.jsonKeyToKey, + }) + + if err != nil { + return nil, err + } } if filterWhereClause != nil { diff --git a/pkg/telemetrylogs/stmt_builder_test.go b/pkg/telemetrylogs/stmt_builder_test.go index 5ea0ad26abd6..c570479318e6 100644 --- a/pkg/telemetrylogs/stmt_builder_test.go +++ b/pkg/telemetrylogs/stmt_builder_test.go @@ -70,6 +70,45 @@ func TestStatementBuilder(t *testing.T) { }, expectedErr: nil, }, + { + name: "test", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{ + Signal: telemetrytypes.SignalLogs, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.LogAggregation{ + { + Expression: "count()", + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + Order: []qbtypes.OrderBy{ + { + Key: qbtypes.OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + Direction: qbtypes.OrderDirectionDesc, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", + Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, + }, + expectedErr: nil, + }, } fm := NewFieldMapper() diff --git a/pkg/telemetrymetadata/field_mapper.go b/pkg/telemetrymetadata/field_mapper.go index 5374c9d16c11..6c151acebdc9 100644 --- a/pkg/telemetrymetadata/field_mapper.go +++ b/pkg/telemetrymetadata/field_mapper.go @@ -95,7 +95,7 @@ func (m *fieldMapper) ColumnExpressionFor( return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction) } else { // not even a close match, return an error - return "", err + return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name) } } } else if len(keysForField) == 1 { @@ -108,7 +108,7 @@ func (m *fieldMapper) ColumnExpressionFor( colName, _ = m.FieldFor(ctx, key) args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName)) } - colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", ")) + colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", ")) } } diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 4d8c9b024a16..d97291a22450 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -9,6 +9,7 @@ import ( "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/querybuilder" "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/SigNoz/signoz/pkg/types/metrictypes" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/huandu/go-sqlbuilder" @@ -54,8 +55,10 @@ func NewTelemetryMetaStore( relatedMetadataDBName string, relatedMetadataTblName string, ) telemetrytypes.MetadataStore { + metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata") t := &telemetryMetaStore{ + logger: metadataSettings.Logger(), telemetrystore: telemetrystore, tracesDBName: tracesDBName, tracesFieldsTblName: tracesFieldsTblName, @@ -879,3 +882,90 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto } return values, nil } + +func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) { + if metricName == "" { + return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty") + } + + temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName) + if err != nil { + return metrictypes.Unknown, err + } + + temporality, ok := temporalityMap[metricName] + if !ok { + return metrictypes.Unknown, nil + } + + return temporality, nil +} + +func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { + if len(metricNames) == 0 { + return make(map[string]metrictypes.Temporality), nil + } + + result := make(map[string]metrictypes.Temporality) + + // Build query to fetch temporality for all metrics + // We use attr_string_value where attr_name = '__temporality__' + // Note: The columns are mixed in the current data - temporality column contains metric_name + // and metric_name column contains temporality value, so we use the correct mapping + sb := sqlbuilder.Select( + "temporality as metric_name", + "argMax(attr_string_value, last_reported_unix_milli) as temporality_value", + ).From(t.metricsDBName + "." + t.metricsFieldsTblName) + + // Filter by metric names (in the temporality column due to data mix-up) + sb.Where(sb.In("temporality", metricNames)) + + // Only fetch temporality metadata rows (where attr_name = '__temporality__') + sb.Where(sb.E("attr_name", "__temporality__")) + + // Group by metric name to get one temporality per metric + sb.GroupBy("temporality") + + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + t.logger.DebugContext(ctx, "fetching metric temporality", "query", query, "args", args) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality") + } + defer rows.Close() + + // Process results + for rows.Next() { + var metricName, temporalityStr string + if err := rows.Scan(&metricName, &temporalityStr); err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result") + } + + // Convert string to Temporality type + var temporality metrictypes.Temporality + switch temporalityStr { + case "Delta": + temporality = metrictypes.Delta + case "Cumulative": + temporality = metrictypes.Cumulative + case "Unspecified": + temporality = metrictypes.Unspecified + default: + // Unknown or empty temporality + temporality = metrictypes.Unknown + } + + result[metricName] = temporality + } + + // For metrics not found in the database, set to Unknown + for _, metricName := range metricNames { + if _, exists := result[metricName]; !exists { + result[metricName] = metrictypes.Unknown + } + } + + return result, nil +} diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go index ed366d975872..d895a0af6fc9 100644 --- a/pkg/telemetrymetrics/statement_builder.go +++ b/pkg/telemetrymetrics/statement_builder.go @@ -305,7 +305,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( sb.LTE("unix_milli", end), ) - if query.Aggregations[0].Temporality != metrictypes.Unspecified { + if query.Aggregations[0].Temporality != metrictypes.Unknown { sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) } diff --git a/pkg/telemetrymetrics/stmt_builder_test.go b/pkg/telemetrymetrics/stmt_builder_test.go index fd451fa5373b..d4b731ebe8c2 100644 --- a/pkg/telemetrymetrics/stmt_builder_test.go +++ b/pkg/telemetrymetrics/stmt_builder_test.go @@ -147,8 +147,8 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", - Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0}, + Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0}, }, expectedErr: nil, }, diff --git a/pkg/telemetrytraces/field_mapper.go b/pkg/telemetrytraces/field_mapper.go index 7229f9b02ac1..7fb9ac5159d3 100644 --- a/pkg/telemetrytraces/field_mapper.go +++ b/pkg/telemetrytraces/field_mapper.go @@ -250,7 +250,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor( return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction) } else { // not even a close match, return an error - return "", err + return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name) } } } else if len(keysForField) == 1 { @@ -263,7 +263,7 @@ func (m *defaultFieldMapper) ColumnExpressionFor( colName, _ = m.FieldFor(ctx, key) args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName)) } - colName = fmt.Sprintf("multiIf(%s)", strings.Join(args, ", ")) + colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", ")) } } diff --git a/pkg/telemetrytraces/statement_builder.go b/pkg/telemetrytraces/statement_builder.go index 8f5559c26c6d..fa7131eb1cfa 100644 --- a/pkg/telemetrytraces/statement_builder.go +++ b/pkg/telemetrytraces/statement_builder.go @@ -179,12 +179,18 @@ func (b *traceQueryStatementBuilder) buildListQuery( // Add order by for _, orderBy := range query.Order { - sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue())) + colExpr, err := b.fm.ColumnExpressionFor(ctx, &orderBy.Key.TelemetryFieldKey, keys) + if err != nil { + return nil, err + } + sb.OrderBy(fmt.Sprintf("%s %s", colExpr, orderBy.Direction.StringValue())) } // Add limit and offset if query.Limit > 0 { sb.Limit(query.Limit) + } else { + sb.Limit(100) } if query.Offset > 0 { diff --git a/pkg/types/metrictypes/metrictypes.go b/pkg/types/metrictypes/metrictypes.go index 61a8e9284a01..4e02e6641acf 100644 --- a/pkg/types/metrictypes/metrictypes.go +++ b/pkg/types/metrictypes/metrictypes.go @@ -13,7 +13,8 @@ type Temporality struct { var ( Delta = Temporality{valuer.NewString("delta")} Cumulative = Temporality{valuer.NewString("cumulative")} - Unspecified = Temporality{valuer.NewString("")} + Unspecified = Temporality{valuer.NewString("unspecified")} + Unknown = Temporality{valuer.NewString("")} ) // Type is the type of the metric in OTLP data model diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index 5987f6942d63..c303c203647e 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -328,6 +328,8 @@ type MetricAggregation struct { TableHints *metrictypes.MetricTableHints `json:"-"` // value filter to apply to the query ValueFilter *metrictypes.MetricValueFilter `json:"-"` + // reduce to operator for metric scalar requests + ReduceTo ReduceTo `json:"reduceTo,omitempty"` } type Filter struct { @@ -379,7 +381,7 @@ type FunctionArg struct { // name of the argument Name string `json:"name,omitempty"` // value of the argument - Value string `json:"value"` + Value any `json:"value"` } type Function struct { diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go index 4d30b9d7cbd8..4089c13db956 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go @@ -55,4 +55,25 @@ type QueryBuilderQuery[T any] struct { // functions to apply to the query Functions []Function `json:"functions,omitempty"` + + // ShiftBy is extracted from timeShift function for internal use + // This field is not serialized to JSON + ShiftBy int64 `json:"-"` +} + +// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields +func (q *QueryBuilderQuery[T]) UnmarshalJSON(data []byte) error { + // Define a type alias to avoid infinite recursion + type Alias QueryBuilderQuery[T] + + var temp Alias + // Use UnmarshalJSONWithContext for better error messages + if err := UnmarshalJSONWithContext(data, &temp, "query spec"); err != nil { + return err + } + + // Copy the decoded values back to the original struct + *q = QueryBuilderQuery[T](temp) + + return nil } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go index 643015e0b868..e31c987e3f57 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go @@ -20,10 +20,30 @@ type QueryBuilderFormula struct { // expression to apply to the query Expression string `json:"expression"` + // order by keys and directions + Order []OrderBy `json:"order,omitempty"` + + // limit the maximum number of rows to return + Limit int `json:"limit,omitempty"` + + // having clause to apply to the formula result + Having *Having `json:"having,omitempty"` + // functions to apply to the formula result Functions []Function `json:"functions,omitempty"` } +// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields +func (f *QueryBuilderFormula) UnmarshalJSON(data []byte) error { + type Alias QueryBuilderFormula + var temp Alias + if err := UnmarshalJSONWithContext(data, &temp, "formula spec"); err != nil { + return err + } + *f = QueryBuilderFormula(temp) + return nil +} + // small container to store the query name and index or alias reference // for a variable in the formula expression // read below for more details on aggregation references diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go index f0f70e64efce..0894e3be56d6 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/functions.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/functions.go @@ -93,9 +93,20 @@ func ApplyFunction(fn Function, result *TimeSeries) *TimeSeries { return result } -// parseFloat64Arg parses a string argument to float64 -func parseFloat64Arg(value string) (float64, error) { - return strconv.ParseFloat(value, 64) +// parseFloat64Arg parses an argument to float64 +func parseFloat64Arg(value any) (float64, error) { + switch v := value.(type) { + case float64: + return v, nil + case int64: + return float64(v), nil + case int: + return float64(v), nil + case string: + return strconv.ParseFloat(v, 64) + default: + return 0, strconv.ErrSyntax + } } // getEWMAAlpha calculates the alpha value for EWMA functions diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/json_decoder.go b/pkg/types/querybuildertypes/querybuildertypesv5/json_decoder.go new file mode 100644 index 000000000000..665a0d365f54 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/json_decoder.go @@ -0,0 +1,163 @@ +package querybuildertypesv5 + +import ( + "bytes" + "encoding/json" + "reflect" + "strings" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +// UnmarshalJSONWithSuggestions unmarshals JSON data into the target struct +// and provides field name suggestions for unknown fields +func UnmarshalJSONWithSuggestions(data []byte, target any) error { + return UnmarshalJSONWithContext(data, target, "") +} + +// UnmarshalJSONWithContext unmarshals JSON with context information for better error messages +func UnmarshalJSONWithContext(data []byte, target any, context string) error { + // First, try to unmarshal with DisallowUnknownFields to catch unknown fields + dec := json.NewDecoder(bytes.NewReader(data)) + dec.DisallowUnknownFields() + + err := dec.Decode(target) + if err == nil { + // No error, successful unmarshal + return nil + } + + // Check if it's an unknown field error + if strings.Contains(err.Error(), "unknown field") { + // Extract the unknown field name + unknownField := extractUnknownField(err.Error()) + if unknownField != "" { + // Get valid field names from the target struct + validFields := getJSONFieldNames(target) + + // Build error message with context + errorMsg := "unknown field %q" + if context != "" { + errorMsg = "unknown field %q in " + context + } + + // Find closest match with max distance of 3 (reasonable for typos) + if suggestion, found := telemetrytypes.SuggestCorrection(unknownField, validFields); found { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + errorMsg, + unknownField, + ).WithAdditional( + suggestion, + ) + } + + // No good suggestion found + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + errorMsg, + unknownField, + ).WithAdditional( + "Valid fields are: " + strings.Join(validFields, ", "), + ) + } + } + + // Return the original error if it's not an unknown field error + return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid JSON: %v", err) +} + +// extractUnknownField extracts the field name from an unknown field error message +func extractUnknownField(errMsg string) string { + // The error message format is: json: unknown field "fieldname" + parts := strings.Split(errMsg, `"`) + if len(parts) >= 2 { + return parts[1] + } + return "" +} + +// getJSONFieldNames extracts all JSON field names from a struct +func getJSONFieldNames(v any) []string { + var fields []string + + t := reflect.TypeOf(v) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + if t.Kind() != reflect.Struct { + return fields + } + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + jsonTag := field.Tag.Get("json") + + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Extract the field name from the JSON tag + fieldName := strings.Split(jsonTag, ",")[0] + if fieldName != "" { + fields = append(fields, fieldName) + } + } + + return fields +} + +// wrapUnmarshalError wraps UnmarshalJSONWithContext errors with appropriate context +// It preserves errors that already have additional context or unknown field errors +func wrapUnmarshalError(err error, errorFormat string, args ...interface{}) error { + if err == nil { + return nil + } + + // If it's already one of our wrapped errors with additional context, return as-is + _, _, _, _, _, additionals := errors.Unwrapb(err) + if len(additionals) > 0 { + return err + } + + // Preserve helpful error messages about unknown fields + if strings.Contains(err.Error(), "unknown field") { + return err + } + + // Wrap with the provided error format + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + errorFormat, + args..., + ) +} + +// wrapValidationError rewraps validation errors with context while preserving additional hints +// It extracts the inner message from the error and creates a new error with the provided format +// The innerMsg is automatically appended to the args for formatting +func wrapValidationError(err error, contextIdentifier string, errorFormat string) error { + if err == nil { + return nil + } + + // Extract the underlying error details + _, _, innerMsg, _, _, additionals := errors.Unwrapb(err) + + // Create a new error with the provided format + newErr := errors.NewInvalidInputf( + errors.CodeInvalidInput, + errorFormat, + contextIdentifier, + innerMsg, + ) + + // Add any additional context from the inner error + if len(additionals) > 0 { + newErr = newErr.WithAdditional(additionals...) + } + + return newErr +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req.go b/pkg/types/querybuildertypes/querybuildertypesv5/req.go index 5951ad96b5d3..4f86ce426410 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req.go @@ -2,6 +2,7 @@ package querybuildertypesv5 import ( "encoding/json" + "strings" "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" @@ -17,12 +18,11 @@ type QueryEnvelope struct { // implement custom json unmarshaler for the QueryEnvelope func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { var shadow struct { - Name string `json:"name"` Type QueryType `json:"type"` Spec json.RawMessage `json:"spec"` } - if err := json.Unmarshal(data, &shadow); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid query envelope") + if err := UnmarshalJSONWithSuggestions(data, &shadow); err != nil { + return err } q.Type = shadow.Type @@ -34,43 +34,53 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { Signal telemetrytypes.Signal `json:"signal"` } if err := json.Unmarshal(shadow.Spec, &header); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "cannot detect builder signal") + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "cannot detect builder signal: %v", + err, + ) } switch header.Signal { case telemetrytypes.SignalTraces: var spec QueryBuilderQuery[TraceAggregation] - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid trace builder query spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil { + return wrapUnmarshalError(err, "invalid trace builder query spec: %v", err) } q.Spec = spec case telemetrytypes.SignalLogs: var spec QueryBuilderQuery[LogAggregation] - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid log builder query spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil { + return wrapUnmarshalError(err, "invalid log builder query spec: %v", err) } q.Spec = spec case telemetrytypes.SignalMetrics: var spec QueryBuilderQuery[MetricAggregation] - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid metric builder query spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil { + return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err) } q.Spec = spec default: - return errors.WrapInvalidInputf(nil, errors.CodeInvalidInput, "unknown builder signal %q", header.Signal) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown builder signal %q", + header.Signal, + ).WithAdditional( + "Valid signals are: traces, logs, metrics", + ) } case QueryTypeFormula: var spec QueryBuilderFormula - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid formula spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "formula spec"); err != nil { + return wrapUnmarshalError(err, "invalid formula spec: %v", err) } q.Spec = spec case QueryTypeJoin: var spec QueryBuilderJoin - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid join spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "join spec"); err != nil { + return wrapUnmarshalError(err, "invalid join spec: %v", err) } q.Spec = spec @@ -83,20 +93,26 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error { case QueryTypePromQL: var spec PromQuery - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid PromQL spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "PromQL spec"); err != nil { + return wrapUnmarshalError(err, "invalid PromQL spec: %v", err) } q.Spec = spec case QueryTypeClickHouseSQL: var spec ClickHouseQuery - if err := json.Unmarshal(shadow.Spec, &spec); err != nil { - return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid ClickHouse SQL spec") + if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "ClickHouse SQL spec"); err != nil { + return wrapUnmarshalError(err, "invalid ClickHouse SQL spec: %v", err) } q.Spec = spec default: - return errors.WrapInvalidInputf(nil, errors.CodeInvalidInput, "unknown query type %q", shadow.Type) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown query type %q", + shadow.Type, + ).WithAdditional( + "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql", + ) } return nil @@ -107,6 +123,59 @@ type CompositeQuery struct { Queries []QueryEnvelope `json:"queries"` } +// UnmarshalJSON implements custom JSON unmarshaling to provide better error messages +func (c *CompositeQuery) UnmarshalJSON(data []byte) error { + type Alias CompositeQuery + + // First do a normal unmarshal without DisallowUnknownFields + var temp Alias + if err := json.Unmarshal(data, &temp); err != nil { + return err + } + + // Then check for unknown fields at this level only + var check map[string]json.RawMessage + if err := json.Unmarshal(data, &check); err != nil { + return err + } + + // Check for unknown fields at this level + validFields := map[string]bool{ + "queries": true, + } + + for field := range check { + if !validFields[field] { + // Find closest match + var fieldNames []string + for f := range validFields { + fieldNames = append(fieldNames, f) + } + + if suggestion, found := telemetrytypes.SuggestCorrection(field, fieldNames); found { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown field %q in composite query", + field, + ).WithAdditional( + suggestion, + ) + } + + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown field %q in composite query", + field, + ).WithAdditional( + "Valid fields are: " + strings.Join(fieldNames, ", "), + ) + } + } + + *c = CompositeQuery(temp) + return nil +} + type QueryRangeRequest struct { // SchemaVersion is the version of the schema to use for the request payload. SchemaVersion string `json:"schemaVersion"` @@ -127,6 +196,69 @@ type QueryRangeRequest struct { FormatOptions *FormatOptions `json:"formatOptions,omitempty"` } +// UnmarshalJSON implements custom JSON unmarshaling to disallow unknown fields +func (r *QueryRangeRequest) UnmarshalJSON(data []byte) error { + // Define a type alias to avoid infinite recursion + type Alias QueryRangeRequest + + // First do a normal unmarshal without DisallowUnknownFields to let nested structures handle their own validation + var temp Alias + if err := json.Unmarshal(data, &temp); err != nil { + return err + } + + // Then check for unknown fields at this level only + var check map[string]json.RawMessage + if err := json.Unmarshal(data, &check); err != nil { + return err + } + + // Check for unknown fields at the top level + validFields := map[string]bool{ + "schemaVersion": true, + "start": true, + "end": true, + "requestType": true, + "compositeQuery": true, + "variables": true, + "noCache": true, + "formatOptions": true, + } + + for field := range check { + if !validFields[field] { + // Find closest match + var fieldNames []string + for f := range validFields { + fieldNames = append(fieldNames, f) + } + + if suggestion, found := telemetrytypes.SuggestCorrection(field, fieldNames); found { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown field %q", + field, + ).WithAdditional( + suggestion, + ) + } + + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown field %q", + field, + ).WithAdditional( + "Valid fields are: " + strings.Join(fieldNames, ", "), + ) + } + } + + // Copy the decoded values back to the original struct + *r = QueryRangeRequest(temp) + + return nil +} + type FormatOptions struct { FillGaps bool `json:"fillGaps,omitempty"` FormatTableResultForUI bool `json:"formatTableResultForUI,omitempty"` diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req_error_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/req_error_test.go new file mode 100644 index 000000000000..d11d6fa7d5e7 --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req_error_test.go @@ -0,0 +1,150 @@ +package querybuildertypesv5 + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueryRangeRequest_UnmarshalJSON_ErrorMessages(t *testing.T) { + tests := []struct { + name string + jsonData string + wantErrMsg string + wantAdditionalHints []string + }{ + { + name: "unknown field 'function' in query spec", + jsonData: `{ + "schemaVersion": "v1", + "start": 1749290340000, + "end": 1749293940000, + "requestType": "scalar", + "compositeQuery": { + "queries": [{ + "type": "builder_query", + "spec": { + "name": "A", + "signal": "logs", + "aggregations": [{ + "expression": "count()", + "alias": "spans_count" + }], + "function": [{ + "name": "absolute", + "args": [] + }] + } + }] + } + }`, + wantErrMsg: `unknown field "function" in query spec`, + wantAdditionalHints: []string{ + "did you mean: 'functions'?", + }, + }, + { + name: "unknown field 'filters' in query spec", + jsonData: `{ + "schemaVersion": "v1", + "start": 1749290340000, + "end": 1749293940000, + "requestType": "scalar", + "compositeQuery": { + "queries": [{ + "type": "builder_query", + "spec": { + "name": "A", + "signal": "metrics", + "aggregations": [{ + "metricName": "test" + }], + "filters": { + "expression": "test = 1" + } + } + }] + } + }`, + wantErrMsg: `unknown field "filters" in query spec`, + wantAdditionalHints: []string{ + "did you mean: 'filter'?", + }, + }, + { + name: "unknown field at top level", + jsonData: `{ + "schemaVersion": "v1", + "start": 1749290340000, + "end": 1749293940000, + "requestType": "scalar", + "compositeQueries": { + "queries": [] + } + }`, + wantErrMsg: `unknown field "compositeQueries"`, + wantAdditionalHints: []string{ + "did you mean: 'compositeQuery'?", + }, + }, + { + name: "unknown field with no good suggestion", + jsonData: `{ + "schemaVersion": "v1", + "start": 1749290340000, + "end": 1749293940000, + "requestType": "scalar", + "compositeQuery": { + "queries": [{ + "type": "builder_query", + "spec": { + "name": "A", + "signal": "metrics", + "aggregations": [{ + "metricName": "test" + }], + "randomField": "value" + } + }] + } + }`, + wantErrMsg: `unknown field "randomField" in query spec`, + wantAdditionalHints: []string{ + "Valid fields are:", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var req QueryRangeRequest + err := json.Unmarshal([]byte(tt.jsonData), &req) + + require.Error(t, err) + + // Check main error message + assert.Contains(t, err.Error(), tt.wantErrMsg) + + // Check if it's an error from our package using Unwrapb + _, _, _, _, _, additionals := errors.Unwrapb(err) + + // Check additional hints if we have any + if len(additionals) > 0 { + for _, hint := range tt.wantAdditionalHints { + found := false + for _, additional := range additionals { + if strings.Contains(additional, hint) { + found = true + break + } + } + assert.True(t, found, "Expected to find hint '%s' in additionals: %v", hint, additionals) + } + } + }) + } +} diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go index d73b86b495d5..6b44cbed0ac6 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/req_test.go @@ -773,7 +773,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { { "type": "builder_formula", "spec": { - "name": "rate", + "name": "B", "expression": "A * 100" } } @@ -799,7 +799,7 @@ func TestQueryRangeRequest_UnmarshalJSON(t *testing.T) { { Type: QueryTypeFormula, Spec: QueryBuilderFormula{ - Name: "rate", + Name: "B", Expression: "A * 100", }, }, diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/resp.go b/pkg/types/querybuildertypes/querybuildertypesv5/resp.go index 0ed0a0109169..544ccebddb3e 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/resp.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/resp.go @@ -129,8 +129,9 @@ type ColumnDescriptor struct { } type ScalarData struct { - Columns []*ColumnDescriptor `json:"columns"` - Data [][]any `json:"data"` + QueryName string `json:"queryName"` + Columns []*ColumnDescriptor `json:"columns"` + Data [][]any `json:"data"` } type RawData struct { diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/validation.go b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go new file mode 100644 index 000000000000..c45498d5955d --- /dev/null +++ b/pkg/types/querybuildertypes/querybuildertypesv5/validation.go @@ -0,0 +1,700 @@ +package querybuildertypesv5 + +import ( + "fmt" + "slices" + "strings" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +// getQueryIdentifier returns a friendly identifier for a query based on its type and name/content +func getQueryIdentifier(envelope QueryEnvelope, index int) string { + switch envelope.Type { + case QueryTypeBuilder, QueryTypeSubQuery: + switch spec := envelope.Spec.(type) { + case QueryBuilderQuery[TraceAggregation]: + if spec.Name != "" { + return fmt.Sprintf("query '%s'", spec.Name) + } + return fmt.Sprintf("trace query at position %d", index+1) + case QueryBuilderQuery[LogAggregation]: + if spec.Name != "" { + return fmt.Sprintf("query '%s'", spec.Name) + } + return fmt.Sprintf("log query at position %d", index+1) + case QueryBuilderQuery[MetricAggregation]: + if spec.Name != "" { + return fmt.Sprintf("query '%s'", spec.Name) + } + return fmt.Sprintf("metric query at position %d", index+1) + } + case QueryTypeFormula: + if spec, ok := envelope.Spec.(QueryBuilderFormula); ok && spec.Name != "" { + return fmt.Sprintf("formula '%s'", spec.Name) + } + return fmt.Sprintf("formula at position %d", index+1) + case QueryTypeJoin: + if spec, ok := envelope.Spec.(QueryBuilderJoin); ok && spec.Name != "" { + return fmt.Sprintf("join '%s'", spec.Name) + } + return fmt.Sprintf("join at position %d", index+1) + case QueryTypePromQL: + if spec, ok := envelope.Spec.(PromQuery); ok && spec.Name != "" { + return fmt.Sprintf("PromQL query '%s'", spec.Name) + } + return fmt.Sprintf("PromQL query at position %d", index+1) + case QueryTypeClickHouseSQL: + if spec, ok := envelope.Spec.(ClickHouseQuery); ok && spec.Name != "" { + return fmt.Sprintf("ClickHouse query '%s'", spec.Name) + } + return fmt.Sprintf("ClickHouse query at position %d", index+1) + } + return fmt.Sprintf("query at position %d", index+1) +} + +const ( + // Maximum limit for query results + MaxQueryLimit = 10000 +) + +// ValidateFunctionName checks if the function name is valid +func ValidateFunctionName(name FunctionName) error { + validFunctions := []FunctionName{ + FunctionNameCutOffMin, + FunctionNameCutOffMax, + FunctionNameClampMin, + FunctionNameClampMax, + FunctionNameAbsolute, + FunctionNameRunningDiff, + FunctionNameLog2, + FunctionNameLog10, + FunctionNameCumulativeSum, + FunctionNameEWMA3, + FunctionNameEWMA5, + FunctionNameEWMA7, + FunctionNameMedian3, + FunctionNameMedian5, + FunctionNameMedian7, + FunctionNameTimeShift, + FunctionNameAnomaly, + } + + if slices.Contains(validFunctions, name) { + return nil + } + + // Format valid functions as comma-separated string + var validFunctionNames []string + for _, fn := range validFunctions { + validFunctionNames = append(validFunctionNames, fn.StringValue()) + } + + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid function name: %s", + name.StringValue(), + ).WithAdditional(fmt.Sprintf("valid functions are: %s", strings.Join(validFunctionNames, ", "))) +} + +// Validate performs preliminary validation on QueryBuilderQuery +func (q *QueryBuilderQuery[T]) Validate(requestType RequestType) error { + // Validate signal + if err := q.validateSignal(); err != nil { + return err + } + + // Validate aggregations only for non-raw request types + if requestType != RequestTypeRaw { + if err := q.validateAggregations(); err != nil { + return err + } + } + + // Validate limit and pagination + if err := q.validateLimitAndPagination(); err != nil { + return err + } + + // Validate functions + if err := q.validateFunctions(); err != nil { + return err + } + + // Validate secondary aggregations + if err := q.validateSecondaryAggregations(); err != nil { + return err + } + + // Validate order by + if err := q.validateOrderBy(); err != nil { + return err + } + + return nil +} + +func (q *QueryBuilderQuery[T]) validateSignal() error { + // Signal validation is handled during unmarshaling in req.go + // Valid signals are: metrics, traces, logs + switch q.Signal { + case telemetrytypes.SignalMetrics, + telemetrytypes.SignalTraces, + telemetrytypes.SignalLogs, + telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility + return nil + default: + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid signal type: %s", + q.Signal, + ).WithAdditional( + "Valid signals are: metrics, traces, logs", + ) + } +} + +func (q *QueryBuilderQuery[T]) validateAggregations() error { + // At least one aggregation required for non-disabled queries + if len(q.Aggregations) == 0 && !q.Disabled { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "at least one aggregation is required", + ) + // TODO: add url with docs + } + + // Check for duplicate aliases + aliases := make(map[string]bool) + for i, agg := range q.Aggregations { + // Type-specific validation based on T + switch v := any(agg).(type) { + case MetricAggregation: + if v.MetricName == "" { + aggId := fmt.Sprintf("aggregation #%d", i+1) + if q.Name != "" { + aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name) + } + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "metric name is required for %s", + aggId, + ) + } + // Validate metric-specific aggregations + if err := validateMetricAggregation(v); err != nil { + aggId := fmt.Sprintf("aggregation #%d", i+1) + if q.Name != "" { + aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name) + } + return wrapValidationError(err, aggId, "invalid metric %s: %s") + } + case TraceAggregation: + if v.Expression == "" { + aggId := fmt.Sprintf("aggregation #%d", i+1) + if q.Name != "" { + aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name) + } + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "expression is required for trace %s", + aggId, + ) + } + if v.Alias != "" { + if aliases[v.Alias] { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "duplicate aggregation alias: %s", + v.Alias, + ) + } + aliases[v.Alias] = true + } + case LogAggregation: + if v.Expression == "" { + aggId := fmt.Sprintf("aggregation #%d", i+1) + if q.Name != "" { + aggId = fmt.Sprintf("aggregation #%d in query '%s'", i+1, q.Name) + } + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "expression is required for log %s", + aggId, + ) + } + if v.Alias != "" { + if aliases[v.Alias] { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "duplicate aggregation alias: %s", + v.Alias, + ) + } + aliases[v.Alias] = true + } + } + } + + return nil +} + +func (q *QueryBuilderQuery[T]) validateLimitAndPagination() error { + // Validate limit + if q.Limit < 0 { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "limit must be non-negative, got %d", + q.Limit, + ) + } + + if q.Limit > MaxQueryLimit { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "limit exceeds maximum allowed value of %d", + MaxQueryLimit, + ).WithAdditional( + fmt.Sprintf("Provided limit: %d", q.Limit), + ) + } + + // Validate offset + if q.Offset < 0 { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "offset must be non-negative, got %d", + q.Offset, + ) + } + + return nil +} + +func (q *QueryBuilderQuery[T]) validateFunctions() error { + for i, fn := range q.Functions { + if err := ValidateFunctionName(fn.Name); err != nil { + fnId := fmt.Sprintf("function #%d", i+1) + if q.Name != "" { + fnId = fmt.Sprintf("function #%d in query '%s'", i+1, q.Name) + } + return wrapValidationError(err, fnId, "invalid %s: %s") + } + } + return nil +} + +func (q *QueryBuilderQuery[T]) validateSecondaryAggregations() error { + for i, secAgg := range q.SecondaryAggregations { + // Secondary aggregation expression can be empty - we allow it per requirements + // Just validate structure + if secAgg.Limit < 0 { + secAggId := fmt.Sprintf("secondary aggregation #%d", i+1) + if q.Name != "" { + secAggId = fmt.Sprintf("secondary aggregation #%d in query '%s'", i+1, q.Name) + } + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "%s: limit must be non-negative", + secAggId, + ) + } + } + return nil +} + +func (q *QueryBuilderQuery[T]) validateOrderBy() error { + for i, order := range q.Order { + // Direction validation is handled by the OrderDirection type + if order.Direction != OrderDirectionAsc && order.Direction != OrderDirectionDesc { + orderId := fmt.Sprintf("order by clause #%d", i+1) + if q.Name != "" { + orderId = fmt.Sprintf("order by clause #%d in query '%s'", i+1, q.Name) + } + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid direction for %s: %s", + orderId, + order.Direction.StringValue(), + ).WithAdditional( + "Valid directions are: asc, desc", + ) + } + } + return nil +} + +// ValidateQueryRangeRequest validates the entire query range request +func (r *QueryRangeRequest) Validate() error { + // Validate time range + if r.Start >= r.End { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "start time must be before end time", + ) + } + + // Validate request type + switch r.RequestType { + case RequestTypeRaw, RequestTypeTimeSeries, RequestTypeScalar: + // Valid request types + default: + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid request type: %s", + r.RequestType, + ).WithAdditional( + "Valid request types are: raw, timeseries, scalar", + ) + } + + // Validate composite query + if err := r.validateCompositeQuery(); err != nil { + return err + } + + return nil +} + +func (r *QueryRangeRequest) validateCompositeQuery() error { + // Validate queries in composite query + if len(r.CompositeQuery.Queries) == 0 { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "at least one query is required", + ) + } + + // Track query names for uniqueness (only for non-formula queries) + queryNames := make(map[string]bool) + + // Validate each query based on its type + for i, envelope := range r.CompositeQuery.Queries { + switch envelope.Type { + case QueryTypeBuilder, QueryTypeSubQuery: + // Validate based on the concrete type + switch spec := envelope.Spec.(type) { + case QueryBuilderQuery[TraceAggregation]: + if err := spec.Validate(r.RequestType); err != nil { + queryId := getQueryIdentifier(envelope, i) + return wrapValidationError(err, queryId, "invalid %s: %s") + } + // Check name uniqueness for non-formula context + if spec.Name != "" { + if queryNames[spec.Name] { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "duplicate query name '%s'", + spec.Name, + ) + } + queryNames[spec.Name] = true + } + case QueryBuilderQuery[LogAggregation]: + if err := spec.Validate(r.RequestType); err != nil { + queryId := getQueryIdentifier(envelope, i) + return wrapValidationError(err, queryId, "invalid %s: %s") + } + // Check name uniqueness for non-formula context + if spec.Name != "" { + if queryNames[spec.Name] { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "duplicate query name '%s'", + spec.Name, + ) + } + queryNames[spec.Name] = true + } + case QueryBuilderQuery[MetricAggregation]: + if err := spec.Validate(r.RequestType); err != nil { + queryId := getQueryIdentifier(envelope, i) + return wrapValidationError(err, queryId, "invalid %s: %s") + } + // Check name uniqueness for non-formula context + if spec.Name != "" { + if queryNames[spec.Name] { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "duplicate query name '%s'", + spec.Name, + ) + } + queryNames[spec.Name] = true + } + default: + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown spec type for %s", + queryId, + ) + } + case QueryTypeFormula: + // Formula validation is handled separately + spec, ok := envelope.Spec.(QueryBuilderFormula) + if !ok { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid spec for %s", + queryId, + ) + } + if spec.Expression == "" { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "expression is required for %s", + queryId, + ) + } + case QueryTypeJoin: + // Join validation is handled separately + _, ok := envelope.Spec.(QueryBuilderJoin) + if !ok { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid spec for %s", + queryId, + ) + } + case QueryTypePromQL: + // PromQL validation is handled separately + spec, ok := envelope.Spec.(PromQuery) + if !ok { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid spec for %s", + queryId, + ) + } + if spec.Query == "" { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "query expression is required for %s", + queryId, + ) + } + case QueryTypeClickHouseSQL: + // ClickHouse SQL validation is handled separately + spec, ok := envelope.Spec.(ClickHouseQuery) + if !ok { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid spec for %s", + queryId, + ) + } + if spec.Query == "" { + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "query expression is required for %s", + queryId, + ) + } + default: + queryId := getQueryIdentifier(envelope, i) + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown query type '%s' for %s", + envelope.Type, + queryId, + ).WithAdditional( + "Valid query types are: builder_query, builder_formula, builder_join, promql, clickhouse_sql", + ) + } + } + + return nil +} + +// Validate performs validation on CompositeQuery +func (c *CompositeQuery) Validate(requestType RequestType) error { + if len(c.Queries) == 0 { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "at least one query is required", + ) + } + + // Validate each query + for i, envelope := range c.Queries { + if err := validateQueryEnvelope(envelope, requestType); err != nil { + queryId := getQueryIdentifier(envelope, i) + return wrapValidationError(err, queryId, "invalid %s: %s") + } + } + + return nil +} + +func validateQueryEnvelope(envelope QueryEnvelope, requestType RequestType) error { + switch envelope.Type { + case QueryTypeBuilder, QueryTypeSubQuery: + switch spec := envelope.Spec.(type) { + case QueryBuilderQuery[TraceAggregation]: + return spec.Validate(requestType) + case QueryBuilderQuery[LogAggregation]: + return spec.Validate(requestType) + case QueryBuilderQuery[MetricAggregation]: + return spec.Validate(requestType) + default: + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown query spec type", + ) + } + case QueryTypeFormula: + spec, ok := envelope.Spec.(QueryBuilderFormula) + if !ok { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid formula spec", + ) + } + if spec.Expression == "" { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "formula expression is required", + ) + } + return nil + case QueryTypeJoin: + _, ok := envelope.Spec.(QueryBuilderJoin) + if !ok { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid join spec", + ) + } + return nil + case QueryTypePromQL: + spec, ok := envelope.Spec.(PromQuery) + if !ok { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid PromQL spec", + ) + } + if spec.Query == "" { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "PromQL query is required", + ) + } + return nil + case QueryTypeClickHouseSQL: + spec, ok := envelope.Spec.(ClickHouseQuery) + if !ok { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid ClickHouse SQL spec", + ) + } + if spec.Query == "" { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "ClickHouse SQL query is required", + ) + } + return nil + default: + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "unknown query type: %s", + envelope.Type, + ).WithAdditional( + "Valid query types are: builder_query, builder_sub_query, builder_formula, builder_join, promql, clickhouse_sql", + ) + } +} + +// validateMetricAggregation validates metric-specific aggregation parameters +func validateMetricAggregation(agg MetricAggregation) error { + // we can't decide anything here without known temporality + if agg.Temporality == metrictypes.Unknown { + return nil + } + + // Validate that rate/increase are only used with appropriate temporalities + if agg.TimeAggregation == metrictypes.TimeAggregationRate || agg.TimeAggregation == metrictypes.TimeAggregationIncrease { + // For gauge metrics (Unspecified temporality), rate/increase doesn't make sense + if agg.Temporality == metrictypes.Unspecified { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "rate/increase aggregation cannot be used with gauge metrics (unspecified temporality)", + ) + } + } + + // Validate percentile aggregations are only used with histogram types + if agg.SpaceAggregation.IsPercentile() { + if agg.Type != metrictypes.HistogramType && agg.Type != metrictypes.ExpHistogramType && agg.Type != metrictypes.SummaryType { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "percentile aggregation can only be used with histogram or summary metric types", + ) + } + } + + // Validate time aggregation values + validTimeAggregations := []metrictypes.TimeAggregation{ + metrictypes.TimeAggregationUnspecified, + metrictypes.TimeAggregationLatest, + metrictypes.TimeAggregationSum, + metrictypes.TimeAggregationAvg, + metrictypes.TimeAggregationMin, + metrictypes.TimeAggregationMax, + metrictypes.TimeAggregationCount, + metrictypes.TimeAggregationCountDistinct, + metrictypes.TimeAggregationRate, + metrictypes.TimeAggregationIncrease, + } + + validTimeAgg := slices.Contains(validTimeAggregations, agg.TimeAggregation) + if !validTimeAgg { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid time aggregation: %s", + agg.TimeAggregation.StringValue(), + ).WithAdditional( + "Valid time aggregations: latest, sum, avg, min, max, count, count_distinct, rate, increase", + ) + } + + // Validate space aggregation values + validSpaceAggregations := []metrictypes.SpaceAggregation{ + metrictypes.SpaceAggregationUnspecified, + metrictypes.SpaceAggregationSum, + metrictypes.SpaceAggregationAvg, + metrictypes.SpaceAggregationMin, + metrictypes.SpaceAggregationMax, + metrictypes.SpaceAggregationCount, + metrictypes.SpaceAggregationPercentile50, + metrictypes.SpaceAggregationPercentile75, + metrictypes.SpaceAggregationPercentile90, + metrictypes.SpaceAggregationPercentile95, + metrictypes.SpaceAggregationPercentile99, + } + + validSpaceAgg := slices.Contains(validSpaceAggregations, agg.SpaceAggregation) + if !validSpaceAgg { + return errors.NewInvalidInputf( + errors.CodeInvalidInput, + "invalid space aggregation: %s", + agg.SpaceAggregation.StringValue(), + ).WithAdditional( + "Valid space aggregations: sum, avg, min, max, count, p50, p75, p90, p95, p99", + ) + } + + return nil +} diff --git a/pkg/types/telemetrytypes/maybe_typo.go b/pkg/types/telemetrytypes/maybe_typo.go index f884d0e8a9de..cf46889ad5af 100644 --- a/pkg/types/telemetrytypes/maybe_typo.go +++ b/pkg/types/telemetrytypes/maybe_typo.go @@ -104,7 +104,7 @@ func SuggestCorrection(input string, knownFieldKeys []string) (string, bool) { } if bestSimilarity >= typoSuggestionThreshold { - return fmt.Sprintf("did you mean: %s?", bestMatch), true + return fmt.Sprintf("did you mean: '%s'?", bestMatch), true } return "", false diff --git a/pkg/types/telemetrytypes/store.go b/pkg/types/telemetrytypes/store.go index 6aeed6b811e1..4a13c14f90e7 100644 --- a/pkg/types/telemetrytypes/store.go +++ b/pkg/types/telemetrytypes/store.go @@ -2,6 +2,8 @@ package telemetrytypes import ( "context" + + "github.com/SigNoz/signoz/pkg/types/metrictypes" ) // MetadataStore is the interface for the telemetry metadata store. @@ -22,4 +24,10 @@ type MetadataStore interface { // GetAllValues returns a list of all values. GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, error) + + // FetchTemporality fetches the temporality for metric + FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) + + // FetchTemporalityMulti fetches the temporality for multiple metrics + FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) } diff --git a/pkg/types/telemetrytypes/telemetrytypestest/metadata_store.go b/pkg/types/telemetrytypes/telemetrytypestest/metadata_store.go index 3be85eeba1a6..41e7065689c8 100644 --- a/pkg/types/telemetrytypes/telemetrytypestest/metadata_store.go +++ b/pkg/types/telemetrytypes/telemetrytypestest/metadata_store.go @@ -4,6 +4,7 @@ import ( "context" "strings" + "github.com/SigNoz/signoz/pkg/types/metrictypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" ) @@ -13,6 +14,7 @@ type MockMetadataStore struct { KeysMap map[string][]*telemetrytypes.TelemetryFieldKey RelatedValuesMap map[string][]string AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues + TemporalityMap map[string]metrictypes.Temporality } // NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps @@ -21,6 +23,7 @@ func NewMockMetadataStore() *MockMetadataStore { KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey), RelatedValuesMap: make(map[string][]string), AllValuesMap: make(map[string]*telemetrytypes.TelemetryFieldValues), + TemporalityMap: make(map[string]metrictypes.Temporality), } } @@ -249,3 +252,31 @@ func (m *MockMetadataStore) SetRelatedValues(lookupKey string, values []string) func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytypes.TelemetryFieldValues) { m.AllValuesMap[lookupKey] = values } + +// FetchTemporality fetches the temporality for a metric +func (m *MockMetadataStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) { + if temporality, exists := m.TemporalityMap[metricName]; exists { + return temporality, nil + } + return metrictypes.Unknown, nil +} + +// FetchTemporalityMulti fetches the temporality for multiple metrics +func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { + result := make(map[string]metrictypes.Temporality) + + for _, metricName := range metricNames { + if temporality, exists := m.TemporalityMap[metricName]; exists { + result[metricName] = temporality + } else { + result[metricName] = metrictypes.Unknown + } + } + + return result, nil +} + +// SetTemporality sets the temporality for a metric in the mock store +func (m *MockMetadataStore) SetTemporality(metricName string, temporality metrictypes.Temporality) { + m.TemporalityMap[metricName] = temporality +}