mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
chore: enable alertmanager metrics collection with instrumentation::metrics (#9020)
* feat(alerts-v2): exposed alertmanager metrics for signozalertmanager * feat(alerts-v2): exposed alertmanager metrics for signozalertmanager * feat(alerts-v2): exposed alertmanager metrics for signozalertmanager * feat(notification-routing): added custom meter provider * feat(notification-routing): added custom meter provider * feat(notification-routing): added custom meter provider * feat(notification-routing): added custom meter provider * feat(notification-routing): added org id label
This commit is contained in:
parent
74be8f5611
commit
c737a7e070
4
go.mod
4
go.mod
@ -300,13 +300,13 @@ require (
|
|||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.36.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.36.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 // indirect
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/prometheus v0.58.0 // indirect
|
go.opentelemetry.io/otel/exporters/prometheus v0.58.0
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.12.2 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.12.2 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect
|
||||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect
|
||||||
go.opentelemetry.io/otel/log v0.12.2 // indirect
|
go.opentelemetry.io/otel/log v0.12.2 // indirect
|
||||||
go.opentelemetry.io/otel/sdk/log v0.12.2 // indirect
|
go.opentelemetry.io/otel/sdk/log v0.12.2 // indirect
|
||||||
go.opentelemetry.io/otel/sdk/metric v1.36.0 // indirect
|
go.opentelemetry.io/otel/sdk/metric v1.36.0
|
||||||
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
|
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
|
||||||
go.uber.org/atomic v1.11.0 // indirect
|
go.uber.org/atomic v1.11.0 // indirect
|
||||||
go.uber.org/goleak v1.3.0 // indirect
|
go.uber.org/goleak v1.3.0 // indirect
|
||||||
|
|||||||
@ -73,8 +73,10 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
|||||||
stateStore: stateStore,
|
stateStore: stateStore,
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
signozRegisterer := prometheus.WrapRegistererWithPrefix("signoz_", registry)
|
||||||
|
signozRegisterer = prometheus.WrapRegistererWith(prometheus.Labels{"org_id": server.orgID}, signozRegisterer)
|
||||||
// initialize marker
|
// initialize marker
|
||||||
server.marker = alertmanagertypes.NewMarker(server.registry)
|
server.marker = alertmanagertypes.NewMarker(signozRegisterer)
|
||||||
|
|
||||||
// get silences for initial state
|
// get silences for initial state
|
||||||
state, err := server.stateStore.Get(ctx, server.orgID)
|
state, err := server.stateStore.Get(ctx, server.orgID)
|
||||||
@ -97,7 +99,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
|||||||
MaxSilences: func() int { return srvConfig.Silences.Max },
|
MaxSilences: func() int { return srvConfig.Silences.Max },
|
||||||
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
|
MaxSilenceSizeBytes: func() int { return srvConfig.Silences.MaxSizeBytes },
|
||||||
},
|
},
|
||||||
Metrics: server.registry,
|
Metrics: signozRegisterer,
|
||||||
Logger: server.logger,
|
Logger: server.logger,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -116,7 +118,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
|||||||
server.nflog, err = nflog.New(nflog.Options{
|
server.nflog, err = nflog.New(nflog.Options{
|
||||||
SnapshotReader: strings.NewReader(nflogSnapshot),
|
SnapshotReader: strings.NewReader(nflogSnapshot),
|
||||||
Retention: server.srvConfig.NFLog.Retention,
|
Retention: server.srvConfig.NFLog.Retention,
|
||||||
Metrics: server.registry,
|
Metrics: signozRegisterer,
|
||||||
Logger: server.logger,
|
Logger: server.logger,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -181,13 +183,13 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
|||||||
})
|
})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.logger, server.registry)
|
server.alerts, err = mem.NewAlerts(ctx, server.marker, server.srvConfig.Alerts.GCInterval, nil, server.logger, signozRegisterer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server.pipelineBuilder = notify.NewPipelineBuilder(server.registry, featurecontrol.NoopFlags{})
|
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
|
||||||
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, server.registry)
|
server.dispatcherMetrics = dispatch.NewDispatcherMetrics(false, signozRegisterer)
|
||||||
|
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
|
|||||||
160
pkg/instrumentation/metric.go
Normal file
160
pkg/instrumentation/metric.go
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
package instrumentation
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
contribsdkconfig "go.opentelemetry.io/contrib/config"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
|
"go.opentelemetry.io/otel/metric/noop"
|
||||||
|
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
|
)
|
||||||
|
|
||||||
|
// readerWithServer wraps a metric reader with an HTTP server for proper shutdown
|
||||||
|
// This mirrors the upstream contrib/config implementation
|
||||||
|
type readerWithServer struct {
|
||||||
|
sdkmetric.Reader
|
||||||
|
server *http.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rws readerWithServer) Shutdown(ctx context.Context) error {
|
||||||
|
return errors.Join(
|
||||||
|
rws.Reader.Shutdown(ctx),
|
||||||
|
rws.server.Shutdown(ctx),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// prometheusReaderWithCustomRegistry creates a Prometheus metric reader using a custom registry
|
||||||
|
// This is based on the upstream contrib/config implementation but allows passing a custom registry
|
||||||
|
func prometheusReaderWithCustomRegistry(ctx context.Context, prometheusConfig *contribsdkconfig.Prometheus, customRegistry *prometheus.Registry) (sdkmetric.Reader, error) {
|
||||||
|
var opts []otelprom.Option
|
||||||
|
if prometheusConfig.Host == nil {
|
||||||
|
return nil, fmt.Errorf("host must be specified")
|
||||||
|
}
|
||||||
|
if prometheusConfig.Port == nil {
|
||||||
|
return nil, fmt.Errorf("port must be specified")
|
||||||
|
}
|
||||||
|
if prometheusConfig.WithoutScopeInfo != nil && *prometheusConfig.WithoutScopeInfo {
|
||||||
|
opts = append(opts, otelprom.WithoutScopeInfo())
|
||||||
|
}
|
||||||
|
if prometheusConfig.WithoutTypeSuffix != nil && *prometheusConfig.WithoutTypeSuffix {
|
||||||
|
opts = append(opts, otelprom.WithoutCounterSuffixes())
|
||||||
|
}
|
||||||
|
if prometheusConfig.WithoutUnits != nil && *prometheusConfig.WithoutUnits {
|
||||||
|
opts = append(opts, otelprom.WithoutUnits())
|
||||||
|
}
|
||||||
|
if prometheusConfig.WithResourceConstantLabels != nil {
|
||||||
|
if prometheusConfig.WithResourceConstantLabels.Included != nil {
|
||||||
|
var keys []attribute.Key
|
||||||
|
for _, val := range prometheusConfig.WithResourceConstantLabels.Included {
|
||||||
|
keys = append(keys, attribute.Key(val))
|
||||||
|
}
|
||||||
|
opts = append(opts, otelprom.WithResourceAsConstantLabels(attribute.NewAllowKeysFilter(keys...)))
|
||||||
|
}
|
||||||
|
if prometheusConfig.WithResourceConstantLabels.Excluded != nil {
|
||||||
|
var keys []attribute.Key
|
||||||
|
for _, val := range prometheusConfig.WithResourceConstantLabels.Excluded {
|
||||||
|
keys = append(keys, attribute.Key(val))
|
||||||
|
}
|
||||||
|
opts = append(opts, otelprom.WithResourceAsConstantLabels(attribute.NewDenyKeysFilter(keys...)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use custom registry instead of creating a new one
|
||||||
|
opts = append(opts, otelprom.WithRegisterer(customRegistry))
|
||||||
|
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("/metrics", promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry}))
|
||||||
|
server := http.Server{
|
||||||
|
// Timeouts are necessary to make a server resilient to attacks, but ListenAndServe doesn't set any.
|
||||||
|
// We use values from this example: https://blog.cloudflare.com/exposing-go-on-the-internet/#:~:text=There%20are%20three%20main%20timeouts
|
||||||
|
ReadTimeout: 5 * time.Second,
|
||||||
|
WriteTimeout: 10 * time.Second,
|
||||||
|
IdleTimeout: 120 * time.Second,
|
||||||
|
Handler: mux,
|
||||||
|
}
|
||||||
|
addr := fmt.Sprintf("%s:%d", *prometheusConfig.Host, *prometheusConfig.Port)
|
||||||
|
|
||||||
|
reader, err := otelprom.New(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating otel prometheus exporter: %w", err)
|
||||||
|
}
|
||||||
|
lis, err := net.Listen("tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Join(
|
||||||
|
fmt.Errorf("binding address %s for Prometheus exporter: %w", addr, err),
|
||||||
|
reader.Shutdown(ctx),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
otel.Handle(fmt.Errorf("the Prometheus HTTP server exited unexpectedly: %w", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return readerWithServer{reader, &server}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type shutdownFunc func(context.Context) error
|
||||||
|
|
||||||
|
// noopShutdown is a no-op shutdown function
|
||||||
|
func noopShutdown(context.Context) error { return nil }
|
||||||
|
|
||||||
|
// meterProviderWithCustomRegistry creates a meter provider using contrib config approach
|
||||||
|
// but with custom Prometheus registry injection
|
||||||
|
func meterProviderWithCustomRegistry(ctx context.Context, meterProviderConfig *contribsdkconfig.MeterProvider, res *resource.Resource, customRegistry *prometheus.Registry) (metric.MeterProvider, shutdownFunc, error) {
|
||||||
|
if meterProviderConfig == nil {
|
||||||
|
return noop.NewMeterProvider(), noopShutdown, nil
|
||||||
|
}
|
||||||
|
opts := []sdkmetric.Option{
|
||||||
|
sdkmetric.WithResource(res),
|
||||||
|
}
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
for _, reader := range meterProviderConfig.Readers {
|
||||||
|
r, err := metricReaderWithCustomRegistry(ctx, reader, customRegistry)
|
||||||
|
if err == nil {
|
||||||
|
opts = append(opts, sdkmetric.WithReader(r))
|
||||||
|
} else {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
return noop.NewMeterProvider(), noopShutdown, errors.Join(errs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
mp := sdkmetric.NewMeterProvider(opts...)
|
||||||
|
return mp, mp.Shutdown, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// metricReaderWithCustomRegistry creates metric readers with custom Prometheus registry support
|
||||||
|
func metricReaderWithCustomRegistry(ctx context.Context, r contribsdkconfig.MetricReader, customRegistry *prometheus.Registry) (sdkmetric.Reader, error) {
|
||||||
|
if r.Periodic != nil && r.Pull != nil {
|
||||||
|
return nil, errors.New("must not specify multiple metric reader type")
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Pull != nil {
|
||||||
|
return pullReaderWithCustomRegistry(ctx, r.Pull.Exporter, customRegistry)
|
||||||
|
}
|
||||||
|
return nil, errors.New("no valid metric reader")
|
||||||
|
}
|
||||||
|
|
||||||
|
// pullReaderWithCustomRegistry creates pull readers with custom Prometheus registry support
|
||||||
|
func pullReaderWithCustomRegistry(ctx context.Context, exporter contribsdkconfig.MetricExporter, customRegistry *prometheus.Registry) (sdkmetric.Reader, error) {
|
||||||
|
if exporter.Prometheus != nil {
|
||||||
|
return prometheusReaderWithCustomRegistry(ctx, exporter.Prometheus, customRegistry)
|
||||||
|
}
|
||||||
|
return nil, errors.New("no valid metric exporter")
|
||||||
|
}
|
||||||
@ -22,6 +22,7 @@ var _ Instrumentation = (*SDK)(nil)
|
|||||||
type SDK struct {
|
type SDK struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
sdk contribsdkconfig.SDK
|
sdk contribsdkconfig.SDK
|
||||||
|
meterProvider sdkmetric.MeterProvider
|
||||||
prometheusRegistry *prometheus.Registry
|
prometheusRegistry *prometheus.Registry
|
||||||
startCh chan struct{}
|
startCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -59,6 +60,9 @@ func New(ctx context.Context, cfg Config, build version.Build, serviceName strin
|
|||||||
SchemaUrl: &sch,
|
SchemaUrl: &sch,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prometheusRegistry := prometheus.NewRegistry()
|
||||||
|
prometheusRegistry.MustRegister(collectors.NewBuildInfoCollector())
|
||||||
|
|
||||||
var tracerProvider *contribsdkconfig.TracerProvider
|
var tracerProvider *contribsdkconfig.TracerProvider
|
||||||
if cfg.Traces.Enabled {
|
if cfg.Traces.Enabled {
|
||||||
tracerProvider = &contribsdkconfig.TracerProvider{
|
tracerProvider = &contribsdkconfig.TracerProvider{
|
||||||
@ -69,20 +73,26 @@ func New(ctx context.Context, cfg Config, build version.Build, serviceName strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var meterProvider *contribsdkconfig.MeterProvider
|
// Use contrib config approach but with custom Prometheus registry
|
||||||
|
var meterProvider sdkmetric.MeterProvider
|
||||||
if cfg.Metrics.Enabled {
|
if cfg.Metrics.Enabled {
|
||||||
meterProvider = &contribsdkconfig.MeterProvider{
|
meterProviderConfig := &contribsdkconfig.MeterProvider{
|
||||||
Readers: []contribsdkconfig.MetricReader{
|
Readers: []contribsdkconfig.MetricReader{
|
||||||
{Pull: &cfg.Metrics.Readers.Pull},
|
{Pull: &cfg.Metrics.Readers.Pull},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mp, _, err := meterProviderWithCustomRegistry(ctx, meterProviderConfig, resource, prometheusRegistry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
meterProvider = mp
|
||||||
}
|
}
|
||||||
|
|
||||||
sdk, err := contribsdkconfig.NewSDK(
|
sdk, err := contribsdkconfig.NewSDK(
|
||||||
contribsdkconfig.WithContext(ctx),
|
contribsdkconfig.WithContext(ctx),
|
||||||
contribsdkconfig.WithOpenTelemetryConfiguration(contribsdkconfig.OpenTelemetryConfiguration{
|
contribsdkconfig.WithOpenTelemetryConfiguration(contribsdkconfig.OpenTelemetryConfiguration{
|
||||||
TracerProvider: tracerProvider,
|
TracerProvider: tracerProvider,
|
||||||
MeterProvider: meterProvider,
|
|
||||||
Resource: &configResource,
|
Resource: &configResource,
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
@ -90,11 +100,9 @@ func New(ctx context.Context, cfg Config, build version.Build, serviceName strin
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
prometheusRegistry := prometheus.NewRegistry()
|
|
||||||
prometheusRegistry.MustRegister(collectors.NewBuildInfoCollector())
|
|
||||||
|
|
||||||
return &SDK{
|
return &SDK{
|
||||||
sdk: sdk,
|
sdk: sdk,
|
||||||
|
meterProvider: meterProvider,
|
||||||
prometheusRegistry: prometheusRegistry,
|
prometheusRegistry: prometheusRegistry,
|
||||||
logger: NewLogger(cfg),
|
logger: NewLogger(cfg),
|
||||||
startCh: make(chan struct{}),
|
startCh: make(chan struct{}),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user