diff --git a/pkg/query-service/agentConf/db.go b/pkg/query-service/agentConf/db.go index 3deb5927781d..afb59123f4d1 100644 --- a/pkg/query-service/agentConf/db.go +++ b/pkg/query-service/agentConf/db.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/jmoiron/sqlx" + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/agentConf/sqlite" "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" @@ -31,7 +32,9 @@ func (r *Repo) initDB(engine string) error { } } -func (r *Repo) GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit int) ([]ConfigVersion, error) { +func (r *Repo) GetConfigHistory( + ctx context.Context, typ ElementTypeDef, limit int, +) ([]ConfigVersion, *model.ApiError) { var c []ConfigVersion err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT version, @@ -54,10 +57,16 @@ func (r *Repo) GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit i limit %v`, limit), typ) - return c, err + if err != nil { + return nil, model.InternalError(err) + } + + return c, nil } -func (r *Repo) GetConfigVersion(ctx context.Context, typ ElementTypeDef, v int) (*ConfigVersion, error) { +func (r *Repo) GetConfigVersion( + ctx context.Context, typ ElementTypeDef, v int, +) (*ConfigVersion, *model.ApiError) { var c ConfigVersion err := r.db.GetContext(ctx, &c, `SELECT id, @@ -78,11 +87,19 @@ func (r *Repo) GetConfigVersion(ctx context.Context, typ ElementTypeDef, v int) WHERE element_type = $1 AND version = $2`, typ, v) - return &c, err + if err == sql.ErrNoRows { + return nil, model.NotFoundError(err) + } + if err != nil { + return nil, model.InternalError(err) + } + return &c, nil } -func (r *Repo) GetLatestVersion(ctx context.Context, typ ElementTypeDef) (*ConfigVersion, error) { +func (r *Repo) GetLatestVersion( + ctx context.Context, typ ElementTypeDef, +) (*ConfigVersion, *model.ApiError) { var c ConfigVersion err := r.db.GetContext(ctx, &c, `SELECT id, @@ -103,23 +120,31 @@ func (r *Repo) GetLatestVersion(ctx context.Context, typ ElementTypeDef) (*Confi SELECT MAX(version) FROM agent_config_versions WHERE element_type=$2)`, typ, typ) - if err != nil { - // intially the table will be empty - return nil, err + + if err == sql.ErrNoRows { + return nil, model.NotFoundError(err) } - return &c, err + if err != nil { + return nil, model.InternalError(err) + } + + return &c, nil } -func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion, elements []string) (fnerr error) { +func (r *Repo) insertConfig( + ctx context.Context, userId string, c *ConfigVersion, elements []string, +) (fnerr *model.ApiError) { if string(c.ElementType) == "" { - return fmt.Errorf("element type is required for creating agent config version") + return model.BadRequest(fmt.Errorf( + "element type is required for creating agent config version", + )) } // allowing empty elements for logs - use case is deleting all pipelines if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines { zap.S().Error("insert config called with no elements ", c.ElementType) - return fmt.Errorf("config must have atleast one element") + return model.BadRequest(fmt.Errorf("config must have atleast one element")) } if c.Version != 0 { @@ -127,15 +152,15 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion // in a monotonically increasing order starting with 1. hence, we reject insert // requests with version anything other than 0. here, 0 indicates un-assigned zap.S().Error("invalid version assignment while inserting agent config", c.Version, c.ElementType) - return fmt.Errorf("user defined versions are not supported in the agent config") + return model.BadRequest(fmt.Errorf( + "user defined versions are not supported in the agent config", + )) } configVersion, err := r.GetLatestVersion(ctx, c.ElementType) - if err != nil { - if err != sql.ErrNoRows { - zap.S().Error("failed to fetch latest config version", err) - return fmt.Errorf("failed to fetch latest config version") - } + if err != nil && err.Type() != model.ErrorNotFound { + zap.S().Error("failed to fetch latest config version", err) + return model.InternalError(fmt.Errorf("failed to fetch latest config version")) } if configVersion != nil { @@ -166,7 +191,7 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion deploy_result) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)` - _, err = r.db.ExecContext(ctx, + _, dbErr := r.db.ExecContext(ctx, configQuery, c.ID, c.Version, @@ -178,9 +203,9 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion c.DeployStatus, c.DeployResult) - if err != nil { - zap.S().Error("error in inserting config version: ", zap.Error(err)) - return fmt.Errorf("failed to insert ingestion rule") + if dbErr != nil { + zap.S().Error("error in inserting config version: ", zap.Error(dbErr)) + return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule")) } elementsQuery := `INSERT INTO agent_config_elements( @@ -191,15 +216,16 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion VALUES ($1, $2, $3, $4)` for _, e := range elements { - - _, err = r.db.ExecContext(ctx, + _, dbErr = r.db.ExecContext( + ctx, elementsQuery, uuid.NewString(), c.ID, c.ElementType, - e) - if err != nil { - return err + e, + ) + if dbErr != nil { + return model.InternalError(dbErr) } } @@ -212,7 +238,7 @@ func (r *Repo) updateDeployStatus(ctx context.Context, status string, result string, lastHash string, - lastconf string) error { + lastconf string) *model.ApiError { updateQuery := `UPDATE agent_config_versions set deploy_status = $1, @@ -225,13 +251,15 @@ func (r *Repo) updateDeployStatus(ctx context.Context, _, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType)) if err != nil { zap.S().Error("failed to update deploy status", err) - return model.BadRequestStr("failed to update deploy status") + return model.BadRequest(fmt.Errorf("failed to update deploy status")) } return nil } -func (r *Repo) updateDeployStatusByHash(ctx context.Context, confighash string, status string, result string) error { +func (r *Repo) updateDeployStatusByHash( + ctx context.Context, confighash string, status string, result string, +) *model.ApiError { updateQuery := `UPDATE agent_config_versions set deploy_status = $1, @@ -241,7 +269,7 @@ func (r *Repo) updateDeployStatusByHash(ctx context.Context, confighash string, _, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash) if err != nil { zap.S().Error("failed to update deploy status", err) - return model.BadRequestStr("failed to update deploy status") + return model.InternalError(errors.Wrap(err, "failed to update deploy status")) } return nil diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index e0b32ffc0c0d..e2a5c2239caf 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -9,6 +9,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/opamp" filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor" tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler" + "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" yaml "gopkg.in/yaml.v3" ) @@ -43,24 +44,32 @@ func Ready() bool { return m.Ready() } -func GetLatestVersion(ctx context.Context, elementType ElementTypeDef) (*ConfigVersion, error) { +func GetLatestVersion( + ctx context.Context, elementType ElementTypeDef, +) (*ConfigVersion, *model.ApiError) { return m.GetLatestVersion(ctx, elementType) } -func GetConfigVersion(ctx context.Context, elementType ElementTypeDef, version int) (*ConfigVersion, error) { +func GetConfigVersion( + ctx context.Context, elementType ElementTypeDef, version int, +) (*ConfigVersion, *model.ApiError) { return m.GetConfigVersion(ctx, elementType, version) } -func GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit int) ([]ConfigVersion, error) { +func GetConfigHistory( + ctx context.Context, typ ElementTypeDef, limit int, +) ([]ConfigVersion, *model.ApiError) { return m.GetConfigHistory(ctx, typ, limit) } // StartNewVersion launches a new config version for given set of elements -func StartNewVersion(ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string) (*ConfigVersion, error) { +func StartNewVersion( + ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string, +) (*ConfigVersion, *model.ApiError) { if !m.Ready() { // agent is already being updated, ask caller to wait and re-try after sometime - return nil, fmt.Errorf("agent updater is busy") + return nil, model.UnavailableError(fmt.Errorf("agent updater is busy")) } // create a new version @@ -75,24 +84,24 @@ func StartNewVersion(ctx context.Context, userId string, eleType ElementTypeDef, return cfg, nil } -func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error { +func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError { configVersion, err := GetConfigVersion(ctx, typ, version) if err != nil { zap.S().Debug("failed to fetch config version during redeploy", err) - return fmt.Errorf("failed to fetch details of the config version") + return model.WrapApiError(err, "failed to fetch details of the config version") } if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") { zap.S().Debug("config version has no conf yaml", configVersion) - return fmt.Errorf("the config version can not be redeployed") + return model.BadRequest(fmt.Errorf("the config version can not be redeployed")) } switch typ { case ElementTypeSamplingRules: var config *tsp.Config if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil { zap.S().Error("failed to read last conf correctly", err) - return fmt.Errorf("failed to read the stored config correctly") + return model.BadRequest(fmt.Errorf("failed to read the stored config correctly")) } // merge current config with new filter params @@ -104,7 +113,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error { configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate) if err != nil { zap.S().Error("failed to call agent config update for trace processor:", err) - return fmt.Errorf("failed to deploy the config") + return model.InternalError(fmt.Errorf("failed to deploy the config")) } m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf) @@ -112,7 +121,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error { var filterConfig *filterprocessor.Config if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil { zap.S().Error("failed to read last conf correctly", err) - return fmt.Errorf("failed to read the stored config correctly") + return model.InternalError(fmt.Errorf("failed to read the stored config correctly")) } processorConf := map[string]interface{}{ "filter": filterConfig, @@ -151,9 +160,9 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce return err } - processorConfYaml, err := yaml.Marshal(config) - if err != nil { - zap.S().Warnf("unexpected error while transforming processor config to yaml", err) + processorConfYaml, yamlErr := yaml.Marshal(config) + if yamlErr != nil { + zap.S().Warnf("unexpected error while transforming processor config to yaml", yamlErr) } m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) @@ -202,9 +211,9 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi return err } - processorConfYaml, err := yaml.Marshal(config) - if err != nil { - zap.S().Warnf("unexpected error while transforming processor config to yaml", err) + processorConfYaml, yamlErr := yaml.Marshal(config) + if yamlErr != nil { + zap.S().Warnf("unexpected error while transforming processor config to yaml", yamlErr) } m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) @@ -212,9 +221,15 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi } // UpsertLogParsingProcessors updates the agent with log parsing processors -func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { +func UpsertLogParsingProcessor( + ctx context.Context, + version int, + rawPipelineData []byte, + config map[string]interface{}, + names []string, +) *model.ApiError { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { - return fmt.Errorf("agent updater is busy") + return model.UnavailableError(fmt.Errorf("agent updater is busy")) } defer atomic.StoreUint32(&m.lock, 0) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 33371987e484..9da0ffbcba5a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -3,7 +3,6 @@ package app import ( "bytes" "context" - "database/sql" "encoding/json" "errors" "fmt" @@ -2423,7 +2422,7 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re version, err := parseAgentConfigVersion(r) if err != nil { - RespondError(w, err, nil) + RespondError(w, model.WrapApiError(err, "Failed to parse agent config version"), nil) return } @@ -2444,12 +2443,14 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re } // listLogsPipelines lists logs piplines for latest version -func (ah *APIHandler) listLogsPipelines(ctx context.Context) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { +func (ah *APIHandler) listLogsPipelines(ctx context.Context) ( + *logparsingpipeline.PipelinesResponse, *model.ApiError, +) { // get lateset agent config lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines) if err != nil { - if err != sql.ErrNoRows { - return nil, model.InternalError(fmt.Errorf("failed to get latest agent config version with error %w", err)) + if err.Type() != model.ErrorNotFound { + return nil, model.WrapApiError(err, "failed to get latest agent config version") } else { return nil, nil } @@ -2457,31 +2458,33 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (*logparsingpipelin payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version) if err != nil { - return nil, model.InternalError(fmt.Errorf("failed to get pipelines with error %w", err)) + return nil, model.WrapApiError(err, "failed to get pipelines") } // todo(Nitya): make a new API for history pagination limit := 10 history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) if err != nil { - return nil, model.InternalError(fmt.Errorf("failed to get config history with error %w", err)) + return nil, model.WrapApiError(err, "failed to get config history") } payload.History = history return payload, nil } // listLogsPipelinesByVersion lists pipelines along with config version history -func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { +func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) ( + *logparsingpipeline.PipelinesResponse, *model.ApiError, +) { payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) if err != nil { - return nil, model.InternalError(err) + return nil, model.WrapApiError(err, "failed to get pipelines by version") } // todo(Nitya): make a new API for history pagination limit := 10 history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) if err != nil { - return nil, model.InternalError(fmt.Errorf("failed to retrieve agent config history with error %w", err)) + return nil, model.WrapApiError(err, "failed to retrieve agent config history") } payload.History = history @@ -2499,7 +2502,10 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) ctx := auth.AttachJwtToContext(context.Background(), r) - createPipeline := func(ctx context.Context, postable []logparsingpipeline.PostablePipeline) (*logparsingpipeline.PipelinesResponse, error) { + createPipeline := func( + ctx context.Context, + postable []logparsingpipeline.PostablePipeline, + ) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { if len(postable) == 0 { zap.S().Warnf("found no pipelines in the http request, this will delete all the pipelines") } @@ -2515,7 +2521,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) res, err := createPipeline(ctx, req.Pipelines) if err != nil { - RespondError(w, model.InternalError(err), nil) + RespondError(w, err, nil) return } diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 3a1fb9e16022..2f681051291e 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/model" @@ -32,11 +33,14 @@ type PipelinesResponse struct { } // ApplyPipelines stores new or changed pipelines and initiates a new config update -func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, postable []PostablePipeline) (*PipelinesResponse, error) { +func (ic *LogParsingPipelineController) ApplyPipelines( + ctx context.Context, + postable []PostablePipeline, +) (*PipelinesResponse, *model.ApiError) { // get user id from context - userId, err := auth.ExtractUserIdFromContext(ctx) - if err != nil { - return nil, model.InternalError(fmt.Errorf("failed to get userId from context %v", err)) + userId, authErr := auth.ExtractUserIdFromContext(ctx) + if authErr != nil { + return nil, model.UnauthorizedError(errors.Wrap(authErr, "failed to get userId from context")) } var pipelines []model.Pipeline @@ -51,17 +55,17 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post if r.Id == "" { // looks like a new or changed pipeline, store it first inserted, err := ic.insertPipeline(ctx, &r) - if err != nil || inserted == nil { + if err != nil { zap.S().Errorf("failed to insert edited pipeline %s", err.Error()) - return nil, fmt.Errorf("failed to insert edited pipeline") + return nil, model.WrapApiError(err, "failed to insert edited pipeline") } else { pipelines = append(pipelines, *inserted) } } else { selected, err := ic.GetPipeline(ctx, r.Id) - if err != nil || selected == nil { + if err != nil { zap.S().Errorf("failed to find edited pipeline %s", err.Error()) - return nil, fmt.Errorf("failed to find pipeline, invalid request") + return nil, model.WrapApiError(err, "failed to find edited pipeline") } pipelines = append(pipelines, *selected) } @@ -69,14 +73,18 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post } // prepare filter config (processor) from the pipelines - filterConfig, names, err := PreparePipelineProcessor(pipelines) - if err != nil { - zap.S().Errorf("failed to generate processor config from pipelines for deployment %s", err.Error()) - return nil, err + filterConfig, names, translationErr := PreparePipelineProcessor(pipelines) + if translationErr != nil { + zap.S().Errorf("failed to generate processor config from pipelines for deployment %w", translationErr) + return nil, model.BadRequest(errors.Wrap( + translationErr, "failed to generate processor config from pipelines for deployment", + )) } if !agentConf.Ready() { - return nil, fmt.Errorf("agent updater unavailable at the moment. Please try in sometime") + return nil, model.UnavailableError(fmt.Errorf( + "agent updater unavailable at the moment. Please try in sometime", + )) } // prepare config elements @@ -107,22 +115,24 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post } if err != nil { - return response, fmt.Errorf("failed to apply pipelines") + return response, model.WrapApiError(err, "failed to apply pipelines") } return response, nil } // GetPipelinesByVersion responds with version info and associated pipelines -func (ic *LogParsingPipelineController) GetPipelinesByVersion(ctx context.Context, version int) (*PipelinesResponse, error) { +func (ic *LogParsingPipelineController) GetPipelinesByVersion( + ctx context.Context, version int, +) (*PipelinesResponse, *model.ApiError) { pipelines, errors := ic.getPipelinesByVersion(ctx, version) if errors != nil { zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors) - return nil, fmt.Errorf("failed to get pipelines for given version") + return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version")) } configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) - if err != nil || configVersion == nil { + if err != nil { zap.S().Errorf("failed to get config for version %d, %s", version, err.Error()) - return nil, fmt.Errorf("failed to get config for given version") + return nil, model.WrapApiError(err, "failed to get config for given version") } return &PipelinesResponse{ diff --git a/pkg/query-service/app/logparsingpipeline/db.go b/pkg/query-service/app/logparsingpipeline/db.go index ac6bc5ba3d01..0d897c272c5a 100644 --- a/pkg/query-service/app/logparsingpipeline/db.go +++ b/pkg/query-service/app/logparsingpipeline/db.go @@ -39,24 +39,30 @@ func (r *Repo) InitDB(engine string) error { } // insertPipeline stores a given postable pipeline to database -func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) (*model.Pipeline, error) { +func (r *Repo) insertPipeline( + ctx context.Context, postable *PostablePipeline, +) (*model.Pipeline, *model.ApiError) { if err := postable.IsValid(); err != nil { - return nil, errors.Wrap(err, "failed to validate postable pipeline") + return nil, model.BadRequest(errors.Wrap(err, + "pipeline is not valid", + )) } rawConfig, err := json.Marshal(postable.Config) if err != nil { - return nil, errors.Wrap(err, "failed to unmarshal postable pipeline config") + return nil, model.BadRequest(errors.Wrap(err, + "failed to unmarshal postable pipeline config", + )) } jwt, err := auth.ExtractJwtFromContext(ctx) if err != nil { - return nil, err + return nil, model.UnauthorizedError(err) } claims, err := auth.ParseJWT(jwt) if err != nil { - return nil, err + return nil, model.UnauthorizedError(err) } insertRow := &model.Pipeline{ @@ -94,7 +100,7 @@ func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) ( if err != nil { zap.S().Errorf("error in inserting pipeline data: ", zap.Error(err)) - return insertRow, errors.Wrap(err, "failed to insert pipeline") + return nil, model.InternalError(errors.Wrap(err, "failed to insert pipeline")) } return insertRow, nil @@ -143,7 +149,9 @@ func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model. } // GetPipelines returns pipeline and errors (if any) -func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, error) { +func (r *Repo) GetPipeline( + ctx context.Context, id string, +) (*model.Pipeline, *model.ApiError) { pipelines := []model.Pipeline{} pipelineQuery := `SELECT id, @@ -162,25 +170,26 @@ func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, err err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id) if err != nil { zap.S().Errorf("failed to get ingestion pipeline from db", err) - return nil, model.BadRequestStr("failed to get ingestion pipeline from db") + return nil, model.InternalError(errors.Wrap(err, "failed to get ingestion pipeline from db")) } if len(pipelines) == 0 { zap.S().Warnf("No row found for ingestion pipeline id", id) - return nil, nil + return nil, model.NotFoundError(fmt.Errorf("No row found for ingestion pipeline id %v", id)) } if len(pipelines) == 1 { err := pipelines[0].ParseRawConfig() if err != nil { zap.S().Errorf("invalid pipeline config found", id, err) - return &pipelines[0], model.InternalError(fmt.Errorf("found an invalid pipeline config ")) + return nil, model.InternalError( + errors.Wrap(err, "found an invalid pipeline config"), + ) } return &pipelines[0], nil } return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id")) - } func (r *Repo) DeletePipeline(ctx context.Context, id string) error { diff --git a/pkg/query-service/app/opamp/configure_ingestionRules.go b/pkg/query-service/app/opamp/configure_ingestionRules.go index af04abb72352..bd71aa38b0bd 100644 --- a/pkg/query-service/app/opamp/configure_ingestionRules.go +++ b/pkg/query-service/app/opamp/configure_ingestionRules.go @@ -10,12 +10,18 @@ import ( "go.opentelemetry.io/collector/confmap" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig" + coreModel "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" ) // inserts or updates ingestion controller processors depending // on the signal (metrics or traces) -func UpsertControlProcessors(ctx context.Context, signal string, processors map[string]interface{}, callback model.OnChangeCallback) (hash string, fnerr error) { +func UpsertControlProcessors( + ctx context.Context, + signal string, + processors map[string]interface{}, + callback model.OnChangeCallback, +) (hash string, fnerr *coreModel.ApiError) { // note: only processors enabled through tracesPipelinePlan will be added // to pipeline. To enable or disable processors from pipeline, call // AddToTracePipeline() or RemoveFromTracesPipeline() prior to calling @@ -25,24 +31,28 @@ func UpsertControlProcessors(ctx context.Context, signal string, processors map[ if signal != string(Metrics) && signal != string(Traces) { zap.S().Error("received invalid signal int UpsertControlProcessors", signal) - fnerr = fmt.Errorf("signal not supported in ingestion rules: %s", signal) + fnerr = coreModel.BadRequest(fmt.Errorf( + "signal not supported in ingestion rules: %s", signal, + )) return } if opAmpServer == nil { - fnerr = fmt.Errorf("opamp server is down, unable to push config to agent at this moment") + fnerr = coreModel.UnavailableError(fmt.Errorf( + "opamp server is down, unable to push config to agent at this moment", + )) return } agents := opAmpServer.agents.GetAllAgents() if len(agents) == 0 { - fnerr = fmt.Errorf("no agents available at the moment") + fnerr = coreModel.UnavailableError(fmt.Errorf("no agents available at the moment")) return } if len(agents) > 1 && signal == string(Traces) { zap.S().Debug("found multiple agents. this feature is not supported for traces pipeline (sampling rules)") - fnerr = fmt.Errorf("multiple agents not supported in sampling rules") + fnerr = coreModel.BadRequest(fmt.Errorf("multiple agents not supported in sampling rules")) return } diff --git a/pkg/query-service/app/opamp/logspipeline.go b/pkg/query-service/app/opamp/logspipeline.go index 36f4a1473bbc..9ad81fe77ca1 100644 --- a/pkg/query-service/app/opamp/logspipeline.go +++ b/pkg/query-service/app/opamp/logspipeline.go @@ -12,37 +12,49 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/constants" + coreModel "go.signoz.io/signoz/pkg/query-service/model" "go.uber.org/zap" ) var lockLogsPipelineSpec sync.RWMutex -func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { +func UpsertLogsParsingProcessor( + ctx context.Context, + parsingProcessors map[string]interface{}, + parsingProcessorsNames []string, + callback func(string, string, error), +) (string, *coreModel.ApiError) { confHash := "" if opAmpServer == nil { - return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") + return confHash, coreModel.UnavailableError(fmt.Errorf( + "opamp server is down, unable to push config to agent at this moment", + )) } agents := opAmpServer.agents.GetAllAgents() if len(agents) == 0 { - return confHash, fmt.Errorf("no agents available at the moment") + return confHash, coreModel.UnavailableError(fmt.Errorf( + "no agents available at the moment", + )) } for _, agent := range agents { config := agent.EffectiveConfig c, err := yaml.Parser().Unmarshal([]byte(config)) if err != nil { - return confHash, err + return confHash, coreModel.BadRequest(err) } buildLogParsingProcessors(c, parsingProcessors) p, err := getOtelPipelinFromConfig(c) if err != nil { - return confHash, err + return confHash, coreModel.BadRequest(err) } if p.Pipelines.Logs == nil { - return confHash, fmt.Errorf("logs pipeline doesn't exist") + return confHash, coreModel.InternalError(fmt.Errorf( + "logs pipeline doesn't exist", + )) } // build the new processor list @@ -54,19 +66,19 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin updatedConf, err := yaml.Parser().Marshal(c) if err != nil { - return confHash, err + return confHash, coreModel.BadRequest(err) } // zap.S().Infof("sending new config", string(updatedConf)) hash := sha256.New() _, err = hash.Write(updatedConf) if err != nil { - return confHash, err + return confHash, coreModel.InternalError(err) } agent.EffectiveConfig = string(updatedConf) err = agent.Upsert() if err != nil { - return confHash, err + return confHash, coreModel.InternalError(err) } agent.SendToAgent(&protobufs.ServerToAgent{ diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 31d5a05ae656..46828b66c192 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/pkg/errors" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/util/stats" "k8s.io/apimachinery/pkg/labels" @@ -89,6 +90,34 @@ func InternalError(err error) *ApiError { } } +func NotFoundError(err error) *ApiError { + return &ApiError{ + Typ: ErrorNotFound, + Err: err, + } +} + +func UnauthorizedError(err error) *ApiError { + return &ApiError{ + Typ: ErrorUnauthorized, + Err: err, + } +} + +func UnavailableError(err error) *ApiError { + return &ApiError{ + Typ: ErrorUnavailable, + Err: err, + } +} + +func WrapApiError(err *ApiError, msg string) *ApiError { + return &ApiError{ + Typ: err.Type(), + Err: errors.Wrap(err.ToError(), msg), + } +} + type QueryDataV2 struct { ResultType parser.ValueType `json:"resultType"` Result parser.Value `json:"result"` diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index a26871429403..c68ae2cbfcda 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -149,6 +149,115 @@ func TestLogPipelinesLifecycle(t *testing.T) { ) } +func TestLogPipelinesValidation(t *testing.T) { + testCases := []struct { + Name string + Pipeline logparsingpipeline.PostablePipeline + ExpectedResponseStatusCode int + }{ + { + Name: "Valid Pipeline", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + ExpectedResponseStatusCode: 200, + }, + { + Name: "Invalid orderId", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 0, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + ExpectedResponseStatusCode: 400, + }, + { + Name: "Invalid filter", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "bad filter", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + ExpectedResponseStatusCode: 400, + }, + { + Name: "Invalid operator field", + Pipeline: logparsingpipeline.PostablePipeline{ + OrderId: 1, + Name: "pipeline 1", + Alias: "pipeline1", + Enabled: true, + Filter: "attributes.method == \"GET\"", + Config: []model.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "bad.field", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + ExpectedResponseStatusCode: 400, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + testbed := NewLogPipelinesTestBed(t) + testbed.PostPipelinesToQSExpectingStatusCode( + logparsingpipeline.PostablePipelines{ + Pipelines: []logparsingpipeline.PostablePipeline{tc.Pipeline}, + }, + tc.ExpectedResponseStatusCode, + ) + }) + } +} + // LogPipelinesTestBed coordinates and mocks components involved in // configuring log pipelines and provides test helpers. type LogPipelinesTestBed struct { @@ -282,7 +391,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline if response.StatusCode != 200 { tb.t.Fatalf( "could not list log parsing pipelines. status: %d, body: %v", - response.StatusCode, responseBody, + response.StatusCode, string(responseBody), ) } @@ -291,7 +400,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline if err != nil { tb.t.Fatalf( "Could not unmarshal QS response into an ApiResponse.\nResponse body: %s", - responseBody, + string(responseBody), ) } pipelinesResp, err := unmarshalPipelinesResponse(&result)