signoz/pkg/query-service/app/opamp/configure_ingestionRules.go
Nityananda Gohain 8455349459
fix: support orgId and postgres in agents (#7327)
* fix: initial commit for agents

* fix: remove frontend package manger commit

* fix: use sqlstore

* fix: opamp server changes

* fix: tests

* fix: tests

* fix: minor changes

* fix: migrations

* fix: use uuid7

* fix: use default orgID for single tenant

* fix: pipelines tests fixed

* fix: use correct agentId

* fix: use orgID in coordinator

* fix: fix tests

* fix: remove redundant hash check

* fix: migration

* fix: migration

* fix: address comments

* fix: rename migration file

* fix: remove unwanted orgid code

* fix: use orggetter

* fix: comment

* fix: schema cleanup

* fix: minor changes

* chore: addresses changes

* fix: add back agentID as it used ulid

* fix: keep only 50 agents for an orgId

* chore: explicitly specify text type

* chore: use valuer.uuid for orgid

* fix: linting complain

* chore: final fixes

* chore: minor changes

* fix: add not null

* fix: fe tests

---------

Co-authored-by: Vikrant Gupta <vikrant@signoz.io>
2025-06-16 20:07:16 +05:30

152 lines
4.5 KiB
Go

package opamp
import (
"context"
"crypto/sha256"
"fmt"
model "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig"
coreModel "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/knadh/koanf/parsers/yaml"
"github.com/open-telemetry/opamp-go/protobufs"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
)
// inserts or updates ingestion controller processors depending
// on the signal (metrics or traces)
func UpsertControlProcessors(
ctx context.Context,
signal string,
processors map[string]interface{},
callback model.OnChangeCallback,
) (hash string, fnerr *coreModel.ApiError) {
// note: only processors enabled through tracesPipelinePlan will be added
// to pipeline. To enable or disable processors from pipeline, call
// AddToTracePipeline() or RemoveFromTracesPipeline() prior to calling
// this method
zap.L().Debug("initiating ingestion rules deployment config", zap.String("signal", signal), zap.Any("processors", processors))
if signal != string(Metrics) && signal != string(Traces) {
zap.L().Error("received invalid signal int UpsertControlProcessors", zap.String("signal", signal))
fnerr = coreModel.BadRequest(fmt.Errorf(
"signal not supported in ingestion rules: %s", signal,
))
return
}
if opAmpServer == nil {
fnerr = coreModel.UnavailableError(fmt.Errorf(
"opamp server is down, unable to push config to agent at this moment",
))
return
}
agents := opAmpServer.agents.GetAllAgents()
if len(agents) == 0 {
fnerr = coreModel.UnavailableError(fmt.Errorf("no agents available at the moment"))
return
}
if len(agents) > 1 && signal == string(Traces) {
zap.L().Debug("found multiple agents. this feature is not supported for traces pipeline (sampling rules)")
fnerr = coreModel.BadRequest(fmt.Errorf("multiple agents not supported in sampling rules"))
return
}
for _, agent := range agents {
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
if err != nil {
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.AgentID), zap.Error(err))
continue
}
if agenthash != "" {
// subscribe callback
model.ListenToConfigUpdate(agent.OrgID, agent.AgentID, agenthash, callback)
}
hash = agenthash
}
return hash, nil
}
// addIngestionControlToAgent adds ingestion contorl rules to agent config
func addIngestionControlToAgent(agent *model.Agent, signal string, processors map[string]interface{}, withLB bool) (string, error) {
confHash := ""
config := agent.Config
c, err := yaml.Parser().Unmarshal([]byte(config))
if err != nil {
return confHash, err
}
agentConf := confmap.NewFromStringMap(c)
// add ingestion control spec
err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
if err != nil {
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
return confHash, err
}
// ------ complete adding processor
configR, err := yaml.Parser().Marshal(agentConf.ToStringMap())
if err != nil {
return confHash, err
}
zap.L().Debug("sending new config", zap.String("config", string(configR)))
hash := sha256.New()
_, err = hash.Write(configR)
if err != nil {
return confHash, err
}
confHash = string(hash.Sum(nil))
agent.Config = string(configR)
err = agent.Upsert()
if err != nil {
return confHash, err
}
agent.SendToAgent(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"collector.yaml": {
Body: configR,
ContentType: "application/x-yaml",
},
},
},
ConfigHash: []byte(confHash),
},
})
return string(confHash), nil
}
// prepare spec to introduce ingestion control in agent conf
func makeIngestionControlSpec(agentConf *confmap.Conf, signal Signal, processors map[string]interface{}) error {
configParser := otelconfig.NewConfigParser(agentConf)
configParser.UpdateProcessors(processors)
// edit pipeline if processor is missing
currentPipeline := configParser.PipelineProcessors(string(signal))
// merge tracesPipelinePlan with current pipeline
mergedPipeline, err := buildPipeline(signal, currentPipeline)
if err != nil {
zap.L().Error("failed to build pipeline", zap.String("signal", string(signal)), zap.Error(err))
return err
}
// add merged pipeline to the service
configParser.UpdateProcsInPipeline(string(signal), mergedPipeline)
return nil
}