mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 07:26:20 +00:00
* chore: added custom distpatcher * feat(notification-grouping): added notification grouping * feat(notification-grouping): addded integration test dependency * feat(notification-grouping): linting and test cases * feat(notification-grouping): linting and test cases * feat(notification-grouping): linting and test cases * feat(notification-grouping): addded integration test dependency * feat(notification-grouping): debug log lines * feat(notification-grouping): debug log lines * feat(notification-grouping): debug log lines * feat(notification-grouping): addded integration test dependency * feat(notification-grouping): addded integration test dependency * feat(notification-grouping): addded integration test dependency * feat(notification-grouping): added structure changes * feat(notification-grouping): added structure changes * feat(notification-routing): added notification routing * chore(notification-grouping): added notificaiton grouping * Update pkg/alertmanager/nfmanager/rulebasednotification/provider.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore(notification-grouping): added renotification interval * fix(notification-grouping): added fix for renotification * chore(notificaiton-grouping): added no data renotify * chore(notificaiton-grouping): added no data renotify * chore(notificaiton-grouping): added no data renotify * chore(notification-grouping): added no data renotify interval * chore(notification-grouping): removed errors package from dispatcher * chore(notification-grouping): removed errors package from dispatcher * chore(notification-grouping): removed unwanted tests * chore(notification-grouping): removed unwanted pkg name * chore(notification-grouping): added delete notification setting * chore(notification-grouping): added delete notification setting * Update pkg/alertmanager/nfmanager/nfmanagertest/provider.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore(notification-grouping): removed nfmanager config| notification settings in postable rule * chore(notification-grouping): removed nfmanager config| notification settings in postable rule * chore(notification-grouping): added test for dispatcher * chore(notification-grouping): added test for dispatcher * chore(notification-grouping): go linting errors * chore(notification-grouping): added test cases for aggGroupPerRoute * chore(notification-grouping): added test cases for aggGroupPerRoute * chore(notification-grouping): corrected get notification config logic * Update pkg/alertmanager/nfmanager/rulebasednotification/provider_test.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore(notification-routing): added notification routing policies * feat(notification-routing): added test cases for dispatcher * chore(notification-routing): added notification routing policies * chore(notification-routing): added notification routing policies * Apply suggestions from code review Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore(notification-routing): added notification routing policies * chore(notification-routing): added notification routing policies * Update pkg/alertmanager/alertmanagerserver/distpatcher_test.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore(notification-routing): sorted imports * chore(notification-routing): minor edit |pr resolve comments * chore(notification-grouping): corrected dispatcher test cases * chore(notification-routing): added notification routing policies * chore(notification-routing): corrected race condition in test * chore: resolved pr comments * chore: passing threshold value to tempalte * chore: completed delete rule functionality * chore: added grouping disabled functionality * chore: added grouping disabled functionality * chore(notification-routing): resolved pr comments * chore(notification-routing): resolved pr comments * chore(notification-routing): resolved pr comments * chore(notification-routing): sorted imports * chore(notification-routing): fix linting errors * chore(notification-routing): removed enabled flags * fix: test rule multiple threhsold (#9224) * chore: corrected linting errors * chore: corrected linting errors * chore: corrected linting errors * chore: corrected linting errors * chore: corrected migration errors * chore: corrected migration errors * chore: corrected migration errors * chore: corrected migration errors * Update pkg/sqlmigration/049_add_route_policy.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * chore: added org_is as foreign key * chore: resolved pr comments * chore: removed route store unused --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
459 lines
14 KiB
Go
459 lines
14 KiB
Go
package alertmanagerserver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/prometheus/alertmanager/types"
|
|
"golang.org/x/sync/errgroup"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagernotify"
|
|
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
|
"github.com/prometheus/alertmanager/dispatch"
|
|
"github.com/prometheus/alertmanager/featurecontrol"
|
|
"github.com/prometheus/alertmanager/inhibit"
|
|
"github.com/prometheus/alertmanager/nflog"
|
|
"github.com/prometheus/alertmanager/notify"
|
|
"github.com/prometheus/alertmanager/provider/mem"
|
|
"github.com/prometheus/alertmanager/silence"
|
|
"github.com/prometheus/alertmanager/template"
|
|
"github.com/prometheus/alertmanager/timeinterval"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
)
|
|
|
|
var (
|
|
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
|
|
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
|
|
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
|
|
snapfnoop string = "snapfnoop"
|
|
)
|
|
|
|
type Server struct {
|
|
// logger is the logger for the alertmanager
|
|
logger *slog.Logger
|
|
|
|
// registry is the prometheus registry for the alertmanager
|
|
registry prometheus.Registerer
|
|
|
|
// srvConfig is the server config for the alertmanager
|
|
srvConfig Config
|
|
|
|
// alertmanagerConfig is the config of the alertmanager
|
|
alertmanagerConfig *alertmanagertypes.Config
|
|
|
|
// orgID is the orgID for the alertmanager
|
|
orgID string
|
|
|
|
// store is the backing store for the alertmanager
|
|
stateStore alertmanagertypes.StateStore
|
|
|
|
// alertmanager primitives from upstream alertmanager
|
|
alerts *mem.Alerts
|
|
nflog *nflog.Log
|
|
dispatcher *Dispatcher
|
|
dispatcherMetrics *DispatcherMetrics
|
|
inhibitor *inhibit.Inhibitor
|
|
silencer *silence.Silencer
|
|
silences *silence.Silences
|
|
timeIntervals map[string][]timeinterval.TimeInterval
|
|
pipelineBuilder *notify.PipelineBuilder
|
|
marker *alertmanagertypes.MemMarker
|
|
tmpl *template.Template
|
|
wg sync.WaitGroup
|
|
stopc chan struct{}
|
|
notificationManager nfmanager.NotificationManager
|
|
}
|
|
|
|
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, nfManager nfmanager.NotificationManager) (*Server, error) {
|
|
server := &Server{
|
|
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
|
registry: registry,
|
|
srvConfig: srvConfig,
|
|
orgID: orgID,
|
|
stateStore: stateStore,
|
|
stopc: make(chan struct{}),
|
|
notificationManager: nfManager,
|
|
}
|
|
signozRegisterer := prometheus.WrapRegistererWithPrefix("signoz_", registry)
|
|
signozRegisterer = prometheus.WrapRegistererWith(prometheus.Labels{"org_id": server.orgID}, signozRegisterer)
|
|
// initialize marker
|
|
server.marker = alertmanagertypes.NewMarker(signozRegisterer)
|
|
|
|
// get silences for initial state
|
|
state, err := server.stateStore.Get(ctx, server.orgID)
|
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
|
return nil, err
|
|
}
|
|
|
|
silencesSnapshot := ""
|
|
if state != nil {
|
|
silencesSnapshot, err = state.Get(alertmanagertypes.SilenceStateName)
|
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
|
return nil, err
|
|
}
|
|
}
|
|
// Initialize silences
|
|
server.silences, err = silence.New(silence.Options{
|
|
SnapshotReader: strings.NewReader(silencesSnapshot),
|
|
Retention: srvConfig.Silences.Retention,
|
|
Limits: silence.Limits{
|
|
MaxSilences: func() int { return srvConfig.Silences.Max },
|
|
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
|
|
},
|
|
Metrics: signozRegisterer,
|
|
Logger: server.logger,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nflogSnapshot := ""
|
|
if state != nil {
|
|
nflogSnapshot, err = state.Get(alertmanagertypes.NFLogStateName)
|
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Initialize notification log
|
|
server.nflog, err = nflog.New(nflog.Options{
|
|
SnapshotReader: strings.NewReader(nflogSnapshot),
|
|
Retention: server.srvConfig.NFLog.Retention,
|
|
Metrics: signozRegisterer,
|
|
Logger: server.logger,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start maintenance for silences
|
|
server.wg.Add(1)
|
|
go func() {
|
|
defer server.wg.Done()
|
|
server.silences.Maintenance(server.srvConfig.Silences.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
|
// Delete silences older than the retention period.
|
|
if _, err := server.silences.GC(); err != nil {
|
|
server.logger.ErrorContext(ctx, "silence garbage collection", "error", err)
|
|
// Don't return here - we need to snapshot our state first.
|
|
}
|
|
|
|
storableSilences, err := server.stateStore.Get(ctx, server.orgID)
|
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
|
return 0, err
|
|
}
|
|
|
|
if storableSilences == nil {
|
|
storableSilences = alertmanagertypes.NewStoreableState(server.orgID)
|
|
}
|
|
|
|
c, err := storableSilences.Set(alertmanagertypes.SilenceStateName, server.silences)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return c, server.stateStore.Set(ctx, server.orgID, storableSilences)
|
|
})
|
|
|
|
}()
|
|
|
|
// Start maintenance for notification logs
|
|
server.wg.Add(1)
|
|
go func() {
|
|
defer server.wg.Done()
|
|
server.nflog.Maintenance(server.srvConfig.NFLog.MaintenanceInterval, snapfnoop, server.stopc, func() (int64, error) {
|
|
if _, err := server.nflog.GC(); err != nil {
|
|
server.logger.ErrorContext(ctx, "notification log garbage collection", "error", err)
|
|
// Don't return without saving the current state.
|
|
}
|
|
|
|
storableNFLog, err := server.stateStore.Get(ctx, server.orgID)
|
|
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
|
|
return 0, err
|
|
}
|
|
|
|
if storableNFLog == nil {
|
|
storableNFLog = alertmanagertypes.NewStoreableState(server.orgID)
|
|
}
|
|
|
|
c, err := storableNFLog.Set(alertmanagertypes.NFLogStateName, server.nflog)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return c, server.stateStore.Set(ctx, server.orgID, storableNFLog)
|
|
})
|
|
}()
|
|
|
|
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.logger, signozRegisterer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
|
|
server.dispatcherMetrics = NewDispatcherMetrics(false, signozRegisterer)
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) {
|
|
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
|
|
server.inhibitor.Mutes(labels)
|
|
server.silencer.Mutes(labels)
|
|
}, params)
|
|
}
|
|
|
|
func (server *Server) PutAlerts(ctx context.Context, postableAlerts alertmanagertypes.PostableAlerts) error {
|
|
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(postableAlerts, time.Duration(server.srvConfig.Global.ResolveTimeout), time.Now())
|
|
// Notification sending alert takes precedence over validation errors.
|
|
if err := server.alerts.Put(alerts...); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err != nil {
|
|
return errors.Join(err...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertmanagertypes.Config) error {
|
|
config := alertmanagerConfig.AlertmanagerConfig()
|
|
|
|
var err error
|
|
server.tmpl, err = alertmanagertypes.FromGlobs(config.Templates)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
server.tmpl.ExternalURL = server.srvConfig.ExternalURL
|
|
|
|
// Build the routing tree and record which receivers are used.
|
|
routes := dispatch.NewRoute(config.Route, nil)
|
|
activeReceivers := make(map[string]struct{})
|
|
routes.Walk(func(r *dispatch.Route) {
|
|
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
|
|
})
|
|
|
|
// Build the map of receiver to integrations.
|
|
receivers := make(map[string][]notify.Integration, len(activeReceivers))
|
|
var integrationsNum int
|
|
for _, rcv := range config.Receivers {
|
|
if _, found := activeReceivers[rcv.Name]; !found {
|
|
// No need to build a receiver if no route is using it.
|
|
server.logger.InfoContext(ctx, "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
|
|
continue
|
|
}
|
|
integrations, err := alertmanagernotify.NewReceiverIntegrations(rcv, server.tmpl, server.logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// rcv.Name is guaranteed to be unique across all receivers.
|
|
receivers[rcv.Name] = integrations
|
|
integrationsNum += len(integrations)
|
|
}
|
|
|
|
// Build the map of time interval names to time interval definitions.
|
|
timeIntervals := make(map[string][]timeinterval.TimeInterval, len(config.MuteTimeIntervals)+len(config.TimeIntervals))
|
|
for _, ti := range config.MuteTimeIntervals {
|
|
timeIntervals[ti.Name] = ti.TimeIntervals
|
|
}
|
|
|
|
for _, ti := range config.TimeIntervals {
|
|
timeIntervals[ti.Name] = ti.TimeIntervals
|
|
}
|
|
|
|
intervener := timeinterval.NewIntervener(timeIntervals)
|
|
|
|
if server.inhibitor != nil {
|
|
server.inhibitor.Stop()
|
|
}
|
|
if server.dispatcher != nil {
|
|
server.dispatcher.Stop()
|
|
}
|
|
|
|
server.inhibitor = inhibit.NewInhibitor(server.alerts, config.InhibitRules, server.marker, server.logger)
|
|
server.timeIntervals = timeIntervals
|
|
server.silencer = silence.NewSilencer(server.silences, server.marker, server.logger)
|
|
|
|
var pipelinePeer notify.Peer
|
|
pipeline := server.pipelineBuilder.New(
|
|
receivers,
|
|
func() time.Duration { return 0 },
|
|
server.inhibitor,
|
|
server.silencer,
|
|
intervener,
|
|
server.marker,
|
|
server.nflog,
|
|
pipelinePeer,
|
|
)
|
|
|
|
timeoutFunc := func(d time.Duration) time.Duration {
|
|
if d < notify.MinTimeout {
|
|
d = notify.MinTimeout
|
|
}
|
|
return d
|
|
}
|
|
|
|
server.dispatcher = NewDispatcher(
|
|
server.alerts,
|
|
routes,
|
|
pipeline,
|
|
server.marker,
|
|
timeoutFunc,
|
|
nil,
|
|
server.logger,
|
|
server.dispatcherMetrics,
|
|
server.notificationManager,
|
|
server.orgID,
|
|
)
|
|
|
|
// Do not try to add these to server.wg as there seems to be a race condition if
|
|
// we call Start() and Stop() in quick succession.
|
|
// Both these goroutines will run indefinitely.
|
|
go server.dispatcher.Run()
|
|
go server.inhibitor.Run()
|
|
|
|
server.alertmanagerConfig = alertmanagerConfig
|
|
return nil
|
|
}
|
|
|
|
func (server *Server) TestReceiver(ctx context.Context, receiver alertmanagertypes.Receiver) error {
|
|
testAlert := alertmanagertypes.NewTestAlert(receiver, time.Now(), time.Now())
|
|
return alertmanagertypes.TestReceiver(ctx, receiver, alertmanagernotify.NewReceiverIntegrations, server.alertmanagerConfig, server.tmpl, server.logger, testAlert.Labels, testAlert)
|
|
}
|
|
|
|
func (server *Server) TestAlert(ctx context.Context, receiversMap map[*alertmanagertypes.PostableAlert][]string, config *alertmanagertypes.NotificationConfig) error {
|
|
if len(receiversMap) == 0 {
|
|
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
|
|
"expected at least 1 alert, got 0")
|
|
}
|
|
|
|
postableAlerts := make(alertmanagertypes.PostableAlerts, 0, len(receiversMap))
|
|
for alert := range receiversMap {
|
|
postableAlerts = append(postableAlerts, alert)
|
|
}
|
|
|
|
alerts, err := alertmanagertypes.NewAlertsFromPostableAlerts(
|
|
postableAlerts,
|
|
time.Duration(server.srvConfig.Global.ResolveTimeout),
|
|
time.Now(),
|
|
)
|
|
if err != nil {
|
|
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
|
|
"failed to construct alerts from postable alerts: %v", err)
|
|
}
|
|
|
|
type alertGroup struct {
|
|
groupLabels model.LabelSet
|
|
alerts []*types.Alert
|
|
receivers map[string]struct{}
|
|
}
|
|
|
|
groupMap := make(map[model.Fingerprint]*alertGroup)
|
|
|
|
for i, alert := range alerts {
|
|
labels := getGroupLabels(alert, config.NotificationGroup, config.GroupByAll)
|
|
fp := labels.Fingerprint()
|
|
|
|
postableAlert := postableAlerts[i]
|
|
alertReceivers := receiversMap[postableAlert]
|
|
|
|
if group, exists := groupMap[fp]; exists {
|
|
group.alerts = append(group.alerts, alert)
|
|
for _, r := range alertReceivers {
|
|
group.receivers[r] = struct{}{}
|
|
}
|
|
} else {
|
|
receiverSet := make(map[string]struct{})
|
|
for _, r := range alertReceivers {
|
|
receiverSet[r] = struct{}{}
|
|
}
|
|
groupMap[fp] = &alertGroup{
|
|
groupLabels: labels,
|
|
alerts: []*types.Alert{alert},
|
|
receivers: receiverSet,
|
|
}
|
|
}
|
|
}
|
|
|
|
var mu sync.Mutex
|
|
var errs []error
|
|
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
for _, group := range groupMap {
|
|
for receiverName := range group.receivers {
|
|
group := group
|
|
receiverName := receiverName
|
|
|
|
g.Go(func() error {
|
|
receiver, err := server.alertmanagerConfig.GetReceiver(receiverName)
|
|
if err != nil {
|
|
mu.Lock()
|
|
errs = append(errs, fmt.Errorf("failed to get receiver %q: %w", receiverName, err))
|
|
mu.Unlock()
|
|
return nil // Return nil to continue processing other goroutines
|
|
}
|
|
|
|
err = alertmanagertypes.TestReceiver(
|
|
gCtx,
|
|
receiver,
|
|
alertmanagernotify.NewReceiverIntegrations,
|
|
server.alertmanagerConfig,
|
|
server.tmpl,
|
|
server.logger,
|
|
group.groupLabels,
|
|
group.alerts...,
|
|
)
|
|
if err != nil {
|
|
mu.Lock()
|
|
errs = append(errs, fmt.Errorf("receiver %q test failed: %w", receiverName, err))
|
|
mu.Unlock()
|
|
}
|
|
return nil // Return nil to continue processing other goroutines
|
|
})
|
|
}
|
|
}
|
|
_ = g.Wait()
|
|
|
|
if len(errs) > 0 {
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (server *Server) Hash() string {
|
|
if server.alertmanagerConfig == nil {
|
|
return ""
|
|
}
|
|
|
|
return server.alertmanagerConfig.StoreableConfig().Hash
|
|
}
|
|
|
|
func (server *Server) Stop(ctx context.Context) error {
|
|
if server.dispatcher != nil {
|
|
server.dispatcher.Stop()
|
|
}
|
|
|
|
if server.inhibitor != nil {
|
|
server.inhibitor.Stop()
|
|
}
|
|
|
|
// Close the alert provider.
|
|
server.alerts.Close()
|
|
|
|
// Signals maintenance goroutines of server states to stop.
|
|
close(server.stopc)
|
|
|
|
// Wait for all goroutines to finish.
|
|
server.wg.Wait()
|
|
|
|
return nil
|
|
}
|