signoz/pkg/querier/bucket_cache_bench_test.go
Srikanth Chekuri 85f04e4bae
chore: add querier HTTP API endpoint and bucket cache implementation (#8178)
* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
2025-06-10 12:56:28 +00:00

446 lines
12 KiB
Go

package querier
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/require"
)
// BenchmarkBucketCache_GetMissRanges benchmarks the GetMissRanges operation
func BenchmarkBucketCache_GetMissRanges(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
// Pre-populate cache with some data
for i := 0; i < 10; i++ {
query := &mockQuery{
fingerprint: fmt.Sprintf("bench-query-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
}
// Create test queries with varying cache hit patterns
queries := []struct {
name string
query *mockQuery
}{
{
name: "full_cache_hit",
query: &mockQuery{
fingerprint: "bench-query-5",
startMs: 50000,
endMs: 60000,
},
},
{
name: "full_cache_miss",
query: &mockQuery{
fingerprint: "bench-query-new",
startMs: 100000,
endMs: 110000,
},
},
{
name: "partial_cache_hit",
query: &mockQuery{
fingerprint: "bench-query-5",
startMs: 45000,
endMs: 65000,
},
},
}
for _, tc := range queries {
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
cached, missing := bc.GetMissRanges(ctx, orgID, tc.query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
}
})
}
}
// BenchmarkBucketCache_Put benchmarks the Put operation
func BenchmarkBucketCache_Put(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
testCases := []struct {
name string
numSeries int
numValues int
numQueries int
}{
{"small_result_1_series_100_values", 1, 100, 1},
{"medium_result_10_series_100_values", 10, 100, 1},
{"large_result_100_series_100_values", 100, 100, 1},
{"huge_result_1000_series_100_values", 1000, 100, 1},
{"many_values_10_series_1000_values", 10, 1000, 1},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test data
queries := make([]*mockQuery, tc.numQueries)
results := make([]*qbtypes.Result, tc.numQueries)
for i := 0; i < tc.numQueries; i++ {
queries[i] = &mockQuery{
fingerprint: fmt.Sprintf("bench-put-query-%d", i),
startMs: uint64(i * 100000),
endMs: uint64((i + 1) * 100000),
}
results[i] = createBenchmarkResultWithSeries(
queries[i].startMs,
queries[i].endMs,
1000,
tc.numSeries,
tc.numValues,
)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for j := 0; j < tc.numQueries; j++ {
bc.Put(ctx, orgID, queries[j], results[j])
}
}
})
}
}
// BenchmarkBucketCache_MergeTimeSeriesValues benchmarks merging of time series data
func BenchmarkBucketCache_MergeTimeSeriesValues(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numBuckets int
numSeries int
numValues int
}{
{"small_2_buckets_10_series", 2, 10, 100},
{"medium_5_buckets_50_series", 5, 50, 100},
{"large_10_buckets_100_series", 10, 100, 100},
{"many_buckets_20_buckets_50_series", 20, 50, 100},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test buckets
buckets := make([]*cachedBucket, tc.numBuckets)
for i := 0; i < tc.numBuckets; i++ {
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
result := createBenchmarkResultWithSeries(startMs, endMs, 1000, tc.numSeries, tc.numValues)
valueBytes, _ := json.Marshal(result.Value)
buckets[i] = &cachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,
Value: valueBytes,
Stats: result.Stats,
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
result := bc.mergeTimeSeriesValues(context.Background(), buckets)
_ = result
}
})
}
}
// BenchmarkBucketCache_FindMissingRangesWithStep benchmarks finding missing ranges
func BenchmarkBucketCache_FindMissingRangesWithStep(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numBuckets int
gapPattern string // "none", "uniform", "random"
}{
{"no_gaps_10_buckets", 10, "none"},
{"uniform_gaps_10_buckets", 10, "uniform"},
{"random_gaps_20_buckets", 20, "random"},
{"many_buckets_100", 100, "uniform"},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create test buckets based on pattern
buckets := createBucketsWithPattern(tc.numBuckets, tc.gapPattern)
startMs := uint64(0)
endMs := uint64(tc.numBuckets * 20000)
stepMs := uint64(1000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
missing := bc.findMissingRangesWithStep(buckets, startMs, endMs, stepMs)
_ = missing
}
})
}
}
// BenchmarkGetUniqueSeriesKey benchmarks the series key generation
func BenchmarkGetUniqueSeriesKey(b *testing.B) {
testCases := []struct {
name string
numLabels int
}{
{"1_label", 1},
{"5_labels", 5},
{"10_labels", 10},
{"20_labels", 20},
{"50_labels", 50},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
labels := make([]*qbtypes.Label, tc.numLabels)
for i := 0; i < tc.numLabels; i++ {
labels[i] = &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{
Name: fmt.Sprintf("label_%d", i),
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("value_%d", i),
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
key := qbtypes.GetUniqueSeriesKey(labels)
_ = key
}
})
}
}
// BenchmarkBucketCache_ConcurrentOperations benchmarks concurrent cache operations
func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
bc := createBenchmarkBucketCache(b)
ctx := context.Background()
orgID := valuer.UUID{}
// Pre-populate cache
for i := 0; i < 100; i++ {
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
// Mix of operations
switch i % 3 {
case 0: // Read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),
startMs: uint64((i % 100) * 10000),
endMs: uint64(((i % 100) + 1) * 10000),
}
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
case 1: // Write
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-new-%d", i),
startMs: uint64(i * 10000),
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
case 2: // Partial read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),
startMs: uint64((i%100)*10000 - 5000),
endMs: uint64(((i%100)+1)*10000 + 5000),
}
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
_ = cached
_ = missing
}
i++
}
})
}
// BenchmarkBucketCache_FilterResultToTimeRange benchmarks filtering results to time range
func BenchmarkBucketCache_FilterResultToTimeRange(b *testing.B) {
bc := createBenchmarkBucketCache(b).(*bucketCache)
testCases := []struct {
name string
numSeries int
numValues int
}{
{"small_10_series_100_values", 10, 100},
{"medium_50_series_500_values", 50, 500},
{"large_100_series_1000_values", 100, 1000},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// Create a large result
result := createBenchmarkResultWithSeries(0, 100000, 1000, tc.numSeries, tc.numValues)
// Filter to middle 50%
startMs := uint64(25000)
endMs := uint64(75000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
filtered := bc.filterResultToTimeRange(result, startMs, endMs)
_ = filtered
}
})
}
}
// Helper function to create benchmark bucket cache
func createBenchmarkBucketCache(tb testing.TB) BucketCache {
config := cache.Config{
Provider: "memory",
Memory: cache.Memory{
TTL: time.Hour * 168,
CleanupInterval: 10 * time.Minute,
},
}
memCache, err := cachetest.New(config)
require.NoError(tb, err)
return NewBucketCache(instrumentationtest.New().ToProviderSettings(), memCache, time.Hour, 5*time.Minute)
}
// Helper function to create benchmark result
func createBenchmarkResult(startMs, endMs uint64, step uint64) *qbtypes.Result {
return createBenchmarkResultWithSeries(startMs, endMs, step, 10, 100)
}
// Helper function to create benchmark result with specific series and values
func createBenchmarkResultWithSeries(startMs, endMs uint64, _ uint64, numSeries, numValuesPerSeries int) *qbtypes.Result {
series := make([]*qbtypes.TimeSeries, numSeries)
for i := 0; i < numSeries; i++ {
ts := &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("server-%d", i),
},
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
FieldDataType: telemetrytypes.FieldDataTypeString,
},
Value: fmt.Sprintf("service-%d", i%5),
},
},
Values: make([]*qbtypes.TimeSeriesValue, 0, numValuesPerSeries),
}
// Generate values
valueStep := (endMs - startMs) / uint64(numValuesPerSeries)
if valueStep == 0 {
valueStep = 1
}
for j := 0; j < numValuesPerSeries; j++ {
timestamp := int64(startMs + uint64(j)*valueStep)
if timestamp < int64(endMs) {
ts.Values = append(ts.Values, &qbtypes.TimeSeriesValue{
Timestamp: timestamp,
Value: float64(i*100 + j),
})
}
}
series[i] = ts
}
return &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "benchmark_query",
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: series,
},
},
},
Stats: qbtypes.ExecStats{
RowsScanned: uint64(numSeries * numValuesPerSeries),
BytesScanned: uint64(numSeries * numValuesPerSeries * 100),
DurationMS: 10,
},
}
}
// Helper function to create buckets with specific gap patterns
func createBucketsWithPattern(numBuckets int, pattern string) []*cachedBucket {
buckets := make([]*cachedBucket, 0, numBuckets)
for i := 0; i < numBuckets; i++ {
// Skip some buckets based on pattern
if pattern == "uniform" && i%3 == 0 {
continue // Create gaps every 3rd bucket
}
if pattern == "random" && i%7 < 2 {
continue // Create random gaps
}
startMs := uint64(i * 10000)
endMs := uint64((i + 1) * 10000)
buckets = append(buckets, &cachedBucket{
StartMs: startMs,
EndMs: endMs,
Type: qbtypes.RequestTypeTimeSeries,
Value: json.RawMessage(`{}`),
Stats: qbtypes.ExecStats{},
})
}
return buckets
}