mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
feat: added custom retention for logs api (#8513)
* feat: added custom retention for logs api * feat: added custom retention for logs api * feat: added implementation of trace operators * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added validation checks for resource keys * feat: added default checks for custom retention * feat: added default checks for custom retention * feat: added default checks for custom retention * feat: added change for ttl * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: v2 api supports both v1 and v2 * feat: added default_ttl in v1 * feat: added set logs ttl v1 from v2 * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts * feat: resolved conflicts
This commit is contained in:
parent
382d9d4a87
commit
4daec45d98
@ -1277,6 +1277,13 @@ func getLocalTableName(tableName string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
func (r *ClickHouseReader) setTTLLogs(ctx context.Context, orgID string, params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
|
||||||
|
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
|
||||||
|
if hasCustomRetention {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("SetTTLV2 only supported")}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing TTL")}
|
||||||
|
}
|
||||||
// uuid is used as transaction id
|
// uuid is used as transaction id
|
||||||
uuidWithHyphen := uuid.New()
|
uuidWithHyphen := uuid.New()
|
||||||
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||||
@ -1561,6 +1568,440 @@ func (r *ClickHouseReader) setTTLTraces(ctx context.Context, orgID string, param
|
|||||||
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) hasCustomRetentionColumn(ctx context.Context) (bool, error) {
|
||||||
|
// Directly query for the _retention_days column existence
|
||||||
|
query := fmt.Sprintf("SELECT 1 FROM system.columns WHERE database = '%s' AND table = '%s' AND name = '_retention_days' LIMIT 1", r.logsDB, r.logsLocalTableV2)
|
||||||
|
|
||||||
|
var exists uint8 // Changed from int to uint8 to match ClickHouse's UInt8 type
|
||||||
|
err := r.db.QueryRow(ctx, query).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// Column doesn't exist
|
||||||
|
zap.L().Debug("_retention_days column not found in logs table", zap.String("table", r.logsLocalTableV2))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
zap.L().Error("Error checking for _retention_days column", zap.Error(err))
|
||||||
|
return false, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error checking columns")
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("Found _retention_days column in logs table", zap.String("table", r.logsLocalTableV2))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error) {
|
||||||
|
|
||||||
|
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasCustomRetention {
|
||||||
|
zap.L().Info("Custom retention not supported, falling back to standard TTL method",
|
||||||
|
zap.String("orgID", orgID))
|
||||||
|
|
||||||
|
ttlParams := &model.TTLParams{
|
||||||
|
Type: params.Type,
|
||||||
|
DelDuration: int64(params.DefaultTTLDays * 24 * 3600),
|
||||||
|
}
|
||||||
|
if params.ColdStorageVolume != "" {
|
||||||
|
ttlParams.ColdStorageVolume = params.ColdStorageVolume
|
||||||
|
} else {
|
||||||
|
ttlParams.ColdStorageVolume = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.ToColdStorageDuration > 0 {
|
||||||
|
ttlParams.ToColdStorageDuration = params.ToColdStorageDuration
|
||||||
|
} else {
|
||||||
|
ttlParams.ToColdStorageDuration = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
ttlResult, apiErr := r.SetTTL(ctx, orgID, ttlParams)
|
||||||
|
if apiErr != nil {
|
||||||
|
return nil, errorsV2.Wrapf(apiErr.Err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to set standard TTL")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.CustomRetentionTTLResponse{
|
||||||
|
Message: fmt.Sprintf("Custom retention not supported, applied standard TTL of %d days. %s", params.DefaultTTLDays, ttlResult.Message),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep only latest 100 transactions/requests
|
||||||
|
r.deleteTtlTransactions(ctx, orgID, 100)
|
||||||
|
|
||||||
|
uuidWithHyphen := valuer.GenerateUUID()
|
||||||
|
uuid := strings.Replace(uuidWithHyphen.String(), "-", "", -1)
|
||||||
|
|
||||||
|
if params.Type != constants.LogsTTL {
|
||||||
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL only supported for logs")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate TTL conditions
|
||||||
|
if err := r.validateTTLConditions(ctx, params.TTLConditions); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tableNames := []string{
|
||||||
|
r.logsDB + "." + r.logsLocalTableV2,
|
||||||
|
r.logsDB + "." + r.logsResourceLocalTableV2,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tableName := range tableNames {
|
||||||
|
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
|
||||||
|
}
|
||||||
|
if statusItem.Status == constants.StatusPending {
|
||||||
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "custom retention TTL is already running")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build multiIf expressions for both tables
|
||||||
|
multiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, false)
|
||||||
|
resourceMultiIfExpr := r.buildMultiIfExpression(params.TTLConditions, params.DefaultTTLDays, true)
|
||||||
|
|
||||||
|
ttlPayload := map[string]string{
|
||||||
|
tableNames[0]: fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
|
||||||
|
tableNames[0], r.cluster, multiIfExpr),
|
||||||
|
tableNames[1]: fmt.Sprintf(`ALTER TABLE %s ON CLUSTER %s MODIFY COLUMN _retention_days UInt16 DEFAULT %s`,
|
||||||
|
tableNames[1], r.cluster, resourceMultiIfExpr),
|
||||||
|
}
|
||||||
|
|
||||||
|
ttlConditionsJSON, err := json.Marshal(params.TTLConditions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error marshalling TTL condition")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the TTL modifications synchronously
|
||||||
|
for tableName, query := range ttlPayload {
|
||||||
|
// Store the operation in the database
|
||||||
|
customTTL := types.TTLSetting{
|
||||||
|
Identifiable: types.Identifiable{
|
||||||
|
ID: valuer.GenerateUUID(),
|
||||||
|
},
|
||||||
|
TimeAuditable: types.TimeAuditable{
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
},
|
||||||
|
TransactionID: uuid,
|
||||||
|
TableName: tableName,
|
||||||
|
TTL: params.DefaultTTLDays,
|
||||||
|
Condition: string(ttlConditionsJSON),
|
||||||
|
Status: constants.StatusPending,
|
||||||
|
OrgID: orgID,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert TTL setting record
|
||||||
|
_, dbErr := r.sqlDB.BunDB().NewInsert().Model(&customTTL).Exec(ctx)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("error in inserting to custom_retention_ttl_settings table", zap.Error(dbErr))
|
||||||
|
return nil, errorsV2.Wrapf(dbErr, errorsV2.TypeInternal, errorsV2.CodeInternal, "error inserting TTL settings")
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("Executing custom retention TTL request: ", zap.String("request", query))
|
||||||
|
|
||||||
|
// Execute the ALTER TABLE query
|
||||||
|
if err := r.db.Exec(ctx, query); err != nil {
|
||||||
|
zap.L().Error("error while setting custom retention ttl", zap.Error(err))
|
||||||
|
|
||||||
|
// Update status to failed
|
||||||
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusFailed)
|
||||||
|
|
||||||
|
return nil, errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "error setting custom retention TTL for table %s", tableName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status to success
|
||||||
|
r.updateCustomRetentionTTLStatus(ctx, orgID, tableName, constants.StatusSuccess)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &model.CustomRetentionTTLResponse{
|
||||||
|
Message: "custom retention TTL has been successfully set up",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// New method to build multiIf expressions with support for multiple AND conditions
|
||||||
|
func (r *ClickHouseReader) buildMultiIfExpression(ttlConditions []model.CustomRetentionRule, defaultTTLDays int, isResourceTable bool) string {
|
||||||
|
var conditions []string
|
||||||
|
|
||||||
|
for i, rule := range ttlConditions {
|
||||||
|
zap.L().Debug("Processing rule", zap.Int("ruleIndex", i), zap.Int("ttlDays", rule.TTLDays), zap.Int("conditionsCount", len(rule.Filters)))
|
||||||
|
|
||||||
|
if len(rule.Filters) == 0 {
|
||||||
|
zap.L().Warn("Rule has no filters, skipping", zap.Int("ruleIndex", i))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build AND conditions for this rule
|
||||||
|
var andConditions []string
|
||||||
|
for j, condition := range rule.Filters {
|
||||||
|
zap.L().Debug("Processing condition", zap.Int("ruleIndex", i), zap.Int("conditionIndex", j), zap.String("key", condition.Key), zap.Strings("values", condition.Values))
|
||||||
|
|
||||||
|
// This should not happen as validation should catch it
|
||||||
|
if len(condition.Values) == 0 {
|
||||||
|
zap.L().Error("Condition has no values - this should have been caught in validation", zap.Int("ruleIndex", i), zap.Int("conditionIndex", j))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Properly quote values for IN clause
|
||||||
|
quotedValues := make([]string, len(condition.Values))
|
||||||
|
for k, v := range condition.Values {
|
||||||
|
quotedValues[k] = fmt.Sprintf("'%s'", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
var conditionExpr string
|
||||||
|
if isResourceTable {
|
||||||
|
// For resource table, use JSONExtractString
|
||||||
|
conditionExpr = fmt.Sprintf(
|
||||||
|
"JSONExtractString(labels, '%s') IN (%s)",
|
||||||
|
condition.Key,
|
||||||
|
strings.Join(quotedValues, ", "),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// For main logs table, use resources_string
|
||||||
|
conditionExpr = fmt.Sprintf(
|
||||||
|
"resources_string['%s'] IN (%s)",
|
||||||
|
condition.Key,
|
||||||
|
strings.Join(quotedValues, ", "),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
andConditions = append(andConditions, conditionExpr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(andConditions) > 0 {
|
||||||
|
// Join all conditions with AND
|
||||||
|
fullCondition := strings.Join(andConditions, " AND ")
|
||||||
|
conditionWithTTL := fmt.Sprintf("%s, %d", fullCondition, rule.TTLDays)
|
||||||
|
zap.L().Debug("Adding condition to multiIf", zap.String("condition", conditionWithTTL))
|
||||||
|
conditions = append(conditions, conditionWithTTL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle case where no valid conditions were found
|
||||||
|
if len(conditions) == 0 {
|
||||||
|
zap.L().Info("No valid conditions found, returning default TTL", zap.Int("defaultTTLDays", defaultTTLDays))
|
||||||
|
return fmt.Sprintf("%d", defaultTTLDays)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := fmt.Sprintf(
|
||||||
|
"multiIf(%s, %d)",
|
||||||
|
strings.Join(conditions, ", "),
|
||||||
|
defaultTTLDays,
|
||||||
|
)
|
||||||
|
|
||||||
|
zap.L().Debug("Final multiIf expression", zap.String("expression", result))
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error) {
|
||||||
|
// Check if V2 (custom retention) is supported
|
||||||
|
hasCustomRetention, err := r.hasCustomRetentionColumn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// If there's an error checking, assume V1 and proceed
|
||||||
|
zap.L().Warn("Error checking for custom retention column, assuming V1", zap.Error(err))
|
||||||
|
hasCustomRetention = false
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &model.GetCustomRetentionTTLResponse{}
|
||||||
|
|
||||||
|
if hasCustomRetention {
|
||||||
|
// V2 - Custom retention is supported
|
||||||
|
response.Version = "v2"
|
||||||
|
|
||||||
|
// Get the latest custom retention TTL setting
|
||||||
|
customTTL := new(types.TTLSetting)
|
||||||
|
err := r.sqlDB.BunDB().NewSelect().
|
||||||
|
Model(customTTL).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Where("table_name = ?", r.logsDB+"."+r.logsLocalTableV2).
|
||||||
|
OrderExpr("created_at DESC").
|
||||||
|
Limit(1).
|
||||||
|
Scan(ctx)
|
||||||
|
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||||
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing get custom ttl query")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// No V2 configuration found, return defaults
|
||||||
|
response.DefaultTTLDays = 15
|
||||||
|
response.TTLConditions = []model.CustomRetentionRule{}
|
||||||
|
response.Status = constants.StatusFailed
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse TTL conditions from Condition
|
||||||
|
var ttlConditions []model.CustomRetentionRule
|
||||||
|
if customTTL.Condition != "" {
|
||||||
|
if err := json.Unmarshal([]byte(customTTL.Condition), &ttlConditions); err != nil {
|
||||||
|
zap.L().Error("Error parsing TTL conditions", zap.Error(err))
|
||||||
|
ttlConditions = []model.CustomRetentionRule{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
response.DefaultTTLDays = customTTL.TTL
|
||||||
|
response.TTLConditions = ttlConditions
|
||||||
|
response.Status = customTTL.Status
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// V1 - Traditional TTL
|
||||||
|
response.Version = "v1"
|
||||||
|
|
||||||
|
// Get V1 TTL configuration
|
||||||
|
ttlParams := &model.GetTTLParams{
|
||||||
|
Type: constants.LogsTTL,
|
||||||
|
}
|
||||||
|
|
||||||
|
ttlResult, apiErr := r.GetTTL(ctx, orgID, ttlParams)
|
||||||
|
if apiErr != nil {
|
||||||
|
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error getting V1 TTL: %s", apiErr.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map V1 fields to response
|
||||||
|
response.LogsTime = ttlResult.LogsTime
|
||||||
|
response.LogsMoveTime = ttlResult.LogsMoveTime
|
||||||
|
response.ExpectedLogsTime = ttlResult.ExpectedLogsTime
|
||||||
|
response.ExpectedLogsMoveTime = ttlResult.ExpectedLogsMoveTime
|
||||||
|
response.Status = ttlResult.Status
|
||||||
|
response.DefaultTTLDays = ttlResult.LogsTime / 24
|
||||||
|
|
||||||
|
// For V1, we don't have TTL conditions
|
||||||
|
response.TTLConditions = []model.CustomRetentionRule{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) checkCustomRetentionTTLStatusItem(ctx context.Context, orgID string, tableName string) (*types.TTLSetting, error) {
|
||||||
|
ttl := new(types.TTLSetting)
|
||||||
|
err := r.sqlDB.BunDB().NewSelect().
|
||||||
|
Model(ttl).
|
||||||
|
Where("table_name = ?", tableName).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
OrderExpr("created_at DESC").
|
||||||
|
Limit(1).
|
||||||
|
Scan(ctx)
|
||||||
|
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||||
|
return ttl, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "error in processing custom_retention_ttl_status check sql query")
|
||||||
|
}
|
||||||
|
|
||||||
|
return ttl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ClickHouseReader) updateCustomRetentionTTLStatus(ctx context.Context, orgID, tableName, status string) {
|
||||||
|
statusItem, err := r.checkCustomRetentionTTLStatusItem(ctx, orgID, tableName)
|
||||||
|
if err == nil && statusItem != nil {
|
||||||
|
_, dbErr := r.sqlDB.BunDB().NewUpdate().
|
||||||
|
Model(new(types.TTLSetting)).
|
||||||
|
Set("updated_at = ?", time.Now()).
|
||||||
|
Set("status = ?", status).
|
||||||
|
Where("id = ?", statusItem.ID.StringValue()).
|
||||||
|
Exec(ctx)
|
||||||
|
if dbErr != nil {
|
||||||
|
zap.L().Error("Error in processing custom_retention_ttl_status update sql query", zap.Error(dbErr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enhanced validation function with duplicate detection and efficient key validation
|
||||||
|
func (r *ClickHouseReader) validateTTLConditions(ctx context.Context, ttlConditions []model.CustomRetentionRule) error {
|
||||||
|
if len(ttlConditions) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all unique keys and detect duplicates
|
||||||
|
var allKeys []string
|
||||||
|
keySet := make(map[string]struct{})
|
||||||
|
conditionSignatures := make(map[string]bool)
|
||||||
|
|
||||||
|
for i, rule := range ttlConditions {
|
||||||
|
if len(rule.Filters) == 0 {
|
||||||
|
return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "rule at index %d has no filters", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a signature for this rule's conditions to detect duplicates
|
||||||
|
var conditionKeys []string
|
||||||
|
var conditionValues []string
|
||||||
|
|
||||||
|
for j, condition := range rule.Filters {
|
||||||
|
if len(condition.Values) == 0 {
|
||||||
|
return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "condition at rule %d, condition %d has no values", i, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect unique keys
|
||||||
|
if _, exists := keySet[condition.Key]; !exists {
|
||||||
|
allKeys = append(allKeys, condition.Key)
|
||||||
|
keySet[condition.Key] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build signature for duplicate detection
|
||||||
|
conditionKeys = append(conditionKeys, condition.Key)
|
||||||
|
conditionValues = append(conditionValues, strings.Join(condition.Values, ","))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create signature by sorting keys and values to handle order-independent comparison
|
||||||
|
sort.Strings(conditionKeys)
|
||||||
|
sort.Strings(conditionValues)
|
||||||
|
signature := strings.Join(conditionKeys, "|") + ":" + strings.Join(conditionValues, "|")
|
||||||
|
|
||||||
|
if conditionSignatures[signature] {
|
||||||
|
return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "duplicate rule detected at index %d: rules with identical conditions are not allowed", i)
|
||||||
|
}
|
||||||
|
conditionSignatures[signature] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(allKeys) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create placeholders for IN query
|
||||||
|
placeholders := make([]string, len(allKeys))
|
||||||
|
for i := range allKeys {
|
||||||
|
placeholders[i] = "?"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Efficient validation using IN query
|
||||||
|
query := fmt.Sprintf("SELECT name FROM %s.%s WHERE name IN (%s)",
|
||||||
|
r.logsDB, r.logsResourceKeys, strings.Join(placeholders, ", "))
|
||||||
|
|
||||||
|
// Convert keys to interface{} for query parameters
|
||||||
|
params := make([]interface{}, len(allKeys))
|
||||||
|
for i, key := range allKeys {
|
||||||
|
params[i] = key
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := r.db.Query(ctx, query, params...)
|
||||||
|
if err != nil {
|
||||||
|
return errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to validate resource keys")
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// Collect valid keys
|
||||||
|
validKeys := make(map[string]struct{})
|
||||||
|
for rows.Next() {
|
||||||
|
var name string
|
||||||
|
if err := rows.Scan(&name); err != nil {
|
||||||
|
return errorsV2.Wrapf(err, errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to scan resource keys")
|
||||||
|
}
|
||||||
|
validKeys[name] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find invalid keys
|
||||||
|
var invalidKeys []string
|
||||||
|
for _, key := range allKeys {
|
||||||
|
if _, exists := validKeys[key]; !exists {
|
||||||
|
invalidKeys = append(invalidKeys, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(invalidKeys) > 0 {
|
||||||
|
return errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "invalid resource keys found: %v. Please check logs_resource_keys table for valid keys", invalidKeys)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetTTL sets the TTL for traces or metrics or logs tables.
|
// SetTTL sets the TTL for traces or metrics or logs tables.
|
||||||
// This is an async API which creates goroutines to set TTL.
|
// This is an async API which creates goroutines to set TTL.
|
||||||
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
// Status of TTL update is tracked with ttl_status table in sqlite db.
|
||||||
|
|||||||
@ -541,6 +541,8 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
|||||||
router.HandleFunc("/api/v1/dependency_graph", am.ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/dependency_graph", am.ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/settings/ttl", am.AdminAccess(aH.setTTL)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/settings/ttl", am.AdminAccess(aH.setTTL)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/settings/ttl", am.ViewAccess(aH.getTTL)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/settings/ttl", am.ViewAccess(aH.getTTL)).Methods(http.MethodGet)
|
||||||
|
router.HandleFunc("/api/v2/settings/ttl", am.AdminAccess(aH.setCustomRetentionTTL)).Methods(http.MethodPost)
|
||||||
|
router.HandleFunc("/api/v2/settings/ttl", am.ViewAccess(aH.getCustomRetentionTTL)).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/settings/apdex", am.AdminAccess(aH.Signoz.Handlers.Apdex.Set)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/settings/apdex", am.AdminAccess(aH.Signoz.Handlers.Apdex.Set)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/settings/apdex", am.ViewAccess(aH.Signoz.Handlers.Apdex.Get)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/settings/apdex", am.ViewAccess(aH.Signoz.Handlers.Apdex.Get)).Methods(http.MethodGet)
|
||||||
|
|
||||||
@ -1930,6 +1932,47 @@ func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) setCustomRetentionTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
claims, errv2 := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if errv2 != nil {
|
||||||
|
render.Error(w, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get org id from context"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var params model.CustomRetentionTTLParams
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil {
|
||||||
|
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, "Invalid data"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Context is not used here as TTL is long duration DB operation
|
||||||
|
result, apiErr := aH.reader.SetTTLV2(context.Background(), claims.OrgID, ¶ms)
|
||||||
|
if apiErr != nil {
|
||||||
|
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.WriteJSON(w, r, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) getCustomRetentionTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
claims, errv2 := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if errv2 != nil {
|
||||||
|
render.Error(w, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "failed to get org id from context"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, apiErr := aH.reader.GetCustomRetentionTTL(r.Context(), claims.OrgID)
|
||||||
|
if apiErr != nil {
|
||||||
|
render.Error(w, errorsV2.Newf(errorsV2.TypeInvalidInput, errorsV2.CodeInternal, apiErr.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aH.WriteJSON(w, r, result)
|
||||||
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getTTL(w http.ResponseWriter, r *http.Request) {
|
||||||
ttlParams, err := parseGetTTL(r)
|
ttlParams, err := parseGetTTL(r)
|
||||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||||
|
|||||||
@ -25,6 +25,7 @@ type Reader interface {
|
|||||||
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||||
|
|
||||||
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
|
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
|
||||||
|
GetCustomRetentionTTL(ctx context.Context, orgID string) (*model.GetCustomRetentionTTLResponse, error)
|
||||||
|
|
||||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||||
// clickhouse only.
|
// clickhouse only.
|
||||||
@ -47,6 +48,7 @@ type Reader interface {
|
|||||||
|
|
||||||
// Setter Interfaces
|
// Setter Interfaces
|
||||||
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
SetTTL(ctx context.Context, orgID string, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
|
||||||
|
SetTTLV2(ctx context.Context, orgID string, params *model.CustomRetentionTTLParams) (*model.CustomRetentionTTLResponse, error)
|
||||||
|
|
||||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||||
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
||||||
|
|||||||
@ -404,6 +404,43 @@ type TTLParams struct {
|
|||||||
DelDuration int64 // Seconds after which data will be deleted.
|
DelDuration int64 // Seconds after which data will be deleted.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CustomRetentionTTLParams struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
DefaultTTLDays int `json:"defaultTTLDays"`
|
||||||
|
TTLConditions []CustomRetentionRule `json:"ttlConditions"`
|
||||||
|
ColdStorageVolume string `json:"coldStorageVolume,omitempty"`
|
||||||
|
ToColdStorageDuration int64 `json:"coldStorageDuration,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CustomRetentionRule struct {
|
||||||
|
Filters []FilterCondition `json:"conditions"`
|
||||||
|
TTLDays int `json:"ttlDays"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type FilterCondition struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Values []string `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetCustomRetentionTTLResponse struct {
|
||||||
|
Version string `json:"version"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
|
||||||
|
// V1 fields
|
||||||
|
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
|
||||||
|
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
|
||||||
|
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
|
||||||
|
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
|
||||||
|
|
||||||
|
// V2 fields
|
||||||
|
DefaultTTLDays int `json:"default_ttl_days,omitempty"`
|
||||||
|
TTLConditions []CustomRetentionRule `json:"ttl_conditions,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CustomRetentionTTLResponse struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
type GetTTLParams struct {
|
type GetTTLParams struct {
|
||||||
Type string
|
Type string
|
||||||
}
|
}
|
||||||
|
|||||||
@ -131,6 +131,7 @@ func NewSQLMigrationProviderFactories(
|
|||||||
sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema),
|
sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema),
|
||||||
sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore),
|
sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore),
|
||||||
sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema),
|
sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema),
|
||||||
|
sqlmigration.NewUpdateTTLSettingForCustomRetentionFactory(sqlstore, sqlschema),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
77
pkg/sqlmigration/048_update_ttl_setting.go
Normal file
77
pkg/sqlmigration/048_update_ttl_setting.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
package sqlmigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
"github.com/uptrace/bun/migrate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type updateTTLSettingForCustomRetention struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
sqlschema sqlschema.SQLSchema
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUpdateTTLSettingForCustomRetentionFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("update_ttl_setting"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||||
|
return newUpdateTTLSettingForCustomRetention(ctx, providerSettings, config, sqlstore, sqlschema)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newUpdateTTLSettingForCustomRetention(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
|
||||||
|
return &updateTTLSettingForCustomRetention{
|
||||||
|
sqlstore: sqlstore,
|
||||||
|
sqlschema: sqlschema,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *updateTTLSettingForCustomRetention) Register(migrations *migrate.Migrations) error {
|
||||||
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *updateTTLSettingForCustomRetention) Up(ctx context.Context, db *bun.DB) error {
|
||||||
|
tx, err := db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
_ = tx.Rollback()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Get the table and its constraints
|
||||||
|
table, _, err := migration.sqlschema.GetTable(ctx, sqlschema.TableName("ttl_setting"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define the new column
|
||||||
|
column := &sqlschema.Column{
|
||||||
|
Name: sqlschema.ColumnName("condition"),
|
||||||
|
DataType: sqlschema.DataTypeText,
|
||||||
|
Nullable: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
sqls := migration.sqlschema.Operator().AddColumn(table, nil, column, nil)
|
||||||
|
for _, sql := range sqls {
|
||||||
|
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *updateTTLSettingForCustomRetention) Down(ctx context.Context, db *bun.DB) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@ -68,6 +68,7 @@ type TTLSetting struct {
|
|||||||
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
|
ColdStorageTTL int `bun:"cold_storage_ttl,notnull,default:0"`
|
||||||
Status string `bun:"status,type:text,notnull"`
|
Status string `bun:"status,type:text,notnull"`
|
||||||
OrgID string `json:"-" bun:"org_id,notnull"`
|
OrgID string `json:"-" bun:"org_id,notnull"`
|
||||||
|
Condition string `bun:"condition,type:text"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type OrganizationStore interface {
|
type OrganizationStore interface {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user