From 7d9e0523c9967aa7404d9254c1a78bce4559e663 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 29 Jul 2025 15:30:28 +0530 Subject: [PATCH] chore: add anomaly to v5 response (#8643) --- ee/anomaly/daily.go | 34 ++ ee/anomaly/hourly.go | 35 ++ ee/anomaly/params.go | 223 ++++++++++++ ee/anomaly/provider.go | 11 + ee/anomaly/seasonal.go | 463 +++++++++++++++++++++++++ ee/anomaly/weekly.go | 34 ++ ee/query-service/app/api/api.go | 3 + ee/query-service/app/api/queryrange.go | 145 ++++++++ 8 files changed, 948 insertions(+) create mode 100644 ee/anomaly/daily.go create mode 100644 ee/anomaly/hourly.go create mode 100644 ee/anomaly/params.go create mode 100644 ee/anomaly/provider.go create mode 100644 ee/anomaly/seasonal.go create mode 100644 ee/anomaly/weekly.go diff --git a/ee/anomaly/daily.go b/ee/anomaly/daily.go new file mode 100644 index 000000000000..7dd93952c337 --- /dev/null +++ b/ee/anomaly/daily.go @@ -0,0 +1,34 @@ +package anomaly + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/valuer" +) + +type DailyProvider struct { + BaseSeasonalProvider +} + +var _ BaseProvider = (*DailyProvider)(nil) + +func (dp *DailyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider { + return &dp.BaseSeasonalProvider +} + +func NewDailyProvider(opts ...GenericProviderOption[*DailyProvider]) *DailyProvider { + dp := &DailyProvider{ + BaseSeasonalProvider: BaseSeasonalProvider{}, + } + + for _, opt := range opts { + opt(dp) + } + + return dp +} + +func (p *DailyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) { + req.Seasonality = SeasonalityDaily + return p.getAnomalies(ctx, orgID, req) +} diff --git a/ee/anomaly/hourly.go b/ee/anomaly/hourly.go new file mode 100644 index 000000000000..95a21bdaa59f --- /dev/null +++ b/ee/anomaly/hourly.go @@ -0,0 +1,35 @@ +package anomaly + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/valuer" +) + +type HourlyProvider struct { + BaseSeasonalProvider +} + +var _ BaseProvider = (*HourlyProvider)(nil) + +func (hp *HourlyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider { + return &hp.BaseSeasonalProvider +} + +// NewHourlyProvider now uses the generic option type +func NewHourlyProvider(opts ...GenericProviderOption[*HourlyProvider]) *HourlyProvider { + hp := &HourlyProvider{ + BaseSeasonalProvider: BaseSeasonalProvider{}, + } + + for _, opt := range opts { + opt(hp) + } + + return hp +} + +func (p *HourlyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) { + req.Seasonality = SeasonalityHourly + return p.getAnomalies(ctx, orgID, req) +} diff --git a/ee/anomaly/params.go b/ee/anomaly/params.go new file mode 100644 index 000000000000..9a1aa8a71cef --- /dev/null +++ b/ee/anomaly/params.go @@ -0,0 +1,223 @@ +package anomaly + +import ( + "time" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/valuer" +) + +type Seasonality struct{ valuer.String } + +var ( + SeasonalityHourly = Seasonality{valuer.NewString("hourly")} + SeasonalityDaily = Seasonality{valuer.NewString("daily")} + SeasonalityWeekly = Seasonality{valuer.NewString("weekly")} +) + +var ( + oneWeekOffset = uint64(24 * 7 * time.Hour.Milliseconds()) + oneDayOffset = uint64(24 * time.Hour.Milliseconds()) + oneHourOffset = uint64(time.Hour.Milliseconds()) + fiveMinOffset = uint64(5 * time.Minute.Milliseconds()) +) + +func (s Seasonality) IsValid() bool { + switch s { + case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly: + return true + default: + return false + } +} + +type AnomaliesRequest struct { + Params qbtypes.QueryRangeRequest + Seasonality Seasonality +} + +type AnomaliesResponse struct { + Results []*qbtypes.TimeSeriesData +} + +// anomalyParams is the params for anomaly detection +// prediction = avg(past_period_query) + avg(current_season_query) - mean(past_season_query, past2_season_query, past3_season_query) +// +// ^ ^ +// | | +// (rounded value for past peiod) + (seasonal growth) +// +// score = abs(value - prediction) / stddev (current_season_query) +type anomalyQueryParams struct { + // CurrentPeriodQuery is the query range params for period user is looking at or eval window + // Example: (now-5m, now), (now-30m, now), (now-1h, now) + // The results obtained from this query are used to compare with predicted values + // and to detect anomalies + CurrentPeriodQuery qbtypes.QueryRangeRequest + // PastPeriodQuery is the query range params for past period of seasonality + // Example: For weekly seasonality, (now-1w-5m, now-1w) + // : For daily seasonality, (now-1d-5m, now-1d) + // : For hourly seasonality, (now-1h-5m, now-1h) + PastPeriodQuery qbtypes.QueryRangeRequest + // CurrentSeasonQuery is the query range params for current period (seasonal) + // Example: For weekly seasonality, this is the query range params for the (now-1w-5m, now) + // : For daily seasonality, this is the query range params for the (now-1d-5m, now) + // : For hourly seasonality, this is the query range params for the (now-1h-5m, now) + CurrentSeasonQuery qbtypes.QueryRangeRequest + // PastSeasonQuery is the query range params for past seasonal period to the current season + // Example: For weekly seasonality, this is the query range params for the (now-2w-5m, now-1w) + // : For daily seasonality, this is the query range params for the (now-2d-5m, now-1d) + // : For hourly seasonality, this is the query range params for the (now-2h-5m, now-1h) + PastSeasonQuery qbtypes.QueryRangeRequest + // Past2SeasonQuery is the query range params for past 2 seasonal period to the current season + // Example: For weekly seasonality, this is the query range params for the (now-3w-5m, now-2w) + // : For daily seasonality, this is the query range params for the (now-3d-5m, now-2d) + // : For hourly seasonality, this is the query range params for the (now-3h-5m, now-2h) + Past2SeasonQuery qbtypes.QueryRangeRequest + // Past3SeasonQuery is the query range params for past 3 seasonal period to the current season + // Example: For weekly seasonality, this is the query range params for the (now-4w-5m, now-3w) + // : For daily seasonality, this is the query range params for the (now-4d-5m, now-3d) + // : For hourly seasonality, this is the query range params for the (now-4h-5m, now-3h) + Past3SeasonQuery qbtypes.QueryRangeRequest +} + +func prepareAnomalyQueryParams(req qbtypes.QueryRangeRequest, seasonality Seasonality) *anomalyQueryParams { + start := req.Start + end := req.End + + currentPeriodQuery := qbtypes.QueryRangeRequest{ + Start: start, + End: end, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + var pastPeriodStart, pastPeriodEnd uint64 + + switch seasonality { + // for one week period, we fetch the data from the past week with 5 min offset + case SeasonalityWeekly: + pastPeriodStart = start - oneWeekOffset - fiveMinOffset + pastPeriodEnd = end - oneWeekOffset + // for one day period, we fetch the data from the past day with 5 min offset + case SeasonalityDaily: + pastPeriodStart = start - oneDayOffset - fiveMinOffset + pastPeriodEnd = end - oneDayOffset + // for one hour period, we fetch the data from the past hour with 5 min offset + case SeasonalityHourly: + pastPeriodStart = start - oneHourOffset - fiveMinOffset + pastPeriodEnd = end - oneHourOffset + } + + pastPeriodQuery := qbtypes.QueryRangeRequest{ + Start: pastPeriodStart, + End: pastPeriodEnd, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + // seasonality growth trend + var currentGrowthPeriodStart, currentGrowthPeriodEnd uint64 + switch seasonality { + case SeasonalityWeekly: + currentGrowthPeriodStart = start - oneWeekOffset + currentGrowthPeriodEnd = start + case SeasonalityDaily: + currentGrowthPeriodStart = start - oneDayOffset + currentGrowthPeriodEnd = start + case SeasonalityHourly: + currentGrowthPeriodStart = start - oneHourOffset + currentGrowthPeriodEnd = start + } + + currentGrowthQuery := qbtypes.QueryRangeRequest{ + Start: currentGrowthPeriodStart, + End: currentGrowthPeriodEnd, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + var pastGrowthPeriodStart, pastGrowthPeriodEnd uint64 + switch seasonality { + case SeasonalityWeekly: + pastGrowthPeriodStart = start - 2*oneWeekOffset + pastGrowthPeriodEnd = start - 1*oneWeekOffset + case SeasonalityDaily: + pastGrowthPeriodStart = start - 2*oneDayOffset + pastGrowthPeriodEnd = start - 1*oneDayOffset + case SeasonalityHourly: + pastGrowthPeriodStart = start - 2*oneHourOffset + pastGrowthPeriodEnd = start - 1*oneHourOffset + } + + pastGrowthQuery := qbtypes.QueryRangeRequest{ + Start: pastGrowthPeriodStart, + End: pastGrowthPeriodEnd, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + var past2GrowthPeriodStart, past2GrowthPeriodEnd uint64 + switch seasonality { + case SeasonalityWeekly: + past2GrowthPeriodStart = start - 3*oneWeekOffset + past2GrowthPeriodEnd = start - 2*oneWeekOffset + case SeasonalityDaily: + past2GrowthPeriodStart = start - 3*oneDayOffset + past2GrowthPeriodEnd = start - 2*oneDayOffset + case SeasonalityHourly: + past2GrowthPeriodStart = start - 3*oneHourOffset + past2GrowthPeriodEnd = start - 2*oneHourOffset + } + + past2GrowthQuery := qbtypes.QueryRangeRequest{ + Start: past2GrowthPeriodStart, + End: past2GrowthPeriodEnd, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + var past3GrowthPeriodStart, past3GrowthPeriodEnd uint64 + switch seasonality { + case SeasonalityWeekly: + past3GrowthPeriodStart = start - 4*oneWeekOffset + past3GrowthPeriodEnd = start - 3*oneWeekOffset + case SeasonalityDaily: + past3GrowthPeriodStart = start - 4*oneDayOffset + past3GrowthPeriodEnd = start - 3*oneDayOffset + case SeasonalityHourly: + past3GrowthPeriodStart = start - 4*oneHourOffset + past3GrowthPeriodEnd = start - 3*oneHourOffset + } + + past3GrowthQuery := qbtypes.QueryRangeRequest{ + Start: past3GrowthPeriodStart, + End: past3GrowthPeriodEnd, + RequestType: qbtypes.RequestTypeTimeSeries, + CompositeQuery: req.CompositeQuery, + NoCache: false, + } + + return &anomalyQueryParams{ + CurrentPeriodQuery: currentPeriodQuery, + PastPeriodQuery: pastPeriodQuery, + CurrentSeasonQuery: currentGrowthQuery, + PastSeasonQuery: pastGrowthQuery, + Past2SeasonQuery: past2GrowthQuery, + Past3SeasonQuery: past3GrowthQuery, + } +} + +type anomalyQueryResults struct { + CurrentPeriodResults []*qbtypes.TimeSeriesData + PastPeriodResults []*qbtypes.TimeSeriesData + CurrentSeasonResults []*qbtypes.TimeSeriesData + PastSeasonResults []*qbtypes.TimeSeriesData + Past2SeasonResults []*qbtypes.TimeSeriesData + Past3SeasonResults []*qbtypes.TimeSeriesData +} diff --git a/ee/anomaly/provider.go b/ee/anomaly/provider.go new file mode 100644 index 000000000000..7d0686006f5f --- /dev/null +++ b/ee/anomaly/provider.go @@ -0,0 +1,11 @@ +package anomaly + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/valuer" +) + +type Provider interface { + GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) +} diff --git a/ee/anomaly/seasonal.go b/ee/anomaly/seasonal.go new file mode 100644 index 000000000000..abd455e0e005 --- /dev/null +++ b/ee/anomaly/seasonal.go @@ -0,0 +1,463 @@ +package anomaly + +import ( + "context" + "log/slog" + "math" + + "github.com/SigNoz/signoz/pkg/querier" + "github.com/SigNoz/signoz/pkg/valuer" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +var ( + // TODO(srikanthccv): make this configurable? + movingAvgWindowSize = 7 +) + +// BaseProvider is an interface that includes common methods for all provider types +type BaseProvider interface { + GetBaseSeasonalProvider() *BaseSeasonalProvider +} + +// GenericProviderOption is a generic type for provider options +type GenericProviderOption[T BaseProvider] func(T) + +func WithQuerier[T BaseProvider](querier querier.Querier) GenericProviderOption[T] { + return func(p T) { + p.GetBaseSeasonalProvider().querier = querier + } +} + +func WithLogger[T BaseProvider](logger *slog.Logger) GenericProviderOption[T] { + return func(p T) { + p.GetBaseSeasonalProvider().logger = logger + } +} + +type BaseSeasonalProvider struct { + querier querier.Querier + logger *slog.Logger +} + +func (p *BaseSeasonalProvider) getQueryParams(req *AnomaliesRequest) *anomalyQueryParams { + if !req.Seasonality.IsValid() { + req.Seasonality = SeasonalityDaily + } + return prepareAnomalyQueryParams(req.Params, req.Seasonality) +} + +func (p *BaseSeasonalProvider) toTSResults(ctx context.Context, resp *qbtypes.QueryRangeResponse) []*qbtypes.TimeSeriesData { + + if resp == nil || resp.Data == nil { + p.logger.InfoContext(ctx, "nil response from query range") + } + + data, ok := resp.Data.(struct { + Results []any `json:"results"` + Warnings []string `json:"warnings"` + }) + if !ok { + return nil + } + tsData := []*qbtypes.TimeSeriesData{} + for _, item := range data.Results { + if resultData, ok := item.(*qbtypes.TimeSeriesData); ok { + tsData = append(tsData, resultData) + } + } + + return tsData +} + +func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID, params *anomalyQueryParams) (*anomalyQueryResults, error) { + // TODO(srikanthccv): parallelize this? + p.logger.InfoContext(ctx, "fetching results for current period", "anomaly_current_period_query", params.CurrentPeriodQuery) + currentPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentPeriodQuery) + if err != nil { + return nil, err + } + + p.logger.InfoContext(ctx, "fetching results for past period", "anomaly_past_period_query", params.PastPeriodQuery) + pastPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.PastPeriodQuery) + if err != nil { + return nil, err + } + + p.logger.InfoContext(ctx, "fetching results for current season", "anomaly_current_season_query", params.CurrentSeasonQuery) + currentSeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentSeasonQuery) + if err != nil { + return nil, err + } + + p.logger.InfoContext(ctx, "fetching results for past season", "anomaly_past_season_query", params.PastSeasonQuery) + pastSeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.PastSeasonQuery) + if err != nil { + return nil, err + } + + p.logger.InfoContext(ctx, "fetching results for past 2 season", "anomaly_past_2season_query", params.Past2SeasonQuery) + past2SeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.Past2SeasonQuery) + if err != nil { + return nil, err + } + + p.logger.InfoContext(ctx, "fetching results for past 3 season", "anomaly_past_3season_query", params.Past3SeasonQuery) + past3SeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.Past3SeasonQuery) + if err != nil { + return nil, err + } + + return &anomalyQueryResults{ + CurrentPeriodResults: p.toTSResults(ctx, currentPeriodResults), + PastPeriodResults: p.toTSResults(ctx, pastPeriodResults), + CurrentSeasonResults: p.toTSResults(ctx, currentSeasonResults), + PastSeasonResults: p.toTSResults(ctx, pastSeasonResults), + Past2SeasonResults: p.toTSResults(ctx, past2SeasonResults), + Past3SeasonResults: p.toTSResults(ctx, past3SeasonResults), + }, nil +} + +// getMatchingSeries gets the matching series from the query result +// for the given series +func (p *BaseSeasonalProvider) getMatchingSeries(_ context.Context, queryResult *qbtypes.TimeSeriesData, series *qbtypes.TimeSeries) *qbtypes.TimeSeries { + if queryResult == nil || len(queryResult.Aggregations) == 0 || len(queryResult.Aggregations[0].Series) == 0 { + return nil + } + + for _, curr := range queryResult.Aggregations[0].Series { + currLabelsKey := qbtypes.GetUniqueSeriesKey(curr.Labels) + seriesLabelsKey := qbtypes.GetUniqueSeriesKey(series.Labels) + if currLabelsKey == seriesLabelsKey { + return curr + } + } + return nil +} + +func (p *BaseSeasonalProvider) getAvg(series *qbtypes.TimeSeries) float64 { + if series == nil || len(series.Values) == 0 { + return 0 + } + var sum float64 + for _, smpl := range series.Values { + sum += smpl.Value + } + return sum / float64(len(series.Values)) +} + +func (p *BaseSeasonalProvider) getStdDev(series *qbtypes.TimeSeries) float64 { + if series == nil || len(series.Values) == 0 { + return 0 + } + avg := p.getAvg(series) + var sum float64 + for _, smpl := range series.Values { + sum += math.Pow(smpl.Value-avg, 2) + } + return math.Sqrt(sum / float64(len(series.Values))) +} + +// getMovingAvg gets the moving average for the given series +// for the given window size and start index +func (p *BaseSeasonalProvider) getMovingAvg(series *qbtypes.TimeSeries, movingAvgWindowSize, startIdx int) float64 { + if series == nil || len(series.Values) == 0 { + return 0 + } + if startIdx >= len(series.Values)-movingAvgWindowSize { + startIdx = int(math.Max(0, float64(len(series.Values)-movingAvgWindowSize))) + } + var sum float64 + points := series.Values[startIdx:] + windowSize := int(math.Min(float64(movingAvgWindowSize), float64(len(points)))) + for i := 0; i < windowSize; i++ { + sum += points[i].Value + } + avg := sum / float64(windowSize) + return avg +} + +func (p *BaseSeasonalProvider) getMean(floats ...float64) float64 { + if len(floats) == 0 { + return 0 + } + var sum float64 + for _, f := range floats { + sum += f + } + return sum / float64(len(floats)) +} + +func (p *BaseSeasonalProvider) getPredictedSeries( + ctx context.Context, + series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, +) *qbtypes.TimeSeries { + predictedSeries := &qbtypes.TimeSeries{ + Labels: series.Labels, + Values: make([]*qbtypes.TimeSeriesValue, 0), + } + + // for each point in the series, get the predicted value + // the predicted value is the moving average (with window size = 7) of the previous period series + // plus the average of the current season series + // minus the mean of the past season series, past2 season series and past3 season series + for idx, curr := range series.Values { + movingAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + avg := p.getAvg(currentSeasonSeries) + mean := p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries)) + predictedValue := movingAvg + avg - mean + + if predictedValue < 0 { + // this should not happen (except when the data has extreme outliers) + // we will use the moving avg of the previous period series in this case + p.logger.WarnContext(ctx, "predicted value is less than 0 for series", "anomaly_predicted_value", predictedValue, "anomaly_labels", series.Labels) + predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + } + + p.logger.DebugContext(ctx, "predicted value for series", + "anomaly_moving_avg", movingAvg, + "anomaly_avg", avg, + "anomaly_mean", mean, + "anomaly_labels", series.Labels, + "anomaly_predicted_value", predictedValue, + "anomaly_curr", curr.Value, + ) + predictedSeries.Values = append(predictedSeries.Values, &qbtypes.TimeSeriesValue{ + Timestamp: curr.Timestamp, + Value: predictedValue, + }) + } + + return predictedSeries +} + +// getBounds gets the upper and lower bounds for the given series +// for the given z score threshold +// moving avg of the previous period series + z score threshold * std dev of the series +// moving avg of the previous period series - z score threshold * std dev of the series +func (p *BaseSeasonalProvider) getBounds( + series, predictedSeries *qbtypes.TimeSeries, + zScoreThreshold float64, +) (*qbtypes.TimeSeries, *qbtypes.TimeSeries) { + upperBoundSeries := &qbtypes.TimeSeries{ + Labels: series.Labels, + Values: make([]*qbtypes.TimeSeriesValue, 0), + } + + lowerBoundSeries := &qbtypes.TimeSeries{ + Labels: series.Labels, + Values: make([]*qbtypes.TimeSeriesValue, 0), + } + + for idx, curr := range series.Values { + upperBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series) + lowerBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series) + upperBoundSeries.Values = append(upperBoundSeries.Values, &qbtypes.TimeSeriesValue{ + Timestamp: curr.Timestamp, + Value: upperBound, + }) + lowerBoundSeries.Values = append(lowerBoundSeries.Values, &qbtypes.TimeSeriesValue{ + Timestamp: curr.Timestamp, + Value: math.Max(lowerBound, 0), + }) + } + + return upperBoundSeries, lowerBoundSeries +} + +// getExpectedValue gets the expected value for the given series +// for the given index +// prevSeriesAvg + currentSeasonSeriesAvg - mean of past season series, past2 season series and past3 season series +func (p *BaseSeasonalProvider) getExpectedValue( + _, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, idx int, +) float64 { + prevSeriesAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries) + pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries) + past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries) + past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries) + return prevSeriesAvg + currentSeasonSeriesAvg - p.getMean(pastSeasonSeriesAvg, past2SeasonSeriesAvg, past3SeasonSeriesAvg) +} + +// getScore gets the anomaly score for the given series +// for the given index +// (value - expectedValue) / std dev of the series +func (p *BaseSeasonalProvider) getScore( + series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, value float64, idx int, +) float64 { + expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries, idx) + if expectedValue < 0 { + expectedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + } + return (value - expectedValue) / p.getStdDev(weekSeries) +} + +// getAnomalyScores gets the anomaly scores for the given series +// for the given index +// (value - expectedValue) / std dev of the series +func (p *BaseSeasonalProvider) getAnomalyScores( + series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, +) *qbtypes.TimeSeries { + anomalyScoreSeries := &qbtypes.TimeSeries{ + Labels: series.Labels, + Values: make([]*qbtypes.TimeSeriesValue, 0), + } + + for idx, curr := range series.Values { + anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries, curr.Value, idx) + anomalyScoreSeries.Values = append(anomalyScoreSeries.Values, &qbtypes.TimeSeriesValue{ + Timestamp: curr.Timestamp, + Value: anomalyScore, + }) + } + + return anomalyScoreSeries +} + +func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) { + anomalyParams := p.getQueryParams(req) + anomalyQueryResults, err := p.getResults(ctx, orgID, anomalyParams) + if err != nil { + return nil, err + } + + currentPeriodResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.CurrentPeriodResults { + currentPeriodResults[result.QueryName] = result + } + + pastPeriodResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.PastPeriodResults { + pastPeriodResults[result.QueryName] = result + } + + currentSeasonResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.CurrentSeasonResults { + currentSeasonResults[result.QueryName] = result + } + + pastSeasonResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.PastSeasonResults { + pastSeasonResults[result.QueryName] = result + } + + past2SeasonResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.Past2SeasonResults { + past2SeasonResults[result.QueryName] = result + } + + past3SeasonResults := make(map[string]*qbtypes.TimeSeriesData) + for _, result := range anomalyQueryResults.Past3SeasonResults { + past3SeasonResults[result.QueryName] = result + } + + for _, result := range currentPeriodResults { + funcs := req.Params.FuncsForQuery(result.QueryName) + + var zScoreThreshold float64 + for _, f := range funcs { + if f.Name == qbtypes.FunctionNameAnomaly { + for _, arg := range f.Args { + if arg.Name != "z_score_threshold" { + continue + } + value, ok := arg.Value.(float64) + if ok { + zScoreThreshold = value + } else { + p.logger.InfoContext(ctx, "z_score_threshold not provided, defaulting") + zScoreThreshold = 3 + } + break + } + } + } + + pastPeriodResult, ok := pastPeriodResults[result.QueryName] + if !ok { + continue + } + currentSeasonResult, ok := currentSeasonResults[result.QueryName] + if !ok { + continue + } + pastSeasonResult, ok := pastSeasonResults[result.QueryName] + if !ok { + continue + } + past2SeasonResult, ok := past2SeasonResults[result.QueryName] + if !ok { + continue + } + past3SeasonResult, ok := past3SeasonResults[result.QueryName] + if !ok { + continue + } + + aggOfInterest := result.Aggregations[0] + + for _, series := range aggOfInterest.Series { + stdDev := p.getStdDev(series) + p.logger.InfoContext(ctx, "calculated standard deviation for series", "anomaly_std_dev", stdDev, "anomaly_labels", series.Labels) + + pastPeriodSeries := p.getMatchingSeries(ctx, pastPeriodResult, series) + currentSeasonSeries := p.getMatchingSeries(ctx, currentSeasonResult, series) + pastSeasonSeries := p.getMatchingSeries(ctx, pastSeasonResult, series) + past2SeasonSeries := p.getMatchingSeries(ctx, past2SeasonResult, series) + past3SeasonSeries := p.getMatchingSeries(ctx, past3SeasonResult, series) + + prevSeriesAvg := p.getAvg(pastPeriodSeries) + currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries) + pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries) + past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries) + past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries) + p.logger.InfoContext(ctx, "calculated mean for series", + "anomaly_prev_series_avg", prevSeriesAvg, + "anomaly_current_season_series_avg", currentSeasonSeriesAvg, + "anomaly_past_season_series_avg", pastSeasonSeriesAvg, + "anomaly_past_2season_series_avg", past2SeasonSeriesAvg, + "anomaly_past_3season_series_avg", past3SeasonSeriesAvg, + "anomaly_labels", series.Labels, + ) + + predictedSeries := p.getPredictedSeries( + ctx, + series, + pastPeriodSeries, + currentSeasonSeries, + pastSeasonSeries, + past2SeasonSeries, + past3SeasonSeries, + ) + aggOfInterest.PredictedSeries = append(aggOfInterest.PredictedSeries, predictedSeries) + + upperBoundSeries, lowerBoundSeries := p.getBounds( + series, + predictedSeries, + zScoreThreshold, + ) + aggOfInterest.UpperBoundSeries = append(aggOfInterest.UpperBoundSeries, upperBoundSeries) + aggOfInterest.LowerBoundSeries = append(aggOfInterest.LowerBoundSeries, lowerBoundSeries) + + anomalyScoreSeries := p.getAnomalyScores( + series, + pastPeriodSeries, + currentSeasonSeries, + pastSeasonSeries, + past2SeasonSeries, + past3SeasonSeries, + ) + aggOfInterest.AnomalyScores = append(aggOfInterest.AnomalyScores, anomalyScoreSeries) + } + } + + results := make([]*qbtypes.TimeSeriesData, 0, len(currentPeriodResults)) + for _, result := range currentPeriodResults { + results = append(results, result) + } + + return &AnomaliesResponse{ + Results: results, + }, nil +} diff --git a/ee/anomaly/weekly.go b/ee/anomaly/weekly.go new file mode 100644 index 000000000000..c3e3f1fa914a --- /dev/null +++ b/ee/anomaly/weekly.go @@ -0,0 +1,34 @@ +package anomaly + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/valuer" +) + +type WeeklyProvider struct { + BaseSeasonalProvider +} + +var _ BaseProvider = (*WeeklyProvider)(nil) + +func (wp *WeeklyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider { + return &wp.BaseSeasonalProvider +} + +func NewWeeklyProvider(opts ...GenericProviderOption[*WeeklyProvider]) *WeeklyProvider { + wp := &WeeklyProvider{ + BaseSeasonalProvider: BaseSeasonalProvider{}, + } + + for _, opt := range opts { + opt(wp) + } + + return wp +} + +func (p *WeeklyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) { + req.Seasonality = SeasonalityWeekly + return p.getAnomalies(ctx, orgID, req) +} diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 5b69254ccdd6..26f69087b164 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -110,6 +110,9 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) { // v4 router.HandleFunc("/api/v4/query_range", am.ViewAccess(ah.queryRangeV4)).Methods(http.MethodPost) + // v5 + router.HandleFunc("/api/v5/query_range", am.ViewAccess(ah.queryRangeV5)).Methods(http.MethodPost) + // Gateway router.PathPrefix(gateway.RoutePrefix).HandlerFunc(am.EditAccess(ah.ServeGatewayHTTP)) diff --git a/ee/query-service/app/api/queryrange.go b/ee/query-service/app/api/queryrange.go index e6801198e436..85d60d4b6b43 100644 --- a/ee/query-service/app/api/queryrange.go +++ b/ee/query-service/app/api/queryrange.go @@ -2,11 +2,16 @@ package api import ( "bytes" + "context" + "encoding/json" "fmt" "io" "net/http" + "runtime/debug" + anomalyV2 "github.com/SigNoz/signoz/ee/anomaly" "github.com/SigNoz/signoz/ee/query-service/anomaly" + "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/http/render" baseapp "github.com/SigNoz/signoz/pkg/query-service/app" "github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder" @@ -15,6 +20,8 @@ import ( "github.com/SigNoz/signoz/pkg/types/authtypes" "github.com/SigNoz/signoz/pkg/valuer" "go.uber.org/zap" + + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" ) func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) { @@ -136,3 +143,141 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) { aH.QueryRangeV4(w, r) } } + +func extractSeasonality(anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) anomalyV2.Seasonality { + for _, fn := range anomalyQuery.Functions { + if fn.Name == qbtypes.FunctionNameAnomaly { + for _, arg := range fn.Args { + if arg.Name == "seasonality" { + if seasonalityStr, ok := arg.Value.(string); ok { + switch seasonalityStr { + case "weekly": + return anomalyV2.SeasonalityWeekly + case "hourly": + return anomalyV2.SeasonalityHourly + } + } + } + } + } + } + return anomalyV2.SeasonalityDaily // default +} + +func createAnomalyProvider(aH *APIHandler, seasonality anomalyV2.Seasonality) anomalyV2.Provider { + switch seasonality { + case anomalyV2.SeasonalityWeekly: + return anomalyV2.NewWeeklyProvider( + anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](aH.Signoz.Querier), + anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](aH.Signoz.Instrumentation.Logger()), + ) + case anomalyV2.SeasonalityHourly: + return anomalyV2.NewHourlyProvider( + anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](aH.Signoz.Querier), + anomalyV2.WithLogger[*anomalyV2.HourlyProvider](aH.Signoz.Instrumentation.Logger()), + ) + default: + return anomalyV2.NewDailyProvider( + anomalyV2.WithQuerier[*anomalyV2.DailyProvider](aH.Signoz.Querier), + anomalyV2.WithLogger[*anomalyV2.DailyProvider](aH.Signoz.Instrumentation.Logger()), + ) + } +} + +func (aH *APIHandler) handleAnomalyQuery(ctx context.Context, orgID valuer.UUID, anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], queryRangeRequest qbtypes.QueryRangeRequest) (*anomalyV2.AnomaliesResponse, error) { + seasonality := extractSeasonality(anomalyQuery) + provider := createAnomalyProvider(aH, seasonality) + + return provider.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{Params: queryRangeRequest}) +} + +func (aH *APIHandler) queryRangeV5(rw http.ResponseWriter, req *http.Request) { + + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to read request body: %v", err)) + return + } + req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + ctx := req.Context() + + claims, err := authtypes.ClaimsFromContext(ctx) + if err != nil { + render.Error(rw, err) + return + } + + var queryRangeRequest qbtypes.QueryRangeRequest + if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil { + render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to decode request body: %v", err)) + return + } + + defer func() { + if r := recover(); r != nil { + stackTrace := string(debug.Stack()) + + queryJSON, _ := json.Marshal(queryRangeRequest) + + aH.Signoz.Instrumentation.Logger().ErrorContext(ctx, "panic in QueryRange", + "error", r, + "user", claims.UserID, + "payload", string(queryJSON), + "stacktrace", stackTrace, + ) + + render.Error(rw, errors.NewInternalf( + errors.CodeInternal, + "Something went wrong on our end. It's not you, it's us. Our team is notified about it. Reach out to support if issue persists.", + )) + } + }() + + if err := queryRangeRequest.Validate(); err != nil { + render.Error(rw, err) + return + } + + orgID, err := valuer.NewUUID(claims.OrgID) + if err != nil { + render.Error(rw, err) + return + } + + if anomalyQuery, ok := queryRangeRequest.IsAnomalyRequest(); ok { + anomalies, err := aH.handleAnomalyQuery(ctx, orgID, anomalyQuery, queryRangeRequest) + if err != nil { + render.Error(rw, errors.NewInternalf(errors.CodeInternal, "failed to get anomalies: %v", err)) + return + } + + results := []any{} + for _, item := range anomalies.Results { + results = append(results, item) + } + + finalResp := &qbtypes.QueryRangeResponse{ + Type: queryRangeRequest.RequestType, + Data: struct { + Results []any `json:"results"` + Warnings []string `json:"warnings"` + }{ + Results: results, + Warnings: make([]string, 0), // TODO(srikanthccv): will there be any warnings here? + }, + Meta: struct { + RowsScanned uint64 `json:"rowsScanned"` + BytesScanned uint64 `json:"bytesScanned"` + DurationMS uint64 `json:"durationMs"` + }{}, + } + + render.Success(rw, http.StatusOK, finalResp) + return + } else { + // regular query range request, let the querier handle it + req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + aH.QuerierAPI.QueryRange(rw, req) + } +}