mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
fix(prom-dup-labels): added fingerprint in prom labels (#8563)
* fix(prom-dup-labels): added fingerprint in prom labels * fix(prom-dup-labels): removed fingerprint labels from result series * fix(prom-dup-labels): removed fingerprint labels from result series * fix(prom-dup-labels): removed fingerprint labels from result series * fix(prom-dup-labels): removed fingerprint labels from result series * fix(prom-dup-labels): added test cases * Update pkg/prometheus/clickhouseprometheus/client_query_test.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix(prom-dup-labels): added test cases --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
This commit is contained in:
parent
fe95ee716a
commit
24d6d83575
@ -151,7 +151,7 @@ func (client *client) getFingerprintsFromClickhouseQuery(ctx context.Context, qu
|
||||
return nil, err
|
||||
}
|
||||
|
||||
labels, _, err := unmarshalLabels(labelString)
|
||||
labels, _, err := unmarshalLabels(labelString, fingerprint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
@ -111,3 +113,97 @@ func TestClient_QuerySamples(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_getFingerprintsFromClickhouseQuery(t *testing.T) {
|
||||
cols := []cmock.ColumnType{
|
||||
{Name: "fingerprint", Type: "UInt64"},
|
||||
{Name: "labels", Type: "String"},
|
||||
}
|
||||
|
||||
sortLabels := func(ls []prompb.Label) {
|
||||
sort.Slice(ls, func(i, j int) bool {
|
||||
if ls[i].Name == ls[j].Name {
|
||||
return ls[i].Value < ls[j].Value
|
||||
}
|
||||
return ls[i].Name < ls[j].Name
|
||||
})
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
start, end int64
|
||||
metricName string
|
||||
subQuery string
|
||||
args []any
|
||||
setupMock func(m cmock.ClickConnMockCommon, args ...any)
|
||||
want map[uint64][]prompb.Label
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "happy-path - two fingerprints",
|
||||
start: 1000,
|
||||
end: 2000,
|
||||
metricName: "cpu_usage",
|
||||
subQuery: `SELECT fingerprint,labels`,
|
||||
// args slice is empty here, but test‑case still owns it
|
||||
args: []any{},
|
||||
|
||||
setupMock: func(m cmock.ClickConnMockCommon, args ...any) {
|
||||
rows := [][]any{
|
||||
{uint64(123), `{"t1":"s1","t2":"s2"}`},
|
||||
{uint64(234), `{"t1":"s1","t2":"s2"}`},
|
||||
}
|
||||
m.ExpectQuery(`SELECT fingerprint,labels`).WithArgs(args...).WillReturnRows(
|
||||
cmock.NewRows(cols, rows),
|
||||
)
|
||||
},
|
||||
|
||||
want: map[uint64][]prompb.Label{
|
||||
123: {
|
||||
{Name: "fingerprint", Value: "123"},
|
||||
{Name: "t1", Value: "s1"},
|
||||
{Name: "t2", Value: "s2"},
|
||||
},
|
||||
234: {
|
||||
{Name: "fingerprint", Value: "234"},
|
||||
{Name: "t1", Value: "s1"},
|
||||
{Name: "t2", Value: "s2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := telemetrystoretest.New(
|
||||
telemetrystore.Config{Provider: "clickhouse"},
|
||||
sqlmock.QueryMatcherRegexp,
|
||||
)
|
||||
|
||||
if tc.setupMock != nil {
|
||||
tc.setupMock(store.Mock(), tc.args...)
|
||||
}
|
||||
|
||||
c := client{telemetryStore: store}
|
||||
|
||||
got, err := c.getFingerprintsFromClickhouseQuery(ctx, tc.subQuery, tc.args)
|
||||
if tc.wantErr {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, got)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(tc.want), len(got), "fingerprint map length mismatch")
|
||||
for fp, expLabels := range tc.want {
|
||||
gotLabels, ok := got[fp]
|
||||
require.Truef(t, ok, "missing fingerprint %d", fp)
|
||||
|
||||
sortLabels(expLabels)
|
||||
sortLabels(gotLabels)
|
||||
|
||||
assert.Equalf(t, expLabels, gotLabels, "labels mismatch for fingerprint %d", fp)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,12 +2,14 @@ package clickhouseprometheus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
// Unmarshals JSON into Prometheus labels. It does not preserve order.
|
||||
func unmarshalLabels(s string) ([]prompb.Label, string, error) {
|
||||
func unmarshalLabels(s string, fingerprint uint64) ([]prompb.Label, string, error) {
|
||||
var metricName string
|
||||
m := make(map[string]string)
|
||||
if err := json.Unmarshal([]byte(s), &m); err != nil {
|
||||
@ -24,5 +26,9 @@ func unmarshalLabels(s string) ([]prompb.Label, string, error) {
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
res = append(res, prompb.Label{
|
||||
Name: prometheus.FingerprintAsPromLabelName,
|
||||
Value: strconv.FormatUint(fingerprint, 10),
|
||||
})
|
||||
return res, metricName, nil
|
||||
}
|
||||
|
||||
3
pkg/prometheus/constants.go
Normal file
3
pkg/prometheus/constants.go
Normal file
@ -0,0 +1,3 @@
|
||||
package prometheus
|
||||
|
||||
const FingerprintAsPromLabelName = "fingerprint"
|
||||
106
pkg/prometheus/prometheustest/utils_test.go
Normal file
106
pkg/prometheus/prometheustest/utils_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
package prometheustest
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRemoveExtraLabels(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
res *promql.Result
|
||||
remove []string
|
||||
wantErr bool
|
||||
verify func(t *testing.T, result *promql.Result, removed []string)
|
||||
}{
|
||||
{
|
||||
name: "vector – label removed",
|
||||
res: &promql.Result{
|
||||
Value: promql.Vector{
|
||||
promql.Sample{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "http_requests_total",
|
||||
"job", "demo",
|
||||
"drop_me", "dropped",
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
remove: []string{"drop_me"},
|
||||
verify: func(t *testing.T, result *promql.Result, removed []string) {
|
||||
k := result.Value.(promql.Vector)
|
||||
for _, str := range removed {
|
||||
get := k[0].Metric.Get(str)
|
||||
if get != "" {
|
||||
t.Fatalf("label not removed")
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "scalar – nothing to strip",
|
||||
res: &promql.Result{
|
||||
Value: promql.Scalar{V: 99, T: 1},
|
||||
},
|
||||
remove: []string{"irrelevant"},
|
||||
verify: func(t *testing.T, result *promql.Result, removed []string) {
|
||||
sc := result.Value.(promql.Scalar)
|
||||
if sc.V != 99 || sc.T != 1 {
|
||||
t.Fatalf("scalar unexpectedly modified: got %+v", sc)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "matrix – label removed",
|
||||
res: &promql.Result{
|
||||
Value: promql.Matrix{
|
||||
promql.Series{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "http_requests_total",
|
||||
"pod", "p0",
|
||||
"drop_me", "dropped",
|
||||
),
|
||||
Floats: []promql.FPoint{{T: 0, F: 1}, {T: 1, F: 2}},
|
||||
},
|
||||
promql.Series{
|
||||
Metric: labels.FromStrings(
|
||||
"__name__", "http_requests_total",
|
||||
"pod", "p0",
|
||||
"drop_me", "dropped",
|
||||
),
|
||||
Floats: []promql.FPoint{{T: 0, F: 1}, {T: 1, F: 2}},
|
||||
},
|
||||
},
|
||||
},
|
||||
remove: []string{"drop_me"},
|
||||
verify: func(t *testing.T, result *promql.Result, removed []string) {
|
||||
mat := result.Value.(promql.Matrix)
|
||||
for _, str := range removed {
|
||||
for _, k := range mat {
|
||||
if k.Metric.Get(str) != "" {
|
||||
t.Fatalf("label not removed")
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := prometheus.RemoveExtraLabels(tc.res, tc.remove...)
|
||||
if tc.wantErr && err == nil {
|
||||
t.Fatalf("expected error, got nil")
|
||||
}
|
||||
if !tc.wantErr && err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if tc.verify != nil {
|
||||
tc.verify(t, tc.res, tc.remove)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
49
pkg/prometheus/utils.go
Normal file
49
pkg/prometheus/utils.go
Normal file
@ -0,0 +1,49 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
)
|
||||
|
||||
func RemoveExtraLabels(res *promql.Result, labelsToRemove ...string) error {
|
||||
if len(labelsToRemove) == 0 || res == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
toRemove := make(map[string]struct{}, len(labelsToRemove))
|
||||
for _, l := range labelsToRemove {
|
||||
toRemove[l] = struct{}{}
|
||||
}
|
||||
|
||||
switch res.Value.(type) {
|
||||
case promql.Vector:
|
||||
value := res.Value.(promql.Vector)
|
||||
for i := range value {
|
||||
series := &(value)[i]
|
||||
dst := series.Metric[:0]
|
||||
for _, lbl := range series.Metric {
|
||||
if _, drop := toRemove[lbl.Name]; !drop {
|
||||
dst = append(dst, lbl)
|
||||
}
|
||||
}
|
||||
series.Metric = dst
|
||||
}
|
||||
case promql.Matrix:
|
||||
value := res.Value.(promql.Matrix)
|
||||
for i := range value {
|
||||
series := &(value)[i]
|
||||
dst := series.Metric[:0]
|
||||
for _, lbl := range series.Metric {
|
||||
if _, drop := toRemove[lbl.Name]; !drop {
|
||||
dst = append(dst, lbl)
|
||||
}
|
||||
}
|
||||
series.Metric = dst
|
||||
}
|
||||
case promql.Scalar:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("rule result is not a vector or scalar or matrix")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -239,6 +239,10 @@ func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, que
|
||||
}
|
||||
|
||||
qry.Close()
|
||||
err = prometheus.RemoveExtraLabels(res, prometheus.FingerprintAsPromLabelName)
|
||||
if err != nil {
|
||||
return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
return res, &qs, nil
|
||||
|
||||
}
|
||||
@ -259,6 +263,10 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model
|
||||
}
|
||||
|
||||
qry.Close()
|
||||
err = prometheus.RemoveExtraLabels(res, prometheus.FingerprintAsPromLabelName)
|
||||
if err != nil {
|
||||
return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
}
|
||||
return res, &qs, nil
|
||||
}
|
||||
|
||||
|
||||
@ -321,6 +321,11 @@ func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time
|
||||
return nil, res.Err
|
||||
}
|
||||
|
||||
err = prometheus.RemoveExtraLabels(res, prometheus.FingerprintAsPromLabelName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch typ := res.Value.(type) {
|
||||
case promql.Vector:
|
||||
series := make([]promql.Series, 0, len(typ))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user