From 4daec45d987ab07a095f1c225db63193fef93f65 Mon Sep 17 00:00:00 2001 From: Ekansh Gupta Date: Mon, 1 Sep 2025 13:00:57 +0530 Subject: [PATCH] feat: added custom retention for logs api (#8513) * feat: added custom retention for logs api * feat: added custom retention for logs api * feat: added implementation of trace operators * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added default checks for custom retention * feat: added default checks for custom retention * feat: added default checks for custom retention * feat: added change for ttl * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: added default_ttl in v1 * feat: added set logs ttl v1 from v2 * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts --- .../app/clickhouseReader/reader.go | 441 ++++++++++++++++++ pkg/query-service/app/http_handler.go | 43 ++ pkg/query-service/interfaces/interface.go | 2 + pkg/query-service/model/queryParams.go | 37 ++ pkg/signoz/provider.go | 1 + pkg/sqlmigration/048_update_ttl_setting.go | 77 +++ pkg/types/organization.go | 1 + 7 files changed, 602 insertions(+) create mode 100644 pkg/sqlmigration/048_update_ttl_setting.go diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 7ab7b02ba8e8..f46ad2572b49 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1277,6 +1277,13 @@ func getLocalTableName(tableName string) string { } func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) { + hasCustomRetention, err := r.hasCustomRetentionColumn(ctx) + if hasCustomRetention { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("SetTTLV2 only supported")} + } + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing TTL")} + } // uuid is used as transaction id uuidWithHyphen := uuid.New() uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) @@ -1561,6 +1568,440 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } +func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) { + // Directly query for the _retention_days column existence + query := fmt.Sprintf("SELECT 1 FROM system.columns WHERE database = '%s' AND table = '%s' AND name = '_retention_days' LIMIT 1", r.logsDB, r.logsLocalTableV2) + + var exists uint8 // Changed from int to uint8 to match ClickHouse's UInt8 type + err := r.db.QueryRow(ctx, query).Scan(&exists) + if err != nil { + if err == sql.ErrNoRows { + // Column doesn't exist + zap.L().Debug("_retention_days column not found in logs table", zap.String("table", r.logsLocalTableV2)) + return false, nil + } + zap.L().Error("Error checking for _retention_days column", zap.Error(err)) + return false, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error checking columns") + } + + zap.L().Debug("Found _retention_days column in logs table", zap.String("table", r.logsLocalTableV2)) + return true, nil +} + +func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) { + + hasCustomRetention, err := r.hasCustomRetentionColumn(ctx) + if err != nil { + return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention not supported") + } + + if !hasCustomRetention { + zap.L().Info("Custom retention not supported, falling back to standard TTL method", + zap.String("orgID", orgID)) + + ttlParams := &model.TTLParams{ + Type: params.Type, + DelDuration: int64(params.DefaultTTLDays * 24 * 3600), + } + if params.ColdStorageVolume != "" { + ttlParams.ColdStorageVolume = params.ColdStorageVolume + } else { + ttlParams.ColdStorageVolume = "" + } + + if params.ToColdStorageDuration > 0 { + ttlParams.ToColdStorageDuration = params.ToColdStorageDuration + } else { + ttlParams.ToColdStorageDuration = 0 + } + + ttlResult, apiErr := r.SetTTL(ctx, orgID, ttlParams) + if apiErr != nil { + return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL") + } + + return &model.CustomRetentionTTLResponse{ + Message: fmt.Sprintf("Custom retention not supported, applied standard TTL of %d days. %s", params.DefaultTTLDays, ttlResult.Message), + }, nil + } + + // Keep only latest 100 transactions/requests + r.deleteTtlTransactions(ctx, orgID, 100) + + uuidWithHyphen := valuer.GenerateUUID() + uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1) + + if params.Type != constants.LogsTTL { + return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL only supported for logs") + } + + // Validate TTL conditions + if err := r.validateTTLConditions(ctx, params.TTLConditions); err != nil { + return nil, err + } + + tableNames := []string{ + r.logsDB + "." + r.logsLocalTableV2, + r.logsDB + "." + r.logsResourceLocalTableV2, + } + + for _, tableName := range tableNames { + statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName) + if err != nil { + return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query") + } + if statusItem.Status == constants.StatusPending { + return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL is already running") + } + } + + // Build multiIf expressions for both tables + multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false) + resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true) + + ttlPayload := map[string]string{ + tableNames[0]: fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`, + tableNames[0], r.cluster, multiIfExpr), + tableNames[1]: fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`, + tableNames[1], r.cluster, resourceMultiIfExpr), + } + + ttlConditionsJSON, err := json.Marshal(params.TTLConditions) + if err != nil { + return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error marshalling TTL condition") + } + + // Execute the TTL modifications synchronously + for tableName, query := range ttlPayload { + // Store the operation in the database + customTTL := types.TTLSetting{ + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + TimeAuditable: types.TimeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + TransactionID: uuid, + TableName: tableName, + TTL: params.DefaultTTLDays, + Condition: string(ttlConditionsJSON), + Status: constants.StatusPending, + OrgID: orgID, + } + + // Insert TTL setting record + _, dbErr := r.sqlDB.BunDB().NewInsert().Model(&customTTL).Exec(ctx) + if dbErr != nil { + zap.L().Error("error in inserting to custom_retention_ttl_settings table", zap.Error(dbErr)) + return nil, errorsV2.Wrapf(dbErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "error inserting TTL settings") + } + + zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", query)) + + // Execute the ALTER TABLE query + if err := r.db.Exec(ctx, query); err != nil { + zap.L().Error("error while setting custom retention ttl", zap.Error(err)) + + // Update status to failed + r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed) + + return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s", tableName) + } + + // Update status to success + r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess) + } + + return &model.CustomRetentionTTLResponse{ + Message: "custom retention TTL has been successfully set up", + }, nil +} + +// New method to build multiIf expressions with support for multiple AND conditions +func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string { + var conditions []string + + for i, rule := range ttlConditions { + zap.L().Debug("Processing rule", zap.Int("ruleIndex", i), zap.Int("ttlDays", rule.TTLDays), zap.Int("conditionsCount", len(rule.Filters))) + + if len(rule.Filters) == 0 { + zap.L().Warn("Rule has no filters, skipping", zap.Int("ruleIndex", i)) + continue + } + + // Build AND conditions for this rule + var andConditions []string + for j, condition := range rule.Filters { + zap.L().Debug("Processing condition", zap.Int("ruleIndex", i), zap.Int("conditionIndex", j), zap.String("key", condition.Key), zap.Strings("values", condition.Values)) + + // This should not happen as validation should catch it + if len(condition.Values) == 0 { + zap.L().Error("Condition has no values - this should have been caught in validation", zap.Int("ruleIndex", i), zap.Int("conditionIndex", j)) + continue + } + + // Properly quote values for IN clause + quotedValues := make([]string, len(condition.Values)) + for k, v := range condition.Values { + quotedValues[k] = fmt.Sprintf("'%s'", v) + } + + var conditionExpr string + if isResourceTable { + // For resource table, use JSONExtractString + conditionExpr = fmt.Sprintf( + "JSONExtractString(labels, '%s') IN (%s)", + condition.Key, + strings.Join(quotedValues, ", "), + ) + } else { + // For main logs table, use resources_string + conditionExpr = fmt.Sprintf( + "resources_string['%s'] IN (%s)", + condition.Key, + strings.Join(quotedValues, ", "), + ) + } + andConditions = append(andConditions, conditionExpr) + } + + if len(andConditions) > 0 { + // Join all conditions with AND + fullCondition := strings.Join(andConditions, " AND ") + conditionWithTTL := fmt.Sprintf("%s, %d", fullCondition, rule.TTLDays) + zap.L().Debug("Adding condition to multiIf", zap.String("condition", conditionWithTTL)) + conditions = append(conditions, conditionWithTTL) + } + } + + // Handle case where no valid conditions were found + if len(conditions) == 0 { + zap.L().Info("No valid conditions found, returning default TTL", zap.Int("defaultTTLDays", defaultTTLDays)) + return fmt.Sprintf("%d", defaultTTLDays) + } + + result := fmt.Sprintf( + "multiIf(%s, %d)", + strings.Join(conditions, ", "), + defaultTTLDays, + ) + + zap.L().Debug("Final multiIf expression", zap.String("expression", result)) + return result +} + +func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) { + // Check if V2 (custom retention) is supported + hasCustomRetention, err := r.hasCustomRetentionColumn(ctx) + if err != nil { + // If there's an error checking, assume V1 and proceed + zap.L().Warn("Error checking for custom retention column, assuming V1", zap.Error(err)) + hasCustomRetention = false + } + + response := &model.GetCustomRetentionTTLResponse{} + + if hasCustomRetention { + // V2 - Custom retention is supported + response.Version = "v2" + + // Get the latest custom retention TTL setting + customTTL := new(types.TTLSetting) + err := r.sqlDB.BunDB().NewSelect(). + Model(customTTL). + Where("org_id = ?", orgID). + Where("table_name = ?", r.logsDB+"."+r.logsLocalTableV2). + OrderExpr("created_at DESC"). + Limit(1). + Scan(ctx) + + if err != nil && err != sql.ErrNoRows { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing get custom ttl query") + } + + if err == sql.ErrNoRows { + // No V2 configuration found, return defaults + response.DefaultTTLDays = 15 + response.TTLConditions = []model.CustomRetentionRule{} + response.Status = constants.StatusFailed + return response, nil + } + + // Parse TTL conditions from Condition + var ttlConditions []model.CustomRetentionRule + if customTTL.Condition != "" { + if err := json.Unmarshal([]byte(customTTL.Condition), &ttlConditions); err != nil { + zap.L().Error("Error parsing TTL conditions", zap.Error(err)) + ttlConditions = []model.CustomRetentionRule{} + } + } + + response.DefaultTTLDays = customTTL.TTL + response.TTLConditions = ttlConditions + response.Status = customTTL.Status + + } else { + // V1 - Traditional TTL + response.Version = "v1" + + // Get V1 TTL configuration + ttlParams := &model.GetTTLParams{ + Type: constants.LogsTTL, + } + + ttlResult, apiErr := r.GetTTL(ctx, orgID, ttlParams) + if apiErr != nil { + return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error getting V1 TTL: %s", apiErr.Error()) + } + + // Map V1 fields to response + response.LogsTime = ttlResult.LogsTime + response.LogsMoveTime = ttlResult.LogsMoveTime + response.ExpectedLogsTime = ttlResult.ExpectedLogsTime + response.ExpectedLogsMoveTime = ttlResult.ExpectedLogsMoveTime + response.Status = ttlResult.Status + response.DefaultTTLDays = ttlResult.LogsTime / 24 + + // For V1, we don't have TTL conditions + response.TTLConditions = []model.CustomRetentionRule{} + } + + return response, nil +} + +func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, error) { + ttl := new(types.TTLSetting) + err := r.sqlDB.BunDB().NewSelect(). + Model(ttl). + Where("table_name = ?", tableName). + Where("org_id = ?", orgID). + OrderExpr("created_at DESC"). + Limit(1). + Scan(ctx) + + if err != nil && err != sql.ErrNoRows { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return ttl, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query") + } + + return ttl, nil +} + +func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, orgID, tableName, status string) { + statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName) + if err == nil && statusItem != nil { + _, dbErr := r.sqlDB.BunDB().NewUpdate(). + Model(new(types.TTLSetting)). + Set("updated_at = ?", time.Now()). + Set("status = ?", status). + Where("id = ?", statusItem.ID.StringValue()). + Exec(ctx) + if dbErr != nil { + zap.L().Error("Error in processing custom_retention_ttl_status update sql query", zap.Error(dbErr)) + } + } +} + +// Enhanced validation function with duplicate detection and efficient key validation +func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []model.CustomRetentionRule) error { + if len(ttlConditions) == 0 { + return nil + } + + // Collect all unique keys and detect duplicates + var allKeys []string + keySet := make(map[string]struct{}) + conditionSignatures := make(map[string]bool) + + for i, rule := range ttlConditions { + if len(rule.Filters) == 0 { + return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "rule at index %d has no filters", i) + } + + // Create a signature for this rule's conditions to detect duplicates + var conditionKeys []string + var conditionValues []string + + for j, condition := range rule.Filters { + if len(condition.Values) == 0 { + return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "condition at rule %d, condition %d has no values", i, j) + } + + // Collect unique keys + if _, exists := keySet[condition.Key]; !exists { + allKeys = append(allKeys, condition.Key) + keySet[condition.Key] = struct{}{} + } + + // Build signature for duplicate detection + conditionKeys = append(conditionKeys, condition.Key) + conditionValues = append(conditionValues, strings.Join(condition.Values, ",")) + } + + // Create signature by sorting keys and values to handle order-independent comparison + sort.Strings(conditionKeys) + sort.Strings(conditionValues) + signature := strings.Join(conditionKeys, "|") + ":" + strings.Join(conditionValues, "|") + + if conditionSignatures[signature] { + return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "duplicate rule detected at index %d: rules with identical conditions are not allowed", i) + } + conditionSignatures[signature] = true + } + + if len(allKeys) == 0 { + return nil + } + + // Create placeholders for IN query + placeholders := make([]string, len(allKeys)) + for i := range allKeys { + placeholders[i] = "?" + } + + // Efficient validation using IN query + query := fmt.Sprintf("SELECT name FROM %s.%s WHERE name IN (%s)", + r.logsDB, r.logsResourceKeys, strings.Join(placeholders, ", ")) + + // Convert keys to interface{} for query parameters + params := make([]interface{}, len(allKeys)) + for i, key := range allKeys { + params[i] = key + } + + rows, err := r.db.Query(ctx, query, params...) + if err != nil { + return errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to validate resource keys") + } + defer rows.Close() + + // Collect valid keys + validKeys := make(map[string]struct{}) + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to scan resource keys") + } + validKeys[name] = struct{}{} + } + + // Find invalid keys + var invalidKeys []string + for _, key := range allKeys { + if _, exists := validKeys[key]; !exists { + invalidKeys = append(invalidKeys, key) + } + } + + if len(invalidKeys) > 0 { + return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "invalid resource keys found: %v. Please check logs_resource_keys table for valid keys", invalidKeys) + } + + return nil +} + // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index fa96fe4dd6e2..0b4a5c09d49f 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -541,6 +541,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) { router.HandleFunc("/api/v1/dependency_graph", am.ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", am.AdminAccess(aH.setTTL)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ttl", am.ViewAccess(aH.getTTL)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/settings/ttl", am.AdminAccess(aH.setCustomRetentionTTL)).Methods(http.MethodPost) + router.HandleFunc("/api/v2/settings/ttl", am.ViewAccess(aH.getCustomRetentionTTL)).Methods(http.MethodGet) router.HandleFunc("/api/v1/settings/apdex", am.AdminAccess(aH.Signoz.Handlers.Apdex.Set)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/apdex", am.ViewAccess(aH.Signoz.Handlers.Apdex.Get)).Methods(http.MethodGet) @@ -1930,6 +1932,47 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + claims, errv2 := authtypes.ClaimsFromContext(ctx) + if errv2 != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get org id from context")) + return + } + + var params model.CustomRetentionTTLParams + if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "Invalid data")) + return + } + + // Context is not used here as TTL is long duration DB operation + result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, ¶ms) + if apiErr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error())) + return + } + + aH.WriteJSON(w, r, result) +} + +func (aH *APIHandler) getCustomRetentionTTL(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + claims, errv2 := authtypes.ClaimsFromContext(ctx) + if errv2 != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get org id from context")) + return + } + + result, apiErr := aH.reader.GetCustomRetentionTTL(r.Context(), claims.OrgID) + if apiErr != nil { + render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error())) + return + } + + aH.WriteJSON(w, r, result) +} + func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) { ttlParams, err := parseGetTTL(r) if aH.HandleError(w, err, http.StatusBadRequest) { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 16742d8814ed..c06d57d0c159 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -25,6 +25,7 @@ type Reader interface { GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) + GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) // GetDisks returns a list of disks configured in the underlying DB. It is supported by // clickhouse only. @@ -47,6 +48,7 @@ type Reader interface { // Setter Interfaces SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) + SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index b62492b0f839..fb63c2056b9e 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -404,6 +404,43 @@ type TTLParams struct { DelDuration int64 // Seconds after which data will be deleted. } +type CustomRetentionTTLParams struct { + Type string `json:"type"` + DefaultTTLDays int `json:"defaultTTLDays"` + TTLConditions []CustomRetentionRule `json:"ttlConditions"` + ColdStorageVolume string `json:"coldStorageVolume,omitempty"` + ToColdStorageDuration int64 `json:"coldStorageDuration,omitempty"` +} + +type CustomRetentionRule struct { + Filters []FilterCondition `json:"conditions"` + TTLDays int `json:"ttlDays"` +} + +type FilterCondition struct { + Key string `json:"key"` + Values []string `json:"values"` +} + +type GetCustomRetentionTTLResponse struct { + Version string `json:"version"` + Status string `json:"status"` + + // V1 fields + LogsTime int `json:"logs_ttl_duration_hrs,omitempty"` + LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"` + ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"` + ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"` + + // V2 fields + DefaultTTLDays int `json:"default_ttl_days,omitempty"` + TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"` +} + +type CustomRetentionTTLResponse struct { + Message string `json:"message"` +} + type GetTTLParams struct { Type string } diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index c737bface022..c307d8c050ab 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -131,6 +131,7 @@ func NewSQLMigrationProviderFactories( sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema), sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore), sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema), + sqlmigration.NewUpdateTTLSettingForCustomRetentionFactory(sqlstore, sqlschema), ) } diff --git a/pkg/sqlmigration/048_update_ttl_setting.go b/pkg/sqlmigration/048_update_ttl_setting.go new file mode 100644 index 000000000000..290f279dfbde --- /dev/null +++ b/pkg/sqlmigration/048_update_ttl_setting.go @@ -0,0 +1,77 @@ +package sqlmigration + +import ( + "context" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/sqlschema" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" +) + +type updateTTLSettingForCustomRetention struct { + sqlstore sqlstore.SQLStore + sqlschema sqlschema.SQLSchema +} + +func NewUpdateTTLSettingForCustomRetentionFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] { + return factory.NewProviderFactory(factory.MustNewName("update_ttl_setting"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) { + return newUpdateTTLSettingForCustomRetention(ctx, providerSettings, config, sqlstore, sqlschema) + }) +} + +func newUpdateTTLSettingForCustomRetention(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) { + return &updateTTLSettingForCustomRetention{ + sqlstore: sqlstore, + sqlschema: sqlschema, + }, nil +} + +func (migration *updateTTLSettingForCustomRetention) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + return nil +} + +func (migration *updateTTLSettingForCustomRetention) Up(ctx context.Context, db *bun.DB) error { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + defer func() { + _ = tx.Rollback() + }() + + // Get the table and its constraints + table, _, err := migration.sqlschema.GetTable(ctx, sqlschema.TableName("ttl_setting")) + if err != nil { + return err + } + + // Define the new column + column := &sqlschema.Column{ + Name: sqlschema.ColumnName("condition"), + DataType: sqlschema.DataTypeText, + Nullable: true, + } + + sqls := migration.sqlschema.Operator().AddColumn(table, nil, column, nil) + for _, sql := range sqls { + if _, err := tx.ExecContext(ctx, string(sql)); err != nil { + return err + } + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil +} + +func (migration *updateTTLSettingForCustomRetention) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/types/organization.go b/pkg/types/organization.go index dae4048598cd..ac5d03023dc0 100644 --- a/pkg/types/organization.go +++ b/pkg/types/organization.go @@ -68,6 +68,7 @@ type TTLSetting struct { ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"` Status string `bun:"status,type:text,notnull"` OrgID string `json:"-" bun:"org_id,notnull"` + Condition string `bun:"condition,type:text"` } type OrganizationStore interface {