2022-07-14 11:59:06 +05:30
package rules
import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
2022-10-06 20:13:30 +05:30
"github.com/google/uuid"
2022-07-14 11:59:06 +05:30
"go.uber.org/zap"
2023-10-17 17:50:54 +00:00
"errors"
2022-07-14 11:59:06 +05:30
"github.com/jmoiron/sqlx"
2024-09-24 10:22:52 +05:30
"go.signoz.io/signoz/pkg/query-service/cache"
2022-10-06 20:13:30 +05:30
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
2023-05-17 16:10:43 +05:30
"go.signoz.io/signoz/pkg/query-service/interfaces"
2022-10-06 20:13:30 +05:30
"go.signoz.io/signoz/pkg/query-service/model"
2024-09-11 09:56:59 +05:30
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
2024-07-02 12:03:01 +05:30
"go.signoz.io/signoz/pkg/query-service/telemetry"
2022-10-06 20:13:30 +05:30
"go.signoz.io/signoz/pkg/query-service/utils/labels"
2022-07-14 11:59:06 +05:30
)
2024-09-04 18:09:40 +05:30
type PrepareTaskOptions struct {
Rule * PostableRule
TaskName string
RuleDB RuleDB
Logger * zap . Logger
Reader interfaces . Reader
2024-09-24 10:22:52 +05:30
Cache cache . Cache
2024-09-04 18:09:40 +05:30
FF interfaces . FeatureLookup
ManagerOpts * ManagerOptions
NotifyFunc NotifyFunc
2024-09-12 10:58:07 +05:30
UseLogsNewSchema bool
2024-09-04 18:09:40 +05:30
}
2022-07-14 11:59:06 +05:30
const taskNamesuffix = "webAppEditor"
2024-09-09 10:28:54 +05:30
func RuleIdFromTaskName ( n string ) string {
2022-07-14 11:59:06 +05:30
return strings . Split ( n , "-groupname" ) [ 0 ]
}
2022-08-04 11:55:54 +05:30
func prepareTaskName ( ruleId interface { } ) string {
switch ruleId . ( type ) {
case int , int64 :
return fmt . Sprintf ( "%d-groupname" , ruleId )
case string :
return fmt . Sprintf ( "%s-groupname" , ruleId )
default :
return fmt . Sprintf ( "%v-groupname" , ruleId )
}
2022-07-14 11:59:06 +05:30
}
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
NotifierOpts am . NotifierOptions
2024-09-11 09:56:59 +05:30
PqlEngine * pqle . PqlEngine
2022-07-14 11:59:06 +05:30
// RepoURL is used to generate a backlink in sent alert messages
RepoURL string
// rule db conn
DBConn * sqlx . DB
Context context . Context
2024-09-04 18:09:40 +05:30
Logger * zap . Logger
2022-07-14 11:59:06 +05:30
ResendDelay time . Duration
DisableRules bool
2023-05-17 16:10:43 +05:30
FeatureFlags interfaces . FeatureLookup
2024-05-24 12:11:34 +05:30
Reader interfaces . Reader
2024-09-24 10:22:52 +05:30
Cache cache . Cache
2024-08-08 17:34:25 +05:30
EvalDelay time . Duration
2024-09-04 18:09:40 +05:30
PrepareTaskFunc func ( opts PrepareTaskOptions ) ( Task , error )
2024-09-12 10:58:07 +05:30
UseLogsNewSchema bool
2022-07-14 11:59:06 +05:30
}
// The Manager manages recording and alerting rules.
type Manager struct {
opts * ManagerOptions
tasks map [ string ] Task
rules map [ string ] Rule
mtx sync . RWMutex
block chan struct { }
// Notifier sends messages through alert manager
notifier * am . Notifier
// datastore to store alert definitions
ruleDB RuleDB
2024-09-04 18:09:40 +05:30
logger * zap . Logger
2023-05-17 16:10:43 +05:30
2024-09-24 10:22:52 +05:30
featureFlags interfaces . FeatureLookup
reader interfaces . Reader
cache cache . Cache
2024-09-04 18:09:40 +05:30
prepareTaskFunc func ( opts PrepareTaskOptions ) ( Task , error )
2024-09-12 10:58:07 +05:30
UseLogsNewSchema bool
2022-07-14 11:59:06 +05:30
}
func defaultOptions ( o * ManagerOptions ) * ManagerOptions {
if o . NotifierOpts . QueueCapacity == 0 {
o . NotifierOpts . QueueCapacity = 10000
}
if o . NotifierOpts . Timeout == 0 {
o . NotifierOpts . Timeout = 10 * time . Second
}
if o . ResendDelay == time . Duration ( 0 ) {
o . ResendDelay = 1 * time . Minute
}
2024-09-04 18:09:40 +05:30
if o . Logger == nil {
o . Logger = zap . L ( )
}
if o . PrepareTaskFunc == nil {
o . PrepareTaskFunc = defaultPrepareTaskFunc
}
2022-07-14 11:59:06 +05:30
return o
}
2024-09-04 18:09:40 +05:30
func defaultPrepareTaskFunc ( opts PrepareTaskOptions ) ( Task , error ) {
rules := make ( [ ] Rule , 0 )
var task Task
2024-09-09 10:28:54 +05:30
ruleId := RuleIdFromTaskName ( opts . TaskName )
2024-09-04 18:09:40 +05:30
if opts . Rule . RuleType == RuleTypeThreshold {
// create a threshold rule
tr , err := NewThresholdRule (
ruleId ,
opts . Rule ,
opts . FF ,
opts . Reader ,
2024-09-12 10:58:07 +05:30
opts . UseLogsNewSchema ,
2024-09-11 09:56:59 +05:30
WithEvalDelay ( opts . ManagerOpts . EvalDelay ) ,
2024-09-04 18:09:40 +05:30
)
if err != nil {
return task , err
}
rules = append ( rules , tr )
// create ch rule task for evalution
task = newTask ( TaskTypeCh , opts . TaskName , taskNamesuffix , time . Duration ( opts . Rule . Frequency ) , rules , opts . ManagerOpts , opts . NotifyFunc , opts . RuleDB )
} else if opts . Rule . RuleType == RuleTypeProm {
// create promql rule
pr , err := NewPromRule (
ruleId ,
opts . Rule ,
opts . Logger ,
opts . Reader ,
2024-09-11 09:56:59 +05:30
opts . ManagerOpts . PqlEngine ,
2024-09-04 18:09:40 +05:30
)
if err != nil {
return task , err
}
rules = append ( rules , pr )
// create promql rule task for evalution
task = newTask ( TaskTypeProm , opts . TaskName , taskNamesuffix , time . Duration ( opts . Rule . Frequency ) , rules , opts . ManagerOpts , opts . NotifyFunc , opts . RuleDB )
} else {
return nil , fmt . Errorf ( "unsupported rule type. Supported types: %s, %s" , RuleTypeProm , RuleTypeThreshold )
}
return task , nil
}
2022-07-14 11:59:06 +05:30
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager ( o * ManagerOptions ) ( * Manager , error ) {
o = defaultOptions ( o )
// here we just initiate notifier, it will be started
// in run()
notifier , err := am . NewNotifier ( & o . NotifierOpts , nil )
if err != nil {
// todo(amol): rethink on this, the query service
// should not be down because alert manager is not available
return nil , err
}
2024-09-17 11:41:46 +05:30
amManager , err := am . New ( )
if err != nil {
return nil , err
}
db := NewRuleDB ( o . DBConn , amManager )
2022-07-14 11:59:06 +05:30
2024-07-02 12:03:01 +05:30
telemetry . GetInstance ( ) . SetAlertsInfoCallback ( db . GetAlertsInfo )
2022-07-14 11:59:06 +05:30
m := & Manager {
2024-09-04 18:09:40 +05:30
tasks : map [ string ] Task { } ,
rules : map [ string ] Rule { } ,
notifier : notifier ,
ruleDB : db ,
opts : o ,
block : make ( chan struct { } ) ,
logger : o . Logger ,
featureFlags : o . FeatureFlags ,
reader : o . Reader ,
2024-09-24 10:22:52 +05:30
cache : o . Cache ,
2024-09-04 18:09:40 +05:30
prepareTaskFunc : o . PrepareTaskFunc ,
2022-07-14 11:59:06 +05:30
}
return m , nil
}
func ( m * Manager ) Start ( ) {
if err := m . initiate ( ) ; err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to initialize alerting rules manager" , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
}
m . run ( )
}
2024-05-31 17:43:13 +05:30
func ( m * Manager ) RuleDB ( ) RuleDB {
return m . ruleDB
}
2022-07-14 11:59:06 +05:30
func ( m * Manager ) Pause ( b bool ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
for _ , t := range m . tasks {
t . Pause ( b )
}
}
func ( m * Manager ) initiate ( ) error {
2023-10-17 17:50:54 +00:00
storedRules , err := m . ruleDB . GetStoredRules ( context . Background ( ) )
2022-07-14 11:59:06 +05:30
if err != nil {
return err
}
if len ( storedRules ) == 0 {
return nil
}
var loadErrors [ ] error
for _ , rec := range storedRules {
taskName := fmt . Sprintf ( "%d-groupname" , rec . Id )
2024-08-30 18:51:55 +05:30
parsedRule , err := ParsePostableRule ( [ ] byte ( rec . Data ) )
2022-07-14 11:59:06 +05:30
2024-08-30 18:51:55 +05:30
if err != nil {
if errors . Is ( err , ErrFailedToParseJSON ) {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Info ( "failed to load rule in json format, trying yaml now:" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
// see if rule is stored in yaml format
2024-08-30 18:51:55 +05:30
parsedRule , err = parsePostableRule ( [ ] byte ( rec . Data ) , RuleDataKindYaml )
2022-07-14 11:59:06 +05:30
2024-08-30 18:51:55 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to parse and initialize yaml rule" , zap . String ( "name" , taskName ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
// just one rule is being parsed so expect just one error
2024-08-30 18:51:55 +05:30
loadErrors = append ( loadErrors , err )
2022-07-14 11:59:06 +05:30
continue
}
} else {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to parse and initialize rule" , zap . String ( "name" , taskName ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
// just one rule is being parsed so expect just one error
2024-03-27 00:07:29 +05:30
loadErrors = append ( loadErrors , err )
2022-07-14 11:59:06 +05:30
continue
}
}
2022-08-04 11:55:54 +05:30
if ! parsedRule . Disabled {
err := m . addTask ( parsedRule , taskName )
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to load the rule definition" , zap . String ( "name" , taskName ) , zap . Error ( err ) )
2022-08-04 11:55:54 +05:30
}
2022-07-14 11:59:06 +05:30
}
}
2023-10-17 17:50:54 +00:00
if len ( loadErrors ) > 0 {
return errors . Join ( loadErrors ... )
}
2022-07-14 11:59:06 +05:30
return nil
}
// Run starts processing of the rule manager.
func ( m * Manager ) run ( ) {
// initiate notifier
go m . notifier . Run ( )
// initiate blocked tasks
close ( m . block )
}
// Stop the rule manager's rule evaluation cycles.
func ( m * Manager ) Stop ( ) {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2024-03-27 00:07:29 +05:30
zap . L ( ) . Info ( "Stopping rule manager..." )
2022-07-14 11:59:06 +05:30
for _ , t := range m . tasks {
t . Stop ( )
}
2024-03-27 00:07:29 +05:30
zap . L ( ) . Info ( "Rule manager stopped" )
2022-07-14 11:59:06 +05:30
}
// EditRuleDefinition writes the rule definition to the
// datastore and also updates the rule executor
2023-10-17 17:50:54 +00:00
func ( m * Manager ) EditRule ( ctx context . Context , ruleStr string , id string ) error {
2022-08-04 11:55:54 +05:30
2024-08-30 18:51:55 +05:30
parsedRule , err := ParsePostableRule ( [ ] byte ( ruleStr ) )
2022-07-14 11:59:06 +05:30
2024-08-30 18:51:55 +05:30
if err != nil {
return err
2022-07-14 11:59:06 +05:30
}
2023-10-17 17:50:54 +00:00
taskName , _ , err := m . ruleDB . EditRuleTx ( ctx , ruleStr , id )
2022-07-14 11:59:06 +05:30
if err != nil {
return err
}
if ! m . opts . DisableRules {
2023-05-17 16:10:43 +05:30
err = m . syncRuleStateWithTask ( taskName , parsedRule )
if err != nil {
return err
}
}
2022-07-14 11:59:06 +05:30
return nil
}
func ( m * Manager ) editTask ( rule * PostableRule , taskName string ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2022-10-06 20:13:30 +05:30
2024-03-27 00:07:29 +05:30
zap . L ( ) . Debug ( "editing a rule task" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
2024-09-04 18:09:40 +05:30
newTask , err := m . prepareTaskFunc ( PrepareTaskOptions {
Rule : rule ,
TaskName : taskName ,
RuleDB : m . ruleDB ,
Logger : m . logger ,
Reader : m . reader ,
2024-09-24 10:22:52 +05:30
Cache : m . cache ,
2024-09-04 18:09:40 +05:30
FF : m . featureFlags ,
ManagerOpts : m . opts ,
NotifyFunc : m . prepareNotifyFunc ( ) ,
2024-09-12 10:58:07 +05:30
UseLogsNewSchema : m . opts . UseLogsNewSchema ,
2024-09-04 18:09:40 +05:30
} )
2022-07-14 11:59:06 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "loading tasks failed" , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
return errors . New ( "error preparing rule with given parameters, previous rule set restored" )
}
2024-09-04 18:09:40 +05:30
for _ , r := range newTask . Rules ( ) {
m . rules [ r . ID ( ) ] = r
}
2022-07-14 11:59:06 +05:30
// If there is an old task with the same identifier, stop it and wait for
// it to finish the current iteration. Then copy it into the new group.
oldTask , ok := m . tasks [ taskName ]
if ! ok {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Warn ( "rule task not found, a new task will be created" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
}
delete ( m . tasks , taskName )
if ok {
oldTask . Stop ( )
newTask . CopyState ( oldTask )
}
go func ( ) {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<- m . block
newTask . Run ( m . opts . Context )
} ( )
m . tasks [ taskName ] = newTask
return nil
}
2023-10-17 17:50:54 +00:00
func ( m * Manager ) DeleteRule ( ctx context . Context , id string ) error {
2022-07-14 11:59:06 +05:30
idInt , err := strconv . Atoi ( id )
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "delete rule received an rule id in invalid format, must be a number" , zap . String ( "id" , id ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
return fmt . Errorf ( "delete rule received an rule id in invalid format, must be a number" )
}
taskName := prepareTaskName ( int64 ( idInt ) )
if ! m . opts . DisableRules {
2022-08-04 11:55:54 +05:30
m . deleteTask ( taskName )
2022-07-14 11:59:06 +05:30
}
2023-10-17 17:50:54 +00:00
if _ , _ , err := m . ruleDB . DeleteRuleTx ( ctx , id ) ; err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to delete the rule from rule db" , zap . String ( "id" , id ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
return err
}
return nil
}
2022-08-04 11:55:54 +05:30
func ( m * Manager ) deleteTask ( taskName string ) {
2022-07-14 11:59:06 +05:30
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2024-03-27 00:07:29 +05:30
zap . L ( ) . Debug ( "deleting a rule task" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
oldg , ok := m . tasks [ taskName ]
if ok {
oldg . Stop ( )
delete ( m . tasks , taskName )
2024-09-09 10:28:54 +05:30
delete ( m . rules , RuleIdFromTaskName ( taskName ) )
2024-03-27 00:07:29 +05:30
zap . L ( ) . Debug ( "rule task deleted" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
} else {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Info ( "rule not found for deletion" , zap . String ( "name" , taskName ) )
2022-07-14 11:59:06 +05:30
}
}
// CreateRule stores rule def into db and also
// starts an executor for the rule
2023-11-28 10:44:11 +05:30
func ( m * Manager ) CreateRule ( ctx context . Context , ruleStr string ) ( * GettableRule , error ) {
2024-08-30 18:51:55 +05:30
parsedRule , err := ParsePostableRule ( [ ] byte ( ruleStr ) )
2022-07-14 11:59:06 +05:30
2024-08-30 18:51:55 +05:30
if err != nil {
return nil , err
2022-07-14 11:59:06 +05:30
}
2023-11-28 10:44:11 +05:30
lastInsertId , tx , err := m . ruleDB . CreateRuleTx ( ctx , ruleStr )
taskName := prepareTaskName ( lastInsertId )
2022-07-14 11:59:06 +05:30
if err != nil {
2023-11-28 10:44:11 +05:30
return nil , err
2022-07-14 11:59:06 +05:30
}
if ! m . opts . DisableRules {
if err := m . addTask ( parsedRule , taskName ) ; err != nil {
tx . Rollback ( )
2023-11-28 10:44:11 +05:30
return nil , err
2022-07-14 11:59:06 +05:30
}
}
2023-05-17 16:10:43 +05:30
err = tx . Commit ( )
if err != nil {
2023-11-28 10:44:11 +05:30
return nil , err
2023-05-17 16:10:43 +05:30
}
2023-11-28 10:44:11 +05:30
gettableRule := & GettableRule {
Id : fmt . Sprintf ( "%d" , lastInsertId ) ,
PostableRule : * parsedRule ,
}
return gettableRule , nil
2023-05-17 16:10:43 +05:30
}
2022-07-14 11:59:06 +05:30
func ( m * Manager ) addTask ( rule * PostableRule , taskName string ) error {
m . mtx . Lock ( )
defer m . mtx . Unlock ( )
2024-03-27 00:07:29 +05:30
zap . L ( ) . Debug ( "adding a new rule task" , zap . String ( "name" , taskName ) )
2024-09-04 18:09:40 +05:30
newTask , err := m . prepareTaskFunc ( PrepareTaskOptions {
Rule : rule ,
TaskName : taskName ,
RuleDB : m . ruleDB ,
Logger : m . logger ,
Reader : m . reader ,
2024-09-24 10:22:52 +05:30
Cache : m . cache ,
2024-09-04 18:09:40 +05:30
FF : m . featureFlags ,
ManagerOpts : m . opts ,
NotifyFunc : m . prepareNotifyFunc ( ) ,
2024-09-12 10:58:07 +05:30
UseLogsNewSchema : m . opts . UseLogsNewSchema ,
2024-09-04 18:09:40 +05:30
} )
2022-07-14 11:59:06 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "creating rule task failed" , zap . String ( "name" , taskName ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
return errors . New ( "error loading rules, previous rule set restored" )
}
2024-09-24 10:56:49 +05:30
for _ , r := range newTask . Rules ( ) {
m . rules [ r . ID ( ) ] = r
}
2022-07-14 11:59:06 +05:30
// If there is an another task with the same identifier, raise an error
_ , ok := m . tasks [ taskName ]
if ok {
return fmt . Errorf ( "a rule with the same name already exists" )
}
go func ( ) {
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<- m . block
newTask . Run ( m . opts . Context )
} ( )
m . tasks [ taskName ] = newTask
return nil
}
// RuleTasks returns the list of manager's rule tasks.
func ( m * Manager ) RuleTasks ( ) [ ] Task {
m . mtx . RLock ( )
defer m . mtx . RUnlock ( )
rgs := make ( [ ] Task , 0 , len ( m . tasks ) )
for _ , g := range m . tasks {
rgs = append ( rgs , g )
}
sort . Slice ( rgs , func ( i , j int ) bool {
return rgs [ i ] . Name ( ) < rgs [ j ] . Name ( )
} )
return rgs
}
// RuleTasks returns the list of manager's rule tasks.
func ( m * Manager ) RuleTasksWithoutLock ( ) [ ] Task {
rgs := make ( [ ] Task , 0 , len ( m . tasks ) )
for _ , g := range m . tasks {
rgs = append ( rgs , g )
}
sort . Slice ( rgs , func ( i , j int ) bool {
return rgs [ i ] . Name ( ) < rgs [ j ] . Name ( )
} )
return rgs
}
// Rules returns the list of the manager's rules.
func ( m * Manager ) Rules ( ) [ ] Rule {
m . mtx . RLock ( )
defer m . mtx . RUnlock ( )
rules := [ ] Rule { }
for _ , r := range m . rules {
rules = append ( rules , r )
}
return rules
}
// TriggeredAlerts returns the list of the manager's rules.
func ( m * Manager ) TriggeredAlerts ( ) [ ] * NamedAlert {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
namedAlerts := [ ] * NamedAlert { }
for _ , r := range m . rules {
active := r . ActiveAlerts ( )
for _ , a := range active {
awn := & NamedAlert {
Alert : a ,
Name : r . Name ( ) ,
}
namedAlerts = append ( namedAlerts , awn )
}
}
return namedAlerts
}
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func ( ctx context . Context , expr string , alerts ... * Alert )
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
func ( m * Manager ) prepareNotifyFunc ( ) NotifyFunc {
return func ( ctx context . Context , expr string , alerts ... * Alert ) {
var res [ ] * am . Alert
for _ , alert := range alerts {
generatorURL := alert . GeneratorURL
if generatorURL == "" {
generatorURL = m . opts . RepoURL
}
a := & am . Alert {
StartsAt : alert . FiredAt ,
Labels : alert . Labels ,
Annotations : alert . Annotations ,
GeneratorURL : generatorURL ,
2022-08-04 15:31:21 +05:30
Receivers : alert . Receivers ,
2022-07-14 11:59:06 +05:30
}
if ! alert . ResolvedAt . IsZero ( ) {
a . EndsAt = alert . ResolvedAt
} else {
a . EndsAt = alert . ValidUntil
}
res = append ( res , a )
}
if len ( alerts ) > 0 {
m . notifier . Send ( res ... )
}
}
}
func ( m * Manager ) ListActiveRules ( ) ( [ ] Rule , error ) {
ruleList := [ ] Rule { }
for _ , r := range m . rules {
ruleList = append ( ruleList , r )
}
return ruleList , nil
}
2023-10-17 17:50:54 +00:00
func ( m * Manager ) ListRuleStates ( ctx context . Context ) ( * GettableRules , error ) {
2022-07-14 11:59:06 +05:30
// fetch rules from DB
2023-10-17 17:50:54 +00:00
storedRules , err := m . ruleDB . GetStoredRules ( ctx )
2022-08-04 11:55:54 +05:30
if err != nil {
return nil , err
}
2022-07-14 11:59:06 +05:30
// initiate response object
resp := make ( [ ] * GettableRule , 0 )
for _ , s := range storedRules {
ruleResponse := & GettableRule { }
if err := json . Unmarshal ( [ ] byte ( s . Data ) , ruleResponse ) ; err != nil { // Parse []byte to go struct pointer
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to unmarshal rule from db" , zap . Int ( "id" , s . Id ) , zap . Error ( err ) )
2022-07-14 11:59:06 +05:30
continue
}
ruleResponse . Id = fmt . Sprintf ( "%d" , s . Id )
// fetch state of rule from memory
if rm , ok := m . rules [ ruleResponse . Id ] ; ! ok {
2024-09-09 13:06:09 +05:30
ruleResponse . State = model . StateDisabled
2022-08-04 11:55:54 +05:30
ruleResponse . Disabled = true
2022-07-14 11:59:06 +05:30
} else {
2024-08-30 10:34:11 +05:30
ruleResponse . State = rm . State ( )
2022-07-14 11:59:06 +05:30
}
2023-10-17 17:50:54 +00:00
ruleResponse . CreatedAt = s . CreatedAt
ruleResponse . CreatedBy = s . CreatedBy
ruleResponse . UpdatedAt = s . UpdatedAt
ruleResponse . UpdatedBy = s . UpdatedBy
2022-07-14 11:59:06 +05:30
resp = append ( resp , ruleResponse )
}
return & GettableRules { Rules : resp } , nil
}
2023-10-17 17:50:54 +00:00
func ( m * Manager ) GetRule ( ctx context . Context , id string ) ( * GettableRule , error ) {
s , err := m . ruleDB . GetStoredRule ( ctx , id )
2022-07-14 11:59:06 +05:30
if err != nil {
return nil , err
}
r := & GettableRule { }
if err := json . Unmarshal ( [ ] byte ( s . Data ) , r ) ; err != nil {
return nil , err
}
r . Id = fmt . Sprintf ( "%d" , s . Id )
2023-11-10 17:43:19 +05:30
// fetch state of rule from memory
if rm , ok := m . rules [ r . Id ] ; ! ok {
2024-09-09 13:06:09 +05:30
r . State = model . StateDisabled
2023-11-10 17:43:19 +05:30
r . Disabled = true
} else {
2024-08-30 10:34:11 +05:30
r . State = rm . State ( )
2023-11-10 17:43:19 +05:30
}
r . CreatedAt = s . CreatedAt
r . CreatedBy = s . CreatedBy
r . UpdatedAt = s . UpdatedAt
r . UpdatedBy = s . UpdatedBy
2022-07-14 11:59:06 +05:30
return r , nil
}
2022-08-04 11:55:54 +05:30
// syncRuleStateWithTask ensures that the state of a stored rule matches
// the task state. For example - if a stored rule is disabled, then
// there is no task running against it.
func ( m * Manager ) syncRuleStateWithTask ( taskName string , rule * PostableRule ) error {
if rule . Disabled {
// check if rule has any task running
if _ , ok := m . tasks [ taskName ] ; ok {
// delete task from memory
m . deleteTask ( taskName )
}
} else {
// check if rule has a task running
if _ , ok := m . tasks [ taskName ] ; ! ok {
// rule has not task, start one
if err := m . addTask ( rule , taskName ) ; err != nil {
return err
}
2022-08-11 13:54:17 +05:30
} else {
if err := m . editTask ( rule , taskName ) ; err != nil {
return err
}
2022-08-04 11:55:54 +05:30
}
}
return nil
}
// PatchRule supports attribute level changes to the rule definition unlike
// EditRule, which updates entire rule definition in the DB.
// the process:
2023-05-17 16:10:43 +05:30
// - get the latest rule from db
// - over write the patch attributes received in input (ruleStr)
// - re-deploy or undeploy task as necessary
// - update the patched rule in the DB
2023-10-17 17:50:54 +00:00
func ( m * Manager ) PatchRule ( ctx context . Context , ruleStr string , ruleId string ) ( * GettableRule , error ) {
2022-08-04 11:55:54 +05:30
if ruleId == "" {
return nil , fmt . Errorf ( "id is mandatory for patching rule" )
}
taskName := prepareTaskName ( ruleId )
// retrieve rule from DB
2023-10-17 17:50:54 +00:00
storedJSON , err := m . ruleDB . GetStoredRule ( ctx , ruleId )
2022-08-04 11:55:54 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to get stored rule with given id" , zap . String ( "id" , ruleId ) , zap . Error ( err ) )
2022-08-04 11:55:54 +05:30
return nil , err
}
// storedRule holds the current stored rule from DB
storedRule := PostableRule { }
if err := json . Unmarshal ( [ ] byte ( storedJSON . Data ) , & storedRule ) ; err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to unmarshal stored rule with given id" , zap . String ( "id" , ruleId ) , zap . Error ( err ) )
2022-08-04 11:55:54 +05:30
return nil , err
}
// patchedRule is combo of stored rule and patch received in the request
2024-08-30 18:51:55 +05:30
patchedRule , err := parseIntoRule ( storedRule , [ ] byte ( ruleStr ) , "json" )
if err != nil {
return nil , err
2022-08-04 11:55:54 +05:30
}
// deploy or un-deploy task according to patched (new) rule state
if err := m . syncRuleStateWithTask ( taskName , patchedRule ) ; err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to sync stored rule state with the task" , zap . String ( "taskName" , taskName ) , zap . Error ( err ) )
2022-08-04 11:55:54 +05:30
return nil , err
}
// prepare rule json to write to update db
patchedRuleBytes , err := json . Marshal ( patchedRule )
if err != nil {
return nil , err
}
// write updated rule to db
2023-10-17 17:50:54 +00:00
if _ , _ , err = m . ruleDB . EditRuleTx ( ctx , string ( patchedRuleBytes ) , ruleId ) ; err != nil {
2022-08-04 11:55:54 +05:30
// write failed, rollback task state
// restore task state from the stored rule
if err := m . syncRuleStateWithTask ( taskName , & storedRule ) ; err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to restore rule after patch failure" , zap . String ( "taskName" , taskName ) , zap . Error ( err ) )
2022-08-04 11:55:54 +05:30
}
return nil , err
}
// prepare http response
response := GettableRule {
Id : ruleId ,
PostableRule : * patchedRule ,
}
// fetch state of rule from memory
if rm , ok := m . rules [ ruleId ] ; ! ok {
2024-09-09 13:06:09 +05:30
response . State = model . StateDisabled
2022-08-04 11:55:54 +05:30
response . Disabled = true
} else {
2024-08-30 10:34:11 +05:30
response . State = rm . State ( )
2022-08-04 11:55:54 +05:30
}
return & response , nil
}
2022-08-04 17:24:15 +05:30
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func ( m * Manager ) TestNotification ( ctx context . Context , ruleStr string ) ( int , * model . ApiError ) {
2024-08-30 18:51:55 +05:30
parsedRule , err := ParsePostableRule ( [ ] byte ( ruleStr ) )
2022-08-04 17:24:15 +05:30
2024-08-30 18:51:55 +05:30
if err != nil {
return 0 , newApiErrorBadData ( err )
2022-08-04 17:24:15 +05:30
}
2024-03-27 20:25:18 +05:30
var alertname = parsedRule . AlertName
2022-08-04 17:24:15 +05:30
if alertname == "" {
// alertname is not mandatory for testing, so picking
// a random string here
alertname = uuid . New ( ) . String ( )
}
// append name to indicate this is test alert
2024-03-27 20:25:18 +05:30
parsedRule . AlertName = fmt . Sprintf ( "%s%s" , alertname , TestAlertPostFix )
2022-08-04 17:24:15 +05:30
var rule Rule
if parsedRule . RuleType == RuleTypeThreshold {
// add special labels for test alerts
parsedRule . Annotations [ labels . AlertSummaryLabel ] = fmt . Sprintf ( "The rule threshold is set to %.4f, and the observed metric value is {{$value}}." , * parsedRule . RuleCondition . Target )
parsedRule . Labels [ labels . RuleSourceLabel ] = ""
parsedRule . Labels [ labels . AlertRuleIdLabel ] = ""
// create a threshold rule
rule , err = NewThresholdRule (
alertname ,
parsedRule ,
2023-08-18 07:32:05 +05:30
m . featureFlags ,
2024-05-24 12:11:34 +05:30
m . reader ,
2024-09-12 10:58:07 +05:30
m . opts . UseLogsNewSchema ,
2024-09-11 09:56:59 +05:30
WithSendAlways ( ) ,
WithSendUnmatched ( ) ,
2022-08-04 17:24:15 +05:30
)
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to prepare a new threshold rule for test" , zap . String ( "name" , rule . Name ( ) ) , zap . Error ( err ) )
2022-08-04 17:24:15 +05:30
return 0 , newApiErrorBadData ( err )
}
} else if parsedRule . RuleType == RuleTypeProm {
// create promql rule
rule , err = NewPromRule (
alertname ,
parsedRule ,
2024-09-04 18:09:40 +05:30
m . logger ,
2024-08-09 12:11:05 +05:30
m . reader ,
2024-09-11 09:56:59 +05:30
m . opts . PqlEngine ,
WithSendAlways ( ) ,
WithSendUnmatched ( ) ,
2022-08-04 17:24:15 +05:30
)
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to prepare a new promql rule for test" , zap . String ( "name" , rule . Name ( ) ) , zap . Error ( err ) )
2022-08-04 17:24:15 +05:30
return 0 , newApiErrorBadData ( err )
}
} else {
return 0 , newApiErrorBadData ( fmt . Errorf ( "failed to derive ruletype with given information" ) )
}
// set timestamp to current utc time
ts := time . Now ( ) . UTC ( )
2024-09-11 09:56:59 +05:30
count , err := rule . Eval ( ctx , ts )
2022-08-04 17:24:15 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "evaluating rule failed" , zap . String ( "rule" , rule . Name ( ) ) , zap . Error ( err ) )
2022-08-04 17:24:15 +05:30
return 0 , newApiErrorInternal ( fmt . Errorf ( "rule evaluation failed" ) )
}
2024-03-27 00:07:29 +05:30
alertsFound , ok := count . ( int )
if ! ok {
return 0 , newApiErrorInternal ( fmt . Errorf ( "something went wrong" ) )
}
2022-08-04 17:24:15 +05:30
rule . SendAlerts ( ctx , ts , 0 , time . Duration ( 1 * time . Minute ) , m . prepareNotifyFunc ( ) )
return alertsFound , nil
}