diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index 56889ef827ff..29fa4cd3f891 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -2,8 +2,12 @@ package logparsingpipeline import ( "fmt" + "slices" "strings" + "github.com/antonmedv/expr" + "github.com/antonmedv/expr/ast" + "github.com/antonmedv/expr/parser" "github.com/pkg/errors" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" @@ -81,12 +85,16 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { } if operator.Type == "regex_parser" { - parseFromParts := strings.Split(operator.ParseFrom, ".") - parseFromPath := strings.Join(parseFromParts, "?.") + parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err, + ) + } operator.If = fmt.Sprintf( - `%s != nil && %s matches "%s"`, - parseFromPath, - parseFromPath, + `%s && %s matches "%s"`, + parseFromNotNilCheck, + operator.ParseFrom, strings.ReplaceAll( strings.ReplaceAll(operator.Regex, `\`, `\\`), `"`, `\"`, @@ -94,37 +102,71 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { ) } else if operator.Type == "json_parser" { - parseFromParts := strings.Split(operator.ParseFrom, ".") - parseFromPath := strings.Join(parseFromParts, "?.") - operator.If = fmt.Sprintf(`%s != nil && %s matches "^\\s*{.*}\\s*$"`, parseFromPath, parseFromPath) + parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err, + ) + } + operator.If = fmt.Sprintf( + `%s && %s matches "^\\s*{.*}\\s*$"`, parseFromNotNilCheck, operator.ParseFrom, + ) + + } else if operator.Type == "add" { + if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") { + expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")") + fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression) + if err != nil { + return nil, fmt.Errorf( + "could'nt generate nil check for fields referenced in value expr of add operator %s: %w", + operator.Name, err, + ) + } + if fieldsNotNilCheck != "" { + operator.If = fieldsNotNilCheck + } + } } else if operator.Type == "move" || operator.Type == "copy" { - fromParts := strings.Split(operator.From, ".") - fromPath := strings.Join(fromParts, "?.") - operator.If = fmt.Sprintf(`%s != nil`, fromPath) + fromNotNilCheck, err := fieldNotNilCheck(operator.From) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for From field of %s op %s: %w", operator.Type, operator.Name, err, + ) + } + operator.If = fromNotNilCheck } else if operator.Type == "remove" { - fieldParts := strings.Split(operator.Field, ".") - fieldPath := strings.Join(fieldParts, "?.") - operator.If = fmt.Sprintf(`%s != nil`, fieldPath) + fieldNotNilCheck, err := fieldNotNilCheck(operator.Field) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for field to be removed by op %s: %w", operator.Name, err, + ) + } + operator.If = fieldNotNilCheck } else if operator.Type == "trace_parser" { cleanTraceParser(&operator) } else if operator.Type == "time_parser" { - parseFromParts := strings.Split(operator.ParseFrom, ".") - parseFromPath := strings.Join(parseFromParts, "?.") - - operator.If = fmt.Sprintf(`%s != nil`, parseFromPath) + parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for parseFrom of time parser op %s: %w", operator.Name, err, + ) + } + operator.If = parseFromNotNilCheck if operator.LayoutType == "strptime" { regex, err := RegexForStrptimeLayout(operator.Layout) if err != nil { - return nil, fmt.Errorf("could not generate time_parser processor: %w", err) + return nil, fmt.Errorf( + "couldn't generate layout regex for time_parser %s: %w", operator.Name, err, + ) } operator.If = fmt.Sprintf( - `%s && %s matches "%s"`, operator.If, parseFromPath, regex, + `%s && %s matches "%s"`, operator.If, operator.ParseFrom, regex, ) } else if operator.LayoutType == "epoch" { valueRegex := `^\\s*[0-9]+\\s*$` @@ -133,19 +175,22 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { } operator.If = fmt.Sprintf( - `%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex, + `%s && string(%s) matches "%s"`, operator.If, operator.ParseFrom, valueRegex, ) } // TODO(Raj): Maybe add support for gotime too eventually } else if operator.Type == "severity_parser" { - parseFromParts := strings.Split(operator.ParseFrom, ".") - parseFromPath := strings.Join(parseFromParts, "?.") - + parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom) + if err != nil { + return nil, fmt.Errorf( + "couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err, + ) + } operator.If = fmt.Sprintf( - `%s != nil && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`, - parseFromPath, parseFromPath, parseFromPath, parseFromPath, parseFromPath, + `%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`, + parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, ) } @@ -169,3 +214,151 @@ func cleanTraceParser(operator *PipelineOperator) { operator.TraceFlags = nil } } + +// Generates an expression checking that `fieldPath` has a non-nil value in a log record. +func fieldNotNilCheck(fieldPath string) (string, error) { + _, err := expr.Compile(fieldPath) + if err != nil { + return "", fmt.Errorf("invalid fieldPath %s: %w", fieldPath, err) + } + + // helper for turning `.` into `?.` in field paths. + // Eg: a.b?.c.d -> a?.b?.c?.d + optionalChainedPath := func(path string) string { + return strings.ReplaceAll( + strings.ReplaceAll(path, "?.", "."), ".", "?.", + ) + } + + // Optional chaining before membership ops is not supported by expr. + // Eg: The field `attributes.test["a.b"].value["c.d"].e` can't be checked using + // the nil check `attributes.test?.["a.b"]?.value?.["c.d"]?.e != nil` + // This needs to be worked around by checking that the target of membership op is not nil first. + // Eg: attributes.test != nil && attributes.test["a.b"]?.value != nil && attributes.test["a.b"].value["c.d"]?.e != nil + + // Split once from the right to include the rightmost membership op and everything after it. + // Eg: `attributes.test["a.b"].value["c.d"].e` would result in `attributes.test["a.b"].value` and `["c.d"].e` + parts := rSplitAfterN(fieldPath, "[", 2) + if len(parts) < 2 { + // there is no [] access in fieldPath + return fmt.Sprintf("%s != nil", optionalChainedPath(fieldPath)), nil + } + + // recursively generate nil check for target of the rightmost membership op (attributes.test["a.b"].value) + // should come out to be (attributes.test != nil && attributes.test["a.b"]?.value != nil) + collectionNotNilCheck, err := fieldNotNilCheck(parts[0]) + if err != nil { + return "", fmt.Errorf("couldn't generate nil check for %s: %w", parts[0], err) + } + + // generate nil check for entire path. + suffixParts := strings.SplitAfter(parts[1], "]") // ["c.d"], ".e" + fullPath := parts[0] + suffixParts[0] + if len(suffixParts) > 1 { + // attributes.test["a.b"].value["c.d"]?.e + fullPath += optionalChainedPath(suffixParts[1]) + } + fullPathCheck := fmt.Sprintf("%s != nil", fullPath) + + // If the membership op is for array/slice indexing, add check ensuring array is long enough + // attributes.test[3] -> len(attributes.test) > 3 && attributes.test[3] != nil + if !(strings.Contains(suffixParts[0], "'") || strings.Contains(suffixParts[0], `"`)) { + fullPathCheck = fmt.Sprintf( + "len(%s) > %s && %s", + parts[0], suffixParts[0][1:len(suffixParts[0])-1], fullPathCheck, + ) + } + + // If prefix is `attributes` or `resource` there is no need to add a nil check for + // the prefix since all log records have non nil `attributes` and `resource` fields. + if slices.Contains([]string{"attributes", "resource"}, parts[0]) { + return fullPathCheck, nil + } + + return fmt.Sprintf("%s && %s", collectionNotNilCheck, fullPathCheck), nil +} + +// Split `str` after `sep` from the right to create up to `n` parts. +// rSplitAfterN("a.b.c.d", ".", 3) -> ["a.b", ".c", ".d"] +func rSplitAfterN(str string, sep string, n int) []string { + reversedStr := reverseString(str) + parts := strings.SplitAfterN(reversedStr, sep, n) + slices.Reverse(parts) + result := []string{} + for _, p := range parts { + result = append(result, reverseString(p)) + } + return result +} + +func reverseString(s string) string { + r := []rune(s) + for i := 0; i < len(r)/2; i++ { + j := len(s) - 1 - i + r[i], r[j] = r[j], r[i] + } + return string(r) +} + +// Generate expression for checking that all fields referenced in `expr` have a non nil value in log record. +// Eg: `attributes.x + len(resource.y)` will return the expression `attributes.x != nil && resource.y != nil` +func fieldsReferencedInExprNotNilCheck(expr string) (string, error) { + referencedFields, err := logFieldsReferencedInExpr(expr) + if err != nil { + return "", fmt.Errorf("couldn't extract log fields referenced in expr %s: %w", expr, err) + } + + // Generating nil check for deepest fields takes care of their prefixes too. + // Eg: `attributes.test.value + len(attributes.test)` needs a nil check only for `attributes.test.value` + deepestFieldRefs := []string{} + for _, field := range referencedFields { + isPrefixOfAnotherReferencedField := slices.ContainsFunc( + referencedFields, func(e string) bool { + return len(e) > len(field) && strings.HasPrefix(e, field) + }, + ) + if !isPrefixOfAnotherReferencedField { + deepestFieldRefs = append(deepestFieldRefs, field) + } + } + + fieldExprChecks := []string{} + for _, field := range deepestFieldRefs { + checkExpr, err := fieldNotNilCheck(field) + if err != nil { + return "", fmt.Errorf("could not create nil check for %s: %w", field, err) + } + fieldExprChecks = append(fieldExprChecks, fmt.Sprintf("(%s)", checkExpr)) + } + + return strings.Join(fieldExprChecks, " && "), nil +} + +// Expr AST visitor for extracting referenced log fields +// See more at https://github.com/expr-lang/expr/blob/master/ast/visitor.go +type logFieldsInExprExtractor struct { + referencedFields []string +} + +func (v *logFieldsInExprExtractor) Visit(node *ast.Node) { + if n, ok := (*node).(*ast.MemberNode); ok { + memberRef := n.String() + if strings.HasPrefix(memberRef, "attributes") || strings.HasPrefix(memberRef, "resource") { + v.referencedFields = append(v.referencedFields, memberRef) + } + } +} + +func logFieldsReferencedInExpr(expr string) ([]string, error) { + // parse abstract syntax tree for expr + exprAst, err := parser.Parse(expr) + if err != nil { + return nil, fmt.Errorf("could not parse expr: %w", err) + } + + // walk ast for expr to collect all member references. + v := &logFieldsInExprExtractor{} + ast.Walk(&exprAst.Node, v) + + return v.referencedFields, nil +} diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index 2eeffaed2ff4..222eb4f9e01f 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -608,6 +608,104 @@ func TestAttributePathsContainingDollarDoNotBreakCollector(t *testing.T) { require.Equal("test", result[0].Attributes_string["$test1"]) } +func TestMembershipOpInProcessorFieldExpressions(t *testing.T) { + require := require.New(t) + + testLogs := []model.SignozLog{ + makeTestSignozLog("test log", map[string]interface{}{ + "http.method": "GET", + "order.products": `{"ids": ["pid0", "pid1"]}`, + }), + } + + testPipeline := Pipeline{ + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "http.method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + ID: "move", + Type: "move", + Enabled: true, + Name: "move", + From: `attributes["http.method"]`, + To: `attributes["test.http.method"]`, + }, { + ID: "json", + Type: "json_parser", + Enabled: true, + Name: "json", + ParseFrom: `attributes["order.products"]`, + ParseTo: `attributes["order.products"]`, + }, { + ID: "move1", + Type: "move", + Enabled: true, + Name: "move1", + From: `attributes["order.products"].ids`, + To: `attributes["order.product_ids"]`, + }, { + ID: "move2", + Type: "move", + Enabled: true, + Name: "move2", + From: `attributes.test?.doesnt_exist`, + To: `attributes["test.doesnt_exist"]`, + }, { + ID: "add", + Type: "add", + Enabled: true, + Name: "add", + Field: `attributes["order.pids"].missing_field`, + Value: `EXPR(attributes.a["b.c"].d[4].e + resource.f)`, + }, { + ID: "add2", + Type: "add", + Enabled: true, + Name: "add2", + Field: `attributes["order.pids.pid0"]`, + Value: `EXPR(attributes["order.product_ids"][0])`, + }, { + ID: "add3", + Type: "add", + Enabled: true, + Name: "add3", + Field: `attributes["attrs.test.value"]`, + Value: `EXPR(attributes.test?.value)`, + }, + }, + } + + result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( + context.Background(), + []Pipeline{testPipeline}, + testLogs, + ) + require.Nil(err) + require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n")) + require.Equal(1, len(result)) + + _, methodAttrExists := result[0].Attributes_string["http.method"] + require.False(methodAttrExists) + require.Equal("GET", result[0].Attributes_string["test.http.method"]) + require.Equal("pid0", result[0].Attributes_string["order.pids.pid0"]) +} + func TestTemporaryWorkaroundForSupportingAttribsContainingDots(t *testing.T) { // TODO(Raj): Remove this after dots are supported