From d174038dce867a653ecf3459ac45ab7d1af67452 Mon Sep 17 00:00:00 2001 From: Piyush Singariya Date: Thu, 26 Jun 2025 16:34:49 +0530 Subject: [PATCH] fix: panic after connecting to collector (#8344) --- .../app/opamp/config_provider_test.go | 19 ++++++++++++------- pkg/query-service/app/opamp/model/agent.go | 11 ++++++++++- pkg/query-service/app/opamp/model/agents.go | 4 ++++ pkg/query-service/app/opamp/opamp_server.go | 11 ++++++++--- .../integration/logparsingpipeline_test.go | 6 ++++-- pkg/valuer/uuid.go | 6 +++++- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/pkg/query-service/app/opamp/config_provider_test.go b/pkg/query-service/app/opamp/config_provider_test.go index 8556de69b416..72a40ce84afa 100644 --- a/pkg/query-service/app/opamp/config_provider_test.go +++ b/pkg/query-service/app/opamp/config_provider_test.go @@ -47,7 +47,8 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { // Even if there are no recommended changes to the agent's initial config require.False(tb.testConfigProvider.HasRecommendations()) agent1Conn := &MockOpAmpConnection{} - agent1Id := []byte(valuer.GenerateUUID().String()) + agent1Id, err := valuer.GenerateUUID().MarshalBinary() + require.Nil(err) // get orgId from the db tb.opampServer.OnMessage( context.Background(), @@ -71,7 +72,9 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { tb.testConfigProvider.ZPagesEndpoint = "localhost:55555" require.True(tb.testConfigProvider.HasRecommendations()) - agent2Id := []byte((valuer.GenerateUUID().String())) + agent2IdUUID := valuer.GenerateUUID() + agent2Id, err := agent2IdUUID.MarshalBinary() + require.Nil(err) agent2Conn := &MockOpAmpConnection{} tb.opampServer.OnMessage( context.Background(), @@ -112,10 +115,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { }, }) expectedConfId := tb.testConfigProvider.ZPagesEndpoint - require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, string(agent2Id)), + require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2IdUUID.String()), "Server should report deployment success to config provider on receiving update from agent.", ) - require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][string(agent2Id)]) + require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2IdUUID.String()]) require.Nil( agent2Conn.LatestMsgFromServer(), "Server should not recommend a RemoteConfig if agent is already running it.", @@ -145,10 +148,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) { }, }) expectedConfId = tb.testConfigProvider.ZPagesEndpoint - require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, string(agent2Id)), + require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2IdUUID.String()), "Server should report deployment failure to config provider on receiving update from agent.", ) - require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][string(agent2Id)]) + require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2IdUUID.String()]) lastAgent1Msg = agent1Conn.LatestMsgFromServer() agent1Conn.ClearMsgsFromServer() @@ -182,7 +185,9 @@ func TestOpAMPServerAgentLimit(t *testing.T) { var agentIds [][]byte for i := 0; i < 51; i++ { agentConn := &MockOpAmpConnection{} - agentId := []byte(valuer.GenerateUUID().String()) + agentIdUUID := valuer.GenerateUUID() + agentId, err := agentIdUUID.MarshalBinary() + require.Nil(err) agentIds = append(agentIds, agentId) tb.opampServer.OnMessage( context.Background(), diff --git a/pkg/query-service/app/opamp/model/agent.go b/pkg/query-service/app/opamp/model/agent.go index 02bc00a8d084..5f95ed33c668 100644 --- a/pkg/query-service/app/opamp/model/agent.go +++ b/pkg/query-service/app/opamp/model/agent.go @@ -265,7 +265,16 @@ func (agent *Agent) processStatusUpdate( configChanged := false if agentDescrChanged { - // Agent description is changed. + // Agent description is changed, but effective config is missing, force request agent to send Config + // + // Note: ideally this flag should be sent along side ErrorResponse; + // but OpAMP agent prioritizes Flags before ErrorResponse hence sending + // requests consequently without respecting the retry cooldown, if in future that changes, + // it should be shifted there; To test uncomment Flags added in opamp_server.go + if newStatus.EffectiveConfig == nil || newStatus.EffectiveConfig.ConfigMap == nil { + response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState) + return + } //Get the default org ID // agent. diff --git a/pkg/query-service/app/opamp/model/agents.go b/pkg/query-service/app/opamp/model/agents.go index 19894b862b46..695068917066 100644 --- a/pkg/query-service/app/opamp/model/agents.go +++ b/pkg/query-service/app/opamp/model/agents.go @@ -75,6 +75,10 @@ func (agents *Agents) FindAgent(agentID string) *Agent { // If the Agent instance does not exist, it is created and added to the list of // Agent instances. func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection, orgID valuer.UUID) (*Agent, bool, error) { + if agentID == "" { + return nil, false, errors.New("cannot create agent without agentID") + } + agents.mux.Lock() defer agents.mux.Unlock() agent, ok := agents.agentsById[agentID] diff --git a/pkg/query-service/app/opamp/opamp_server.go b/pkg/query-service/app/opamp/opamp_server.go index 6738d7394ad8..ceb3b4c4f2e2 100644 --- a/pkg/query-service/app/opamp/opamp_server.go +++ b/pkg/query-service/app/opamp/opamp_server.go @@ -98,7 +98,7 @@ func (srv *Server) onDisconnect(conn types.Connection) { // orgID from the context // note :- there can only be 50 agents in the db for a given orgID, we don't have a check in-memory but we delete from the db after insert. func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { - agentID := string(msg.GetInstanceUid()) + agentID, _ := valuer.NewUUIDFromBytes(msg.GetInstanceUid()) // find the orgID, if nothing is found keep it empty. // the find or create agent will return an error if orgID is empty @@ -109,9 +109,12 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr orgID = orgIDs[0].ID } - agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn, orgID) + // when a new org is created and the agent is not able to register + // the changes in pkg/query-service/app/opamp/model/agent.go 270 - 277 takes care that + // agents sends the effective config when we processStatusUpdate. + agent, created, err := srv.agents.FindOrCreateAgent(agentID.String(), conn, orgID) if err != nil { - zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID), zap.Error(err)) + zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID.String()), zap.Error(err)) // Return error response according to OpAMP protocol return &protobufs.ServerToAgent{ @@ -124,6 +127,8 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr }, }, }, + // Note: refer to opamp/model/agent.go; look for `Flags` keyword + // Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState), } } diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 84816f0269ea..49284aff4758 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -774,12 +774,14 @@ func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection( pipelines []pipelinetypes.GettablePipeline, ) { newAgentConn := &opamp.MockOpAmpConnection{} - agentID := valuer.GenerateUUID().String() + agentIDUUID := valuer.GenerateUUID() + agentID, err := agentIDUUID.MarshalBinary() + require.Nil(tb.t, err) tb.opampServer.OnMessage( context.Background(), newAgentConn, &protobufs.AgentToServer{ - InstanceUid: []byte(agentID), + InstanceUid: agentID, EffectiveConfig: &protobufs.EffectiveConfig{ ConfigMap: newInitialAgentConfigMap(), }, diff --git a/pkg/valuer/uuid.go b/pkg/valuer/uuid.go index a12979e0ce20..04e9ec603759 100644 --- a/pkg/valuer/uuid.go +++ b/pkg/valuer/uuid.go @@ -27,7 +27,7 @@ func NewUUID(value string) (UUID, error) { } func NewUUIDFromBytes(value []byte) (UUID, error) { - val, err := uuid.ParseBytes(value) + val, err := uuid.FromBytes(value) if err != nil { return UUID{}, err } @@ -69,6 +69,10 @@ func (enum UUID) String() string { return enum.val.String() } +func (enum UUID) MarshalBinary() ([]byte, error) { + return enum.val.MarshalBinary() +} + func (enum UUID) MarshalJSON() ([]byte, error) { return json.Marshal(enum.StringValue()) }