mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-22 18:06:35 +00:00
chore: some updates
This commit is contained in:
parent
6ca6d8633d
commit
890e811a07
@ -2,7 +2,6 @@ package anomaly
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
@ -49,29 +48,31 @@ func (p *BaseSeasonalProvider) getQueryParams(req *AnomaliesRequest) *anomalyQue
|
|||||||
return prepareAnomalyQueryParams(req.Params, req.Seasonality)
|
return prepareAnomalyQueryParams(req.Params, req.Seasonality)
|
||||||
}
|
}
|
||||||
|
|
||||||
func toTSResults(resp *qbtypes.QueryRangeResponse) []*qbtypes.TimeSeriesData {
|
func (p *BaseSeasonalProvider) toTSResults(ctx context.Context, resp *qbtypes.QueryRangeResponse) []*qbtypes.TimeSeriesData {
|
||||||
|
|
||||||
|
if resp == nil || resp.Data == nil {
|
||||||
|
p.logger.InfoContext(ctx, "nil response from query range")
|
||||||
|
}
|
||||||
|
|
||||||
data, ok := resp.Data.(struct {
|
data, ok := resp.Data.(struct {
|
||||||
Results []any `json:"results"`
|
Results []any `json:"results"`
|
||||||
Warnings []string `json:"warnings"`
|
Warnings []string `json:"warnings"`
|
||||||
})
|
})
|
||||||
if !ok {
|
if !ok {
|
||||||
fmt.Println("something wrong with data", data, resp.Data)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
tsDatum := []*qbtypes.TimeSeriesData{}
|
tsData := []*qbtypes.TimeSeriesData{}
|
||||||
for _, item := range data.Results {
|
for _, item := range data.Results {
|
||||||
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
|
if resultData, ok := item.(*qbtypes.TimeSeriesData); ok {
|
||||||
tsDatum = append(tsDatum, tsData)
|
tsData = append(tsData, resultData)
|
||||||
} else {
|
|
||||||
fmt.Println("something wrong", item)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsDatum
|
return tsData
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID, params *anomalyQueryParams) (*anomalyQueryResults, error) {
|
func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID, params *anomalyQueryParams) (*anomalyQueryResults, error) {
|
||||||
|
// TODO(srikanthccv): parallelize this?
|
||||||
p.logger.InfoContext(ctx, "fetching results for current period", "anomaly.current_period_query", params.CurrentPeriodQuery)
|
p.logger.InfoContext(ctx, "fetching results for current period", "anomaly.current_period_query", params.CurrentPeriodQuery)
|
||||||
currentPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentPeriodQuery)
|
currentPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentPeriodQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -109,18 +110,18 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &anomalyQueryResults{
|
return &anomalyQueryResults{
|
||||||
CurrentPeriodResults: toTSResults(currentPeriodResults),
|
CurrentPeriodResults: p.toTSResults(ctx, currentPeriodResults),
|
||||||
PastPeriodResults: toTSResults(pastPeriodResults),
|
PastPeriodResults: p.toTSResults(ctx, pastPeriodResults),
|
||||||
CurrentSeasonResults: toTSResults(currentSeasonResults),
|
CurrentSeasonResults: p.toTSResults(ctx, currentSeasonResults),
|
||||||
PastSeasonResults: toTSResults(pastSeasonResults),
|
PastSeasonResults: p.toTSResults(ctx, pastSeasonResults),
|
||||||
Past2SeasonResults: toTSResults(past2SeasonResults),
|
Past2SeasonResults: p.toTSResults(ctx, past2SeasonResults),
|
||||||
Past3SeasonResults: toTSResults(past3SeasonResults),
|
Past3SeasonResults: p.toTSResults(ctx, past3SeasonResults),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getMatchingSeries gets the matching series from the query result
|
// getMatchingSeries gets the matching series from the query result
|
||||||
// for the given series
|
// for the given series
|
||||||
func (p *BaseSeasonalProvider) getMatchingSeries(queryResult *qbtypes.TimeSeriesData, series *qbtypes.TimeSeries) *qbtypes.TimeSeries {
|
func (p *BaseSeasonalProvider) getMatchingSeries(_ context.Context, queryResult *qbtypes.TimeSeriesData, series *qbtypes.TimeSeries) *qbtypes.TimeSeries {
|
||||||
if queryResult == nil || len(queryResult.Aggregations) == 0 || len(queryResult.Aggregations[0].Series) == 0 {
|
if queryResult == nil || len(queryResult.Aggregations) == 0 || len(queryResult.Aggregations[0].Series) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -321,37 +322,37 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
currentPeriodResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
currentPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.CurrentPeriodResults {
|
for _, result := range anomalyQueryResults.CurrentPeriodResults {
|
||||||
currentPeriodResultsMap[result.QueryName] = result
|
currentPeriodResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
pastPeriodResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
pastPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.PastPeriodResults {
|
for _, result := range anomalyQueryResults.PastPeriodResults {
|
||||||
pastPeriodResultsMap[result.QueryName] = result
|
pastPeriodResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
currentSeasonResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
currentSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.CurrentSeasonResults {
|
for _, result := range anomalyQueryResults.CurrentSeasonResults {
|
||||||
currentSeasonResultsMap[result.QueryName] = result
|
currentSeasonResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
pastSeasonResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
pastSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.PastSeasonResults {
|
for _, result := range anomalyQueryResults.PastSeasonResults {
|
||||||
pastSeasonResultsMap[result.QueryName] = result
|
pastSeasonResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
past2SeasonResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
past2SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.Past2SeasonResults {
|
for _, result := range anomalyQueryResults.Past2SeasonResults {
|
||||||
past2SeasonResultsMap[result.QueryName] = result
|
past2SeasonResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
past3SeasonResultsMap := make(map[string]*qbtypes.TimeSeriesData)
|
past3SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||||
for _, result := range anomalyQueryResults.Past3SeasonResults {
|
for _, result := range anomalyQueryResults.Past3SeasonResults {
|
||||||
past3SeasonResultsMap[result.QueryName] = result
|
past3SeasonResults[result.QueryName] = result
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, result := range currentPeriodResultsMap {
|
for _, result := range currentPeriodResults {
|
||||||
funcs := req.Params.FuncsForQuery(result.QueryName)
|
funcs := req.Params.FuncsForQuery(result.QueryName)
|
||||||
|
|
||||||
var zScoreThreshold float64
|
var zScoreThreshold float64
|
||||||
@ -373,23 +374,23 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pastPeriodResult, ok := pastPeriodResultsMap[result.QueryName]
|
pastPeriodResult, ok := pastPeriodResults[result.QueryName]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
currentSeasonResult, ok := currentSeasonResultsMap[result.QueryName]
|
currentSeasonResult, ok := currentSeasonResults[result.QueryName]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pastSeasonResult, ok := pastSeasonResultsMap[result.QueryName]
|
pastSeasonResult, ok := pastSeasonResults[result.QueryName]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
past2SeasonResult, ok := past2SeasonResultsMap[result.QueryName]
|
past2SeasonResult, ok := past2SeasonResults[result.QueryName]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
past3SeasonResult, ok := past3SeasonResultsMap[result.QueryName]
|
past3SeasonResult, ok := past3SeasonResults[result.QueryName]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -400,11 +401,11 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
|
|||||||
stdDev := p.getStdDev(series)
|
stdDev := p.getStdDev(series)
|
||||||
p.logger.InfoContext(ctx, "calculated standard deviation for series", "anomaly.std_dev", stdDev, "anomaly.labels", series.Labels)
|
p.logger.InfoContext(ctx, "calculated standard deviation for series", "anomaly.std_dev", stdDev, "anomaly.labels", series.Labels)
|
||||||
|
|
||||||
pastPeriodSeries := p.getMatchingSeries(pastPeriodResult, series)
|
pastPeriodSeries := p.getMatchingSeries(ctx, pastPeriodResult, series)
|
||||||
currentSeasonSeries := p.getMatchingSeries(currentSeasonResult, series)
|
currentSeasonSeries := p.getMatchingSeries(ctx, currentSeasonResult, series)
|
||||||
pastSeasonSeries := p.getMatchingSeries(pastSeasonResult, series)
|
pastSeasonSeries := p.getMatchingSeries(ctx, pastSeasonResult, series)
|
||||||
past2SeasonSeries := p.getMatchingSeries(past2SeasonResult, series)
|
past2SeasonSeries := p.getMatchingSeries(ctx, past2SeasonResult, series)
|
||||||
past3SeasonSeries := p.getMatchingSeries(past3SeasonResult, series)
|
past3SeasonSeries := p.getMatchingSeries(ctx, past3SeasonResult, series)
|
||||||
|
|
||||||
prevSeriesAvg := p.getAvg(pastPeriodSeries)
|
prevSeriesAvg := p.getAvg(pastPeriodSeries)
|
||||||
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
|
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
|
||||||
@ -451,8 +452,8 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
results := make([]*qbtypes.TimeSeriesData, 0, len(currentPeriodResultsMap))
|
results := make([]*qbtypes.TimeSeriesData, 0, len(currentPeriodResults))
|
||||||
for _, result := range currentPeriodResultsMap {
|
for _, result := range currentPeriodResults {
|
||||||
results = append(results, result)
|
results = append(results, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -263,7 +263,8 @@ func (aH *APIHandler) queryRangeV5(rw http.ResponseWriter, req *http.Request) {
|
|||||||
Results []any `json:"results"`
|
Results []any `json:"results"`
|
||||||
Warnings []string `json:"warnings"`
|
Warnings []string `json:"warnings"`
|
||||||
}{
|
}{
|
||||||
Results: results,
|
Results: results,
|
||||||
|
Warnings: make([]string, 0), // TODO(srikanthccv): will there be any warnings here?
|
||||||
},
|
},
|
||||||
Meta: struct {
|
Meta: struct {
|
||||||
RowsScanned uint64 `json:"rowsScanned"`
|
RowsScanned uint64 `json:"rowsScanned"`
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -70,6 +70,7 @@ require (
|
|||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
golang.org/x/crypto v0.39.0
|
golang.org/x/crypto v0.39.0
|
||||||
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
|
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
|
||||||
|
golang.org/x/net v0.41.0
|
||||||
golang.org/x/oauth2 v0.30.0
|
golang.org/x/oauth2 v0.30.0
|
||||||
golang.org/x/sync v0.15.0
|
golang.org/x/sync v0.15.0
|
||||||
golang.org/x/text v0.26.0
|
golang.org/x/text v0.26.0
|
||||||
@ -283,7 +284,6 @@ require (
|
|||||||
go.uber.org/atomic v1.11.0 // indirect
|
go.uber.org/atomic v1.11.0 // indirect
|
||||||
go.uber.org/goleak v1.3.0 // indirect
|
go.uber.org/goleak v1.3.0 // indirect
|
||||||
golang.org/x/mod v0.25.0 // indirect
|
golang.org/x/mod v0.25.0 // indirect
|
||||||
golang.org/x/net v0.41.0 // indirect
|
|
||||||
golang.org/x/sys v0.33.0 // indirect
|
golang.org/x/sys v0.33.0 // indirect
|
||||||
golang.org/x/time v0.11.0 // indirect
|
golang.org/x/time v0.11.0 // indirect
|
||||||
golang.org/x/tools v0.33.0 // indirect
|
golang.org/x/tools v0.33.0 // indirect
|
||||||
|
|||||||
@ -234,11 +234,7 @@ func PrepareLinksToTracesV5(start, end time.Time, whereClause string) string {
|
|||||||
PageSize: 100,
|
PageSize: 100,
|
||||||
}
|
}
|
||||||
|
|
||||||
options := URLShareableOptions{
|
options := URLShareableOptions{}
|
||||||
MaxLines: 2,
|
|
||||||
Format: "list",
|
|
||||||
SelectColumns: tracesV3.TracesListViewDefaultSelectedColumns,
|
|
||||||
}
|
|
||||||
|
|
||||||
period, _ := json.Marshal(tr)
|
period, _ := json.Marshal(tr)
|
||||||
urlEncodedTimeRange := url.QueryEscape(string(period))
|
urlEncodedTimeRange := url.QueryEscape(string(period))
|
||||||
@ -253,12 +249,6 @@ func PrepareLinksToTracesV5(start, end time.Time, whereClause string) string {
|
|||||||
Disabled: false,
|
Disabled: false,
|
||||||
Having: []v3.Having{},
|
Having: []v3.Having{},
|
||||||
StepInterval: 60,
|
StepInterval: 60,
|
||||||
OrderBy: []v3.OrderBy{
|
|
||||||
{
|
|
||||||
ColumnName: "timestamp",
|
|
||||||
Order: "desc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Filter: &FilterExpression{Expression: whereClause},
|
Filter: &FilterExpression{Expression: whereClause},
|
||||||
}
|
}
|
||||||
@ -291,11 +281,7 @@ func PrepareLinksToLogsV5(start, end time.Time, whereClause string) string {
|
|||||||
PageSize: 100,
|
PageSize: 100,
|
||||||
}
|
}
|
||||||
|
|
||||||
options := URLShareableOptions{
|
options := URLShareableOptions{}
|
||||||
MaxLines: 2,
|
|
||||||
Format: "list",
|
|
||||||
SelectColumns: []v3.AttributeKey{},
|
|
||||||
}
|
|
||||||
|
|
||||||
period, _ := json.Marshal(tr)
|
period, _ := json.Marshal(tr)
|
||||||
urlEncodedTimeRange := url.QueryEscape(string(period))
|
urlEncodedTimeRange := url.QueryEscape(string(period))
|
||||||
|
|||||||
@ -629,7 +629,7 @@ func (bc *bucketCache) isEmptyResult(result *qbtypes.Result) (isEmpty bool, isFi
|
|||||||
return !hasValues, !hasValues && totalSeries > 0
|
return !hasValues, !hasValues && totalSeries > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
|
||||||
// Raw and scalar data are not cached
|
// Raw and scalar data are not cached
|
||||||
return true, false
|
return true, false
|
||||||
}
|
}
|
||||||
@ -775,7 +775,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
|
|||||||
trimmedResult.Value = trimmedData
|
trimmedResult.Value = trimmedData
|
||||||
}
|
}
|
||||||
|
|
||||||
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar:
|
case qbtypes.RequestTypeRaw, qbtypes.RequestTypeScalar, qbtypes.RequestTypeTrace:
|
||||||
// Don't cache raw or scalar data
|
// Don't cache raw or scalar data
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -298,15 +298,15 @@ func (q *querier) run(
|
|||||||
}
|
}
|
||||||
switch result.Type {
|
switch result.Type {
|
||||||
case qbtypes.RequestTypeScalar:
|
case qbtypes.RequestTypeScalar:
|
||||||
if val, ok := result.Value.(*qbtypes.ScalarData); ok {
|
if val, ok := result.Value.(*qbtypes.ScalarData); ok && val != nil {
|
||||||
return len(val.Data) != 0
|
return len(val.Data) != 0
|
||||||
}
|
}
|
||||||
case qbtypes.RequestTypeRaw:
|
case qbtypes.RequestTypeRaw:
|
||||||
if val, ok := result.Value.(*qbtypes.RawData); ok {
|
if val, ok := result.Value.(*qbtypes.RawData); ok && val != nil {
|
||||||
return len(val.Rows) != 0
|
return len(val.Rows) != 0
|
||||||
}
|
}
|
||||||
case qbtypes.RequestTypeTimeSeries:
|
case qbtypes.RequestTypeTimeSeries:
|
||||||
if val, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
|
if val, ok := result.Value.(*qbtypes.TimeSeriesData); ok && val != nil {
|
||||||
if len(val.Aggregations) != 0 {
|
if len(val.Aggregations) != 0 {
|
||||||
anyNonEmpty := false
|
anyNonEmpty := false
|
||||||
for _, aggBucket := range val.Aggregations {
|
for _, aggBucket := range val.Aggregations {
|
||||||
|
|||||||
@ -513,6 +513,7 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
|
|||||||
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
|
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
|
||||||
results = append(results, transition.ConvertV5TimeSeriesDataToV4Result(tsData))
|
results = append(results, transition.ConvertV5TimeSeriesDataToV4Result(tsData))
|
||||||
} else {
|
} else {
|
||||||
|
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
|
||||||
zap.L().Warn("expected qbtypes.TimeSeriesData but got", zap.Any("item_type", reflect.TypeOf(item)))
|
zap.L().Warn("expected qbtypes.TimeSeriesData but got", zap.Any("item_type", reflect.TypeOf(item)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||||
"github.com/SigNoz/signoz/pkg/valuer"
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||||
@ -24,6 +25,8 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||||
|
|
||||||
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestThresholdRuleShouldAlert(t *testing.T) {
|
func TestThresholdRuleShouldAlert(t *testing.T) {
|
||||||
@ -899,6 +902,102 @@ func TestPrepareLinksToLogs(t *testing.T) {
|
|||||||
assert.Contains(t, link, "&timeRange=%7B%22start%22%3A1705468620000%2C%22end%22%3A1705468920000%2C%22pageSize%22%3A100%7D&startTime=1705468620000&endTime=1705468920000")
|
assert.Contains(t, link, "&timeRange=%7B%22start%22%3A1705468620000%2C%22end%22%3A1705468920000%2C%22pageSize%22%3A100%7D&startTime=1705468620000&endTime=1705468920000")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPrepareLinksToLogsV5(t *testing.T) {
|
||||||
|
postableRule := ruletypes.PostableRule{
|
||||||
|
AlertName: "Tricky Condition Tests",
|
||||||
|
AlertType: ruletypes.AlertTypeLogs,
|
||||||
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
Queries: []qbtypes.QueryEnvelope{
|
||||||
|
{
|
||||||
|
Type: qbtypes.QueryTypeBuilder,
|
||||||
|
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||||
|
Name: "A",
|
||||||
|
StepInterval: qbtypes.Step{Duration: 1 * time.Minute},
|
||||||
|
Aggregations: []qbtypes.LogAggregation{
|
||||||
|
{
|
||||||
|
Expression: "count()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Filter: &qbtypes.Filter{
|
||||||
|
Expression: "service.name EXISTS",
|
||||||
|
},
|
||||||
|
Signal: telemetrytypes.SignalLogs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
CompareOp: "4", // Not Equals
|
||||||
|
MatchType: "1", // Once
|
||||||
|
Target: &[]float64{0.0}[0],
|
||||||
|
SelectedQuery: "A",
|
||||||
|
},
|
||||||
|
Version: "v5",
|
||||||
|
}
|
||||||
|
|
||||||
|
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
|
||||||
|
if err != nil {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := time.UnixMilli(1753527163000)
|
||||||
|
|
||||||
|
link := rule.prepareLinksToLogs(ts, labels.Labels{})
|
||||||
|
assert.Contains(t, link, "compositeQuery=%257B%2522queryType%2522%253A%2522builder%2522%252C%2522builder%2522%253A%257B%2522queryData%2522%253A%255B%257B%2522queryName%2522%253A%2522A%2522%252C%2522stepInterval%2522%253A60%252C%2522dataSource%2522%253A%2522logs%2522%252C%2522aggregateOperator%2522%253A%2522noop%2522%252C%2522aggregateAttribute%2522%253A%257B%2522key%2522%253A%2522%2522%252C%2522dataType%2522%253A%2522%2522%252C%2522type%2522%253A%2522%2522%252C%2522isColumn%2522%253Afalse%252C%2522isJSON%2522%253Afalse%257D%252C%2522expression%2522%253A%2522A%2522%252C%2522disabled%2522%253Afalse%252C%2522limit%2522%253A0%252C%2522offset%2522%253A0%252C%2522pageSize%2522%253A0%252C%2522ShiftBy%2522%253A0%252C%2522IsAnomaly%2522%253Afalse%252C%2522QueriesUsedInFormula%2522%253Anull%252C%2522filter%2522%253A%257B%2522expression%2522%253A%2522service.name%2BEXISTS%2522%257D%257D%255D%252C%2522queryFormulas%2522%253A%255B%255D%257D%257D&timeRange=%7B%22start%22%3A1753526700000%2C%22end%22%3A1753527000000%2C%22pageSize%22%3A100%7D&startTime=1753526700000&endTime=1753527000000&options=%7B%22maxLines%22%3A0%2C%22format%22%3A%22%22%2C%22selectColumns%22%3Anull%7D")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPrepareLinksToTracesV5(t *testing.T) {
|
||||||
|
postableRule := ruletypes.PostableRule{
|
||||||
|
AlertName: "Tricky Condition Tests",
|
||||||
|
AlertType: ruletypes.AlertTypeTraces,
|
||||||
|
RuleType: ruletypes.RuleTypeThreshold,
|
||||||
|
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||||
|
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||||
|
RuleCondition: &ruletypes.RuleCondition{
|
||||||
|
CompositeQuery: &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeBuilder,
|
||||||
|
Queries: []qbtypes.QueryEnvelope{
|
||||||
|
{
|
||||||
|
Type: qbtypes.QueryTypeBuilder,
|
||||||
|
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||||
|
Name: "A",
|
||||||
|
StepInterval: qbtypes.Step{Duration: 1 * time.Minute},
|
||||||
|
Aggregations: []qbtypes.TraceAggregation{
|
||||||
|
{
|
||||||
|
Expression: "count()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Filter: &qbtypes.Filter{
|
||||||
|
Expression: "service.name EXISTS",
|
||||||
|
},
|
||||||
|
Signal: telemetrytypes.SignalTraces,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
CompareOp: "4", // Not Equals
|
||||||
|
MatchType: "1", // Once
|
||||||
|
Target: &[]float64{0.0}[0],
|
||||||
|
SelectedQuery: "A",
|
||||||
|
},
|
||||||
|
Version: "v5",
|
||||||
|
}
|
||||||
|
|
||||||
|
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, nil, nil, WithEvalDelay(2*time.Minute))
|
||||||
|
if err != nil {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := time.UnixMilli(1753527163000)
|
||||||
|
|
||||||
|
link := rule.prepareLinksToTraces(ts, labels.Labels{})
|
||||||
|
assert.Contains(t, link, "compositeQuery=%257B%2522queryType%2522%253A%2522builder%2522%252C%2522builder%2522%253A%257B%2522queryData%2522%253A%255B%257B%2522queryName%2522%253A%2522A%2522%252C%2522stepInterval%2522%253A60%252C%2522dataSource%2522%253A%2522traces%2522%252C%2522aggregateOperator%2522%253A%2522noop%2522%252C%2522aggregateAttribute%2522%253A%257B%2522key%2522%253A%2522%2522%252C%2522dataType%2522%253A%2522%2522%252C%2522type%2522%253A%2522%2522%252C%2522isColumn%2522%253Afalse%252C%2522isJSON%2522%253Afalse%257D%252C%2522expression%2522%253A%2522A%2522%252C%2522disabled%2522%253Afalse%252C%2522limit%2522%253A0%252C%2522offset%2522%253A0%252C%2522pageSize%2522%253A0%252C%2522ShiftBy%2522%253A0%252C%2522IsAnomaly%2522%253Afalse%252C%2522QueriesUsedInFormula%2522%253Anull%252C%2522filter%2522%253A%257B%2522expression%2522%253A%2522service.name%2BEXISTS%2522%257D%257D%255D%252C%2522queryFormulas%2522%253A%255B%255D%257D%257D&timeRange=%7B%22start%22%3A1753526700000000000%2C%22end%22%3A1753527000000000000%2C%22pageSize%22%3A100%7D&startTime=1753526700000000000&endTime=1753527000000000000&options=%7B%22maxLines%22%3A0%2C%22format%22%3A%22%22%2C%22selectColumns%22%3Anull%7D")
|
||||||
|
}
|
||||||
|
|
||||||
func TestPrepareLinksToTraces(t *testing.T) {
|
func TestPrepareLinksToTraces(t *testing.T) {
|
||||||
postableRule := ruletypes.PostableRule{
|
postableRule := ruletypes.PostableRule{
|
||||||
AlertName: "Links to traces test",
|
AlertName: "Links to traces test",
|
||||||
|
|||||||
@ -177,6 +177,7 @@ func (migration *queryBuilderV5Migration) migrateDashboards(
|
|||||||
updated := dashboardMigrator.Migrate(ctx, dashboard.Data)
|
updated := dashboardMigrator.Migrate(ctx, dashboard.Data)
|
||||||
|
|
||||||
if updated {
|
if updated {
|
||||||
|
dashboard.Data["version"] = "v5"
|
||||||
dataJSON, err := json.Marshal(dashboard.Data)
|
dataJSON, err := json.Marshal(dashboard.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -355,6 +355,17 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
|||||||
sb.Having(rewrittenExpr)
|
sb.Having(rewrittenExpr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(query.Order) != 0 {
|
||||||
|
// Add order by
|
||||||
|
for _, orderBy := range query.Order {
|
||||||
|
_, ok := aggOrderBy(orderBy, query)
|
||||||
|
if !ok {
|
||||||
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.OrderBy("ts desc")
|
||||||
|
}
|
||||||
|
|
||||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||||
|
|
||||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||||
@ -372,6 +383,16 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
|
|||||||
sb.Having(rewrittenExpr)
|
sb.Having(rewrittenExpr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(query.Order) != 0 {
|
||||||
|
// Add order by
|
||||||
|
for _, orderBy := range query.Order {
|
||||||
|
_, ok := aggOrderBy(orderBy, query)
|
||||||
|
if !ok {
|
||||||
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.OrderBy("ts desc")
|
||||||
|
}
|
||||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||||
|
|
||||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||||
|
|||||||
@ -107,7 +107,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: qbtypes.Statement{
|
expected: qbtypes.Statement{
|
||||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY `service.name` desc LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY `service.name` desc, ts desc",
|
||||||
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
Args: []any{"cartservice", "%service.name%", "%service.name\":\"cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
||||||
},
|
},
|
||||||
expectedErr: nil,
|
expectedErr: nil,
|
||||||
|
|||||||
@ -537,6 +537,17 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
|||||||
sb.Having(rewrittenExpr)
|
sb.Having(rewrittenExpr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(query.Order) != 0 {
|
||||||
|
// Add order by
|
||||||
|
for _, orderBy := range query.Order {
|
||||||
|
_, ok := aggOrderBy(orderBy, query)
|
||||||
|
if !ok {
|
||||||
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.OrderBy("ts desc")
|
||||||
|
}
|
||||||
|
|
||||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||||
|
|
||||||
@ -553,6 +564,17 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
|
|||||||
sb.Having(rewrittenExpr)
|
sb.Having(rewrittenExpr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(query.Order) != 0 {
|
||||||
|
// Add order by
|
||||||
|
for _, orderBy := range query.Order {
|
||||||
|
_, ok := aggOrderBy(orderBy, query)
|
||||||
|
if !ok {
|
||||||
|
sb.OrderBy(fmt.Sprintf("`%s` %s", orderBy.Key.Name, orderBy.Direction.StringValue()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.OrderBy("ts desc")
|
||||||
|
}
|
||||||
|
|
||||||
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
combinedArgs := append(allGroupByArgs, allAggChArgs...)
|
||||||
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
|
||||||
|
|
||||||
|
|||||||
@ -498,6 +498,11 @@ func (mc *migrateCommon) createFilterExpression(ctx context.Context, queryData m
|
|||||||
|
|
||||||
expression := mc.buildExpression(ctx, items, op, dataSource)
|
expression := mc.buildExpression(ctx, items, op, dataSource)
|
||||||
if expression != "" {
|
if expression != "" {
|
||||||
|
if groupByExists := mc.groupByExistsExpr(queryData); groupByExists != "" {
|
||||||
|
mc.logger.InfoContext(ctx, "adding default exists for old qb", "group_by_exists", groupByExists)
|
||||||
|
expression += groupByExists
|
||||||
|
}
|
||||||
|
|
||||||
queryData["filter"] = map[string]any{
|
queryData["filter"] = map[string]any{
|
||||||
"expression": expression,
|
"expression": expression,
|
||||||
}
|
}
|
||||||
@ -508,6 +513,41 @@ func (mc *migrateCommon) createFilterExpression(ctx context.Context, queryData m
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *migrateCommon) groupByExistsExpr(queryData map[string]any) string {
|
||||||
|
expr := []string{}
|
||||||
|
groupBy, ok := queryData["groupBy"].([]any)
|
||||||
|
if !ok {
|
||||||
|
return strings.Join(expr, " AND ")
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx := range groupBy {
|
||||||
|
item, ok := groupBy[idx].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
key, ok := item["key"].(string)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expr = append(expr, fmt.Sprintf("%s EXISTS", key))
|
||||||
|
|
||||||
|
if _, ok := telemetrytraces.IntrinsicFields[key]; ok {
|
||||||
|
delete(item, "type")
|
||||||
|
}
|
||||||
|
if _, ok := telemetrytraces.CalculatedFields[key]; ok {
|
||||||
|
delete(item, "type")
|
||||||
|
}
|
||||||
|
if _, ok := telemetrytraces.IntrinsicFieldsDeprecated[key]; ok {
|
||||||
|
delete(item, "type")
|
||||||
|
}
|
||||||
|
if _, ok := telemetrytraces.CalculatedFieldsDeprecated[key]; ok {
|
||||||
|
delete(item, "type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(expr, " AND ")
|
||||||
|
}
|
||||||
|
|
||||||
func (mc *migrateCommon) fixGroupBy(queryData map[string]any) bool {
|
func (mc *migrateCommon) fixGroupBy(queryData map[string]any) bool {
|
||||||
groupBy, ok := queryData["groupBy"].([]any)
|
groupBy, ok := queryData["groupBy"].([]any)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
@ -21,7 +21,7 @@ func (m *savedViewMigrateV5) Migrate(ctx context.Context, data map[string]any) b
|
|||||||
updated := false
|
updated := false
|
||||||
|
|
||||||
if builderQueries, ok := data["builderQueries"].(map[string]any); ok {
|
if builderQueries, ok := data["builderQueries"].(map[string]any); ok {
|
||||||
for _, query := range builderQueries {
|
for name, query := range builderQueries {
|
||||||
if queryMap, ok := query.(map[string]any); ok {
|
if queryMap, ok := query.(map[string]any); ok {
|
||||||
var panelType string
|
var panelType string
|
||||||
if _, ok := data["panelType"].(string); ok {
|
if _, ok := data["panelType"].(string); ok {
|
||||||
@ -30,6 +30,13 @@ func (m *savedViewMigrateV5) Migrate(ctx context.Context, data map[string]any) b
|
|||||||
if m.updateQueryData(ctx, queryMap, "v4", panelType) {
|
if m.updateQueryData(ctx, queryMap, "v4", panelType) {
|
||||||
updated = true
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.logger.InfoContext(ctx, "migrated querymap")
|
||||||
|
|
||||||
|
// wrap it in the v5 envelope
|
||||||
|
envelope := m.wrapInV5Envelope(name, queryMap, "builder_query")
|
||||||
|
m.logger.InfoContext(ctx, "envelope after wrap", "envelope", envelope)
|
||||||
|
data["queries"] = append(data["queries"].([]any), envelope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user