signoz/pkg/query-service/app/opamp/config_provider_test.go

325 lines
10 KiB
Go
Raw Normal View History

package opamp
import (
"context"
"fmt"
"log/slog"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
_ "github.com/mattn/go-sqlite3"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
require := require.New(t)
tb := newTestbed(t)
orgID, err := utils.GetTestOrgId(tb.sqlStore)
require.Nil(err)
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"there should be no agent config subscribers at the start",
)
tb.StartServer()
require.Equal(
1, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have subscribed to updates from config provider after being started",
)
// Server should always respond with a RemoteConfig when an agent connects.
// Even if there are no recommended changes to the agent's initial config
require.False(tb.testConfigProvider.HasRecommendations())
agent1Conn := &MockOpAmpConnection{}
agent1Id := valuer.GenerateUUID().String()
// get orgId from the db
tb.opampServer.OnMessage(
agent1Conn,
&protobufs.AgentToServer{
InstanceUid: agent1Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
lastAgent1Msg := agent1Conn.LatestMsgFromServer()
require.NotNil(
lastAgent1Msg,
"Server should always send a remote config to the agent when it connects",
)
require.Equal(
RemoteConfigBody(lastAgent1Msg),
string(initialAgentConf().ConfigMap[model.CollectorConfigFilename].Body),
)
tb.testConfigProvider.ZPagesEndpoint = "localhost:55555"
require.True(tb.testConfigProvider.HasRecommendations())
agent2Id := valuer.GenerateUUID().String()
agent2Conn := &MockOpAmpConnection{}
tb.opampServer.OnMessage(
agent2Conn,
&protobufs.AgentToServer{
InstanceUid: agent2Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
lastAgent2Msg := agent2Conn.LatestMsgFromServer()
require.NotNil(
lastAgent2Msg,
"server should recommend a config to agent when it connects",
)
recommendedEndpoint, err := GetStringValueFromYaml(
[]byte(RemoteConfigBody(lastAgent2Msg)), "extensions.zpages.endpoint",
)
require.Nil(err)
require.Equal(
tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint,
"server should send recommended config to agent when it connects",
)
agent2Conn.ClearMsgsFromServer()
tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{
InstanceUid: agent2Id,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: NewAgentConfigMap(
[]byte(RemoteConfigBody(lastAgent2Msg)),
),
},
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash,
},
})
expectedConfId := tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2Id),
"Server should report deployment success to config provider on receiving update from agent.",
)
require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2Id])
require.Nil(
agent2Conn.LatestMsgFromServer(),
"Server should not recommend a RemoteConfig if agent is already running it.",
)
// Server should rollout latest config to all agents when notified of a change by config provider
agent1Conn.ClearMsgsFromServer()
agent2Conn.ClearMsgsFromServer()
tb.testConfigProvider.ZPagesEndpoint = "localhost:66666"
tb.testConfigProvider.NotifySubscribersOfChange()
for _, agentConn := range []*MockOpAmpConnection{agent1Conn, agent2Conn} {
lastMsg := agentConn.LatestMsgFromServer()
recommendedEndpoint, err := GetStringValueFromYaml(
[]byte(RemoteConfigBody(lastMsg)), "extensions.zpages.endpoint",
)
require.Nil(err)
require.Equal(tb.testConfigProvider.ZPagesEndpoint, recommendedEndpoint)
}
lastAgent2Msg = agent2Conn.LatestMsgFromServer()
tb.opampServer.OnMessage(agent2Conn, &protobufs.AgentToServer{
InstanceUid: agent2Id,
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
LastRemoteConfigHash: lastAgent2Msg.RemoteConfig.ConfigHash,
},
})
expectedConfId = tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2Id),
"Server should report deployment failure to config provider on receiving update from agent.",
)
require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2Id])
lastAgent1Msg = agent1Conn.LatestMsgFromServer()
agent1Conn.ClearMsgsFromServer()
response := tb.opampServer.OnMessage(agent1Conn, &protobufs.AgentToServer{
InstanceUid: agent1Id,
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
LastRemoteConfigHash: lastAgent1Msg.RemoteConfig.ConfigHash,
},
})
require.Nil(response.RemoteConfig)
require.Nil(
agent1Conn.LatestMsgFromServer(),
"server should not recommend a config if agent is reporting back with status on a broadcasted config",
)
require.Equal(1, len(tb.testConfigProvider.ConfigUpdateSubscribers))
tb.opampServer.Stop()
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have unsubscribed to config provider updates after shutdown",
)
}
func TestOpAMPServerAgentLimit(t *testing.T) {
require := require.New(t)
tb := newTestbed(t)
// Create 51 agents and check if the first one gets deleted
var agentConnections []*MockOpAmpConnection
var agentIds []string
for i := 0; i < 51; i++ {
agentConn := &MockOpAmpConnection{}
agentId := valuer.GenerateUUID().String()
agentIds = append(agentIds, agentId)
tb.opampServer.OnMessage(
agentConn,
&protobufs.AgentToServer{
InstanceUid: agentId,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
agentConnections = append(agentConnections, agentConn)
}
// Perform a DB level check to ensure the first agent is removed
count, err := tb.sqlStore.BunDB().NewSelect().
Model(new(opamptypes.StorableAgent)).
Where("agent_id = ?", agentIds[0]).
Count(context.Background())
require.Nil(err, "Error querying the database for agent count")
require.Equal(0, count, "First agent should be removed from the database after exceeding the limit of 50 agents")
// verify there are 50 agents in the db
count, err = tb.sqlStore.BunDB().NewSelect().
Model(new(opamptypes.StorableAgent)).
Count(context.Background())
require.Nil(err, "Error querying the database for agent count")
require.Equal(50, count, "There should be 50 agents in the database")
// Check if the 51st agent received a config
lastAgentConn := agentConnections[50]
lastAgentMsg := lastAgentConn.LatestMsgFromServer()
require.NotNil(
lastAgentMsg,
"51st agent should receive a remote config from the server",
)
tb.opampServer.Stop()
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have unsubscribed to config provider updates after shutdown",
)
}
type testbed struct {
testConfigProvider *MockAgentConfigProvider
opampServer *Server
t *testing.T
sqlStore sqlstore.SQLStore
}
func newTestbed(t *testing.T) *testbed {
testDB := utils.NewQueryServiceDBForTests(t)
providerSettings := instrumentationtest.New().ToProviderSettings()
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.Nil(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
model.Init(testDB, slog.Default(), orgGetter)
testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider)
// create a test org
err = utils.CreateTestOrg(t, testDB)
if err != nil {
t.Fatalf("could not create test org: %v", err)
}
return &testbed{
testConfigProvider: testConfigProvider,
opampServer: opampServer,
t: t,
sqlStore: testDB,
}
}
func (tb *testbed) StartServer() {
testListenPath := GetAvailableLocalAddress()
err := tb.opampServer.Start(testListenPath)
require.Nil(tb.t, err, "should be able to start opamp server")
}
// Test helper
func GetStringValueFromYaml(
serializedYaml []byte, path string,
) (string, error) {
if len(serializedYaml) < 1 {
return "", fmt.Errorf("yaml data is empty")
}
k := koanf.New(".")
err := k.Load(rawbytes.Provider(serializedYaml), yaml.Parser())
if err != nil {
return "", errors.Wrap(err, "could not unmarshal collector config")
}
return k.String("extensions.zpages.endpoint"), nil
}
// Returns body of a ServerToAgent.RemoteConfig or ""
func RemoteConfigBody(msg *protobufs.ServerToAgent) string {
if msg == nil {
return ""
}
collectorConfFiles := msg.RemoteConfig.Config.ConfigMap
if len(collectorConfFiles) < 1 {
return ""
}
return string(maps.Values(collectorConfFiles)[0].Body)
}
func NewAgentConfigMap(body []byte) *protobufs.AgentConfigMap {
return &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
model.CollectorConfigFilename: {
Body: body,
ContentType: "text/yaml",
},
},
}
}
func initialAgentConf() *protobufs.AgentConfigMap {
return NewAgentConfigMap(
[]byte(`
receivers:
otlp:
processors:
batch:
exporters:
otlp:
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
`),
)
}