From 05a0ce64d3e5d0df242e077aed55cc851ae80cab Mon Sep 17 00:00:00 2001 From: aniket Date: Sun, 10 Aug 2025 15:11:46 +0530 Subject: [PATCH] feat(cron-alerts): added prom cron alerts --- pkg/query-service/rules/prom_rule_task.go | 35 +++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index 3af623c82499..27426b3d63cf 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -98,6 +98,22 @@ func (g *PromRuleTask) Pause(b bool) { func (g *PromRuleTask) Run(ctx context.Context) { defer close(g.terminated) + defer func() { + if !g.markStale { + return + } + go func(now time.Time) { + for _, rule := range g.seriesInPreviousEval { + for _, r := range rule { + g.staleSeries = append(g.staleSeries, r) + } + } + // That can be garbage collected at this point. + g.seriesInPreviousEval = nil + + }(time.Now()) + + }() if g.IsCronSchedule() { schedule, err := rrule.StrToRRule("DTSTART=" + g.scheduleStartsAt.UTC().Format("20060102T150405Z") + "\nRRULE:" + g.schedule) // assuming g.cronExpr contains the cron expression if err != nil { @@ -190,24 +206,6 @@ func (g *PromRuleTask) Run(ctx context.Context) { tick := time.NewTicker(g.frequency) defer tick.Stop() - // defer cleanup - defer func() { - if !g.markStale { - return - } - go func(now time.Time) { - for _, rule := range g.seriesInPreviousEval { - for _, r := range rule { - g.staleSeries = append(g.staleSeries, r) - } - } - // That can be garbage collected at this point. - g.seriesInPreviousEval = nil - - }(time.Now()) - - }() - iter() // let the group iterate and run @@ -318,6 +316,7 @@ func (g *PromRuleTask) SetSchedule(schedule string, t time.Time) { g.mtx.Lock() defer g.mtx.Unlock() g.schedule = schedule + g.scheduleStartsAt = t } // EvalTimestamp returns the immediately preceding consistently slotted evaluation time.