Skip to content

Commit

Permalink
chore: some more cleanup and update pipelines e2e/integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
raj-k-singh committed Apr 14, 2024
1 parent cbef7be commit 61b97ec
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 208 deletions.
182 changes: 7 additions & 175 deletions pkg/query-service/app/logparsingpipeline/pipelineBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
// To work around this, we add statements before and after the actual pipeline
// operator statements, that add and remove a pipeline specific marker, ensuring
// all operators in a pipeline get to act on the log even if an op changes the filter referenced fields.
pipelineMarker := uuid.NewString()
pipelineMarker := fmt.Sprintf("signoz-log-pipeline-%s", pipeline.Alias)
addMarkerStatement := fmt.Sprintf(`set(attributes["__signoz_pipeline_marker__"], "%s")`, pipelineMarker)
if len(filterExpr) > 0 {
addMarkerStatement += fmt.Sprintf(" where %s", toOttlExpr(filterExpr))
Expand All @@ -135,24 +135,18 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s

appendStatement := func(statement string, additionalFilter string) {
whereConditions := []string{pipelineMarkerWhereClause}
// if len(filterExpr) > 0 {
// whereConditions = append(
// whereConditions,
// toOttlExpr(filterExpr),
// )
// }

if len(additionalFilter) > 0 {
whereConditions = append(whereConditions, additionalFilter)
}

// if len(whereConditions) > 0 {
statement = fmt.Sprintf(
`%s where %s`, statement, strings.Join(whereConditions, " and "),
)
// }

ottlStatements = append(ottlStatements, statement)
}

appendDeleteStatement := func(path string) {
fieldPath := logTransformPathToOttlPath(path)
fieldPathParts := rSplitAfterN(fieldPath, "[", 2)
Expand All @@ -170,11 +164,6 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
)
}

// ensureIntermediateMaps := func(path string, additionalFilter string) {
// parts := pathParts(path)
//
// }

appendMapExtractStatements := func(
filterClause string,
mapGenerator string,
Expand Down Expand Up @@ -214,11 +203,10 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
value = toOttlExpr(expression)
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
if err != nil {
panic(err)
// return nil, fmt.Errorf(
// "could'nt generate nil check for fields referenced in value expr of add operator %s: %w",
// operator.Name, err,
// )
return nil, nil, fmt.Errorf(
"could'nt generate nil check for fields referenced in value expr of add operator %s: %w",
operator.Name, err,
)
}
if fieldsNotNilCheck != "" {
condition = toOttlExpr(fieldsNotNilCheck)
Expand Down Expand Up @@ -285,14 +273,6 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
fmt.Sprintf(`IsMatch(%s, "^\\s*{.*}\\s*$")`, logTransformPathToOttlPath(operator.ParseFrom)),
}, " and ")

// appendStatement(
// fmt.Sprintf(
// `merge_maps(%s, ParseJSON(%s), "upsert")`,
// logTransformPathToOttlPath(operator.ParseTo),
// logTransformPathToOttlPath(operator.ParseFrom),
// ),
// whereClause,
// )
appendMapExtractStatements(
whereClause,
fmt.Sprintf("ParseJSON(%s)", logTransformPathToOttlPath(operator.ParseFrom)),
Expand Down Expand Up @@ -472,154 +452,6 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
return processors, names, nil
}

func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
filteredOp := []PipelineOperator{}
for i, operator := range ops {
if operator.Enabled {
if len(filteredOp) > 0 {
filteredOp[len(filteredOp)-1].Output = operator.ID
}

if operator.Type == "regex_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of regex op %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s && %s matches "%s"`,
parseFromNotNilCheck,
operator.ParseFrom,
strings.ReplaceAll(
strings.ReplaceAll(operator.Regex, `\`, `\\`),
`"`, `\"`,
),
)

} else if operator.Type == "grok_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of grok op %s: %w", operator.Name, err,
)
}
operator.If = parseFromNotNilCheck

} else if operator.Type == "json_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of json parser op %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s && %s matches "^\\s*{.*}\\s*$"`, parseFromNotNilCheck, operator.ParseFrom,
)

} else if operator.Type == "add" {
if strings.HasPrefix(operator.Value, "EXPR(") && strings.HasSuffix(operator.Value, ")") {
expression := strings.TrimSuffix(strings.TrimPrefix(operator.Value, "EXPR("), ")")
fieldsNotNilCheck, err := fieldsReferencedInExprNotNilCheck(expression)
if err != nil {
return nil, fmt.Errorf(
"could'nt generate nil check for fields referenced in value expr of add operator %s: %w",
operator.Name, err,
)
}
if fieldsNotNilCheck != "" {
operator.If = fieldsNotNilCheck
}
}

} else if operator.Type == "move" || operator.Type == "copy" {
fromNotNilCheck, err := fieldNotNilCheck(operator.From)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for From field of %s op %s: %w", operator.Type, operator.Name, err,
)
}
operator.If = fromNotNilCheck

} else if operator.Type == "remove" {
fieldNotNilCheck, err := fieldNotNilCheck(operator.Field)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for field to be removed by op %s: %w", operator.Name, err,
)
}
operator.If = fieldNotNilCheck

} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)

} else if operator.Type == "time_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of time parser op %s: %w", operator.Name, err,
)
}
operator.If = parseFromNotNilCheck

if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate layout regex for time_parser %s: %w", operator.Name, err,
)
}

operator.If = fmt.Sprintf(
`%s && %s matches "%s"`, operator.If, operator.ParseFrom, regex,
)
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
if strings.Contains(operator.Layout, ".") {
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
}

operator.If = fmt.Sprintf(
`%s && string(%s) matches "%s"`, operator.If, operator.ParseFrom, valueRegex,
)

}
// TODO(Raj): Maybe add support for gotime too eventually

} else if operator.Type == "severity_parser" {
parseFromNotNilCheck, err := fieldNotNilCheck(operator.ParseFrom)
if err != nil {
return nil, fmt.Errorf(
"couldn't generate nil check for parseFrom of severity parser %s: %w", operator.Name, err,
)
}
operator.If = fmt.Sprintf(
`%s && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromNotNilCheck, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom, operator.ParseFrom,
)

}

filteredOp = append(filteredOp, operator)
} else if i == len(ops)-1 && len(filteredOp) != 0 {
filteredOp[len(filteredOp)-1].Output = ""
}
}
return filteredOp, nil
}

func cleanTraceParser(operator *PipelineOperator) {
if operator.TraceId != nil && len(operator.TraceId.ParseFrom) < 1 {
operator.TraceId = nil
}
if operator.SpanId != nil && len(operator.SpanId.ParseFrom) < 1 {
operator.SpanId = nil
}
if operator.TraceFlags != nil && len(operator.TraceFlags.ParseFrom) < 1 {
operator.TraceFlags = nil
}
}

// Generates an expression checking that `fieldPath` has a non-nil value in a log record.
func fieldNotNilCheck(fieldPath string) (string, error) {
_, err := expr.Compile(fieldPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ var prepareProcessorTestData = []struct {
func TestPreparePipelineProcessor(t *testing.T) {
for _, test := range prepareProcessorTestData {
Convey(test.Name, t, func() {
res, err := getOperators(test.Operators)
So(err, ShouldBeNil)
So(res, ShouldResemble, test.Output)
require.NotNil(t, nil, "TODO(Raj): maybe use these tests in new config generation")
// res, err := getOperators(test.Operators)
// So(err, ShouldBeNil)
// So(res, ShouldResemble, test.Output)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var ReservedColumnTargetAliases = map[string]struct{}{
}

// logsPPLPfx is a short constant for logsPipelinePrefix
const LogsPPLPfx = "logstransform/pipeline_"
const LogsPPLPfx = "signoztransform/logs-pipeline"

const IntegrationPipelineIdPrefix = "integration"

Expand Down
42 changes: 13 additions & 29 deletions pkg/query-service/tests/integration/logparsingpipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"go.signoz.io/signoz/pkg/query-service/utils"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

func TestLogPipelinesLifecycle(t *testing.T) {
Expand Down Expand Up @@ -628,10 +626,14 @@ func assertPipelinesRecommendedInRemoteConfig(
t.Fatalf("could not unmarshal config file sent to opamp client: %v", err)
}

// Each pipeline is expected to become its own processor
// in the logs service in otel collector config.
// Validate expected collector config processors for log pipelines
// are present in config recommended to opamp client

expectedProcessors, expectedProcNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)

collectorConfSvcs := collectorConfSentToClient["service"].(map[string]interface{})
collectorConfLogsSvc := collectorConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})

collectorConfLogsSvcProcessorNames := collectorConfLogsSvc["processors"].([]interface{})
collectorConfLogsPipelineProcNames := []string{}
for _, procNameVal := range collectorConfLogsSvcProcessorNames {
Expand All @@ -643,43 +645,25 @@ func assertPipelinesRecommendedInRemoteConfig(
)
}
}

_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
require.Equal(
t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
t, expectedProcNames, collectorConfLogsPipelineProcNames,
"config sent to opamp client doesn't contain expected log pipelines",
)

collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{})
for _, procName := range expectedLogProcessorNames {
for _, procName := range expectedProcNames {
pipelineProcessorInConf, procExists := collectorConfProcessors[procName]
require.True(t, procExists, fmt.Sprintf(
"%s processor not found in config sent to opamp client", procName,
))

// Validate that filter expr in collector conf is as expected.

// extract expr present in collector conf processor
pipelineProcOps := pipelineProcessorInConf.(map[string]interface{})["operators"].([]interface{})
procInConfYaml, err := yaml.Parser().Marshal(pipelineProcessorInConf.(map[string]interface{}))
require.Nil(t, err)

routerOpIdx := slices.IndexFunc(
pipelineProcOps,
func(op interface{}) bool { return op.(map[string]interface{})["id"] == "router_signoz" },
)
require.GreaterOrEqual(t, routerOpIdx, 0)
routerOproutes := pipelineProcOps[routerOpIdx].(map[string]interface{})["routes"].([]interface{})
pipelineFilterExpr := routerOproutes[0].(map[string]interface{})["expr"].(string)

// find logparsingpipeline.Pipeline whose processor is being validated here
pipelineIdx := slices.IndexFunc(
pipelines, func(p logparsingpipeline.Pipeline) bool {
return logparsingpipeline.CollectorConfProcessorName(p) == procName
},
)
require.GreaterOrEqual(t, pipelineIdx, 0)
expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter)
expectedProcYaml, err := yaml.Parser().Marshal(expectedProcessors[procName].(map[string]interface{}))
require.Nil(t, err)
require.Equal(t, expectedExpr, pipelineFilterExpr)

require.Equal(t, procInConfYaml, expectedProcYaml)
}
}

Expand Down

0 comments on commit 61b97ec

Please sign in to comment.