mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
* fix: initial commit for agents * fix: remove frontend package manger commit * fix: use sqlstore * fix: opamp server changes * fix: tests * fix: tests * fix: minor changes * fix: migrations * fix: use uuid7 * fix: use default orgID for single tenant * fix: pipelines tests fixed * fix: use correct agentId * fix: use orgID in coordinator * fix: fix tests * fix: remove redundant hash check * fix: migration * fix: migration * fix: address comments * fix: rename migration file * fix: remove unwanted orgid code * fix: use orggetter * fix: comment * fix: schema cleanup * fix: minor changes * chore: addresses changes * fix: add back agentID as it used ulid * fix: keep only 50 agents for an orgId * chore: explicitly specify text type * chore: use valuer.uuid for orgid * fix: linting complain * chore: final fixes * chore: minor changes * fix: add not null * fix: fe tests --------- Co-authored-by: Vikrant Gupta <vikrant@signoz.io>
355 lines
12 KiB
Go
355 lines
12 KiB
Go
package agentConf
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
|
|
filterprocessor "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
|
|
tsp "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
|
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
|
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
"github.com/google/uuid"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
yaml "gopkg.in/yaml.v3"
|
|
)
|
|
|
|
var m *Manager
|
|
|
|
func init() {
|
|
m = &Manager{}
|
|
}
|
|
|
|
type AgentFeatureType string
|
|
|
|
type Manager struct {
|
|
Repo
|
|
// lock to make sure only one update is sent to remote agents at a time
|
|
lock uint32
|
|
|
|
// For AgentConfigProvider implementation
|
|
agentFeatures []AgentFeature
|
|
configSubscribers map[string]func()
|
|
configSubscribersLock sync.Mutex
|
|
}
|
|
|
|
type ManagerOptions struct {
|
|
Store sqlstore.SQLStore
|
|
|
|
// When acting as opamp.AgentConfigProvider, agent conf recommendations are
|
|
// applied to the base conf in the order the features have been specified here.
|
|
AgentFeatures []AgentFeature
|
|
}
|
|
|
|
func Initiate(options *ManagerOptions) (*Manager, error) {
|
|
// featureType must be unqiue across registered AgentFeatures.
|
|
agentFeatureByType := map[AgentFeatureType]AgentFeature{}
|
|
for _, feature := range options.AgentFeatures {
|
|
featureType := feature.AgentFeatureType()
|
|
if agentFeatureByType[featureType] != nil {
|
|
panic(fmt.Sprintf(
|
|
"found multiple agent features with type: %s", featureType,
|
|
))
|
|
}
|
|
agentFeatureByType[featureType] = feature
|
|
}
|
|
|
|
m = &Manager{
|
|
Repo: Repo{options.Store},
|
|
agentFeatures: options.AgentFeatures,
|
|
configSubscribers: map[string]func(){},
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// Implements opamp.AgentConfigProvider
|
|
func (m *Manager) SubscribeToConfigUpdates(callback func()) (unsubscribe func()) {
|
|
m.configSubscribersLock.Lock()
|
|
defer m.configSubscribersLock.Unlock()
|
|
|
|
subscriberId := uuid.NewString()
|
|
m.configSubscribers[subscriberId] = callback
|
|
|
|
return func() {
|
|
delete(m.configSubscribers, subscriberId)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) notifyConfigUpdateSubscribers() {
|
|
m.configSubscribersLock.Lock()
|
|
defer m.configSubscribersLock.Unlock()
|
|
for _, handler := range m.configSubscribers {
|
|
handler()
|
|
}
|
|
}
|
|
|
|
// Implements opamp.AgentConfigProvider
|
|
func (m *Manager) RecommendAgentConfig(orgId valuer.UUID, currentConfYaml []byte) (
|
|
recommendedConfYaml []byte,
|
|
// Opaque id of the recommended config, used for reporting deployment status updates
|
|
configId string,
|
|
err error,
|
|
) {
|
|
recommendation := currentConfYaml
|
|
settingVersionsUsed := []string{}
|
|
|
|
for _, feature := range m.agentFeatures {
|
|
featureType := opamptypes.NewElementType(string(feature.AgentFeatureType()))
|
|
latestConfig, apiErr := GetLatestVersion(context.Background(), orgId, featureType)
|
|
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
|
|
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
|
|
}
|
|
|
|
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(orgId, recommendation, latestConfig)
|
|
if apiErr != nil {
|
|
return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf(
|
|
"failed to generate agent config recommendation for %s", featureType,
|
|
))
|
|
}
|
|
recommendation = updatedConf
|
|
|
|
// It is possible for a feature to recommend collector config
|
|
// before any user created config versions exist.
|
|
//
|
|
// For example, log pipeline config for installed integrations will
|
|
// have to be recommended even if the user hasn't created any pipelines yet
|
|
configVersion := -1
|
|
if latestConfig != nil {
|
|
configVersion = latestConfig.Version
|
|
}
|
|
configId := fmt.Sprintf("%s:%d", featureType, configVersion)
|
|
|
|
settingVersionsUsed = append(settingVersionsUsed, configId)
|
|
|
|
_ = m.updateDeployStatus(
|
|
context.Background(),
|
|
orgId,
|
|
featureType,
|
|
configVersion,
|
|
opamptypes.DeployInitiated.StringValue(),
|
|
"Deployment has started",
|
|
configId,
|
|
serializedSettingsUsed,
|
|
)
|
|
|
|
}
|
|
|
|
if len(settingVersionsUsed) > 0 {
|
|
configId = strings.Join(settingVersionsUsed, ",")
|
|
|
|
} else {
|
|
// Do not return an empty configId even if no recommendations were made
|
|
hash := sha256.New()
|
|
hash.Write(recommendation)
|
|
configId = string(hash.Sum(nil))
|
|
}
|
|
|
|
return recommendation, configId, nil
|
|
}
|
|
|
|
// Implements opamp.AgentConfigProvider
|
|
func (m *Manager) ReportConfigDeploymentStatus(
|
|
orgId valuer.UUID,
|
|
agentId string,
|
|
configId string,
|
|
err error,
|
|
) {
|
|
featureConfigIds := strings.Split(configId, ",")
|
|
for _, featureConfId := range featureConfigIds {
|
|
newStatus := opamptypes.Deployed.StringValue()
|
|
message := "Deployment was successful"
|
|
if err != nil {
|
|
newStatus = opamptypes.DeployFailed.StringValue()
|
|
message = fmt.Sprintf("%s: %s", agentId, err.Error())
|
|
}
|
|
_ = m.updateDeployStatusByHash(
|
|
context.Background(), orgId, featureConfId, newStatus, message,
|
|
)
|
|
}
|
|
}
|
|
|
|
func GetLatestVersion(
|
|
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
|
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
|
|
return m.GetLatestVersion(ctx, orgId, elementType)
|
|
}
|
|
|
|
func GetConfigVersion(
|
|
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType, version int,
|
|
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
|
|
return m.GetConfigVersion(ctx, orgId, elementType, version)
|
|
}
|
|
|
|
func GetConfigHistory(
|
|
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, limit int,
|
|
) ([]opamptypes.AgentConfigVersion, *model.ApiError) {
|
|
return m.GetConfigHistory(ctx, orgId, typ, limit)
|
|
}
|
|
|
|
// StartNewVersion launches a new config version for given set of elements
|
|
func StartNewVersion(
|
|
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, eleType opamptypes.ElementType, elementIds []string,
|
|
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
|
|
|
|
// create a new version
|
|
cfg := opamptypes.NewAgentConfigVersion(orgId, userId, eleType)
|
|
|
|
// insert new config and elements into database
|
|
err := m.insertConfig(ctx, orgId, userId, cfg, elementIds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m.notifyConfigUpdateSubscribers()
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func NotifyConfigUpdate(ctx context.Context) {
|
|
m.notifyConfigUpdateSubscribers()
|
|
}
|
|
|
|
func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, version int) *model.ApiError {
|
|
|
|
configVersion, err := GetConfigVersion(ctx, orgId, typ, version)
|
|
if err != nil {
|
|
zap.L().Error("failed to fetch config version during redeploy", zap.Error(err))
|
|
return model.WrapApiError(err, "failed to fetch details of the config version")
|
|
}
|
|
|
|
if configVersion == nil || (configVersion != nil && configVersion.Config == "") {
|
|
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
|
|
return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
|
|
}
|
|
switch typ {
|
|
case opamptypes.ElementTypeSamplingRules:
|
|
var config *tsp.Config
|
|
if err := yaml.Unmarshal([]byte(configVersion.Config), &config); err != nil {
|
|
zap.L().Debug("failed to read last conf correctly", zap.Error(err))
|
|
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
|
|
}
|
|
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"signoz_tail_sampling": config,
|
|
}
|
|
|
|
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
|
|
return model.InternalError(fmt.Errorf("failed to deploy the config"))
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, configVersion.Config)
|
|
case opamptypes.ElementTypeDropRules:
|
|
var filterConfig *filterprocessor.Config
|
|
if err := yaml.Unmarshal([]byte(configVersion.Config), &filterConfig); err != nil {
|
|
zap.L().Error("failed to read last conf correctly", zap.Error(err))
|
|
return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
|
|
}
|
|
processorConf := map[string]interface{}{
|
|
"filter": filterConfig,
|
|
}
|
|
|
|
opamp.AddToMetricsPipelineSpec("filter")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, configVersion.Config)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpsertFilterProcessor updates the agent config with new filter processor params
|
|
func UpsertFilterProcessor(ctx context.Context, orgId valuer.UUID, version int, config *filterprocessor.Config) error {
|
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
|
return fmt.Errorf("agent updater is busy")
|
|
}
|
|
defer atomic.StoreUint32(&m.lock, 0)
|
|
|
|
// merge current config with new filter params
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"filter": config,
|
|
}
|
|
|
|
opamp.AddToMetricsPipelineSpec("filter")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
processorConfYaml, yamlErr := yaml.Marshal(config)
|
|
if yamlErr != nil {
|
|
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeDropRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))
|
|
return nil
|
|
}
|
|
|
|
// OnConfigUpdate is a callback function passed to opamp server.
|
|
// It receives a config hash with error status. We assume
|
|
// successful deployment if no error is received.
|
|
// this method is currently expected to be called only once in the lifecycle
|
|
// but can be improved in future to accept continuous request status updates from opamp
|
|
func (m *Manager) OnConfigUpdate(orgId valuer.UUID, agentId string, hash string, err error) {
|
|
|
|
status := opamptypes.Deployed.StringValue()
|
|
|
|
message := "Deployment was successful"
|
|
|
|
defer func() {
|
|
zap.L().Info(status, zap.String("agentId", agentId), zap.String("agentResponse", message))
|
|
}()
|
|
|
|
if err != nil {
|
|
status = opamptypes.DeployFailed.StringValue()
|
|
message = fmt.Sprintf("%s: %s", agentId, err.Error())
|
|
}
|
|
|
|
_ = m.updateDeployStatusByHash(context.Background(), orgId, hash, status, message)
|
|
}
|
|
|
|
// UpsertSamplingProcessor updates the agent config with new filter processor params
|
|
func UpsertSamplingProcessor(ctx context.Context, orgId valuer.UUID, version int, config *tsp.Config) error {
|
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
|
return fmt.Errorf("agent updater is busy")
|
|
}
|
|
defer atomic.StoreUint32(&m.lock, 0)
|
|
|
|
// merge current config with new filter params
|
|
processorConf := map[string]interface{}{
|
|
"signoz_tail_sampling": config,
|
|
}
|
|
|
|
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
|
|
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
|
|
if err != nil {
|
|
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
processorConfYaml, yamlErr := yaml.Marshal(config)
|
|
if yamlErr != nil {
|
|
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
|
|
}
|
|
|
|
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))
|
|
return nil
|
|
}
|