From 43eea4d2a0eb904f0bf6757e511ebfa18f73c348 Mon Sep 17 00:00:00 2001 From: aniket Date: Fri, 8 Aug 2025 16:34:13 +0530 Subject: [PATCH] feat(cron-alerts): added cron alerts --- ee/query-service/rules/manager.go | 12 +- go.mod | 1 + go.sum | 2 + pkg/query-service/rules/base_rule.go | 12 ++ pkg/query-service/rules/manager.go | 8 +- pkg/query-service/rules/prom_rule_task.go | 12 ++ pkg/query-service/rules/rule_task.go | 199 +++++++++++++++++----- pkg/query-service/rules/task.go | 2 + pkg/types/ruletypes/api_params.go | 15 +- 9 files changed, 213 insertions(+), 50 deletions(-) diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index bf5cbbbec117..f0c48277e6a3 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -41,7 +41,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) // create ch rule task for evalution task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID) - + if tr.IsScheduled() { + task.SetSchedule(tr.GetSchedule()) + } } else if opts.Rule.RuleType == ruletypes.RuleTypeProm { // create promql rule @@ -63,7 +65,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) // create promql rule task for evalution task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID) - + if pr.IsScheduled() { + task.SetSchedule(pr.GetSchedule()) + } } else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly { // create anomaly rule ar, err := NewAnomalyRule( @@ -85,7 +89,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) // create anomaly rule task for evalution task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID) - + if ar.IsScheduled() { + task.SetSchedule(ar.GetSchedule()) + } } else { return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold) } diff --git a/go.mod b/go.mod index 5b3df5a97991..4d0304d5fbea 100644 --- a/go.mod +++ b/go.mod @@ -210,6 +210,7 @@ require ( github.com/smarty/assertions v1.15.0 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/teambition/rrule-go v1.8.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.15 // indirect diff --git a/go.sum b/go.sum index e48341f9919e..2ffb33773a86 100644 --- a/go.sum +++ b/go.sum @@ -918,6 +918,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8= +github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go index 5375c1ad6c7b..6ccc4db922d8 100644 --- a/pkg/query-service/rules/base_rule.go +++ b/pkg/query-service/rules/base_rule.go @@ -87,6 +87,9 @@ type BaseRule struct { sqlstore sqlstore.SQLStore evaluation ruletypes.Evaluation + + schedule string + scheduleStartsAt time.Time } type RuleOption func(*BaseRule) @@ -142,6 +145,7 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader reader: reader, TemporalityMap: make(map[string]map[v3.Temporality]bool), evaluation: p.Evaluation, + schedule: p.Schedule, } if baseRule.evalWindow == 0 { @@ -217,6 +221,14 @@ func (r *BaseRule) Thresholds() []ruletypes.RuleThreshold { return r.ruleCondition.Thresholds } +func (r *BaseRule) IsScheduled() bool { + return r.schedule != "" +} + +func (r *BaseRule) GetSchedule() (string, time.Time) { + return r.schedule, r.scheduleStartsAt +} + func (r *ThresholdRule) hostFromSource() string { parsedUrl, err := url.Parse(r.source) if err != nil { diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index c9ff9923b295..b034aadbf256 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -168,7 +168,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { // create ch rule task for evalution task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID) - + if tr.IsScheduled() { + task.SetSchedule(tr.GetSchedule()) + } } else if opts.Rule.RuleType == ruletypes.RuleTypeProm { // create promql rule @@ -190,7 +192,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { // create promql rule task for evalution task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID) - + if pr.IsScheduled() { + task.SetSchedule(pr.GetSchedule()) + } } else { return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold) } diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index 94fda380faeb..95f727cecb6e 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -39,6 +39,8 @@ type PromRuleTask struct { maintenanceStore ruletypes.MaintenanceStore orgID valuer.UUID + schedule string + scheduleStartsAt time.Time } // newPromRuleTask holds rules that have promql condition @@ -75,6 +77,10 @@ func (g *PromRuleTask) Key() string { return g.name + ";" + g.file } +func (g *PromRuleTask) IsCronSchedule() bool { + return g.schedule != "" +} + func (g *PromRuleTask) Type() TaskType { return TaskTypeProm } // Rules returns the group's rules. @@ -245,6 +251,12 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) { g.lastEvaluation = ts } +func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) { + g.mtx.Lock() + defer g.mtx.Unlock() + g.schedule = schedule +} + // EvalTimestamp returns the immediately preceding consistently slotted evaluation time. func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time { var ( diff --git a/pkg/query-service/rules/rule_task.go b/pkg/query-service/rules/rule_task.go index ff0f50d3afc7..04a2409030d5 100644 --- a/pkg/query-service/rules/rule_task.go +++ b/pkg/query-service/rules/rule_task.go @@ -12,6 +12,7 @@ import ( ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/valuer" opentracing "github.com/opentracing/opentracing-go" + "github.com/teambition/rrule-go" "go.uber.org/zap" ) @@ -36,6 +37,10 @@ type RuleTask struct { maintenanceStore ruletypes.MaintenanceStore orgID valuer.UUID + + // New field for rrule-based scheduling + schedule string + scheduleStartsAt time.Time } const DefaultFrequency = 1 * time.Minute @@ -71,6 +76,10 @@ func (g *RuleTask) Key() string { return g.name + ";" + g.file } +func (g *RuleTask) IsCronSchedule() bool { + return g.schedule != "" +} + // Name returns the group name. func (g *RuleTask) Type() TaskType { return TaskTypeCh } @@ -95,56 +104,119 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con func (g *RuleTask) Run(ctx context.Context) { defer close(g.terminated) - // Wait an initial amount to have consistently slotted intervals. - evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency) - zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp)) - select { - case <-time.After(time.Until(evalTimestamp)): - case <-g.done: - return - } - - ctx = NewQueryOriginContext(ctx, map[string]interface{}{ - "ruleRuleTask": map[string]string{ - "name": g.Name(), - }, - }) - - iter := func() { - if g.pause { - // todo(amol): remove in memory active alerts - // and last series state + if g.IsCronSchedule() { + schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression + if err != nil { + zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err)) return } - start := time.Now() - g.Eval(ctx, evalTimestamp) - timeSinceStart := time.Since(start) + now := time.Now() + nextRun := schedule.After(now, false) - g.setEvaluationTime(timeSinceStart) - g.setLastEvaluation(start) - } - - // The assumption here is that since the ticker was started after having - // waited for `evalTimestamp` to pass, the ticks will trigger soon - // after each `evalTimestamp + N * g.frequency` occurrence. - tick := time.NewTicker(g.frequency) - defer tick.Stop() - - iter() - - // let the group iterate and run - for { select { + case <-time.After(time.Until(nextRun)): case <-g.done: return - default: + } + + ctx = NewQueryOriginContext(ctx, map[string]interface{}{ + "ruleRuleTask": map[string]string{ + "name": g.Name(), + }, + }) + + iter := func() { + if g.pause { + return + } + start := time.Now() + g.Eval(ctx, start) // using current time instead of evalTimestamp + timeSinceStart := time.Since(start) + + g.setEvaluationTime(timeSinceStart) + g.setLastEvaluation(start) + } + + iter() + currentRun := nextRun + + for { + // Calculate the next run time + nextRun = schedule.After(currentRun, false) + select { case <-g.done: return - case <-tick.C: - missed := (time.Since(evalTimestamp) / g.frequency) - 1 - evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency) - iter() + default: + select { + case <-g.done: + return + case <-time.After(time.Until(nextRun)): + // Check if we missed any scheduled runs + now := time.Now() + if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance + zap.L().Warn("missed scheduled run", + zap.Time("scheduled", nextRun), + zap.Time("actual", now)) + } + + currentRun = nextRun + iter() + } + } + } + } else { + // Wait an initial amount to have consistently slotted intervals. + evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency) + zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp)) + select { + case <-time.After(time.Until(evalTimestamp)): + case <-g.done: + return + } + + ctx = NewQueryOriginContext(ctx, map[string]interface{}{ + "ruleRuleTask": map[string]string{ + "name": g.Name(), + }, + }) + + iter := func() { + if g.pause { + // todo(amol): remove in memory active alerts + // and last series state + return + } + start := time.Now() + g.Eval(ctx, evalTimestamp) + timeSinceStart := time.Since(start) + + g.setEvaluationTime(timeSinceStart) + g.setLastEvaluation(start) + } + + // The assumption here is that since the ticker was started after having + // waited for `evalTimestamp` to pass, the ticks will trigger soon + // after each `evalTimestamp + N * g.frequency` occurrence. + tick := time.NewTicker(g.frequency) + defer tick.Stop() + + iter() + + // let the group iterate and run + for { + select { + case <-g.done: + return + default: + select { + case <-g.done: + return + case <-tick.C: + missed := (time.Since(evalTimestamp) / g.frequency) - 1 + evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency) + iter() + } } } } @@ -298,6 +370,13 @@ func (g *RuleTask) CopyState(fromTask Task) error { return nil } +func (g *RuleTask) SetSchedule(schedule string, t time.Time) { + g.mtx.Lock() + defer g.mtx.Unlock() + g.schedule = schedule + g.scheduleStartsAt = t +} + // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { @@ -379,3 +458,41 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { }(i, rule) } } + +// Helper to convert ruletypes.Schedule/Recurrence to rrule.ROption +func recurrenceToROption(s *ruletypes.Schedule) rrule.ROption { + // Only basic mapping for daily/weekly/monthly, can be extended + opt := rrule.ROption{ + Dtstart: s.Recurrence.StartTime, + } + switch s.Recurrence.RepeatType { + case ruletypes.RepeatTypeDaily: + opt.Freq = rrule.DAILY + case ruletypes.RepeatTypeWeekly: + opt.Freq = rrule.WEEKLY + for _, day := range s.Recurrence.RepeatOn { + switch day { + case ruletypes.RepeatOnSunday: + opt.Byweekday = append(opt.Byweekday, rrule.SU) + case ruletypes.RepeatOnMonday: + opt.Byweekday = append(opt.Byweekday, rrule.MO) + case ruletypes.RepeatOnTuesday: + opt.Byweekday = append(opt.Byweekday, rrule.TU) + case ruletypes.RepeatOnWednesday: + opt.Byweekday = append(opt.Byweekday, rrule.WE) + case ruletypes.RepeatOnThursday: + opt.Byweekday = append(opt.Byweekday, rrule.TH) + case ruletypes.RepeatOnFriday: + opt.Byweekday = append(opt.Byweekday, rrule.FR) + case ruletypes.RepeatOnSaturday: + opt.Byweekday = append(opt.Byweekday, rrule.SA) + } + } + case ruletypes.RepeatTypeMonthly: + opt.Freq = rrule.MONTHLY + } + if s.Recurrence.EndTime != nil { + opt.Until = *s.Recurrence.EndTime + } + return opt +} diff --git a/pkg/query-service/rules/task.go b/pkg/query-service/rules/task.go index d06194b24c8f..2b14c01927c8 100644 --- a/pkg/query-service/rules/task.go +++ b/pkg/query-service/rules/task.go @@ -28,6 +28,8 @@ type Task interface { Rules() []Rule Stop() Pause(b bool) + IsCronSchedule() bool + SetSchedule(string, time.Time) } // newTask returns an appropriate group for diff --git a/pkg/types/ruletypes/api_params.go b/pkg/types/ruletypes/api_params.go index cf51225c2122..4de28961e8f8 100644 --- a/pkg/types/ruletypes/api_params.go +++ b/pkg/types/ruletypes/api_params.go @@ -65,10 +65,12 @@ type PostableRule struct { Version string `json:"version,omitempty"` // legacy - Expr string `yaml:"expr,omitempty" json:"expr,omitempty"` - OldYaml string `json:"yaml,omitempty"` - + Expr string `yaml:"expr,omitempty" json:"expr,omitempty"` + OldYaml string `json:"yaml,omitempty"` + EvalType string `yaml:"evalType,omitempty" json:"evalType,omitempty"` Evaluation Evaluation `yaml:"evaluation,omitempty" json:"evaluation,omitempty"` + StartsAt int64 `yaml:"startsAt,omitempty" json:"startsAt,omitempty"` + Schedule string `json:"schedule,omitempty"` } func ParsePostableRule(content []byte) (*PostableRule, error) { @@ -145,7 +147,12 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P //added alerts v2 fields rule.RuleCondition.Thresholds = append(rule.RuleCondition.Thresholds, NewBasicRuleThreshold(rule.AlertName, rule.RuleCondition.Target, nil, rule.RuleCondition.MatchType, rule.RuleCondition.CompareOp, rule.RuleCondition.SelectedQuery, rule.RuleCondition.TargetUnit, rule.RuleCondition.CompositeQuery.Unit)) - rule.Evaluation = NewEvaluation("rolling", RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints}) + if rule.EvalType == "" || rule.EvalType == "rolling" { + rule.EvalType = "rolling" + rule.Evaluation = NewEvaluation(rule.EvalType, RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints}) + } else if rule.EvalType == "cumulative" { + rule.Evaluation = NewEvaluation(rule.EvalType, CumulativeWindow{EvalWindow: rule.EvalWindow, StartsAt: time.UnixMilli(rule.StartsAt), RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints}) + } return rule, nil }