From 51c2bbcd4b54b6858c84f8ff92afde8ff24a6883 Mon Sep 17 00:00:00 2001 From: aniketio-ctrl Date: Fri, 18 Jul 2025 14:27:00 +0530 Subject: [PATCH] fix(dedup-prom): added check for duplicated samples data (#8502) * fix(dedup-prom): added check for duplicated samples data * fix(dedup-prom): added test cases for duplicated samples data * fix(dedup-prom): added test cases for duplicated samples data --- pkg/prometheus/clickhouseprometheus/client.go | 10 +- .../clickhouseprometheus/client_query_test.go | 113 ++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 pkg/prometheus/clickhouseprometheus/client_query_test.go diff --git a/pkg/prometheus/clickhouseprometheus/client.go b/pkg/prometheus/clickhouseprometheus/client.go index d033286b4cf9..092f304319b0 100644 --- a/pkg/prometheus/clickhouseprometheus/client.go +++ b/pkg/prometheus/clickhouseprometheus/client.go @@ -188,10 +188,12 @@ func (client *client) querySamples(ctx context.Context, start int64, end int64, var res []*prompb.TimeSeries var ts *prompb.TimeSeries var fingerprint, prevFingerprint uint64 - var timestampMs int64 + var timestampMs, prevTimestamp int64 var value float64 var flags uint32 + prevTimestamp = math.MinInt64 + for rows.Next() { if err := rows.Scan(&metricName, &fingerprint, ×tampMs, &value, &flags); err != nil { return nil, err @@ -209,12 +211,18 @@ func (client *client) querySamples(ctx context.Context, start int64, end int64, ts = &prompb.TimeSeries{ Labels: labels, } + prevTimestamp = math.MinInt64 } if flags&1 == 1 { value = math.Float64frombits(promValue.StaleNaN) } + if timestampMs == prevTimestamp { + continue + } + prevTimestamp = timestampMs + // add samples to current time series ts.Samples = append(ts.Samples, prompb.Sample{ Timestamp: timestampMs, diff --git a/pkg/prometheus/clickhouseprometheus/client_query_test.go b/pkg/prometheus/clickhouseprometheus/client_query_test.go new file mode 100644 index 000000000000..18f42cfe8aa4 --- /dev/null +++ b/pkg/prometheus/clickhouseprometheus/client_query_test.go @@ -0,0 +1,113 @@ +package clickhouseprometheus + +import ( + "context" + "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" + cmock "github.com/srikanthccv/ClickHouse-go-mock" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" +) + +// Test for querySamples method +func TestClient_QuerySamples(t *testing.T) { + ctx := context.Background() + cols := make([]cmock.ColumnType, 0) + cols = append(cols, cmock.ColumnType{Name: "metric_name", Type: "String"}) + cols = append(cols, cmock.ColumnType{Name: "fingerprint", Type: "UInt64"}) + cols = append(cols, cmock.ColumnType{Name: "unix_milli", Type: "Int64"}) + cols = append(cols, cmock.ColumnType{Name: "value", Type: "Float64"}) + cols = append(cols, cmock.ColumnType{Name: "flags", Type: "UInt32"}) + tests := []struct { + name string + start int64 + end int64 + fingerprints map[uint64][]prompb.Label + metricName string + subQuery string + args []any + setupMock func(mock cmock.ClickConnMockCommon, args ...any) + expectedTimeSeries int + expectError bool + description string + result []*prompb.TimeSeries + }{ + { + name: "successful samples retrieval", + start: int64(1000), + end: int64(2000), + fingerprints: map[uint64][]prompb.Label{ + 123: { + {Name: "__name__", Value: "cpu_usage"}, + {Name: "instance", Value: "localhost:9090"}, + }, + 456: { + {Name: "__name__", Value: "cpu_usage"}, + {Name: "instance", Value: "localhost:9091"}, + }, + }, + metricName: "cpu_usage", + subQuery: "SELECT metric_name, fingerprint, unix_milli, value, flags", + expectedTimeSeries: 2, + expectError: false, + description: "Should successfully retrieve samples for multiple time series", + setupMock: func(mock cmock.ClickConnMockCommon, args ...any) { + values := [][]interface{}{ + {"cpu_usage", uint64(123), int64(1001), float64(1.1), uint32(0)}, + {"cpu_usage", uint64(123), int64(1001), float64(1.1), uint32(0)}, + {"cpu_usage", uint64(456), int64(1001), float64(1.2), uint32(0)}, + {"cpu_usage", uint64(456), int64(1001), float64(1.2), uint32(0)}, + {"cpu_usage", uint64(456), int64(1001), float64(1.2), uint32(0)}, + } + mock.ExpectQuery("SELECT metric_name, fingerprint, unix_milli, value, flags").WithArgs(args...).WillReturnRows( + cmock.NewRows(cols, values), + ) + }, + result: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "cpu_usage"}, + {Name: "instance", Value: "localhost:9090"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1001, Value: 1.1}, + }, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "cpu_usage"}, + {Name: "instance", Value: "localhost:9091"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1001, Value: 1.2}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherRegexp) + readClient := client{telemetryStore: telemetryStore} + if tt.setupMock != nil { + tt.setupMock(telemetryStore.Mock(), tt.metricName, tt.start, tt.end) + + } + result, err := readClient.querySamples(ctx, tt.start, tt.end, tt.fingerprints, tt.metricName, tt.subQuery, tt.args) + + if tt.expectError { + assert.Error(t, err) + assert.Nil(t, result) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedTimeSeries, len(result)) + assert.Equal(t, result, tt.result) + } + + }) + } +}