mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-24 10:56:53 +00:00
feat(cron-alerts): added cron alerts
This commit is contained in:
parent
a1bace9b14
commit
43eea4d2a0
@ -41,7 +41,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
task.SetSchedule(tr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@ -63,7 +65,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
task.SetSchedule(pr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
||||
// create anomaly rule
|
||||
ar, err := NewAnomalyRule(
|
||||
@ -85,7 +89,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
|
||||
// create anomaly rule task for evalution
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if ar.IsScheduled() {
|
||||
task.SetSchedule(ar.GetSchedule())
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
1
go.mod
1
go.mod
@ -210,6 +210,7 @@ require (
|
||||
github.com/smarty/assertions v1.15.0 // indirect
|
||||
github.com/spf13/pflag v1.0.6 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/teambition/rrule-go v1.8.2 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.15 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@ -918,6 +918,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
|
||||
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
|
||||
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
|
||||
@ -87,6 +87,9 @@ type BaseRule struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
|
||||
evaluation ruletypes.Evaluation
|
||||
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@ -142,6 +145,7 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
reader: reader,
|
||||
TemporalityMap: make(map[string]map[v3.Temporality]bool),
|
||||
evaluation: p.Evaluation,
|
||||
schedule: p.Schedule,
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
@ -217,6 +221,14 @@ func (r *BaseRule) Thresholds() []ruletypes.RuleThreshold {
|
||||
return r.ruleCondition.Thresholds
|
||||
}
|
||||
|
||||
func (r *BaseRule) IsScheduled() bool {
|
||||
return r.schedule != ""
|
||||
}
|
||||
|
||||
func (r *BaseRule) GetSchedule() (string, time.Time) {
|
||||
return r.schedule, r.scheduleStartsAt
|
||||
}
|
||||
|
||||
func (r *ThresholdRule) hostFromSource() string {
|
||||
parsedUrl, err := url.Parse(r.source)
|
||||
if err != nil {
|
||||
|
||||
@ -168,7 +168,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create ch rule task for evalution
|
||||
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if tr.IsScheduled() {
|
||||
task.SetSchedule(tr.GetSchedule())
|
||||
}
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
// create promql rule
|
||||
@ -190,7 +192,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
|
||||
// create promql rule task for evalution
|
||||
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
if pr.IsScheduled() {
|
||||
task.SetSchedule(pr.GetSchedule())
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
}
|
||||
|
||||
@ -39,6 +39,8 @@ type PromRuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
// newPromRuleTask holds rules that have promql condition
|
||||
@ -75,6 +77,10 @@ func (g *PromRuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) Type() TaskType { return TaskTypeProm }
|
||||
|
||||
// Rules returns the group's rules.
|
||||
@ -245,6 +251,12 @@ func (g *PromRuleTask) setLastEvaluation(ts time.Time) {
|
||||
g.lastEvaluation = ts
|
||||
}
|
||||
|
||||
func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
}
|
||||
|
||||
// EvalTimestamp returns the immediately preceding consistently slotted evaluation time.
|
||||
func (g *PromRuleTask) EvalTimestamp(startTime int64) time.Time {
|
||||
var (
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/teambition/rrule-go"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -36,6 +37,10 @@ type RuleTask struct {
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
|
||||
// New field for rrule-based scheduling
|
||||
schedule string
|
||||
scheduleStartsAt time.Time
|
||||
}
|
||||
|
||||
const DefaultFrequency = 1 * time.Minute
|
||||
@ -71,6 +76,10 @@ func (g *RuleTask) Key() string {
|
||||
return g.name + ";" + g.file
|
||||
}
|
||||
|
||||
func (g *RuleTask) IsCronSchedule() bool {
|
||||
return g.schedule != ""
|
||||
}
|
||||
|
||||
// Name returns the group name.
|
||||
func (g *RuleTask) Type() TaskType { return TaskTypeCh }
|
||||
|
||||
@ -95,56 +104,119 @@ func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) con
|
||||
func (g *RuleTask) Run(ctx context.Context) {
|
||||
defer close(g.terminated)
|
||||
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
// todo(amol): remove in memory active alerts
|
||||
// and last series state
|
||||
if g.IsCronSchedule() {
|
||||
schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression
|
||||
if err != nil {
|
||||
zap.L().Error("failed to parse rrule expression", zap.String("rrule", g.schedule), zap.Error(err))
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
now := time.Now()
|
||||
nextRun := schedule.After(now, false)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, start) // using current time instead of evalTimestamp
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
iter()
|
||||
currentRun := nextRun
|
||||
|
||||
for {
|
||||
// Calculate the next run time
|
||||
nextRun = schedule.After(currentRun, false)
|
||||
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-time.After(time.Until(nextRun)):
|
||||
// Check if we missed any scheduled runs
|
||||
now := time.Now()
|
||||
if now.After(nextRun.Add(time.Minute)) { // Allow 1 minute tolerance
|
||||
zap.L().Warn("missed scheduled run",
|
||||
zap.Time("scheduled", nextRun),
|
||||
zap.Time("actual", now))
|
||||
}
|
||||
|
||||
currentRun = nextRun
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Wait an initial amount to have consistently slotted intervals.
|
||||
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
|
||||
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
|
||||
select {
|
||||
case <-time.After(time.Until(evalTimestamp)):
|
||||
case <-g.done:
|
||||
return
|
||||
}
|
||||
|
||||
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
|
||||
"ruleRuleTask": map[string]string{
|
||||
"name": g.Name(),
|
||||
},
|
||||
})
|
||||
|
||||
iter := func() {
|
||||
if g.pause {
|
||||
// todo(amol): remove in memory active alerts
|
||||
// and last series state
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
g.Eval(ctx, evalTimestamp)
|
||||
timeSinceStart := time.Since(start)
|
||||
|
||||
g.setEvaluationTime(timeSinceStart)
|
||||
g.setLastEvaluation(start)
|
||||
}
|
||||
|
||||
// The assumption here is that since the ticker was started after having
|
||||
// waited for `evalTimestamp` to pass, the ticks will trigger soon
|
||||
// after each `evalTimestamp + N * g.frequency` occurrence.
|
||||
tick := time.NewTicker(g.frequency)
|
||||
defer tick.Stop()
|
||||
|
||||
iter()
|
||||
|
||||
// let the group iterate and run
|
||||
for {
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
case <-tick.C:
|
||||
missed := (time.Since(evalTimestamp) / g.frequency) - 1
|
||||
evalTimestamp = evalTimestamp.Add((missed + 1) * g.frequency)
|
||||
iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -298,6 +370,13 @@ func (g *RuleTask) CopyState(fromTask Task) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *RuleTask) SetSchedule(schedule string, t time.Time) {
|
||||
g.mtx.Lock()
|
||||
defer g.mtx.Unlock()
|
||||
g.schedule = schedule
|
||||
g.scheduleStartsAt = t
|
||||
}
|
||||
|
||||
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
|
||||
func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
@ -379,3 +458,41 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
}(i, rule)
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to convert ruletypes.Schedule/Recurrence to rrule.ROption
|
||||
func recurrenceToROption(s *ruletypes.Schedule) rrule.ROption {
|
||||
// Only basic mapping for daily/weekly/monthly, can be extended
|
||||
opt := rrule.ROption{
|
||||
Dtstart: s.Recurrence.StartTime,
|
||||
}
|
||||
switch s.Recurrence.RepeatType {
|
||||
case ruletypes.RepeatTypeDaily:
|
||||
opt.Freq = rrule.DAILY
|
||||
case ruletypes.RepeatTypeWeekly:
|
||||
opt.Freq = rrule.WEEKLY
|
||||
for _, day := range s.Recurrence.RepeatOn {
|
||||
switch day {
|
||||
case ruletypes.RepeatOnSunday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.SU)
|
||||
case ruletypes.RepeatOnMonday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.MO)
|
||||
case ruletypes.RepeatOnTuesday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.TU)
|
||||
case ruletypes.RepeatOnWednesday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.WE)
|
||||
case ruletypes.RepeatOnThursday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.TH)
|
||||
case ruletypes.RepeatOnFriday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.FR)
|
||||
case ruletypes.RepeatOnSaturday:
|
||||
opt.Byweekday = append(opt.Byweekday, rrule.SA)
|
||||
}
|
||||
}
|
||||
case ruletypes.RepeatTypeMonthly:
|
||||
opt.Freq = rrule.MONTHLY
|
||||
}
|
||||
if s.Recurrence.EndTime != nil {
|
||||
opt.Until = *s.Recurrence.EndTime
|
||||
}
|
||||
return opt
|
||||
}
|
||||
|
||||
@ -28,6 +28,8 @@ type Task interface {
|
||||
Rules() []Rule
|
||||
Stop()
|
||||
Pause(b bool)
|
||||
IsCronSchedule() bool
|
||||
SetSchedule(string, time.Time)
|
||||
}
|
||||
|
||||
// newTask returns an appropriate group for
|
||||
|
||||
@ -65,10 +65,12 @@ type PostableRule struct {
|
||||
Version string `json:"version,omitempty"`
|
||||
|
||||
// legacy
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||
OldYaml string `json:"yaml,omitempty"`
|
||||
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
|
||||
OldYaml string `json:"yaml,omitempty"`
|
||||
EvalType string `yaml:"evalType,omitempty" json:"evalType,omitempty"`
|
||||
Evaluation Evaluation `yaml:"evaluation,omitempty" json:"evaluation,omitempty"`
|
||||
StartsAt int64 `yaml:"startsAt,omitempty" json:"startsAt,omitempty"`
|
||||
Schedule string `json:"schedule,omitempty"`
|
||||
}
|
||||
|
||||
func ParsePostableRule(content []byte) (*PostableRule, error) {
|
||||
@ -145,7 +147,12 @@ func ParseIntoRule(initRule PostableRule, content []byte, kind RuleDataKind) (*P
|
||||
//added alerts v2 fields
|
||||
rule.RuleCondition.Thresholds = append(rule.RuleCondition.Thresholds,
|
||||
NewBasicRuleThreshold(rule.AlertName, rule.RuleCondition.Target, nil, rule.RuleCondition.MatchType, rule.RuleCondition.CompareOp, rule.RuleCondition.SelectedQuery, rule.RuleCondition.TargetUnit, rule.RuleCondition.CompositeQuery.Unit))
|
||||
rule.Evaluation = NewEvaluation("rolling", RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||
if rule.EvalType == "" || rule.EvalType == "rolling" {
|
||||
rule.EvalType = "rolling"
|
||||
rule.Evaluation = NewEvaluation(rule.EvalType, RollingWindow{EvalWindow: rule.EvalWindow, Frequency: rule.Frequency, RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||
} else if rule.EvalType == "cumulative" {
|
||||
rule.Evaluation = NewEvaluation(rule.EvalType, CumulativeWindow{EvalWindow: rule.EvalWindow, StartsAt: time.UnixMilli(rule.StartsAt), RequiredNumPoints: rule.RuleCondition.RequiredNumPoints, RequireMinPoints: rule.RuleCondition.RequireMinPoints})
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user