fix: panic after connecting to collector (#8344)

This commit is contained in:
Piyush Singariya 2025-06-26 16:34:49 +05:30 committed by GitHub
parent 78d09e2940
commit d174038dce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 43 additions and 14 deletions

View File

@ -47,7 +47,8 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
// Even if there are no recommended changes to the agent's initial config // Even if there are no recommended changes to the agent's initial config
require.False(tb.testConfigProvider.HasRecommendations()) require.False(tb.testConfigProvider.HasRecommendations())
agent1Conn := &MockOpAmpConnection{} agent1Conn := &MockOpAmpConnection{}
agent1Id := []byte(valuer.GenerateUUID().String()) agent1Id, err := valuer.GenerateUUID().MarshalBinary()
require.Nil(err)
// get orgId from the db // get orgId from the db
tb.opampServer.OnMessage( tb.opampServer.OnMessage(
context.Background(), context.Background(),
@ -71,7 +72,9 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
tb.testConfigProvider.ZPagesEndpoint = "localhost:55555" tb.testConfigProvider.ZPagesEndpoint = "localhost:55555"
require.True(tb.testConfigProvider.HasRecommendations()) require.True(tb.testConfigProvider.HasRecommendations())
agent2Id := []byte((valuer.GenerateUUID().String())) agent2IdUUID := valuer.GenerateUUID()
agent2Id, err := agent2IdUUID.MarshalBinary()
require.Nil(err)
agent2Conn := &MockOpAmpConnection{} agent2Conn := &MockOpAmpConnection{}
tb.opampServer.OnMessage( tb.opampServer.OnMessage(
context.Background(), context.Background(),
@ -112,10 +115,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
}, },
}) })
expectedConfId := tb.testConfigProvider.ZPagesEndpoint expectedConfId := tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, string(agent2Id)), require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2IdUUID.String()),
"Server should report deployment success to config provider on receiving update from agent.", "Server should report deployment success to config provider on receiving update from agent.",
) )
require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][string(agent2Id)]) require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2IdUUID.String()])
require.Nil( require.Nil(
agent2Conn.LatestMsgFromServer(), agent2Conn.LatestMsgFromServer(),
"Server should not recommend a RemoteConfig if agent is already running it.", "Server should not recommend a RemoteConfig if agent is already running it.",
@ -145,10 +148,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
}, },
}) })
expectedConfId = tb.testConfigProvider.ZPagesEndpoint expectedConfId = tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, string(agent2Id)), require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2IdUUID.String()),
"Server should report deployment failure to config provider on receiving update from agent.", "Server should report deployment failure to config provider on receiving update from agent.",
) )
require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][string(agent2Id)]) require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2IdUUID.String()])
lastAgent1Msg = agent1Conn.LatestMsgFromServer() lastAgent1Msg = agent1Conn.LatestMsgFromServer()
agent1Conn.ClearMsgsFromServer() agent1Conn.ClearMsgsFromServer()
@ -182,7 +185,9 @@ func TestOpAMPServerAgentLimit(t *testing.T) {
var agentIds [][]byte var agentIds [][]byte
for i := 0; i < 51; i++ { for i := 0; i < 51; i++ {
agentConn := &MockOpAmpConnection{} agentConn := &MockOpAmpConnection{}
agentId := []byte(valuer.GenerateUUID().String()) agentIdUUID := valuer.GenerateUUID()
agentId, err := agentIdUUID.MarshalBinary()
require.Nil(err)
agentIds = append(agentIds, agentId) agentIds = append(agentIds, agentId)
tb.opampServer.OnMessage( tb.opampServer.OnMessage(
context.Background(), context.Background(),

View File

@ -265,7 +265,16 @@ func (agent *Agent) processStatusUpdate(
configChanged := false configChanged := false
if agentDescrChanged { if agentDescrChanged {
// Agent description is changed. // Agent description is changed, but effective config is missing, force request agent to send Config
//
// Note: ideally this flag should be sent along side ErrorResponse;
// but OpAMP agent prioritizes Flags before ErrorResponse hence sending
// requests consequently without respecting the retry cooldown, if in future that changes,
// it should be shifted there; To test uncomment Flags added in opamp_server.go
if newStatus.EffectiveConfig == nil || newStatus.EffectiveConfig.ConfigMap == nil {
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState)
return
}
//Get the default org ID //Get the default org ID
// agent. // agent.

View File

@ -75,6 +75,10 @@ func (agents *Agents) FindAgent(agentID string) *Agent {
// If the Agent instance does not exist, it is created and added to the list of // If the Agent instance does not exist, it is created and added to the list of
// Agent instances. // Agent instances.
func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection, orgID valuer.UUID) (*Agent, bool, error) { func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection, orgID valuer.UUID) (*Agent, bool, error) {
if agentID == "" {
return nil, false, errors.New("cannot create agent without agentID")
}
agents.mux.Lock() agents.mux.Lock()
defer agents.mux.Unlock() defer agents.mux.Unlock()
agent, ok := agents.agentsById[agentID] agent, ok := agents.agentsById[agentID]

View File

@ -98,7 +98,7 @@ func (srv *Server) onDisconnect(conn types.Connection) {
// orgID from the context // orgID from the context
// note :- there can only be 50 agents in the db for a given orgID, we don't have a check in-memory but we delete from the db after insert. // note :- there can only be 50 agents in the db for a given orgID, we don't have a check in-memory but we delete from the db after insert.
func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent { func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
agentID := string(msg.GetInstanceUid()) agentID, _ := valuer.NewUUIDFromBytes(msg.GetInstanceUid())
// find the orgID, if nothing is found keep it empty. // find the orgID, if nothing is found keep it empty.
// the find or create agent will return an error if orgID is empty // the find or create agent will return an error if orgID is empty
@ -109,9 +109,12 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr
orgID = orgIDs[0].ID orgID = orgIDs[0].ID
} }
agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn, orgID) // when a new org is created and the agent is not able to register
// the changes in pkg/query-service/app/opamp/model/agent.go 270 - 277 takes care that
// agents sends the effective config when we processStatusUpdate.
agent, created, err := srv.agents.FindOrCreateAgent(agentID.String(), conn, orgID)
if err != nil { if err != nil {
zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID), zap.Error(err)) zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID.String()), zap.Error(err))
// Return error response according to OpAMP protocol // Return error response according to OpAMP protocol
return &protobufs.ServerToAgent{ return &protobufs.ServerToAgent{
@ -124,6 +127,8 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr
}, },
}, },
}, },
// Note: refer to opamp/model/agent.go; look for `Flags` keyword
// Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
} }
} }

View File

@ -774,12 +774,14 @@ func (tb *LogPipelinesTestBed) assertNewAgentGetsPipelinesOnConnection(
pipelines []pipelinetypes.GettablePipeline, pipelines []pipelinetypes.GettablePipeline,
) { ) {
newAgentConn := &opamp.MockOpAmpConnection{} newAgentConn := &opamp.MockOpAmpConnection{}
agentID := valuer.GenerateUUID().String() agentIDUUID := valuer.GenerateUUID()
agentID, err := agentIDUUID.MarshalBinary()
require.Nil(tb.t, err)
tb.opampServer.OnMessage( tb.opampServer.OnMessage(
context.Background(), context.Background(),
newAgentConn, newAgentConn,
&protobufs.AgentToServer{ &protobufs.AgentToServer{
InstanceUid: []byte(agentID), InstanceUid: agentID,
EffectiveConfig: &protobufs.EffectiveConfig{ EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(), ConfigMap: newInitialAgentConfigMap(),
}, },

View File

@ -27,7 +27,7 @@ func NewUUID(value string) (UUID, error) {
} }
func NewUUIDFromBytes(value []byte) (UUID, error) { func NewUUIDFromBytes(value []byte) (UUID, error) {
val, err := uuid.ParseBytes(value) val, err := uuid.FromBytes(value)
if err != nil { if err != nil {
return UUID{}, err return UUID{}, err
} }
@ -69,6 +69,10 @@ func (enum UUID) String() string {
return enum.val.String() return enum.val.String()
} }
func (enum UUID) MarshalBinary() ([]byte, error) {
return enum.val.MarshalBinary()
}
func (enum UUID) MarshalJSON() ([]byte, error) { func (enum UUID) MarshalJSON() ([]byte, error) {
return json.Marshal(enum.StringValue()) return json.Marshal(enum.StringValue())
} }