2023-03-15 15:09:15 +05:30
package agentConf
import (
"context"
"database/sql"
"fmt"
2025-06-16 20:07:16 +05:30
"strings"
"time"
2023-03-15 15:09:15 +05:30
2025-03-20 21:01:41 +05:30
"github.com/SigNoz/signoz/pkg/query-service/model"
2025-06-16 20:07:16 +05:30
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
2023-09-10 16:48:29 +05:30
"github.com/pkg/errors"
2023-03-15 15:09:15 +05:30
"go.uber.org/zap"
2023-10-31 10:36:25 +05:30
"golang.org/x/exp/slices"
2023-03-15 15:09:15 +05:30
)
// Repo handles DDL and DML ops on ingestion rules
type Repo struct {
2025-06-16 20:07:16 +05:30
store sqlstore . SQLStore
2023-03-15 15:09:15 +05:30
}
2023-09-10 16:48:29 +05:30
func ( r * Repo ) GetConfigHistory (
2025-06-16 20:07:16 +05:30
ctx context . Context , orgId valuer . UUID , typ opamptypes . ElementType , limit int ,
) ( [ ] opamptypes . AgentConfigVersion , * model . ApiError ) {
var c [ ] opamptypes . AgentConfigVersion
err := r . store . BunDB ( ) . NewSelect ( ) .
Model ( & c ) .
ColumnExpr ( "id, version, element_type, deploy_status, deploy_result, created_at" ) .
ColumnExpr ( "COALESCE(created_by, '') as created_by" ) .
ColumnExpr ( ` COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name ` ) .
ColumnExpr ( "COALESCE(hash, '') as hash, COALESCE(config, '{}') as config" ) .
Where ( "acv.element_type = ?" , typ ) .
Where ( "acv.org_id = ?" , orgId ) .
OrderExpr ( "acv.created_at DESC, acv.version DESC" ) .
Limit ( limit ) .
Scan ( ctx )
2023-03-15 15:09:15 +05:30
2023-09-10 16:48:29 +05:30
if err != nil {
return nil , model . InternalError ( err )
}
2025-06-16 20:07:16 +05:30
incompleteStatuses := [ ] opamptypes . DeployStatus { opamptypes . DeployInitiated , opamptypes . Deploying }
2023-10-31 10:36:25 +05:30
for idx := 1 ; idx < len ( c ) ; idx ++ {
if slices . Contains ( incompleteStatuses , c [ idx ] . DeployStatus ) {
2025-06-16 20:07:16 +05:30
c [ idx ] . DeployStatus = opamptypes . DeployStatusUnknown
2023-10-31 10:36:25 +05:30
}
}
2023-09-10 16:48:29 +05:30
return c , nil
2023-03-15 15:09:15 +05:30
}
2023-09-10 16:48:29 +05:30
func ( r * Repo ) GetConfigVersion (
2025-06-16 20:07:16 +05:30
ctx context . Context , orgId valuer . UUID , typ opamptypes . ElementType , v int ,
) ( * opamptypes . AgentConfigVersion , * model . ApiError ) {
var c opamptypes . AgentConfigVersion
err := r . store . BunDB ( ) . NewSelect ( ) .
Model ( & c ) .
ColumnExpr ( "id, version, element_type, deploy_status, deploy_result, created_at" ) .
ColumnExpr ( "COALESCE(created_by, '') as created_by" ) .
ColumnExpr ( ` COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name ` ) .
ColumnExpr ( "COALESCE(hash, '') as hash, COALESCE(config, '{}') as config" ) .
Where ( "acv.element_type = ?" , typ ) .
Where ( "acv.version = ?" , v ) .
Where ( "acv.org_id = ?" , orgId ) .
Scan ( ctx )
2023-03-15 15:09:15 +05:30
2023-09-10 16:48:29 +05:30
if err != nil {
2025-06-16 20:07:16 +05:30
if errors . Is ( err , sql . ErrNoRows ) {
return nil , model . NotFoundError ( err )
}
2023-09-10 16:48:29 +05:30
return nil , model . InternalError ( err )
}
2023-03-15 15:09:15 +05:30
2023-09-10 16:48:29 +05:30
return & c , nil
2023-03-15 15:09:15 +05:30
}
2023-09-10 16:48:29 +05:30
func ( r * Repo ) GetLatestVersion (
2025-06-16 20:07:16 +05:30
ctx context . Context , orgId valuer . UUID , typ opamptypes . ElementType ,
) ( * opamptypes . AgentConfigVersion , * model . ApiError ) {
var c opamptypes . AgentConfigVersion
err := r . store . BunDB ( ) . NewSelect ( ) .
Model ( & c ) .
ColumnExpr ( "id, version, element_type, deploy_status, deploy_result, created_at" ) .
ColumnExpr ( "COALESCE(created_by, '') as created_by" ) .
ColumnExpr ( ` COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name ` ) .
Where ( "acv.element_type = ?" , typ ) .
Where ( "acv.org_id = ?" , orgId ) .
Where ( "version = (SELECT MAX(version) FROM agent_config_version WHERE acv.element_type = ?)" , typ ) .
Scan ( ctx )
2023-09-10 16:48:29 +05:30
2023-03-15 15:09:15 +05:30
if err != nil {
2025-06-16 20:07:16 +05:30
if errors . Is ( err , sql . ErrNoRows ) {
return nil , model . NotFoundError ( err )
}
2023-09-10 16:48:29 +05:30
return nil , model . InternalError ( err )
2023-03-15 15:09:15 +05:30
}
2023-09-10 16:48:29 +05:30
return & c , nil
2023-03-15 15:09:15 +05:30
}
2023-09-10 16:48:29 +05:30
func ( r * Repo ) insertConfig (
2025-06-16 20:07:16 +05:30
ctx context . Context , orgId valuer . UUID , userId valuer . UUID , c * opamptypes . AgentConfigVersion , elements [ ] string ,
2023-09-10 16:48:29 +05:30
) ( fnerr * model . ApiError ) {
2023-03-15 15:09:15 +05:30
2025-06-16 20:07:16 +05:30
if c . ElementType . StringValue ( ) == "" {
2023-09-10 16:48:29 +05:30
return model . BadRequest ( fmt . Errorf (
"element type is required for creating agent config version" ,
) )
2023-03-15 15:09:15 +05:30
}
2023-07-31 21:34:42 +05:30
// allowing empty elements for logs - use case is deleting all pipelines
2025-06-16 20:07:16 +05:30
if len ( elements ) == 0 && c . ElementType != opamptypes . ElementTypeLogPipelines {
zap . L ( ) . Error ( "insert config called with no elements " , zap . String ( "ElementType" , c . ElementType . StringValue ( ) ) )
2023-09-10 16:48:29 +05:30
return model . BadRequest ( fmt . Errorf ( "config must have atleast one element" ) )
2023-03-15 15:09:15 +05:30
}
if c . Version != 0 {
// the version can not be set by the user, we want to auto-assign the versions
// in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned
2025-06-16 20:07:16 +05:30
zap . L ( ) . Error ( "invalid version assignment while inserting agent config" , zap . Int ( "version" , c . Version ) , zap . String ( "ElementType" , c . ElementType . StringValue ( ) ) )
2023-09-10 16:48:29 +05:30
return model . BadRequest ( fmt . Errorf (
"user defined versions are not supported in the agent config" ,
) )
2023-03-15 15:09:15 +05:30
}
2025-06-16 20:07:16 +05:30
configVersion , err := r . GetLatestVersion ( ctx , orgId , c . ElementType )
2023-09-10 16:48:29 +05:30
if err != nil && err . Type ( ) != model . ErrorNotFound {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to fetch latest config version" , zap . Error ( err ) )
2023-09-10 16:48:29 +05:30
return model . InternalError ( fmt . Errorf ( "failed to fetch latest config version" ) )
2023-03-15 15:09:15 +05:30
}
2023-07-31 21:34:42 +05:30
if configVersion != nil {
2025-06-16 20:07:16 +05:30
c . IncrementVersion ( configVersion . Version )
2023-07-31 21:34:42 +05:30
} else {
// first version
c . Version = 1
}
2023-03-15 15:09:15 +05:30
defer func ( ) {
if fnerr != nil {
// remove all the damage (invalid rows from db)
2025-06-16 20:07:16 +05:30
r . store . BunDB ( ) . NewDelete ( ) . Model ( new ( opamptypes . AgentConfigVersion ) ) . Where ( "id = ?" , c . ID ) . Where ( "org_id = ?" , orgId ) . Exec ( ctx )
r . store . BunDB ( ) . NewDelete ( ) . Model ( new ( opamptypes . AgentConfigElement ) ) . Where ( "version_id = ?" , c . ID ) . Exec ( ctx )
2023-03-15 15:09:15 +05:30
}
} ( )
2025-06-16 20:07:16 +05:30
_ , dbErr := r . store .
BunDB ( ) .
NewInsert ( ) .
Model ( c ) .
Exec ( ctx )
2023-03-15 15:09:15 +05:30
2023-09-10 16:48:29 +05:30
if dbErr != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "error in inserting config version: " , zap . Error ( dbErr ) )
2023-09-10 16:48:29 +05:30
return model . InternalError ( errors . Wrap ( dbErr , "failed to insert ingestion rule" ) )
2023-03-15 15:09:15 +05:30
}
for _ , e := range elements {
2025-06-16 20:07:16 +05:30
agentConfigElement := & opamptypes . AgentConfigElement {
Identifiable : types . Identifiable { ID : valuer . GenerateUUID ( ) } ,
TimeAuditable : types . TimeAuditable {
CreatedAt : time . Now ( ) ,
UpdatedAt : time . Now ( ) ,
} ,
VersionID : c . ID ,
ElementType : c . ElementType . StringValue ( ) ,
ElementID : e ,
}
_ , dbErr = r . store . BunDB ( ) . NewInsert ( ) . Model ( agentConfigElement ) . Exec ( ctx )
2023-09-10 16:48:29 +05:30
if dbErr != nil {
return model . InternalError ( dbErr )
2023-03-15 15:09:15 +05:30
}
}
return nil
}
func ( r * Repo ) updateDeployStatus ( ctx context . Context ,
2025-06-16 20:07:16 +05:30
orgId valuer . UUID ,
elementType opamptypes . ElementType ,
2023-03-15 15:09:15 +05:30
version int ,
status string ,
result string ,
lastHash string ,
2023-09-10 16:48:29 +05:30
lastconf string ) * model . ApiError {
2023-03-15 15:09:15 +05:30
2025-06-16 20:07:16 +05:30
// check if it has org orgID prefix
// ensuring it here and also ensuring in coordinator.go
if ! strings . HasPrefix ( lastHash , orgId . String ( ) ) {
lastHash = orgId . String ( ) + lastHash
}
2023-03-15 15:09:15 +05:30
2025-06-16 20:07:16 +05:30
_ , err := r . store . BunDB ( ) . NewUpdate ( ) .
Model ( new ( opamptypes . AgentConfigVersion ) ) .
Set ( "deploy_status = ?" , status ) .
Set ( "deploy_result = ?" , result ) .
Set ( "hash = COALESCE(?, hash)" , lastHash ) .
Set ( "config = ?" , lastconf ) .
Where ( "version = ?" , version ) .
Where ( "element_type = ?" , elementType ) .
Where ( "org_id = ?" , orgId ) .
Exec ( ctx )
2023-03-15 15:09:15 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to update deploy status" , zap . Error ( err ) )
2025-06-16 20:07:16 +05:30
return model . BadRequest ( fmt . Errorf ( "failed to update deploy status" ) )
2023-03-15 15:09:15 +05:30
}
return nil
}
2023-09-10 16:48:29 +05:30
func ( r * Repo ) updateDeployStatusByHash (
2025-06-16 20:07:16 +05:30
ctx context . Context , orgId valuer . UUID , confighash string , status string , result string ,
2023-09-10 16:48:29 +05:30
) * model . ApiError {
2023-03-15 15:09:15 +05:30
2025-06-16 20:07:16 +05:30
_ , err := r . store . BunDB ( ) . NewUpdate ( ) .
Model ( new ( opamptypes . AgentConfigVersion ) ) .
Set ( "deploy_status = ?" , status ) .
Set ( "deploy_result = ?" , result ) .
Where ( "hash = ?" , confighash ) .
Where ( "org_id = ?" , orgId ) .
Exec ( ctx )
2023-03-15 15:09:15 +05:30
if err != nil {
2024-03-27 00:07:29 +05:30
zap . L ( ) . Error ( "failed to update deploy status" , zap . Error ( err ) )
2023-09-10 16:48:29 +05:30
return model . InternalError ( errors . Wrap ( err , "failed to update deploy status" ) )
2023-03-15 15:09:15 +05:30
}
return nil
}