diff --git a/pkg/querier/postprocess.go b/pkg/querier/postprocess.go index af8dc001f715..d001d96ea87d 100644 --- a/pkg/querier/postprocess.go +++ b/pkg/querier/postprocess.go @@ -30,7 +30,7 @@ func getqueryInfo(spec any) queryInfo { case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]: return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval} case qbtypes.QueryBuilderFormula: - return queryInfo{Name: s.Name, Disabled: false} + return queryInfo{Name: s.Name, Disabled: s.Disabled} case qbtypes.PromQuery: return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.Step} case qbtypes.ClickHouseQuery: diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4b74113726c9..4007afc57a95 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "math" + "math/rand/v2" "net/http" "net/url" "regexp" @@ -29,6 +30,7 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/app/cloudintegrations/services" "github.com/SigNoz/signoz/pkg/query-service/app/integrations" "github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer" + "github.com/SigNoz/signoz/pkg/query-service/transition" "github.com/SigNoz/signoz/pkg/signoz" "github.com/SigNoz/signoz/pkg/valuer" "github.com/prometheus/prometheus/promql" @@ -64,6 +66,7 @@ import ( "github.com/SigNoz/signoz/pkg/types/licensetypes" "github.com/SigNoz/signoz/pkg/types/opamptypes" "github.com/SigNoz/signoz/pkg/types/pipelinetypes" + "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes" traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunneltypes" @@ -4877,6 +4880,45 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que Result: result, } + if rand.Float64() < (1.0/30.0) && + queryRangeParams.CompositeQuery.PanelType != v3.PanelTypeList && + queryRangeParams.CompositeQuery.PanelType != v3.PanelTypeTrace { + v4JSON, _ := json.Marshal(queryRangeParams) + func() { + defer func() { + if rr := recover(); rr != nil { + zap.L().Warn( + "unexpected panic while converting to v5", + zap.Any("panic", rr), + zap.String("v4_payload", string(v4JSON)), + ) + } + }() + v5Req, err := transition.ConvertV3ToV5(queryRangeParams) + if err != nil { + zap.L().Warn("unable to convert to v5 request payload", zap.Error(err), zap.String("v4_payload", string(v4JSON))) + return + } + v5ReqJSON, _ := json.Marshal(v5Req) + + v3Resp := v3.QueryRangeResponse{ + Result: result, + } + + v5Resp, err := transition.ConvertV3ResponseToV5(&v3Resp, querybuildertypesv5.RequestTypeTimeSeries) + if err != nil { + zap.L().Warn("unable to convert to v5 response payload", zap.Error(err)) + return + } + + v5RespJSON, _ := json.Marshal(v5Resp) + zap.L().Info("v5 request and expected response", + zap.String("request_payload", string(v5ReqJSON)), + zap.String("response_payload", string(v5RespJSON)), + ) + }() + } + aH.Respond(w, resp) } diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index a28c72040420..329fdd2b7613 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand/v2" "text/template" "time" @@ -15,6 +16,8 @@ import ( "github.com/SigNoz/signoz/pkg/query-service/contextlinks" "github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/postprocess" + "github.com/SigNoz/signoz/pkg/query-service/transition" + "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/valuer" @@ -52,6 +55,9 @@ type ThresholdRule struct { // used for attribute metadata enrichment for logs and traces logsKeys map[string]v3.AttributeKey spansKeys map[string]v3.AttributeKey + + // internal use + triggerCnt int } func NewThresholdRule( @@ -349,12 +355,53 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, return resultVector, nil } + shouldLog := false + for _, series := range queryResult.Series { smpl, shouldAlert := r.ShouldAlert(*series) if shouldAlert { + shouldLog = true resultVector = append(resultVector, smpl) } } + + if (shouldLog && r.triggerCnt < 100) || rand.Float64() < (1.0/30.0) { + func(ts time.Time) { + r.triggerCnt++ + defer func() { + if rr := recover(); rr != nil { + zap.L().Warn("unexpected panic while converting to v5", + zap.Any("panic", rr), + zap.String("ruleid", r.ID()), + ) + } + }() + v5Req, err := transition.ConvertV3ToV5(params) + if err != nil { + zap.L().Warn("unable to convert to v5 request payload", zap.Error(err), zap.String("ruleid", r.ID())) + return + } + v5ReqJSON, _ := json.Marshal(v5Req) + + v3Resp := v3.QueryRangeResponse{ + Result: results, + } + + v5Resp, err := transition.ConvertV3ResponseToV5(&v3Resp, querybuildertypesv5.RequestTypeTimeSeries) + if err != nil { + zap.L().Warn("unable to convert to v5 response payload", zap.Error(err), zap.String("ruleid", r.ID())) + return + } + + v5RespJSON, _ := json.Marshal(v5Resp) + zap.L().Info("v5 request and expected response for triggered alert", + zap.String("request_payload", string(v5ReqJSON)), + zap.String("response_payload", string(v5RespJSON)), + zap.String("ruleid", r.ID()), + ) + }(ts) + } + return resultVector, nil } diff --git a/pkg/query-service/transition/v3_to_v5_req.go b/pkg/query-service/transition/v3_to_v5_req.go new file mode 100644 index 000000000000..e4da53e89ec1 --- /dev/null +++ b/pkg/query-service/transition/v3_to_v5_req.go @@ -0,0 +1,683 @@ +package transition + +import ( + "fmt" + "strings" + "time" + + "github.com/SigNoz/signoz/pkg/types/metrictypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + + "github.com/SigNoz/signoz/pkg/query-service/constants" + v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + "github.com/SigNoz/signoz/pkg/query-service/utils" + + v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" +) + +func ConvertV3ToV5(params *v3.QueryRangeParamsV3) (*v5.QueryRangeRequest, error) { + v3Params := params.Clone() + + if v3Params == nil || v3Params.CompositeQuery == nil { + return nil, fmt.Errorf("v3 params or composite query is nil") + } + + varItems := map[string]v5.VariableItem{} + + for name, value := range v3Params.Variables { + varItems[name] = v5.VariableItem{ + Type: v5.QueryVariableType, // doesn't matter at the moment + Value: value, + } + } + + v5Request := &v5.QueryRangeRequest{ + SchemaVersion: "v5", + Start: uint64(v3Params.Start), + End: uint64(v3Params.End), + RequestType: convertPanelTypeToRequestType(v3Params.CompositeQuery.PanelType), + Variables: varItems, + CompositeQuery: v5.CompositeQuery{ + Queries: []v5.QueryEnvelope{}, + }, + FormatOptions: &v5.FormatOptions{ + FormatTableResultForUI: v3Params.FormatForWeb, + FillGaps: v3Params.CompositeQuery.FillGaps, + }, + } + + // Convert based on query type + switch v3Params.CompositeQuery.QueryType { + case v3.QueryTypeBuilder: + if err := convertBuilderQueries(v3Params.CompositeQuery.BuilderQueries, &v5Request.CompositeQuery); err != nil { + return nil, err + } + case v3.QueryTypeClickHouseSQL: + if err := convertClickHouseQueries(v3Params.CompositeQuery.ClickHouseQueries, &v5Request.CompositeQuery); err != nil { + return nil, err + } + case v3.QueryTypePromQL: + if err := convertPromQueries(v3Params.CompositeQuery.PromQueries, v3Params.Step, &v5Request.CompositeQuery); err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unsupported query type: %s", v3Params.CompositeQuery.QueryType) + } + + return v5Request, nil +} + +func convertPanelTypeToRequestType(panelType v3.PanelType) v5.RequestType { + switch panelType { + case v3.PanelTypeValue, v3.PanelTypeTable: + return v5.RequestTypeScalar + case v3.PanelTypeGraph: + return v5.RequestTypeTimeSeries + case v3.PanelTypeList, v3.PanelTypeTrace: + return v5.RequestTypeRaw + default: + return v5.RequestTypeUnknown + } +} + +func convertBuilderQueries(v3Queries map[string]*v3.BuilderQuery, v5Composite *v5.CompositeQuery) error { + for name, query := range v3Queries { + if query == nil { + continue + } + + // Handle formula queries + if query.Expression != "" && query.Expression != name { + v5Envelope := v5.QueryEnvelope{ + Type: v5.QueryTypeFormula, + Spec: v5.QueryBuilderFormula{ + Name: name, + Expression: query.Expression, + Disabled: query.Disabled, + Order: convertOrderBy(query.OrderBy, query), + Limit: int(query.Limit), + Having: convertHaving(query.Having, query), + Functions: convertFunctions(query.Functions), + }, + } + v5Composite.Queries = append(v5Composite.Queries, v5Envelope) + continue + } + + // Regular builder query + envelope, err := convertSingleBuilderQuery(name, query) + if err != nil { + return err + } + v5Composite.Queries = append(v5Composite.Queries, envelope) + } + return nil +} + +func convertSingleBuilderQuery(name string, v3Query *v3.BuilderQuery) (v5.QueryEnvelope, error) { + v5Envelope := v5.QueryEnvelope{ + Type: v5.QueryTypeBuilder, + } + + switch v3Query.DataSource { + case v3.DataSourceTraces: + v5Query := v5.QueryBuilderQuery[v5.TraceAggregation]{ + Name: name, + Signal: telemetrytypes.SignalTraces, + Disabled: v3Query.Disabled, + StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second}, + Filter: convertFilter(v3Query.Filters), + GroupBy: convertGroupBy(v3Query.GroupBy), + Order: convertOrderBy(v3Query.OrderBy, v3Query), + Limit: int(v3Query.Limit), + Offset: int(v3Query.Offset), + Having: convertHaving(v3Query.Having, v3Query), + Functions: convertFunctions(v3Query.Functions), + SelectFields: convertSelectColumns(v3Query.SelectColumns), + } + + // Convert trace aggregations + if v3Query.AggregateOperator != v3.AggregateOperatorNoOp { + v5Query.Aggregations = []v5.TraceAggregation{ + { + Expression: buildTraceAggregationExpression(v3Query), + Alias: "", + }, + } + } + + v5Envelope.Spec = v5Query + + case v3.DataSourceLogs: + v5Query := v5.QueryBuilderQuery[v5.LogAggregation]{ + Name: name, + Signal: telemetrytypes.SignalLogs, + Disabled: v3Query.Disabled, + StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second}, + Filter: convertFilter(v3Query.Filters), + GroupBy: convertGroupBy(v3Query.GroupBy), + Order: convertOrderBy(v3Query.OrderBy, v3Query), + Limit: int(v3Query.PageSize), + Offset: int(v3Query.Offset), + Having: convertHaving(v3Query.Having, v3Query), + Functions: convertFunctions(v3Query.Functions), + } + + // Convert log aggregations + if v3Query.AggregateOperator != v3.AggregateOperatorNoOp { + v5Query.Aggregations = []v5.LogAggregation{ + { + Expression: buildLogAggregationExpression(v3Query), + Alias: "", + }, + } + } + + v5Envelope.Spec = v5Query + + case v3.DataSourceMetrics: + v5Query := v5.QueryBuilderQuery[v5.MetricAggregation]{ + Name: name, + Signal: telemetrytypes.SignalMetrics, + Disabled: v3Query.Disabled, + StepInterval: v5.Step{Duration: time.Duration(v3Query.StepInterval) * time.Second}, + Filter: convertFilter(v3Query.Filters), + GroupBy: convertGroupBy(v3Query.GroupBy), + Order: convertOrderBy(v3Query.OrderBy, v3Query), + Limit: int(v3Query.Limit), + Offset: int(v3Query.Offset), + Having: convertHaving(v3Query.Having, v3Query), + Functions: convertFunctions(v3Query.Functions), + } + + if v3Query.AggregateAttribute.Key != "" { + v5Query.Aggregations = []v5.MetricAggregation{ + { + MetricName: v3Query.AggregateAttribute.Key, + Temporality: convertTemporality(v3Query.Temporality), + TimeAggregation: convertTimeAggregation(v3Query.TimeAggregation), + SpaceAggregation: convertSpaceAggregation(v3Query.SpaceAggregation), + }, + } + } + + v5Envelope.Spec = v5Query + + default: + return v5Envelope, fmt.Errorf("unsupported data source: %s", v3Query.DataSource) + } + + return v5Envelope, nil +} + +func buildTraceAggregationExpression(v3Query *v3.BuilderQuery) string { + switch v3Query.AggregateOperator { + case v3.AggregateOperatorCount: + if v3Query.AggregateAttribute.Key != "" { + return fmt.Sprintf("count(%s)", v3Query.AggregateAttribute.Key) + } + return "count()" + case v3.AggregateOperatorCountDistinct: + if v3Query.AggregateAttribute.Key != "" { + return fmt.Sprintf("countDistinct(%s)", v3Query.AggregateAttribute.Key) + } + return "countDistinct()" + case v3.AggregateOperatorSum: + return fmt.Sprintf("sum(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorAvg: + return fmt.Sprintf("avg(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorMin: + return fmt.Sprintf("min(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorMax: + return fmt.Sprintf("max(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP05: + return fmt.Sprintf("p05(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP10: + return fmt.Sprintf("p10(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP20: + return fmt.Sprintf("p20(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP25: + return fmt.Sprintf("p25(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP50: + return fmt.Sprintf("p50(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP75: + return fmt.Sprintf("p75(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP90: + return fmt.Sprintf("p90(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP95: + return fmt.Sprintf("p95(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorP99: + return fmt.Sprintf("p99(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorRate: + return "rate()" + case v3.AggregateOperatorRateSum: + return fmt.Sprintf("rate_sum(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorRateAvg: + return fmt.Sprintf("rate_avg(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorRateMin: + return fmt.Sprintf("rate_min(%s)", v3Query.AggregateAttribute.Key) + case v3.AggregateOperatorRateMax: + return fmt.Sprintf("rate_max(%s)", v3Query.AggregateAttribute.Key) + default: + return "count()" + } +} + +func buildLogAggregationExpression(v3Query *v3.BuilderQuery) string { + // Similar to traces + return buildTraceAggregationExpression(v3Query) +} + +func convertFilter(v3Filter *v3.FilterSet) *v5.Filter { + if v3Filter == nil || len(v3Filter.Items) == 0 { + return nil + } + + expressions := []string{} + for _, item := range v3Filter.Items { + expr := buildFilterExpression(item) + if expr != "" { + expressions = append(expressions, expr) + } + } + + if len(expressions) == 0 { + return nil + } + + operator := "AND" + if v3Filter.Operator == "OR" { + operator = "OR" + } + + return &v5.Filter{ + Expression: strings.Join(expressions, fmt.Sprintf(" %s ", operator)), + } +} + +func buildFilterExpression(item v3.FilterItem) string { + key := item.Key.Key + value := item.Value + + switch item.Operator { + case v3.FilterOperatorEqual: + return fmt.Sprintf("%s = %s", key, formatValue(value)) + case v3.FilterOperatorNotEqual: + return fmt.Sprintf("%s != %s", key, formatValue(value)) + case v3.FilterOperatorGreaterThan: + return fmt.Sprintf("%s > %s", key, formatValue(value)) + case v3.FilterOperatorGreaterThanOrEq: + return fmt.Sprintf("%s >= %s", key, formatValue(value)) + case v3.FilterOperatorLessThan: + return fmt.Sprintf("%s < %s", key, formatValue(value)) + case v3.FilterOperatorLessThanOrEq: + return fmt.Sprintf("%s <= %s", key, formatValue(value)) + case v3.FilterOperatorIn: + return fmt.Sprintf("%s IN %s", key, formatValue(value)) + case v3.FilterOperatorNotIn: + return fmt.Sprintf("%s NOT IN %s", key, formatValue(value)) + case v3.FilterOperatorContains: + return fmt.Sprintf("%s LIKE '%%%v%%'", key, value) + case v3.FilterOperatorNotContains: + return fmt.Sprintf("%s NOT LIKE '%%%v%%'", key, value) + case v3.FilterOperatorRegex: + return fmt.Sprintf("%s REGEXP %s", key, formatValue(value)) + case v3.FilterOperatorNotRegex: + return fmt.Sprintf("%s NOT REGEXP %s", key, formatValue(value)) + case v3.FilterOperatorExists: + return fmt.Sprintf("%s EXISTS", key) + case v3.FilterOperatorNotExists: + return fmt.Sprintf("%s NOT EXISTS", key) + default: + return "" + } +} + +func formatValue(value interface{}) string { + return utils.ClickHouseFormattedValue(value) +} + +func convertGroupBy(v3GroupBy []v3.AttributeKey) []v5.GroupByKey { + v5GroupBy := []v5.GroupByKey{} + for _, key := range v3GroupBy { + v5GroupBy = append(v5GroupBy, v5.GroupByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: key.Key, + FieldDataType: convertDataType(key.DataType), + FieldContext: convertAttributeType(key.Type), + Materialized: key.IsColumn, + }, + }) + } + return v5GroupBy +} + +func convertOrderBy(v3OrderBy []v3.OrderBy, v3Query *v3.BuilderQuery) []v5.OrderBy { + v5OrderBy := []v5.OrderBy{} + for _, order := range v3OrderBy { + direction := v5.OrderDirectionAsc + if order.Order == v3.DirectionDesc { + direction = v5.OrderDirectionDesc + } + + var orderByName string + if order.ColumnName == "#SIGNOZ_VALUE" { + if v3Query.DataSource == v3.DataSourceLogs || v3Query.DataSource == v3.DataSourceTraces { + orderByName = buildTraceAggregationExpression(v3Query) + } else { + if v3Query.Expression != v3Query.QueryName { + orderByName = v3Query.Expression + } else { + orderByName = fmt.Sprintf("%s(%s)", v3Query.SpaceAggregation, v3Query.AggregateAttribute.Key) + } + } + } else { + orderByName = order.ColumnName + } + + v5OrderBy = append(v5OrderBy, v5.OrderBy{ + Key: v5.OrderByKey{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: orderByName, + Materialized: order.IsColumn, + }, + }, + Direction: direction, + }) + } + return v5OrderBy +} + +func convertHaving(v3Having []v3.Having, v3Query *v3.BuilderQuery) *v5.Having { + if len(v3Having) == 0 { + return nil + } + + expressions := []string{} + for _, h := range v3Having { + var expr string + + if v3Query.DataSource == v3.DataSourceLogs || v3Query.DataSource == v3.DataSourceTraces { + h.ColumnName = buildTraceAggregationExpression(v3Query) + } else { + if v3Query.Expression != v3Query.QueryName { + h.ColumnName = v3Query.Expression + } else { + h.ColumnName = fmt.Sprintf("%s(%s)", v3Query.SpaceAggregation, v3Query.AggregateAttribute.Key) + } + } + expr = buildHavingExpression(h) + + if expr != "" { + expressions = append(expressions, expr) + } + } + + if len(expressions) == 0 { + return nil + } + + return &v5.Having{ + Expression: strings.Join(expressions, " AND "), + } +} + +func buildHavingExpression(having v3.Having) string { + + switch having.Operator { + case v3.HavingOperatorEqual: + return fmt.Sprintf("%s = %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorNotEqual: + return fmt.Sprintf("%s != %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorGreaterThan: + return fmt.Sprintf("%s > %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorGreaterThanOrEq: + return fmt.Sprintf("%s >= %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorLessThan: + return fmt.Sprintf("%s < %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorLessThanOrEq: + return fmt.Sprintf("%s <= %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorIn: + return fmt.Sprintf("%s IN %s", having.ColumnName, formatValue(having.Value)) + case v3.HavingOperatorNotIn: + return fmt.Sprintf("%s NOT IN %s", having.ColumnName, formatValue(having.Value)) + default: + return "" + } +} + +func convertFunctions(v3Functions []v3.Function) []v5.Function { + v5Functions := []v5.Function{} + for _, fn := range v3Functions { + v5Fn := v5.Function{ + Name: convertFunctionName(fn.Name), + Args: []v5.FunctionArg{}, + } + + for _, arg := range fn.Args { + v5Fn.Args = append(v5Fn.Args, v5.FunctionArg{ + Value: arg, + }) + } + + for name, value := range fn.NamedArgs { + v5Fn.Args = append(v5Fn.Args, v5.FunctionArg{ + Name: name, + Value: value, + }) + } + + v5Functions = append(v5Functions, v5Fn) + } + return v5Functions +} + +func convertFunctionName(v3Name v3.FunctionName) v5.FunctionName { + switch v3Name { + case v3.FunctionNameCutOffMin: + return v5.FunctionNameCutOffMin + case v3.FunctionNameCutOffMax: + return v5.FunctionNameCutOffMax + case v3.FunctionNameClampMin: + return v5.FunctionNameClampMin + case v3.FunctionNameClampMax: + return v5.FunctionNameClampMax + case v3.FunctionNameAbsolute: + return v5.FunctionNameAbsolute + case v3.FunctionNameRunningDiff: + return v5.FunctionNameRunningDiff + case v3.FunctionNameLog2: + return v5.FunctionNameLog2 + case v3.FunctionNameLog10: + return v5.FunctionNameLog10 + case v3.FunctionNameCumSum: + return v5.FunctionNameCumulativeSum + case v3.FunctionNameEWMA3: + return v5.FunctionNameEWMA3 + case v3.FunctionNameEWMA5: + return v5.FunctionNameEWMA5 + case v3.FunctionNameEWMA7: + return v5.FunctionNameEWMA7 + case v3.FunctionNameMedian3: + return v5.FunctionNameMedian3 + case v3.FunctionNameMedian5: + return v5.FunctionNameMedian5 + case v3.FunctionNameMedian7: + return v5.FunctionNameMedian7 + case v3.FunctionNameTimeShift: + return v5.FunctionNameTimeShift + case v3.FunctionNameAnomaly: + return v5.FunctionNameAnomaly + default: + return v5.FunctionName{} + } +} + +func convertSelectColumns(cols []v3.AttributeKey) []telemetrytypes.TelemetryFieldKey { + fields := []telemetrytypes.TelemetryFieldKey{} + + for _, key := range cols { + newKey := telemetrytypes.TelemetryFieldKey{ + Name: key.Key, + } + + if _, exists := constants.NewStaticFieldsTraces[key.Key]; exists { + fields = append(fields, newKey) + continue + } + + if _, exists := constants.DeprecatedStaticFieldsTraces[key.Key]; exists { + fields = append(fields, newKey) + continue + } + + if _, exists := constants.StaticFieldsLogsV3[key.Key]; exists { + fields = append(fields, newKey) + continue + } + + newKey.FieldDataType = convertDataType(key.DataType) + newKey.FieldContext = convertAttributeType(key.Type) + newKey.Materialized = key.IsColumn + } + return fields +} + +func convertDataType(v3Type v3.AttributeKeyDataType) telemetrytypes.FieldDataType { + switch v3Type { + case v3.AttributeKeyDataTypeString: + return telemetrytypes.FieldDataTypeString + case v3.AttributeKeyDataTypeInt64: + return telemetrytypes.FieldDataTypeInt64 + case v3.AttributeKeyDataTypeFloat64: + return telemetrytypes.FieldDataTypeFloat64 + case v3.AttributeKeyDataTypeBool: + return telemetrytypes.FieldDataTypeBool + case v3.AttributeKeyDataTypeArrayString: + return telemetrytypes.FieldDataTypeArrayString + case v3.AttributeKeyDataTypeArrayInt64: + return telemetrytypes.FieldDataTypeArrayInt64 + case v3.AttributeKeyDataTypeArrayFloat64: + return telemetrytypes.FieldDataTypeArrayFloat64 + case v3.AttributeKeyDataTypeArrayBool: + return telemetrytypes.FieldDataTypeArrayBool + default: + return telemetrytypes.FieldDataTypeUnspecified + } +} + +func convertAttributeType(v3Type v3.AttributeKeyType) telemetrytypes.FieldContext { + switch v3Type { + case v3.AttributeKeyTypeTag: + return telemetrytypes.FieldContextAttribute + case v3.AttributeKeyTypeResource: + return telemetrytypes.FieldContextResource + case v3.AttributeKeyTypeInstrumentationScope: + return telemetrytypes.FieldContextScope + default: + return telemetrytypes.FieldContextUnspecified + } +} + +func convertTemporality(v3Temp v3.Temporality) metrictypes.Temporality { + switch v3Temp { + case v3.Delta: + return metrictypes.Delta + case v3.Cumulative: + return metrictypes.Cumulative + default: + return metrictypes.Unspecified + } +} + +func convertTimeAggregation(v3TimeAgg v3.TimeAggregation) metrictypes.TimeAggregation { + switch v3TimeAgg { + case v3.TimeAggregationAnyLast: + return metrictypes.TimeAggregationLatest + case v3.TimeAggregationSum: + return metrictypes.TimeAggregationSum + case v3.TimeAggregationAvg: + return metrictypes.TimeAggregationAvg + case v3.TimeAggregationMin: + return metrictypes.TimeAggregationMin + case v3.TimeAggregationMax: + return metrictypes.TimeAggregationMax + case v3.TimeAggregationCount: + return metrictypes.TimeAggregationCount + case v3.TimeAggregationCountDistinct: + return metrictypes.TimeAggregationCountDistinct + case v3.TimeAggregationRate: + return metrictypes.TimeAggregationRate + case v3.TimeAggregationIncrease: + return metrictypes.TimeAggregationIncrease + default: + return metrictypes.TimeAggregationUnspecified + } +} + +func convertSpaceAggregation(v3SpaceAgg v3.SpaceAggregation) metrictypes.SpaceAggregation { + switch v3SpaceAgg { + case v3.SpaceAggregationSum: + return metrictypes.SpaceAggregationSum + case v3.SpaceAggregationAvg: + return metrictypes.SpaceAggregationAvg + case v3.SpaceAggregationMin: + return metrictypes.SpaceAggregationMin + case v3.SpaceAggregationMax: + return metrictypes.SpaceAggregationMax + case v3.SpaceAggregationCount: + return metrictypes.SpaceAggregationCount + case v3.SpaceAggregationPercentile50: + return metrictypes.SpaceAggregationPercentile50 + case v3.SpaceAggregationPercentile75: + return metrictypes.SpaceAggregationPercentile75 + case v3.SpaceAggregationPercentile90: + return metrictypes.SpaceAggregationPercentile90 + case v3.SpaceAggregationPercentile95: + return metrictypes.SpaceAggregationPercentile95 + case v3.SpaceAggregationPercentile99: + return metrictypes.SpaceAggregationPercentile99 + default: + return metrictypes.SpaceAggregationUnspecified + } +} + +func convertClickHouseQueries(v3Queries map[string]*v3.ClickHouseQuery, v5Composite *v5.CompositeQuery) error { + for name, query := range v3Queries { + if query == nil { + continue + } + + v5Envelope := v5.QueryEnvelope{ + Type: v5.QueryTypeClickHouseSQL, + Spec: v5.ClickHouseQuery{ + Name: name, + Query: query.Query, + Disabled: query.Disabled, + }, + } + v5Composite.Queries = append(v5Composite.Queries, v5Envelope) + } + return nil +} + +func convertPromQueries(v3Queries map[string]*v3.PromQuery, step int64, v5Composite *v5.CompositeQuery) error { + for name, query := range v3Queries { + if query == nil { + continue + } + + v5Envelope := v5.QueryEnvelope{ + Type: v5.QueryTypePromQL, + Spec: v5.PromQuery{ + Name: name, + Query: query.Query, + Disabled: query.Disabled, + Step: v5.Step{Duration: time.Duration(step) * time.Second}, + Stats: query.Stats != "", + }, + } + v5Composite.Queries = append(v5Composite.Queries, v5Envelope) + } + return nil +} diff --git a/pkg/query-service/transition/v3_to_v5_resp.go b/pkg/query-service/transition/v3_to_v5_resp.go new file mode 100644 index 000000000000..31dd7354e6ef --- /dev/null +++ b/pkg/query-service/transition/v3_to_v5_resp.go @@ -0,0 +1,442 @@ +package transition + +import ( + "encoding/json" + "fmt" + "sort" + "strings" + + v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +func ConvertV3ResponseToV5(v3Response *v3.QueryRangeResponse, requestType v5.RequestType) (*v5.QueryRangeResponse, error) { + if v3Response == nil { + return nil, fmt.Errorf("v3 response is nil") + } + + v5Response := &v5.QueryRangeResponse{ + Type: requestType, + } + + switch requestType { + case v5.RequestTypeTimeSeries: + data, err := convertToTimeSeriesData(v3Response.Result) + if err != nil { + return nil, err + } + v5Response.Data = data + + case v5.RequestTypeScalar: + data, err := convertToScalarData(v3Response.Result) + if err != nil { + return nil, err + } + v5Response.Data = data + + case v5.RequestTypeRaw: + data, err := convertToRawData(v3Response.Result) + if err != nil { + return nil, err + } + v5Response.Data = data + + default: + return nil, fmt.Errorf("unsupported request type: %v", requestType) + } + + return v5Response, nil +} + +func convertToTimeSeriesData(v3Results []*v3.Result) ([]*v5.TimeSeriesData, error) { + v5Data := []*v5.TimeSeriesData{} + + for _, result := range v3Results { + if result == nil { + continue + } + + tsData := &v5.TimeSeriesData{ + QueryName: result.QueryName, + Aggregations: []*v5.AggregationBucket{}, + } + + if len(result.Series) > 0 { + bucket := &v5.AggregationBucket{ + Index: 0, + Alias: "", + Series: convertSeries(result.Series), + } + tsData.Aggregations = append(tsData.Aggregations, bucket) + } + + v5Data = append(v5Data, tsData) + } + + return v5Data, nil +} + +func convertSeries(v3Series []*v3.Series) []*v5.TimeSeries { + v5Series := []*v5.TimeSeries{} + + for _, series := range v3Series { + if series == nil { + continue + } + + v5TimeSeries := &v5.TimeSeries{ + Labels: convertLabels(series.Labels), + Values: convertPoints(series.Points), + } + + v5Series = append(v5Series, v5TimeSeries) + } + + return v5Series +} + +func convertLabels(v3Labels map[string]string) []*v5.Label { + v5Labels := []*v5.Label{} + + keys := make([]string, 0, len(v3Labels)) + for k := range v3Labels { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, key := range keys { + v5Labels = append(v5Labels, &v5.Label{ + Key: telemetrytypes.TelemetryFieldKey{ + Name: key, + }, + Value: v3Labels[key], + }) + } + + return v5Labels +} + +func convertPoints(v3Points []v3.Point) []*v5.TimeSeriesValue { + v5Values := []*v5.TimeSeriesValue{} + + for _, point := range v3Points { + v5Values = append(v5Values, &v5.TimeSeriesValue{ + Timestamp: point.Timestamp, + Value: point.Value, + }) + } + + return v5Values +} + +func convertToScalarData(v3Results []*v3.Result) (*v5.ScalarData, error) { + scalarData := &v5.ScalarData{ + Columns: []*v5.ColumnDescriptor{}, + Data: [][]any{}, + } + + for _, result := range v3Results { + if result == nil || result.Table == nil { + continue + } + + for _, col := range result.Table.Columns { + columnType := v5.ColumnTypeGroup + if col.IsValueColumn { + columnType = v5.ColumnTypeAggregation + } + + scalarData.Columns = append(scalarData.Columns, &v5.ColumnDescriptor{ + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: col.Name, + }, + QueryName: col.QueryName, + AggregationIndex: 0, + Type: columnType, + }) + } + + for _, row := range result.Table.Rows { + rowData := []any{} + for _, col := range result.Table.Columns { + if val, ok := row.Data[col.Name]; ok { + rowData = append(rowData, val) + } else { + rowData = append(rowData, nil) + } + } + scalarData.Data = append(scalarData.Data, rowData) + } + } + + return scalarData, nil +} + +func convertToRawData(v3Results []*v3.Result) ([]*v5.RawData, error) { + v5Data := []*v5.RawData{} + + for _, result := range v3Results { + if result == nil { + continue + } + + rawData := &v5.RawData{ + QueryName: result.QueryName, + NextCursor: "", + Rows: []*v5.RawRow{}, + } + + for _, row := range result.List { + if row == nil { + continue + } + + dataMap := make(map[string]*any) + for k, v := range row.Data { + val := v + dataMap[k] = &val + } + + rawData.Rows = append(rawData.Rows, &v5.RawRow{ + Timestamp: row.Timestamp, + Data: dataMap, + }) + } + + v5Data = append(v5Data, rawData) + } + + return v5Data, nil +} + +func LogV5Response(response *v5.QueryRangeResponse, logger func(string)) { + if response == nil { + logger("Response: nil") + return + } + + logger(fmt.Sprintf("[%s] Meta{rows:%d bytes:%d ms:%d}", + response.Type, response.Meta.RowsScanned, response.Meta.BytesScanned, response.Meta.DurationMS)) + + switch response.Type { + case v5.RequestTypeTimeSeries: + logTimeSeriesDataCompact(response.Data, logger) + case v5.RequestTypeScalar: + logScalarDataCompact(response.Data, logger) + case v5.RequestTypeRaw: + logRawDataCompact(response.Data, logger) + default: + logger(fmt.Sprintf("Unknown response type: %v", response.Type)) + } +} + +func logTimeSeriesDataCompact(data any, logger func(string)) { + tsData, ok := data.([]*v5.TimeSeriesData) + if !ok { + logger("ERROR: Failed to cast data to TimeSeriesData") + return + } + + sort.Slice(tsData, func(i, j int) bool { + return tsData[i].QueryName < tsData[j].QueryName + }) + + for _, ts := range tsData { + allSeries := flattenSeries(ts.Aggregations) + + sort.Slice(allSeries, func(i, j int) bool { + return createLabelSignature(allSeries[i].Labels) < createLabelSignature(allSeries[j].Labels) + }) + + for _, series := range allSeries { + labels := []string{} + for _, label := range series.Labels { + labels = append(labels, fmt.Sprintf("%s:%v", label.Key.Name, label.Value)) + } + labelStr := strings.Join(labels, ",") + + values := make([]*v5.TimeSeriesValue, len(series.Values)) + copy(values, series.Values) + sort.Slice(values, func(i, j int) bool { + return values[i].Timestamp < values[j].Timestamp + }) + + valueStrs := []string{} + for _, val := range values { + relTime := val.Timestamp + if len(values) > 0 && values[0].Timestamp > 0 { + relTime = (val.Timestamp - values[0].Timestamp) / 1000 // Convert to seconds + } + valueStrs = append(valueStrs, fmt.Sprintf("%d:%.2f", relTime, val.Value)) + } + + logger(fmt.Sprintf("%s {%s} [%s]", ts.QueryName, labelStr, strings.Join(valueStrs, " "))) + } + } +} + +func createLabelSignature(labels []*v5.Label) string { + parts := []string{} + for _, label := range labels { + parts = append(parts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value)) + } + sort.Strings(parts) + return strings.Join(parts, ",") +} + +func logScalarDataCompact(data any, logger func(string)) { + scalar, ok := data.(*v5.ScalarData) + if !ok { + logger("ERROR: Failed to cast data to ScalarData") + return + } + + colNames := []string{} + for _, col := range scalar.Columns { + colNames = append(colNames, col.Name) + } + + logger(fmt.Sprintf("SCALAR [%s]", strings.Join(colNames, "|"))) + + for i, row := range scalar.Data { + rowVals := []string{} + for _, val := range row { + rowVals = append(rowVals, fmt.Sprintf("%v", val)) + } + logger(fmt.Sprintf(" %d: [%s]", i, strings.Join(rowVals, "|"))) + } +} + +func flattenSeries(buckets []*v5.AggregationBucket) []*v5.TimeSeries { + var allSeries []*v5.TimeSeries + for _, bucket := range buckets { + allSeries = append(allSeries, bucket.Series...) + } + return allSeries +} + +func logRawDataCompact(data any, logger func(string)) { + rawData, ok := data.([]*v5.RawData) + if !ok { + logger("ERROR: Failed to cast data to RawData") + return + } + + sort.Slice(rawData, func(i, j int) bool { + return rawData[i].QueryName < rawData[j].QueryName + }) + + for _, rd := range rawData { + logger(fmt.Sprintf("RAW %s (rows:%d cursor:%s)", rd.QueryName, len(rd.Rows), rd.NextCursor)) + + rows := make([]*v5.RawRow, len(rd.Rows)) + copy(rows, rd.Rows) + sort.Slice(rows, func(i, j int) bool { + return rows[i].Timestamp.Before(rows[j].Timestamp) + }) + + allFields := make(map[string]bool) + for _, row := range rows { + for k := range row.Data { + allFields[k] = true + } + } + + fieldNames := []string{} + for k := range allFields { + fieldNames = append(fieldNames, k) + } + sort.Strings(fieldNames) + + logger(fmt.Sprintf(" Fields: [%s]", strings.Join(fieldNames, "|"))) + + for i, row := range rows { + vals := []string{} + for _, field := range fieldNames { + if val, exists := row.Data[field]; exists && val != nil { + vals = append(vals, fmt.Sprintf("%v", *val)) + } else { + vals = append(vals, "-") + } + } + tsStr := row.Timestamp.Format("15:04:05") + logger(fmt.Sprintf(" %d@%s: [%s]", i, tsStr, strings.Join(vals, "|"))) + } + } +} + +func LogV5ResponseJSON(response *v5.QueryRangeResponse, logger func(string)) { + sortedResponse := sortV5ResponseForLogging(response) + + jsonBytes, err := json.MarshalIndent(sortedResponse, "", " ") + if err != nil { + logger(fmt.Sprintf("ERROR: Failed to marshal response: %v", err)) + return + } + + logger(string(jsonBytes)) +} + +func sortV5ResponseForLogging(response *v5.QueryRangeResponse) *v5.QueryRangeResponse { + if response == nil { + return nil + } + + responseCopy := &v5.QueryRangeResponse{ + Type: response.Type, + Meta: response.Meta, + } + + switch response.Type { + case v5.RequestTypeTimeSeries: + if tsData, ok := response.Data.([]*v5.TimeSeriesData); ok { + sortedData := make([]*v5.TimeSeriesData, len(tsData)) + for i, ts := range tsData { + sortedData[i] = &v5.TimeSeriesData{ + QueryName: ts.QueryName, + Aggregations: make([]*v5.AggregationBucket, len(ts.Aggregations)), + } + + for j, bucket := range ts.Aggregations { + sortedBucket := &v5.AggregationBucket{ + Index: bucket.Index, + Alias: bucket.Alias, + Series: make([]*v5.TimeSeries, len(bucket.Series)), + } + + for k, series := range bucket.Series { + sortedSeries := &v5.TimeSeries{ + Labels: series.Labels, + Values: make([]*v5.TimeSeriesValue, len(series.Values)), + } + copy(sortedSeries.Values, series.Values) + + sort.Slice(sortedSeries.Values, func(i, j int) bool { + return sortedSeries.Values[i].Timestamp < sortedSeries.Values[j].Timestamp + }) + + sortedBucket.Series[k] = sortedSeries + } + + sort.Slice(sortedBucket.Series, func(i, j int) bool { + return createLabelSignature(sortedBucket.Series[i].Labels) < + createLabelSignature(sortedBucket.Series[j].Labels) + }) + + sortedData[i].Aggregations[j] = sortedBucket + } + } + + sort.Slice(sortedData, func(i, j int) bool { + return sortedData[i].QueryName < sortedData[j].QueryName + }) + + responseCopy.Data = sortedData + } + default: + responseCopy.Data = response.Data + } + + return responseCopy +} diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 8b00054c639b..5198a3d98bc2 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -417,14 +417,17 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto } else { fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%")) } + fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%")) - if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified { - fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType())) - } + // note: type and datatype do not have much significance in metrics - if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType())) - } + // if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified { + // fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType())) + // } + + // if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { + // fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType())) + // } if fieldKeySelector.MetricContext != nil { fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName)) @@ -966,18 +969,15 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa // Note: The columns are mixed in the current data - temporality column contains metric_name // and metric_name column contains temporality value, so we use the correct mapping sb := sqlbuilder.Select( - "temporality as metric_name", - "argMax(attr_string_value, last_reported_unix_milli) as temporality_value", + "metric_name", + "argMax(temporality, last_reported_unix_milli) as temporality", ).From(t.metricsDBName + "." + t.metricsFieldsTblName) // Filter by metric names (in the temporality column due to data mix-up) - sb.Where(sb.In("temporality", metricNames)) - - // Only fetch temporality metadata rows (where attr_name = '__temporality__') - sb.Where(sb.E("attr_name", "__temporality__")) + sb.Where(sb.In("metric_name", metricNames)) // Group by metric name to get one temporality per metric - sb.GroupBy("temporality") + sb.GroupBy("metric_name") query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) diff --git a/pkg/telemetrytraces/const.go b/pkg/telemetrytraces/const.go index f49da5ac4228..d961016a88a5 100644 --- a/pkg/telemetrytraces/const.go +++ b/pkg/telemetrytraces/const.go @@ -1,5 +1,7 @@ package telemetrytraces +import "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + var ( IntrinsicFields = []string{ "trace_id", @@ -53,4 +55,37 @@ var ( } SpanSearchScopeRoot = "isroot" SpanSearchScopeEntryPoint = "isentrypoint" + + DefaultFields = []telemetrytypes.TelemetryFieldKey{ + { + Name: "timestamp", + FieldContext: telemetrytypes.FieldContextSpan, + }, + { + Name: "span_id", + FieldContext: telemetrytypes.FieldContextSpan, + }, + { + Name: "trace_id", + FieldContext: telemetrytypes.FieldContextSpan, + }, + { + Name: "name", + FieldContext: telemetrytypes.FieldContextSpan, + }, + { + Name: "service.name", + FieldContext: telemetrytypes.FieldContextResource, + FieldDataType: telemetrytypes.FieldDataTypeString, + Materialized: true, + }, + { + Name: "duration_nano", + FieldContext: telemetrytypes.FieldContextSpan, + }, + { + Name: "response_status_code", + FieldContext: telemetrytypes.FieldContextSpan, + }, + } ) diff --git a/pkg/telemetrytraces/statement_builder.go b/pkg/telemetrytraces/statement_builder.go index 02189f4cd57d..9cb15d0ed89a 100644 --- a/pkg/telemetrytraces/statement_builder.go +++ b/pkg/telemetrytraces/statement_builder.go @@ -171,19 +171,14 @@ func (b *traceQueryStatementBuilder) buildListQuery( cteArgs = append(cteArgs, args) } - // Select default columns - sb.Select( - "timestamp", - "trace_id", - "span_id", - "name", - sqlbuilder.Escape("resource_string_service$$name"), - "duration_nano", - "response_status_code", - ) + selectedFields := query.SelectFields + + if len(selectedFields) == 0 { + selectedFields = DefaultFields + } // TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs? - for _, field := range query.SelectFields { + for _, field := range selectedFields { colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys) if err != nil { return nil, err diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go index e31c987e3f57..6a21f80a4f35 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/formula.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/formula.go @@ -20,6 +20,8 @@ type QueryBuilderFormula struct { // expression to apply to the query Expression string `json:"expression"` + Disabled bool `json:"disabled,omitempty"` + // order by keys and directions Order []OrderBy `json:"order,omitempty"`