mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
chore: add anomaly to v5 response (#8643)
This commit is contained in:
parent
360285ef33
commit
7d9e0523c9
34
ee/anomaly/daily.go
Normal file
34
ee/anomaly/daily.go
Normal file
@ -0,0 +1,34 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type DailyProvider struct {
|
||||
BaseSeasonalProvider
|
||||
}
|
||||
|
||||
var _ BaseProvider = (*DailyProvider)(nil)
|
||||
|
||||
func (dp *DailyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
|
||||
return &dp.BaseSeasonalProvider
|
||||
}
|
||||
|
||||
func NewDailyProvider(opts ...GenericProviderOption[*DailyProvider]) *DailyProvider {
|
||||
dp := &DailyProvider{
|
||||
BaseSeasonalProvider: BaseSeasonalProvider{},
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(dp)
|
||||
}
|
||||
|
||||
return dp
|
||||
}
|
||||
|
||||
func (p *DailyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
|
||||
req.Seasonality = SeasonalityDaily
|
||||
return p.getAnomalies(ctx, orgID, req)
|
||||
}
|
||||
35
ee/anomaly/hourly.go
Normal file
35
ee/anomaly/hourly.go
Normal file
@ -0,0 +1,35 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type HourlyProvider struct {
|
||||
BaseSeasonalProvider
|
||||
}
|
||||
|
||||
var _ BaseProvider = (*HourlyProvider)(nil)
|
||||
|
||||
func (hp *HourlyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
|
||||
return &hp.BaseSeasonalProvider
|
||||
}
|
||||
|
||||
// NewHourlyProvider now uses the generic option type
|
||||
func NewHourlyProvider(opts ...GenericProviderOption[*HourlyProvider]) *HourlyProvider {
|
||||
hp := &HourlyProvider{
|
||||
BaseSeasonalProvider: BaseSeasonalProvider{},
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(hp)
|
||||
}
|
||||
|
||||
return hp
|
||||
}
|
||||
|
||||
func (p *HourlyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
|
||||
req.Seasonality = SeasonalityHourly
|
||||
return p.getAnomalies(ctx, orgID, req)
|
||||
}
|
||||
223
ee/anomaly/params.go
Normal file
223
ee/anomaly/params.go
Normal file
@ -0,0 +1,223 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Seasonality struct{ valuer.String }
|
||||
|
||||
var (
|
||||
SeasonalityHourly = Seasonality{valuer.NewString("hourly")}
|
||||
SeasonalityDaily = Seasonality{valuer.NewString("daily")}
|
||||
SeasonalityWeekly = Seasonality{valuer.NewString("weekly")}
|
||||
)
|
||||
|
||||
var (
|
||||
oneWeekOffset = uint64(24 * 7 * time.Hour.Milliseconds())
|
||||
oneDayOffset = uint64(24 * time.Hour.Milliseconds())
|
||||
oneHourOffset = uint64(time.Hour.Milliseconds())
|
||||
fiveMinOffset = uint64(5 * time.Minute.Milliseconds())
|
||||
)
|
||||
|
||||
func (s Seasonality) IsValid() bool {
|
||||
switch s {
|
||||
case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
type AnomaliesRequest struct {
|
||||
Params qbtypes.QueryRangeRequest
|
||||
Seasonality Seasonality
|
||||
}
|
||||
|
||||
type AnomaliesResponse struct {
|
||||
Results []*qbtypes.TimeSeriesData
|
||||
}
|
||||
|
||||
// anomalyParams is the params for anomaly detection
|
||||
// prediction = avg(past_period_query) + avg(current_season_query) - mean(past_season_query, past2_season_query, past3_season_query)
|
||||
//
|
||||
// ^ ^
|
||||
// | |
|
||||
// (rounded value for past peiod) + (seasonal growth)
|
||||
//
|
||||
// score = abs(value - prediction) / stddev (current_season_query)
|
||||
type anomalyQueryParams struct {
|
||||
// CurrentPeriodQuery is the query range params for period user is looking at or eval window
|
||||
// Example: (now-5m, now), (now-30m, now), (now-1h, now)
|
||||
// The results obtained from this query are used to compare with predicted values
|
||||
// and to detect anomalies
|
||||
CurrentPeriodQuery qbtypes.QueryRangeRequest
|
||||
// PastPeriodQuery is the query range params for past period of seasonality
|
||||
// Example: For weekly seasonality, (now-1w-5m, now-1w)
|
||||
// : For daily seasonality, (now-1d-5m, now-1d)
|
||||
// : For hourly seasonality, (now-1h-5m, now-1h)
|
||||
PastPeriodQuery qbtypes.QueryRangeRequest
|
||||
// CurrentSeasonQuery is the query range params for current period (seasonal)
|
||||
// Example: For weekly seasonality, this is the query range params for the (now-1w-5m, now)
|
||||
// : For daily seasonality, this is the query range params for the (now-1d-5m, now)
|
||||
// : For hourly seasonality, this is the query range params for the (now-1h-5m, now)
|
||||
CurrentSeasonQuery qbtypes.QueryRangeRequest
|
||||
// PastSeasonQuery is the query range params for past seasonal period to the current season
|
||||
// Example: For weekly seasonality, this is the query range params for the (now-2w-5m, now-1w)
|
||||
// : For daily seasonality, this is the query range params for the (now-2d-5m, now-1d)
|
||||
// : For hourly seasonality, this is the query range params for the (now-2h-5m, now-1h)
|
||||
PastSeasonQuery qbtypes.QueryRangeRequest
|
||||
// Past2SeasonQuery is the query range params for past 2 seasonal period to the current season
|
||||
// Example: For weekly seasonality, this is the query range params for the (now-3w-5m, now-2w)
|
||||
// : For daily seasonality, this is the query range params for the (now-3d-5m, now-2d)
|
||||
// : For hourly seasonality, this is the query range params for the (now-3h-5m, now-2h)
|
||||
Past2SeasonQuery qbtypes.QueryRangeRequest
|
||||
// Past3SeasonQuery is the query range params for past 3 seasonal period to the current season
|
||||
// Example: For weekly seasonality, this is the query range params for the (now-4w-5m, now-3w)
|
||||
// : For daily seasonality, this is the query range params for the (now-4d-5m, now-3d)
|
||||
// : For hourly seasonality, this is the query range params for the (now-4h-5m, now-3h)
|
||||
Past3SeasonQuery qbtypes.QueryRangeRequest
|
||||
}
|
||||
|
||||
func prepareAnomalyQueryParams(req qbtypes.QueryRangeRequest, seasonality Seasonality) *anomalyQueryParams {
|
||||
start := req.Start
|
||||
end := req.End
|
||||
|
||||
currentPeriodQuery := qbtypes.QueryRangeRequest{
|
||||
Start: start,
|
||||
End: end,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
var pastPeriodStart, pastPeriodEnd uint64
|
||||
|
||||
switch seasonality {
|
||||
// for one week period, we fetch the data from the past week with 5 min offset
|
||||
case SeasonalityWeekly:
|
||||
pastPeriodStart = start - oneWeekOffset - fiveMinOffset
|
||||
pastPeriodEnd = end - oneWeekOffset
|
||||
// for one day period, we fetch the data from the past day with 5 min offset
|
||||
case SeasonalityDaily:
|
||||
pastPeriodStart = start - oneDayOffset - fiveMinOffset
|
||||
pastPeriodEnd = end - oneDayOffset
|
||||
// for one hour period, we fetch the data from the past hour with 5 min offset
|
||||
case SeasonalityHourly:
|
||||
pastPeriodStart = start - oneHourOffset - fiveMinOffset
|
||||
pastPeriodEnd = end - oneHourOffset
|
||||
}
|
||||
|
||||
pastPeriodQuery := qbtypes.QueryRangeRequest{
|
||||
Start: pastPeriodStart,
|
||||
End: pastPeriodEnd,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
// seasonality growth trend
|
||||
var currentGrowthPeriodStart, currentGrowthPeriodEnd uint64
|
||||
switch seasonality {
|
||||
case SeasonalityWeekly:
|
||||
currentGrowthPeriodStart = start - oneWeekOffset
|
||||
currentGrowthPeriodEnd = start
|
||||
case SeasonalityDaily:
|
||||
currentGrowthPeriodStart = start - oneDayOffset
|
||||
currentGrowthPeriodEnd = start
|
||||
case SeasonalityHourly:
|
||||
currentGrowthPeriodStart = start - oneHourOffset
|
||||
currentGrowthPeriodEnd = start
|
||||
}
|
||||
|
||||
currentGrowthQuery := qbtypes.QueryRangeRequest{
|
||||
Start: currentGrowthPeriodStart,
|
||||
End: currentGrowthPeriodEnd,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
var pastGrowthPeriodStart, pastGrowthPeriodEnd uint64
|
||||
switch seasonality {
|
||||
case SeasonalityWeekly:
|
||||
pastGrowthPeriodStart = start - 2*oneWeekOffset
|
||||
pastGrowthPeriodEnd = start - 1*oneWeekOffset
|
||||
case SeasonalityDaily:
|
||||
pastGrowthPeriodStart = start - 2*oneDayOffset
|
||||
pastGrowthPeriodEnd = start - 1*oneDayOffset
|
||||
case SeasonalityHourly:
|
||||
pastGrowthPeriodStart = start - 2*oneHourOffset
|
||||
pastGrowthPeriodEnd = start - 1*oneHourOffset
|
||||
}
|
||||
|
||||
pastGrowthQuery := qbtypes.QueryRangeRequest{
|
||||
Start: pastGrowthPeriodStart,
|
||||
End: pastGrowthPeriodEnd,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
var past2GrowthPeriodStart, past2GrowthPeriodEnd uint64
|
||||
switch seasonality {
|
||||
case SeasonalityWeekly:
|
||||
past2GrowthPeriodStart = start - 3*oneWeekOffset
|
||||
past2GrowthPeriodEnd = start - 2*oneWeekOffset
|
||||
case SeasonalityDaily:
|
||||
past2GrowthPeriodStart = start - 3*oneDayOffset
|
||||
past2GrowthPeriodEnd = start - 2*oneDayOffset
|
||||
case SeasonalityHourly:
|
||||
past2GrowthPeriodStart = start - 3*oneHourOffset
|
||||
past2GrowthPeriodEnd = start - 2*oneHourOffset
|
||||
}
|
||||
|
||||
past2GrowthQuery := qbtypes.QueryRangeRequest{
|
||||
Start: past2GrowthPeriodStart,
|
||||
End: past2GrowthPeriodEnd,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
var past3GrowthPeriodStart, past3GrowthPeriodEnd uint64
|
||||
switch seasonality {
|
||||
case SeasonalityWeekly:
|
||||
past3GrowthPeriodStart = start - 4*oneWeekOffset
|
||||
past3GrowthPeriodEnd = start - 3*oneWeekOffset
|
||||
case SeasonalityDaily:
|
||||
past3GrowthPeriodStart = start - 4*oneDayOffset
|
||||
past3GrowthPeriodEnd = start - 3*oneDayOffset
|
||||
case SeasonalityHourly:
|
||||
past3GrowthPeriodStart = start - 4*oneHourOffset
|
||||
past3GrowthPeriodEnd = start - 3*oneHourOffset
|
||||
}
|
||||
|
||||
past3GrowthQuery := qbtypes.QueryRangeRequest{
|
||||
Start: past3GrowthPeriodStart,
|
||||
End: past3GrowthPeriodEnd,
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: req.CompositeQuery,
|
||||
NoCache: false,
|
||||
}
|
||||
|
||||
return &anomalyQueryParams{
|
||||
CurrentPeriodQuery: currentPeriodQuery,
|
||||
PastPeriodQuery: pastPeriodQuery,
|
||||
CurrentSeasonQuery: currentGrowthQuery,
|
||||
PastSeasonQuery: pastGrowthQuery,
|
||||
Past2SeasonQuery: past2GrowthQuery,
|
||||
Past3SeasonQuery: past3GrowthQuery,
|
||||
}
|
||||
}
|
||||
|
||||
type anomalyQueryResults struct {
|
||||
CurrentPeriodResults []*qbtypes.TimeSeriesData
|
||||
PastPeriodResults []*qbtypes.TimeSeriesData
|
||||
CurrentSeasonResults []*qbtypes.TimeSeriesData
|
||||
PastSeasonResults []*qbtypes.TimeSeriesData
|
||||
Past2SeasonResults []*qbtypes.TimeSeriesData
|
||||
Past3SeasonResults []*qbtypes.TimeSeriesData
|
||||
}
|
||||
11
ee/anomaly/provider.go
Normal file
11
ee/anomaly/provider.go
Normal file
@ -0,0 +1,11 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error)
|
||||
}
|
||||
463
ee/anomaly/seasonal.go
Normal file
463
ee/anomaly/seasonal.go
Normal file
@ -0,0 +1,463 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"math"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO(srikanthccv): make this configurable?
|
||||
movingAvgWindowSize = 7
|
||||
)
|
||||
|
||||
// BaseProvider is an interface that includes common methods for all provider types
|
||||
type BaseProvider interface {
|
||||
GetBaseSeasonalProvider() *BaseSeasonalProvider
|
||||
}
|
||||
|
||||
// GenericProviderOption is a generic type for provider options
|
||||
type GenericProviderOption[T BaseProvider] func(T)
|
||||
|
||||
func WithQuerier[T BaseProvider](querier querier.Querier) GenericProviderOption[T] {
|
||||
return func(p T) {
|
||||
p.GetBaseSeasonalProvider().querier = querier
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger[T BaseProvider](logger *slog.Logger) GenericProviderOption[T] {
|
||||
return func(p T) {
|
||||
p.GetBaseSeasonalProvider().logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
type BaseSeasonalProvider struct {
|
||||
querier querier.Querier
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getQueryParams(req *AnomaliesRequest) *anomalyQueryParams {
|
||||
if !req.Seasonality.IsValid() {
|
||||
req.Seasonality = SeasonalityDaily
|
||||
}
|
||||
return prepareAnomalyQueryParams(req.Params, req.Seasonality)
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) toTSResults(ctx context.Context, resp *qbtypes.QueryRangeResponse) []*qbtypes.TimeSeriesData {
|
||||
|
||||
if resp == nil || resp.Data == nil {
|
||||
p.logger.InfoContext(ctx, "nil response from query range")
|
||||
}
|
||||
|
||||
data, ok := resp.Data.(struct {
|
||||
Results []any `json:"results"`
|
||||
Warnings []string `json:"warnings"`
|
||||
})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
tsData := []*qbtypes.TimeSeriesData{}
|
||||
for _, item := range data.Results {
|
||||
if resultData, ok := item.(*qbtypes.TimeSeriesData); ok {
|
||||
tsData = append(tsData, resultData)
|
||||
}
|
||||
}
|
||||
|
||||
return tsData
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID, params *anomalyQueryParams) (*anomalyQueryResults, error) {
|
||||
// TODO(srikanthccv): parallelize this?
|
||||
p.logger.InfoContext(ctx, "fetching results for current period", "anomaly_current_period_query", params.CurrentPeriodQuery)
|
||||
currentPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentPeriodQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.logger.InfoContext(ctx, "fetching results for past period", "anomaly_past_period_query", params.PastPeriodQuery)
|
||||
pastPeriodResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.PastPeriodQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.logger.InfoContext(ctx, "fetching results for current season", "anomaly_current_season_query", params.CurrentSeasonQuery)
|
||||
currentSeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.CurrentSeasonQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.logger.InfoContext(ctx, "fetching results for past season", "anomaly_past_season_query", params.PastSeasonQuery)
|
||||
pastSeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.PastSeasonQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.logger.InfoContext(ctx, "fetching results for past 2 season", "anomaly_past_2season_query", params.Past2SeasonQuery)
|
||||
past2SeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.Past2SeasonQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.logger.InfoContext(ctx, "fetching results for past 3 season", "anomaly_past_3season_query", params.Past3SeasonQuery)
|
||||
past3SeasonResults, err := p.querier.QueryRange(ctx, orgID, ¶ms.Past3SeasonQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &anomalyQueryResults{
|
||||
CurrentPeriodResults: p.toTSResults(ctx, currentPeriodResults),
|
||||
PastPeriodResults: p.toTSResults(ctx, pastPeriodResults),
|
||||
CurrentSeasonResults: p.toTSResults(ctx, currentSeasonResults),
|
||||
PastSeasonResults: p.toTSResults(ctx, pastSeasonResults),
|
||||
Past2SeasonResults: p.toTSResults(ctx, past2SeasonResults),
|
||||
Past3SeasonResults: p.toTSResults(ctx, past3SeasonResults),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// getMatchingSeries gets the matching series from the query result
|
||||
// for the given series
|
||||
func (p *BaseSeasonalProvider) getMatchingSeries(_ context.Context, queryResult *qbtypes.TimeSeriesData, series *qbtypes.TimeSeries) *qbtypes.TimeSeries {
|
||||
if queryResult == nil || len(queryResult.Aggregations) == 0 || len(queryResult.Aggregations[0].Series) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, curr := range queryResult.Aggregations[0].Series {
|
||||
currLabelsKey := qbtypes.GetUniqueSeriesKey(curr.Labels)
|
||||
seriesLabelsKey := qbtypes.GetUniqueSeriesKey(series.Labels)
|
||||
if currLabelsKey == seriesLabelsKey {
|
||||
return curr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getAvg(series *qbtypes.TimeSeries) float64 {
|
||||
if series == nil || len(series.Values) == 0 {
|
||||
return 0
|
||||
}
|
||||
var sum float64
|
||||
for _, smpl := range series.Values {
|
||||
sum += smpl.Value
|
||||
}
|
||||
return sum / float64(len(series.Values))
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getStdDev(series *qbtypes.TimeSeries) float64 {
|
||||
if series == nil || len(series.Values) == 0 {
|
||||
return 0
|
||||
}
|
||||
avg := p.getAvg(series)
|
||||
var sum float64
|
||||
for _, smpl := range series.Values {
|
||||
sum += math.Pow(smpl.Value-avg, 2)
|
||||
}
|
||||
return math.Sqrt(sum / float64(len(series.Values)))
|
||||
}
|
||||
|
||||
// getMovingAvg gets the moving average for the given series
|
||||
// for the given window size and start index
|
||||
func (p *BaseSeasonalProvider) getMovingAvg(series *qbtypes.TimeSeries, movingAvgWindowSize, startIdx int) float64 {
|
||||
if series == nil || len(series.Values) == 0 {
|
||||
return 0
|
||||
}
|
||||
if startIdx >= len(series.Values)-movingAvgWindowSize {
|
||||
startIdx = int(math.Max(0, float64(len(series.Values)-movingAvgWindowSize)))
|
||||
}
|
||||
var sum float64
|
||||
points := series.Values[startIdx:]
|
||||
windowSize := int(math.Min(float64(movingAvgWindowSize), float64(len(points))))
|
||||
for i := 0; i < windowSize; i++ {
|
||||
sum += points[i].Value
|
||||
}
|
||||
avg := sum / float64(windowSize)
|
||||
return avg
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getMean(floats ...float64) float64 {
|
||||
if len(floats) == 0 {
|
||||
return 0
|
||||
}
|
||||
var sum float64
|
||||
for _, f := range floats {
|
||||
sum += f
|
||||
}
|
||||
return sum / float64(len(floats))
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getPredictedSeries(
|
||||
ctx context.Context,
|
||||
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries,
|
||||
) *qbtypes.TimeSeries {
|
||||
predictedSeries := &qbtypes.TimeSeries{
|
||||
Labels: series.Labels,
|
||||
Values: make([]*qbtypes.TimeSeriesValue, 0),
|
||||
}
|
||||
|
||||
// for each point in the series, get the predicted value
|
||||
// the predicted value is the moving average (with window size = 7) of the previous period series
|
||||
// plus the average of the current season series
|
||||
// minus the mean of the past season series, past2 season series and past3 season series
|
||||
for idx, curr := range series.Values {
|
||||
movingAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
|
||||
avg := p.getAvg(currentSeasonSeries)
|
||||
mean := p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries))
|
||||
predictedValue := movingAvg + avg - mean
|
||||
|
||||
if predictedValue < 0 {
|
||||
// this should not happen (except when the data has extreme outliers)
|
||||
// we will use the moving avg of the previous period series in this case
|
||||
p.logger.WarnContext(ctx, "predicted value is less than 0 for series", "anomaly_predicted_value", predictedValue, "anomaly_labels", series.Labels)
|
||||
predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
|
||||
}
|
||||
|
||||
p.logger.DebugContext(ctx, "predicted value for series",
|
||||
"anomaly_moving_avg", movingAvg,
|
||||
"anomaly_avg", avg,
|
||||
"anomaly_mean", mean,
|
||||
"anomaly_labels", series.Labels,
|
||||
"anomaly_predicted_value", predictedValue,
|
||||
"anomaly_curr", curr.Value,
|
||||
)
|
||||
predictedSeries.Values = append(predictedSeries.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: curr.Timestamp,
|
||||
Value: predictedValue,
|
||||
})
|
||||
}
|
||||
|
||||
return predictedSeries
|
||||
}
|
||||
|
||||
// getBounds gets the upper and lower bounds for the given series
|
||||
// for the given z score threshold
|
||||
// moving avg of the previous period series + z score threshold * std dev of the series
|
||||
// moving avg of the previous period series - z score threshold * std dev of the series
|
||||
func (p *BaseSeasonalProvider) getBounds(
|
||||
series, predictedSeries *qbtypes.TimeSeries,
|
||||
zScoreThreshold float64,
|
||||
) (*qbtypes.TimeSeries, *qbtypes.TimeSeries) {
|
||||
upperBoundSeries := &qbtypes.TimeSeries{
|
||||
Labels: series.Labels,
|
||||
Values: make([]*qbtypes.TimeSeriesValue, 0),
|
||||
}
|
||||
|
||||
lowerBoundSeries := &qbtypes.TimeSeries{
|
||||
Labels: series.Labels,
|
||||
Values: make([]*qbtypes.TimeSeriesValue, 0),
|
||||
}
|
||||
|
||||
for idx, curr := range series.Values {
|
||||
upperBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series)
|
||||
lowerBound := p.getMovingAvg(predictedSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series)
|
||||
upperBoundSeries.Values = append(upperBoundSeries.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: curr.Timestamp,
|
||||
Value: upperBound,
|
||||
})
|
||||
lowerBoundSeries.Values = append(lowerBoundSeries.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: curr.Timestamp,
|
||||
Value: math.Max(lowerBound, 0),
|
||||
})
|
||||
}
|
||||
|
||||
return upperBoundSeries, lowerBoundSeries
|
||||
}
|
||||
|
||||
// getExpectedValue gets the expected value for the given series
|
||||
// for the given index
|
||||
// prevSeriesAvg + currentSeasonSeriesAvg - mean of past season series, past2 season series and past3 season series
|
||||
func (p *BaseSeasonalProvider) getExpectedValue(
|
||||
_, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, idx int,
|
||||
) float64 {
|
||||
prevSeriesAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
|
||||
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
|
||||
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
|
||||
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
|
||||
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
|
||||
return prevSeriesAvg + currentSeasonSeriesAvg - p.getMean(pastSeasonSeriesAvg, past2SeasonSeriesAvg, past3SeasonSeriesAvg)
|
||||
}
|
||||
|
||||
// getScore gets the anomaly score for the given series
|
||||
// for the given index
|
||||
// (value - expectedValue) / std dev of the series
|
||||
func (p *BaseSeasonalProvider) getScore(
|
||||
series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries, value float64, idx int,
|
||||
) float64 {
|
||||
expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries, idx)
|
||||
if expectedValue < 0 {
|
||||
expectedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
|
||||
}
|
||||
return (value - expectedValue) / p.getStdDev(weekSeries)
|
||||
}
|
||||
|
||||
// getAnomalyScores gets the anomaly scores for the given series
|
||||
// for the given index
|
||||
// (value - expectedValue) / std dev of the series
|
||||
func (p *BaseSeasonalProvider) getAnomalyScores(
|
||||
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *qbtypes.TimeSeries,
|
||||
) *qbtypes.TimeSeries {
|
||||
anomalyScoreSeries := &qbtypes.TimeSeries{
|
||||
Labels: series.Labels,
|
||||
Values: make([]*qbtypes.TimeSeriesValue, 0),
|
||||
}
|
||||
|
||||
for idx, curr := range series.Values {
|
||||
anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries, curr.Value, idx)
|
||||
anomalyScoreSeries.Values = append(anomalyScoreSeries.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: curr.Timestamp,
|
||||
Value: anomalyScore,
|
||||
})
|
||||
}
|
||||
|
||||
return anomalyScoreSeries
|
||||
}
|
||||
|
||||
func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
|
||||
anomalyParams := p.getQueryParams(req)
|
||||
anomalyQueryResults, err := p.getResults(ctx, orgID, anomalyParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
currentPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.CurrentPeriodResults {
|
||||
currentPeriodResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
pastPeriodResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.PastPeriodResults {
|
||||
pastPeriodResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
currentSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.CurrentSeasonResults {
|
||||
currentSeasonResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
pastSeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.PastSeasonResults {
|
||||
pastSeasonResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
past2SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.Past2SeasonResults {
|
||||
past2SeasonResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
past3SeasonResults := make(map[string]*qbtypes.TimeSeriesData)
|
||||
for _, result := range anomalyQueryResults.Past3SeasonResults {
|
||||
past3SeasonResults[result.QueryName] = result
|
||||
}
|
||||
|
||||
for _, result := range currentPeriodResults {
|
||||
funcs := req.Params.FuncsForQuery(result.QueryName)
|
||||
|
||||
var zScoreThreshold float64
|
||||
for _, f := range funcs {
|
||||
if f.Name == qbtypes.FunctionNameAnomaly {
|
||||
for _, arg := range f.Args {
|
||||
if arg.Name != "z_score_threshold" {
|
||||
continue
|
||||
}
|
||||
value, ok := arg.Value.(float64)
|
||||
if ok {
|
||||
zScoreThreshold = value
|
||||
} else {
|
||||
p.logger.InfoContext(ctx, "z_score_threshold not provided, defaulting")
|
||||
zScoreThreshold = 3
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pastPeriodResult, ok := pastPeriodResults[result.QueryName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
currentSeasonResult, ok := currentSeasonResults[result.QueryName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
pastSeasonResult, ok := pastSeasonResults[result.QueryName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
past2SeasonResult, ok := past2SeasonResults[result.QueryName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
past3SeasonResult, ok := past3SeasonResults[result.QueryName]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
aggOfInterest := result.Aggregations[0]
|
||||
|
||||
for _, series := range aggOfInterest.Series {
|
||||
stdDev := p.getStdDev(series)
|
||||
p.logger.InfoContext(ctx, "calculated standard deviation for series", "anomaly_std_dev", stdDev, "anomaly_labels", series.Labels)
|
||||
|
||||
pastPeriodSeries := p.getMatchingSeries(ctx, pastPeriodResult, series)
|
||||
currentSeasonSeries := p.getMatchingSeries(ctx, currentSeasonResult, series)
|
||||
pastSeasonSeries := p.getMatchingSeries(ctx, pastSeasonResult, series)
|
||||
past2SeasonSeries := p.getMatchingSeries(ctx, past2SeasonResult, series)
|
||||
past3SeasonSeries := p.getMatchingSeries(ctx, past3SeasonResult, series)
|
||||
|
||||
prevSeriesAvg := p.getAvg(pastPeriodSeries)
|
||||
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
|
||||
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
|
||||
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
|
||||
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
|
||||
p.logger.InfoContext(ctx, "calculated mean for series",
|
||||
"anomaly_prev_series_avg", prevSeriesAvg,
|
||||
"anomaly_current_season_series_avg", currentSeasonSeriesAvg,
|
||||
"anomaly_past_season_series_avg", pastSeasonSeriesAvg,
|
||||
"anomaly_past_2season_series_avg", past2SeasonSeriesAvg,
|
||||
"anomaly_past_3season_series_avg", past3SeasonSeriesAvg,
|
||||
"anomaly_labels", series.Labels,
|
||||
)
|
||||
|
||||
predictedSeries := p.getPredictedSeries(
|
||||
ctx,
|
||||
series,
|
||||
pastPeriodSeries,
|
||||
currentSeasonSeries,
|
||||
pastSeasonSeries,
|
||||
past2SeasonSeries,
|
||||
past3SeasonSeries,
|
||||
)
|
||||
aggOfInterest.PredictedSeries = append(aggOfInterest.PredictedSeries, predictedSeries)
|
||||
|
||||
upperBoundSeries, lowerBoundSeries := p.getBounds(
|
||||
series,
|
||||
predictedSeries,
|
||||
zScoreThreshold,
|
||||
)
|
||||
aggOfInterest.UpperBoundSeries = append(aggOfInterest.UpperBoundSeries, upperBoundSeries)
|
||||
aggOfInterest.LowerBoundSeries = append(aggOfInterest.LowerBoundSeries, lowerBoundSeries)
|
||||
|
||||
anomalyScoreSeries := p.getAnomalyScores(
|
||||
series,
|
||||
pastPeriodSeries,
|
||||
currentSeasonSeries,
|
||||
pastSeasonSeries,
|
||||
past2SeasonSeries,
|
||||
past3SeasonSeries,
|
||||
)
|
||||
aggOfInterest.AnomalyScores = append(aggOfInterest.AnomalyScores, anomalyScoreSeries)
|
||||
}
|
||||
}
|
||||
|
||||
results := make([]*qbtypes.TimeSeriesData, 0, len(currentPeriodResults))
|
||||
for _, result := range currentPeriodResults {
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
return &AnomaliesResponse{
|
||||
Results: results,
|
||||
}, nil
|
||||
}
|
||||
34
ee/anomaly/weekly.go
Normal file
34
ee/anomaly/weekly.go
Normal file
@ -0,0 +1,34 @@
|
||||
package anomaly
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type WeeklyProvider struct {
|
||||
BaseSeasonalProvider
|
||||
}
|
||||
|
||||
var _ BaseProvider = (*WeeklyProvider)(nil)
|
||||
|
||||
func (wp *WeeklyProvider) GetBaseSeasonalProvider() *BaseSeasonalProvider {
|
||||
return &wp.BaseSeasonalProvider
|
||||
}
|
||||
|
||||
func NewWeeklyProvider(opts ...GenericProviderOption[*WeeklyProvider]) *WeeklyProvider {
|
||||
wp := &WeeklyProvider{
|
||||
BaseSeasonalProvider: BaseSeasonalProvider{},
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(wp)
|
||||
}
|
||||
|
||||
return wp
|
||||
}
|
||||
|
||||
func (p *WeeklyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *AnomaliesRequest) (*AnomaliesResponse, error) {
|
||||
req.Seasonality = SeasonalityWeekly
|
||||
return p.getAnomalies(ctx, orgID, req)
|
||||
}
|
||||
@ -110,6 +110,9 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||
// v4
|
||||
router.HandleFunc("/api/v4/query_range", am.ViewAccess(ah.queryRangeV4)).Methods(http.MethodPost)
|
||||
|
||||
// v5
|
||||
router.HandleFunc("/api/v5/query_range", am.ViewAccess(ah.queryRangeV5)).Methods(http.MethodPost)
|
||||
|
||||
// Gateway
|
||||
router.PathPrefix(gateway.RoutePrefix).HandlerFunc(am.EditAccess(ah.ServeGatewayHTTP))
|
||||
|
||||
|
||||
@ -2,11 +2,16 @@ package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
|
||||
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
|
||||
"github.com/SigNoz/signoz/ee/query-service/anomaly"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
baseapp "github.com/SigNoz/signoz/pkg/query-service/app"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||
@ -15,6 +20,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"go.uber.org/zap"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
|
||||
@ -136,3 +143,141 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
|
||||
aH.QueryRangeV4(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func extractSeasonality(anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) anomalyV2.Seasonality {
|
||||
for _, fn := range anomalyQuery.Functions {
|
||||
if fn.Name == qbtypes.FunctionNameAnomaly {
|
||||
for _, arg := range fn.Args {
|
||||
if arg.Name == "seasonality" {
|
||||
if seasonalityStr, ok := arg.Value.(string); ok {
|
||||
switch seasonalityStr {
|
||||
case "weekly":
|
||||
return anomalyV2.SeasonalityWeekly
|
||||
case "hourly":
|
||||
return anomalyV2.SeasonalityHourly
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return anomalyV2.SeasonalityDaily // default
|
||||
}
|
||||
|
||||
func createAnomalyProvider(aH *APIHandler, seasonality anomalyV2.Seasonality) anomalyV2.Provider {
|
||||
switch seasonality {
|
||||
case anomalyV2.SeasonalityWeekly:
|
||||
return anomalyV2.NewWeeklyProvider(
|
||||
anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](aH.Signoz.Querier),
|
||||
anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](aH.Signoz.Instrumentation.Logger()),
|
||||
)
|
||||
case anomalyV2.SeasonalityHourly:
|
||||
return anomalyV2.NewHourlyProvider(
|
||||
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](aH.Signoz.Querier),
|
||||
anomalyV2.WithLogger[*anomalyV2.HourlyProvider](aH.Signoz.Instrumentation.Logger()),
|
||||
)
|
||||
default:
|
||||
return anomalyV2.NewDailyProvider(
|
||||
anomalyV2.WithQuerier[*anomalyV2.DailyProvider](aH.Signoz.Querier),
|
||||
anomalyV2.WithLogger[*anomalyV2.DailyProvider](aH.Signoz.Instrumentation.Logger()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (aH *APIHandler) handleAnomalyQuery(ctx context.Context, orgID valuer.UUID, anomalyQuery *qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], queryRangeRequest qbtypes.QueryRangeRequest) (*anomalyV2.AnomaliesResponse, error) {
|
||||
seasonality := extractSeasonality(anomalyQuery)
|
||||
provider := createAnomalyProvider(aH, seasonality)
|
||||
|
||||
return provider.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{Params: queryRangeRequest})
|
||||
}
|
||||
|
||||
func (aH *APIHandler) queryRangeV5(rw http.ResponseWriter, req *http.Request) {
|
||||
|
||||
bodyBytes, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to read request body: %v", err))
|
||||
return
|
||||
}
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
|
||||
ctx := req.Context()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
var queryRangeRequest qbtypes.QueryRangeRequest
|
||||
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to decode request body: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
stackTrace := string(debug.Stack())
|
||||
|
||||
queryJSON, _ := json.Marshal(queryRangeRequest)
|
||||
|
||||
aH.Signoz.Instrumentation.Logger().ErrorContext(ctx, "panic in QueryRange",
|
||||
"error", r,
|
||||
"user", claims.UserID,
|
||||
"payload", string(queryJSON),
|
||||
"stacktrace", stackTrace,
|
||||
)
|
||||
|
||||
render.Error(rw, errors.NewInternalf(
|
||||
errors.CodeInternal,
|
||||
"Something went wrong on our end. It's not you, it's us. Our team is notified about it. Reach out to support if issue persists.",
|
||||
))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := queryRangeRequest.Validate(); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
if anomalyQuery, ok := queryRangeRequest.IsAnomalyRequest(); ok {
|
||||
anomalies, err := aH.handleAnomalyQuery(ctx, orgID, anomalyQuery, queryRangeRequest)
|
||||
if err != nil {
|
||||
render.Error(rw, errors.NewInternalf(errors.CodeInternal, "failed to get anomalies: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
results := []any{}
|
||||
for _, item := range anomalies.Results {
|
||||
results = append(results, item)
|
||||
}
|
||||
|
||||
finalResp := &qbtypes.QueryRangeResponse{
|
||||
Type: queryRangeRequest.RequestType,
|
||||
Data: struct {
|
||||
Results []any `json:"results"`
|
||||
Warnings []string `json:"warnings"`
|
||||
}{
|
||||
Results: results,
|
||||
Warnings: make([]string, 0), // TODO(srikanthccv): will there be any warnings here?
|
||||
},
|
||||
Meta: struct {
|
||||
RowsScanned uint64 `json:"rowsScanned"`
|
||||
BytesScanned uint64 `json:"bytesScanned"`
|
||||
DurationMS uint64 `json:"durationMs"`
|
||||
}{},
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, finalResp)
|
||||
return
|
||||
} else {
|
||||
// regular query range request, let the querier handle it
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
aH.QuerierAPI.QueryRange(rw, req)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user