From d6eed8e79dae5281839b461f46c1fbffe44b8bca Mon Sep 17 00:00:00 2001 From: Piyush Singariya Date: Mon, 14 Jul 2025 18:48:01 +0530 Subject: [PATCH 1/2] feat: JSON Flattening in logs pipelines (#8227) * feat: introducing JSON Flattening * fix: removed bug and tested * test: removed testing test * feat: additional severity levels, and some clearing * chore: minor changes * test: added tests for processJSONParser * test: added check for OnError * fix: review from ellipsis * fix: variablise max flattening depth * Update pkg/query-service/app/logparsingpipeline/pipelineBuilder.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * Update pkg/errors/errors.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: quoted JSON strings fix * test: updating otel collector for testing * test: update collector's reference * chore: change with new error package * chore: set flattening depth equal to 1 * fix: fallback for depth * fix: change in errors package * fix: tests * fix: test * chore: update collector version * fix: go.sum --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> Co-authored-by: Nityananda Gohain --- go.mod | 2 +- go.sum | 4 +- pkg/errors/errors.go | 24 ++ pkg/errors/type.go | 1 + .../app/logparsingpipeline/error.go | 8 + .../app/logparsingpipeline/pipelineBuilder.go | 229 +++++++++++++++--- .../pipelineBuilder_test.go | 146 +++++++++++ .../severity_parser_test.go | 4 +- pkg/query-service/constants/constants.go | 8 + pkg/types/pipelinetypes/pipeline.go | 84 ++++++- .../pipelinetypes/postable_pipeline_test.go | 6 +- 11 files changed, 473 insertions(+), 43 deletions(-) create mode 100644 pkg/query-service/app/logparsingpipeline/error.go diff --git a/go.mod b/go.mod index 91a48e27154b..d3a75ad6152b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.36.0 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd - github.com/SigNoz/signoz-otel-collector v0.111.43-aded056 + github.com/SigNoz/signoz-otel-collector v0.128.1 github.com/antlr4-go/antlr/v4 v4.13.1 github.com/antonmedv/expr v1.15.3 github.com/cespare/xxhash/v2 v2.3.0 diff --git a/go.sum b/go.sum index e48fcf1320c0..e48341f9919e 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8= github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc= -github.com/SigNoz/signoz-otel-collector v0.111.43-aded056 h1:lJ7262JHZlHX7KuUlQa8vpWCdgZKwlZ2P6sUmZEqNLE= -github.com/SigNoz/signoz-otel-collector v0.111.43-aded056/go.mod h1:AHfJ2N/74IXsrbYEPAlqfJeKg006VTt63vBZglUK3jY= +github.com/SigNoz/signoz-otel-collector v0.128.1 h1:D0bKMrRNgcKreKKYoakCr5jTWj1srupbNwGIvpHMihw= +github.com/SigNoz/signoz-otel-collector v0.128.1/go.mod h1:vFQLsJFzQwVkO1ltIMH+z9KKuTZTn/P0lKu2mNYDBpE= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 764b1632c39c..8d8cd867d7d0 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -79,6 +79,22 @@ func Wrapf(cause error, t typ, code Code, format string, args ...interface{}) *b } } +// WithAdditional wraps an existing base error with a new formatted message. +// It is used when the original error already contains type and code. +func WithAdditional(cause error, format string, args ...interface{}) *base { + t, c, m, e, u, a := Unwrapb(cause) + b := &base{ + t: t, + c: c, + m: m, + e: e, + u: u, + a: a, + } + + return b.WithAdditional(append(a, fmt.Sprintf(format, args...))...) +} + // WithUrl adds a url to the base error and returns a new base error. func (b *base) WithUrl(u string) *base { return &base{ @@ -169,3 +185,11 @@ func WrapInvalidInputf(cause error, code Code, format string, args ...interface{ func NewInvalidInputf(code Code, format string, args ...interface{}) *base { return Newf(TypeInvalidInput, code, format, args...) } + +func WrapUnexpectedf(cause error, code Code, format string, args ...interface{}) *base { + return Wrapf(cause, TypeInvalidInput, code, format, args...) +} + +func NewUnexpectedf(code Code, format string, args ...interface{}) *base { + return Newf(TypeInvalidInput, code, format, args...) +} diff --git a/pkg/errors/type.go b/pkg/errors/type.go index 3663f9df667c..80d0dbbefa01 100644 --- a/pkg/errors/type.go +++ b/pkg/errors/type.go @@ -11,6 +11,7 @@ var ( TypeForbidden = typ{"forbidden"} TypeCanceled = typ{"canceled"} TypeTimeout = typ{"timeout"} + TypeUnexpected = typ{"unexpected"} // Generic mismatch of expectations ) // Defines custom error types diff --git a/pkg/query-service/app/logparsingpipeline/error.go b/pkg/query-service/app/logparsingpipeline/error.go new file mode 100644 index 000000000000..bd127a69cc3e --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/error.go @@ -0,0 +1,8 @@ +package logparsingpipeline + +import "github.com/SigNoz/signoz/pkg/errors" + +var ( + CodeInvalidOperatorType = errors.MustNewCode("operator_type_mismatch") + CodeFieldNilCheckType = errors.MustNewCode("operator_field_nil_check") +) diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index e8b4bf1fb980..913678d0cb9d 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -6,13 +6,15 @@ import ( "slices" "strings" + signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper" + "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/query-service/constants" "github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr" "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/antonmedv/expr" "github.com/antonmedv/expr/ast" "github.com/antonmedv/expr/parser" - "github.com/pkg/errors" + "github.com/google/uuid" ) const ( @@ -38,7 +40,7 @@ func PreparePipelineProcessor(gettablePipelines []pipelinetypes.GettablePipeline operators, err := getOperators(v.Config) if err != nil { - return nil, nil, errors.Wrap(err, "failed to prepare operators") + return nil, nil, err } if len(operators) == 0 { @@ -47,7 +49,7 @@ func PreparePipelineProcessor(gettablePipelines []pipelinetypes.GettablePipeline filterExpr, err := queryBuilderToExpr.Parse(v.Filter) if err != nil { - return nil, nil, errors.Wrap(err, "failed to parse pipeline filter") + return nil, nil, err } router := []pipelinetypes.PipelineOperator{ @@ -93,10 +95,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin filteredOp := []pipelinetypes.PipelineOperator{} for i, operator := range ops { if operator.Enabled { - if len(filteredOp) > 0 { - filteredOp[len(filteredOp)-1].Output = operator.ID - } - if operator.Type == "regex_parser" { parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) if err != nil { @@ -124,16 +122,13 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin operator.If = parseFromNotNilCheck } else if operator.Type == "json_parser" { - parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + operators, err := processJSONParser(&operator) if err != nil { - return nil, fmt.Errorf( - "couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err, - ) + return nil, fmt.Errorf("couldn't process json_parser op %s: %s", operator.Name, err) } - operator.If = fmt.Sprintf( - `%s && ((type(%s) == "string" && %s matches "^\\s*{.*}\\s*$" ) || type(%s) == "map")`, - parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, - ) + + filteredOp = append(filteredOp, operators...) + continue // Continue here to skip deduplication of json_parser operator } else if operator.Type == "add" { if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") { expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")") @@ -148,7 +143,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin operator.If = fieldsNotNilCheck } } - } else if operator.Type == "move" || operator.Type == "copy" { fromNotNilCheck, err := fieldNotNilCheck(operator.From) if err != nil { @@ -157,7 +151,6 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin ) } operator.If = fromNotNilCheck - } else if operator.Type == "remove" { fieldNotNilCheck, err := fieldNotNilCheck(operator.Field) if err != nil { @@ -166,10 +159,8 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin ) } operator.If = fieldNotNilCheck - } else if operator.Type == "trace_parser" { cleanTraceParser(&operator) - } else if operator.Type == "time_parser" { parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) if err != nil { @@ -202,19 +193,11 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin } // TODO(Raj): Maybe add support for gotime too eventually - } else if operator.Type == "severity_parser" { - parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + err := processSeverityParser(&operator) if err != nil { - return nil, fmt.Errorf( - "couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err, - ) + return nil, err } - operator.If = fmt.Sprintf( - `%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`, - parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, - ) - } filteredOp = append(filteredOp, operator) @@ -222,9 +205,193 @@ func getOperators(ops []pipelinetypes.PipelineOperator) ([]pipelinetypes.Pipelin filteredOp[len(filteredOp)-1].Output = "" } } + + for idx := range filteredOp { + if idx > 0 { + filteredOp[idx-1].Output = filteredOp[idx].ID + } + } return filteredOp, nil } +func processSeverityParser(operator *pipelinetypes.PipelineOperator) error { + if operator.Type != "severity_parser" { + return errors.NewUnexpectedf(CodeInvalidOperatorType, "operator type received %s", operator.Type) + } + + parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + if err != nil { + return errors.WrapInvalidInputf(err, CodeFieldNilCheckType, + "couldn't generate nil check for parseFrom of severity parser %s", operator.Name, + ) + } + operator.If = fmt.Sprintf( + `%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`, + parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, + ) + + return nil +} + +// processJSONParser converts simple JSON parser operator into multiple operators for JSONMapping of default variables +func processJSONParser(parent *pipelinetypes.PipelineOperator) ([]pipelinetypes.PipelineOperator, error) { + if parent.Type != "json_parser" { + return nil, errors.NewUnexpectedf(CodeInvalidOperatorType, "operator type received %s", parent.Type) + } + + parseFromNotNilCheck, err := fieldNotNilCheck(parent.ParseFrom) + if err != nil { + return nil, errors.WrapInvalidInputf(err, CodeFieldNilCheckType, + "couldn't generate nil check for parseFrom of json parser op %s: %s", parent.Name, err, + ) + } + parent.If = fmt.Sprintf( + `%s && ((type(%s) == "string" && isJSON(%s) && type(fromJSON(unquote(%s))) == "map" ) || type(%s) == "map")`, + parseFromNotNilCheck, parent.ParseFrom, parent.ParseFrom, parent.ParseFrom, parent.ParseFrom, + ) + if parent.EnableFlattening { + parent.MaxFlatteningDepth = constants.MaxJSONFlatteningDepth + } + + // return if no mapping available + if parent.Mapping == nil { + return []pipelinetypes.PipelineOperator{*parent}, nil + } + + mapping := parent.Mapping + children := []pipelinetypes.PipelineOperator{} + + // cloning since the same function is used when saving pipelines (POST request) hence reversing + // the same array inplace ends up with saving mapping in a reversed order in database + cloneAndReverse := func(input []string) []string { + cloned := slices.Clone(input) + slices.Reverse(cloned) + + return cloned + } + + generateCustomID := func() string { + return fmt.Sprintf("%s-json-parser", uuid.NewString()) // json-parser helps in identifying processors part of JSON Parser + } + + // reusable move operator function + generateMoveOperators := func(keywords []string, to string) error { + for _, keyword := range cloneAndReverse(keywords) { + operator := pipelinetypes.PipelineOperator{ + Type: "move", + ID: generateCustomID(), + OnError: signozstanzahelper.SendOnErrorQuiet, + From: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword), + To: to, + } + + fromNotNilCheck, err := fieldNotNilCheck(operator.From) + if err != nil { + return err + } + + operator.If = fromNotNilCheck + children = append(children, operator) + } + + return nil + } + + // JSONMapping: host + err = generateMoveOperators(mapping[pipelinetypes.Host], `resource["host.name"]`) + if err != nil { + return nil, err + } + + // JSONMapping: service + err = generateMoveOperators(mapping[pipelinetypes.Service], `resource["service.name"]`) + if err != nil { + return nil, err + } + + // JSONMapping: trace_id + for _, keyword := range cloneAndReverse(mapping[pipelinetypes.TraceID]) { + operator := pipelinetypes.PipelineOperator{ + Type: "trace_parser", + ID: generateCustomID(), + OnError: signozstanzahelper.SendOnErrorQuiet, + TraceParser: &pipelinetypes.TraceParser{ + TraceId: &pipelinetypes.ParseFrom{ + ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword), + }, + }, + } + + children = append(children, operator) + } + + // JSONMapping: span_id + for _, keyword := range cloneAndReverse(mapping[pipelinetypes.SpanID]) { + operator := pipelinetypes.PipelineOperator{ + Type: "trace_parser", + ID: generateCustomID(), + OnError: signozstanzahelper.SendOnErrorQuiet, + TraceParser: &pipelinetypes.TraceParser{ + SpanId: &pipelinetypes.ParseFrom{ + ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword), + }, + }, + } + + children = append(children, operator) + } + + // JSONMapping: trace_flags + for _, keyword := range cloneAndReverse(mapping[pipelinetypes.TraceFlags]) { + operator := pipelinetypes.PipelineOperator{ + Type: "trace_parser", + ID: generateCustomID(), + OnError: signozstanzahelper.SendOnErrorQuiet, + TraceParser: &pipelinetypes.TraceParser{ + TraceFlags: &pipelinetypes.ParseFrom{ + ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword), + }, + }, + } + + children = append(children, operator) + } + + // JSONMapping: severity + for _, keyword := range cloneAndReverse(mapping[pipelinetypes.Severity]) { + operator := pipelinetypes.PipelineOperator{ + Type: "severity_parser", + ID: generateCustomID(), + OnError: signozstanzahelper.SendOnErrorQuiet, + ParseFrom: fmt.Sprintf(`%s["%s"]`, parent.ParseTo, keyword), + } + err := processSeverityParser(&operator) + if err != nil { + return nil, err + } + + operator.Mapping = pipelinetypes.DefaultSeverityMapping + children = append(children, operator) + } + + // JSONMapping: environment + err = generateMoveOperators(mapping[pipelinetypes.Environment], `resource["deployment.environment.name"]`) + if err != nil { + return nil, err + } + + // JSONMapping: body + err = generateMoveOperators(mapping[pipelinetypes.Message], `body`) + if err != nil { + return nil, err + } + + // removed mapping reference so it doesn't appear in Collector's config + parent.Mapping = nil + return append(append([]pipelinetypes.PipelineOperator{}, *parent), children...), nil +} + +// TODO: (Piyush) remove this in future func cleanTraceParser(operator *pipelinetypes.PipelineOperator) { if operator.TraceId != nil && len(operator.TraceId.ParseFrom) < 1 { operator.TraceId = nil @@ -241,7 +408,7 @@ func cleanTraceParser(operator *pipelinetypes.PipelineOperator) { func fieldNotNilCheck(fieldPath string) (string, error) { _, err := expr.Compile(fieldPath) if err != nil { - return "", fmt.Errorf("invalid fieldPath %s: %w", fieldPath, err) + return "", errors.WrapInvalidInputf(err, CodeFieldNilCheckType, "invalid fieldPath %s", fieldPath) } // helper for turning `.` into `?.` in field paths. @@ -270,7 +437,7 @@ func fieldNotNilCheck(fieldPath string) (string, error) { // should come out to be (attributes.test != nil && attributes.test["a.b"]?.value != nil) collectionNotNilCheck, err := fieldNotNilCheck(parts[0]) if err != nil { - return "", fmt.Errorf("couldn't generate nil check for %s: %w", parts[0], err) + return "", errors.WithAdditional(err, "couldn't generate nil check for %s", parts[0]) } // generate nil check for entire path. diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index 5791cf98d79e..0e60dd9e1b6a 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -7,12 +7,14 @@ import ( "testing" "time" + signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper" "github.com/SigNoz/signoz/pkg/query-service/model" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" "github.com/SigNoz/signoz/pkg/query-service/utils" "github.com/SigNoz/signoz/pkg/types" "github.com/SigNoz/signoz/pkg/types/pipelinetypes" "github.com/SigNoz/signoz/pkg/valuer" + "github.com/google/uuid" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/require" @@ -841,3 +843,147 @@ func TestContainsFilterIsCaseInsensitive(t *testing.T) { _, test2Exists := result[0].Attributes_string["test2"] require.False(test2Exists) } + +func TestProcessJSONParser_WithFlatteningAndMapping(t *testing.T) { + parserID := uuid.NewString() + outputID := uuid.NewString() + + parent := &pipelinetypes.PipelineOperator{ + Type: "json_parser", + ID: parserID, + Name: "Parse JSON", + OrderId: 1, + Enabled: true, + ParseFrom: "body", + ParseTo: "attributes", + Output: outputID, + EnableFlattening: true, + EnablePaths: false, + PathPrefix: "", + Mapping: map[string][]string{ + pipelinetypes.Host: {"host", "hostname"}, + pipelinetypes.Service: {"service", "syslog.appname"}, + pipelinetypes.Severity: {"status", "severity", "level", "syslog.severity"}, + pipelinetypes.TraceID: {"trace_id"}, + pipelinetypes.SpanID: {"span_id"}, + pipelinetypes.Message: {"message", "msg", "log"}, + pipelinetypes.TraceFlags: {"flags"}, + pipelinetypes.Environment: {"service.env"}, + }, + } + + // Total children generated = sum(len(mapping values)) + severity_parser + trace_parser ops + expectedMoveOps := len(parent.Mapping[pipelinetypes.Host]) + + len(parent.Mapping[pipelinetypes.Service]) + + len(parent.Mapping[pipelinetypes.Message]) + + len(parent.Mapping[pipelinetypes.Environment]) + expectedTraceOps := len(parent.Mapping[pipelinetypes.TraceID]) + + len(parent.Mapping[pipelinetypes.SpanID]) + + len(parent.Mapping[pipelinetypes.TraceFlags]) + expectedSeverityOps := len(parent.Mapping[pipelinetypes.Severity]) // severity_parser + + totalOps := expectedMoveOps + expectedTraceOps + expectedSeverityOps + + ops, err := processJSONParser(parent) + require.NoError(t, err) + require.NotEmpty(t, ops) + + // Parent is always first + parentOp := ops[0] + require.Equal(t, "json_parser", parentOp.Type) + require.Equal(t, 1, parentOp.MaxFlatteningDepth) + require.Nil(t, parentOp.Mapping) // Mapping should be removed + require.Nil(t, parent.Mapping) // Mapping should be removed + require.Contains(t, parentOp.If, `isJSON(body)`) + require.Contains(t, parentOp.If, `type(body)`) + + require.Equal(t, 1+totalOps, len(ops)) + + var traceParserCount, moveCount, severityParserCount int + for _, op := range ops[1:] { + require.NotEmpty(t, op.ID) + require.Equal(t, op.OnError, signozstanzahelper.SendOnErrorQuiet) + + switch op.Type { + case "move": + require.NotEmpty(t, op.From) + require.NotEmpty(t, op.To) + moveCount++ + case "trace_parser": + require.NotNil(t, op.TraceParser) + traceParserCount++ + case "severity_parser": + require.NotEmpty(t, op.ParseFrom) + require.NotEmpty(t, op.If) + severityParserCount++ + default: + t.Errorf("unexpected operator type: %s", op.Type) + } + } + + require.Equal(t, expectedMoveOps, moveCount) + require.Equal(t, expectedTraceOps, traceParserCount) + require.Equal(t, expectedSeverityOps, severityParserCount) +} + +func TestProcessJSONParser_WithoutMapping(t *testing.T) { + parent := &pipelinetypes.PipelineOperator{ + Type: "json_parser", + ID: uuid.NewString(), + Name: "Parse JSON", + OrderId: 1, + Enabled: true, + ParseFrom: "body", + ParseTo: "attributes", + EnableFlattening: true, + EnablePaths: true, + PathPrefix: "parsed", + Mapping: nil, // No mapping + } + + ops, err := processJSONParser(parent) + require.NoError(t, err) + require.Len(t, ops, 1) // Only the parent operator should exist + + op := ops[0] + require.Equal(t, "json_parser", op.Type) + require.Equal(t, 1, op.MaxFlatteningDepth) + require.True(t, op.EnableFlattening) + require.True(t, op.EnablePaths) + require.Equal(t, "parsed", op.PathPrefix) + require.Contains(t, op.If, `isJSON(body)`) +} + +func TestProcessJSONParser_Simple(t *testing.T) { + parent := &pipelinetypes.PipelineOperator{ + Type: "json_parser", + ID: uuid.NewString(), + Name: "Parse JSON", + OrderId: 1, + Enabled: true, + ParseFrom: "body", + ParseTo: "attributes", + } + + ops, err := processJSONParser(parent) + require.NoError(t, err) + require.Len(t, ops, 1) // Only the parent operator should exist + + op := ops[0] + require.Equal(t, "json_parser", op.Type) + require.Equal(t, 0, op.MaxFlatteningDepth) + require.False(t, op.EnableFlattening) + require.False(t, op.EnablePaths) + require.Equal(t, "", op.PathPrefix) + require.Contains(t, op.If, `isJSON(body)`) +} + +func TestProcessJSONParser_InvalidType(t *testing.T) { + parent := &pipelinetypes.PipelineOperator{ + Type: "copy", // Invalid type + } + + _, err := processJSONParser(parent) + require.Error(t, err) + require.Contains(t, err.Error(), "operator type received copy") +} diff --git a/pkg/query-service/app/logparsingpipeline/severity_parser_test.go b/pkg/query-service/app/logparsingpipeline/severity_parser_test.go index ec707dfb95f9..4ff9b53de4c1 100644 --- a/pkg/query-service/app/logparsingpipeline/severity_parser_test.go +++ b/pkg/query-service/app/logparsingpipeline/severity_parser_test.go @@ -183,7 +183,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) { Enabled: true, Name: "severity parser", ParseFrom: "attributes.test_severity", - SeverityMapping: map[string][]string{ + Mapping: map[string][]string{ "debug": {"debug"}, }, OverwriteSeverityText: true, @@ -199,7 +199,7 @@ func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) { Enabled: true, Name: "severity parser", ParseFrom: "attributes.test_severity", - SeverityMapping: map[string][]string{ + Mapping: map[string][]string{ "debug": {"debug"}, }, OverwriteSeverityText: true, diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 3be0bef5b90e..0b784192672a 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -604,6 +604,7 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{} var IsDotMetricsEnabled = false var PreferSpanMetrics = false +var MaxJSONFlatteningDepth = 1 func init() { StaticFieldsTraces = maps.Clone(NewStaticFieldsTraces) @@ -614,6 +615,12 @@ func init() { if GetOrDefaultEnv("USE_SPAN_METRICS", "false") == "true" { PreferSpanMetrics = true } + + // set max flattening depth + depth, err := strconv.Atoi(GetOrDefaultEnv(maxJSONFlatteningDepth, "1")) + if err == nil { + MaxJSONFlatteningDepth = depth + } } const TRACE_V4_MAX_PAGINATION_LIMIT = 10000 @@ -641,3 +648,4 @@ func GetDefaultSiteURL() string { } const DotMetricsEnabled = "DOT_METRICS_ENABLED" +const maxJSONFlatteningDepth = "MAX_JSON_FLATTENING_DEPTH" diff --git a/pkg/types/pipelinetypes/pipeline.go b/pkg/types/pipelinetypes/pipeline.go index f441d9574feb..31fff6e7f4be 100644 --- a/pkg/types/pipelinetypes/pipeline.go +++ b/pkg/types/pipelinetypes/pipeline.go @@ -14,6 +14,32 @@ import ( "github.com/uptrace/bun" ) +type JSONMappingType = string + +const ( + Host JSONMappingType = "host" + Service JSONMappingType = "service" + Environment JSONMappingType = "environment" + Severity JSONMappingType = "severity" + TraceID JSONMappingType = "trace_id" + SpanID JSONMappingType = "span_id" + TraceFlags JSONMappingType = "trace_flags" + Message JSONMappingType = "message" +) + +var DefaultSeverityMapping = map[string][]string{ + "trace": {"TRACE", "Trace", "trace", "trc", "Trc"}, + "debug": {"DEBUG", "Debug", "debug", "dbg", "Dbg"}, + "info": {"INFO", "Info", "info"}, + "warn": {"WARN", "Warn", "warn", "warning", "Warning", "wrn", "Wrn"}, + "error": {"ERROR", "Error", "error", "err", "Err", "ERR", "fail", "Fail", "FAIL"}, + "fatal": {"FATAL", "Fatal", "fatal", "critical", "Critical", "CRITICAL", "crit", "Crit", "CRIT", + "panic", "Panic", "PANIC"}, +} + +var validMappingLevels = []string{"trace", "debug", "info", "warn", "error", "fatal"} +var validMappingVariableTypes = []string{Host, Service, Environment, Severity, TraceID, SpanID, TraceFlags, Message} + type StoreablePipeline struct { bun.BaseModel `bun:"table:pipelines,alias:p"` @@ -91,9 +117,54 @@ type PipelineOperator struct { Layout string `json:"layout,omitempty" yaml:"layout,omitempty"` LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"` + // json_parser fields + EnableFlattening bool `json:"enable_flattening,omitempty" yaml:"enable_flattening,omitempty"` + MaxFlatteningDepth int `json:"-" yaml:"max_flattening_depth,omitempty"` // MaxFlatteningDepth is not configurable from User's side + EnablePaths bool `json:"enable_paths,omitempty" yaml:"enable_paths,omitempty"` + PathPrefix string `json:"path_prefix,omitempty" yaml:"path_prefix,omitempty"` + + // Used in Severity Parsing and JSON Flattening mapping + Mapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"` // severity parser fields - SeverityMapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"` - OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"` + OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"` +} + +func (op PipelineOperator) MarshalJSON() ([]byte, error) { + type Alias PipelineOperator + + p := Alias(op) + if p.TraceParser != nil { + if p.TraceId != nil && len(p.TraceId.ParseFrom) < 1 { + p.TraceId = nil + } + if p.SpanId != nil && len(p.SpanId.ParseFrom) < 1 { + p.SpanId = nil + } + if p.TraceFlags != nil && len(p.TraceFlags.ParseFrom) < 1 { + p.TraceFlags = nil + } + } + + return json.Marshal(p) +} + +func (op PipelineOperator) MarshalYAML() (interface{}, error) { + type Alias PipelineOperator + alias := Alias(op) + + if alias.TraceParser != nil { + if alias.TraceParser.TraceId != nil && len(alias.TraceParser.TraceId.ParseFrom) < 1 { + alias.TraceParser.TraceId = nil + } + if alias.TraceParser.SpanId != nil && len(alias.TraceParser.SpanId.ParseFrom) < 1 { + alias.TraceParser.SpanId = nil + } + if alias.TraceParser.TraceFlags != nil && len(alias.TraceParser.TraceFlags.ParseFrom) < 1 { + alias.TraceParser.TraceFlags = nil + } + } + + return alias, nil } type TimestampParser struct { @@ -206,6 +277,12 @@ func isValidOperator(op PipelineOperator) error { if op.ParseFrom == "" && op.ParseTo == "" { return fmt.Errorf("parse from and parse to of %s json operator cannot be empty", op.ID) } + + for k := range op.Mapping { + if !slices.Contains(validMappingVariableTypes, strings.ToLower(k)) { + return fmt.Errorf("%s is not a valid mapping type in processor %s", k, op.ID) + } + } case "grok_parser": if op.Pattern == "" { return fmt.Errorf("pattern of %s grok operator cannot be empty", op.ID) @@ -306,8 +383,7 @@ func isValidOperator(op PipelineOperator) error { return fmt.Errorf("parse from of severity parsing processor %s cannot be empty", op.ID) } - validMappingLevels := []string{"trace", "debug", "info", "warn", "error", "fatal"} - for k := range op.SeverityMapping { + for k := range op.Mapping { if !slices.Contains(validMappingLevels, strings.ToLower(k)) { return fmt.Errorf("%s is not a valid severity in processor %s", k, op.ID) } diff --git a/pkg/types/pipelinetypes/postable_pipeline_test.go b/pkg/types/pipelinetypes/postable_pipeline_test.go index 832a7860084b..4713ecedb2a4 100644 --- a/pkg/types/pipelinetypes/postable_pipeline_test.go +++ b/pkg/types/pipelinetypes/postable_pipeline_test.go @@ -332,7 +332,7 @@ var operatorTest = []struct { ID: "severity", Type: "severity_parser", ParseFrom: "attributes.test_severity", - SeverityMapping: map[string][]string{ + Mapping: map[string][]string{ "trace": {"test_trace"}, "fatal": {"test_fatal"}, }, @@ -344,7 +344,7 @@ var operatorTest = []struct { Operator: PipelineOperator{ ID: "severity", Type: "severity_parser", - SeverityMapping: map[string][]string{}, + Mapping: map[string][]string{}, OverwriteSeverityText: true, }, IsValid: false, @@ -354,7 +354,7 @@ var operatorTest = []struct { ID: "severity", Type: "severity_parser", ParseFrom: "attributes.test", - SeverityMapping: map[string][]string{ + Mapping: map[string][]string{ "not-a-level": {"bad-level"}, }, OverwriteSeverityText: true, From 5e0bf930d69c6071fd0c18885f816a9440507c5e Mon Sep 17 00:00:00 2001 From: Abhi kumar Date: Tue, 15 Jul 2025 12:38:53 +0530 Subject: [PATCH 2/2] chore: added new frontend maintainers (#8530) --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index cc0925878a77..5dfc9d661bf8 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,8 @@ Not sure how to get started? Just ping us on `#contributing` in our [slack commu - [Shaheer Kochai](https://github.com/ahmadshaheer) - [Amlan Kumar Nandy](https://github.com/amlannandy) - [Sahil Khan](https://github.com/sawhil) +- [Aditya Singh](https://github.com/aks07) +- [Abhi Kumar](https://github.com/ahrefabhi) #### DevOps