diff --git a/docs/user/02-logs.md b/docs/user/02-logs.md index e5ccffc1c..24b39caf4 100644 --- a/docs/user/02-logs.md +++ b/docs/user/02-logs.md @@ -454,7 +454,6 @@ The `unsupportedMode` attribute of a LogPipeline indicates that you are using a You cannot enable the following plugins, because they potentially harm the stability: -- Multiline Filter - Kubernetes Filter - Rewrite_Tag Filter diff --git a/internal/fluentbit/config/builder/config_builder.go b/internal/fluentbit/config/builder/config_builder.go index bcb2a75eb..5577c4d83 100644 --- a/internal/fluentbit/config/builder/config_builder.go +++ b/internal/fluentbit/config/builder/config_builder.go @@ -38,9 +38,12 @@ func BuildFluentBitConfig(pipeline *telemetryv1alpha1.LogPipeline, config Builde var sb strings.Builder sb.WriteString(createInputSection(pipeline, includePath, excludePath)) + // skip if the filter is a multiline filter, multiline filter should be first filter in the pipeline filter chain + // see for more details https://docs.fluentbit.io/manual/pipeline/filters/multiline-stacktrace + sb.WriteString(createCustomFilters(pipeline, multilineFilter)) sb.WriteString(createRecordModifierFilter(pipeline)) sb.WriteString(createKubernetesFilter(pipeline)) - sb.WriteString(createCustomFilters(pipeline)) + sb.WriteString(createCustomFilters(pipeline, nonMultilineFilter)) sb.WriteString(createLuaDedotFilter(pipeline)) sb.WriteString(createOutputSection(pipeline, config.PipelineDefaults)) diff --git a/internal/fluentbit/config/builder/config_builder_test.go b/internal/fluentbit/config/builder/config_builder_test.go index 4d2e709a4..be9e0a58b 100644 --- a/internal/fluentbit/config/builder/config_builder_test.go +++ b/internal/fluentbit/config/builder/config_builder_test.go @@ -82,13 +82,18 @@ func TestMergeSectionsConfig(t *testing.T) { db /data/flb_foo.db exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log,/var/log/containers/*_*_container1-*.log,/var/log/containers/*_*_container2-*.log mem_buf_limit 5MB - multiline.parser docker, cri, go, python, java + multiline.parser cri path /var/log/containers/*_*_*-*.log read_from_head true skip_long_lines on storage.type filesystem tag foo.* +[FILTER] + name multiline + match foo.* + multiline.parser java + [FILTER] name record_modifier match foo.* @@ -151,6 +156,12 @@ func TestMergeSectionsConfig(t *testing.T) { regex log aa `, }, + { + Custom: ` + name multiline + multiline.parser java + `, + }, }, Output: telemetryv1alpha1.Output{ HTTP: &telemetryv1alpha1.HTTPOutput{ @@ -181,7 +192,7 @@ func TestMergeSectionsConfigCustomOutput(t *testing.T) { alias foo db /data/flb_foo.db mem_buf_limit 5MB - multiline.parser docker, cri, go, python, java + multiline.parser cri path /var/log/containers/*_*_*-*.log read_from_head true skip_long_lines on diff --git a/internal/fluentbit/config/builder/custom_filter.go b/internal/fluentbit/config/builder/custom_filter.go index f286c8c2b..d0d782171 100644 --- a/internal/fluentbit/config/builder/custom_filter.go +++ b/internal/fluentbit/config/builder/custom_filter.go @@ -5,14 +5,26 @@ import ( "strings" telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" + "github.com/kyma-project/telemetry-manager/internal/fluentbit/config" ) -func createCustomFilters(pipeline *telemetryv1alpha1.LogPipeline) string { +const ( + multilineFilter = "multiline" + nonMultilineFilter = "non-multiline" +) + +func createCustomFilters(pipeline *telemetryv1alpha1.LogPipeline, filterType string) string { var filters []string for _, filter := range pipeline.Spec.Filters { - builder := NewFilterSectionBuilder() customFilterParams := parseMultiline(filter.Custom) + isMultiline := isMultilineFilter(customFilterParams) + + if (filterType == multilineFilter && !isMultiline) || (filterType == nonMultilineFilter && isMultiline) { + continue + } + + builder := NewFilterSectionBuilder() for _, p := range customFilterParams { builder.AddConfigParam(p.Key, p.Value) } @@ -20,5 +32,9 @@ func createCustomFilters(pipeline *telemetryv1alpha1.LogPipeline) string { filters = append(filters, builder.Build()) } - return strings.Join(filters, "") + return strings.Join(filters, "\n") +} + +func isMultilineFilter(filter config.ParameterList) bool { + return filter.ContainsKey("name") && strings.Compare(filter.GetByKey("name").Value, "multiline") == 0 } diff --git a/internal/fluentbit/config/builder/custom_filter_test.go b/internal/fluentbit/config/builder/custom_filter_test.go index e647fb226..87ca2253e 100644 --- a/internal/fluentbit/config/builder/custom_filter_test.go +++ b/internal/fluentbit/config/builder/custom_filter_test.go @@ -1 +1,57 @@ package builder + +import ( + "testing" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1" +) + +func TestCreateCustomFilters(t *testing.T) { + testPipeline := &telemetryv1alpha1.LogPipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: telemetryv1alpha1.LogPipelineSpec{ + Filters: []telemetryv1alpha1.Filter{ + { + Custom: ` + name multiline + `, + }, + { + Custom: ` + name grep + `, + }, + }, + }, + } + + tests := []struct { + name string + pipeline *telemetryv1alpha1.LogPipeline + filterType string + want string + }{ + { + name: "Test Multiline Filter", + pipeline: testPipeline, + filterType: multilineFilter, + want: "[FILTER]\n name multiline\n match foo.*\n\n", + }, + { + name: "Test Non-Multiline Filter", + pipeline: testPipeline, + filterType: nonMultilineFilter, + want: "[FILTER]\n name grep\n match foo.*\n\n", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filterConf := createCustomFilters(tt.pipeline, tt.filterType) + require.Equal(t, filterConf, tt.want) + }) + } +} diff --git a/internal/fluentbit/config/builder/input.go b/internal/fluentbit/config/builder/input.go index 1f90f15b1..bd737d3ae 100644 --- a/internal/fluentbit/config/builder/input.go +++ b/internal/fluentbit/config/builder/input.go @@ -14,7 +14,7 @@ func createInputSection(pipeline *telemetryv1alpha1.LogPipeline, includePath, ex inputBuilder.AddConfigParam("alias", pipeline.Name) inputBuilder.AddConfigParam("path", includePath) inputBuilder.AddIfNotEmpty("exclude_path", excludePath) - inputBuilder.AddConfigParam("multiline.parser", "docker, cri, go, python, java") + inputBuilder.AddConfigParam("multiline.parser", "cri") inputBuilder.AddConfigParam("tag", fmt.Sprintf("%s.*", pipeline.Name)) inputBuilder.AddConfigParam("skip_long_lines", "on") inputBuilder.AddConfigParam("db", fmt.Sprintf("/data/flb_%s.db", pipeline.Name)) diff --git a/internal/fluentbit/config/builder/input_test.go b/internal/fluentbit/config/builder/input_test.go index ec2759098..5858e4d63 100644 --- a/internal/fluentbit/config/builder/input_test.go +++ b/internal/fluentbit/config/builder/input_test.go @@ -19,7 +19,7 @@ func TestCreateInput(t *testing.T) { db /data/flb_test-logpipeline.db exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log mem_buf_limit 5MB - multiline.parser docker, cri, go, python, java + multiline.parser cri path /var/log/containers/*.log read_from_head true skip_long_lines on diff --git a/main.go b/main.go index 0f6c70796..f97cc75c4 100644 --- a/main.go +++ b/main.go @@ -255,7 +255,7 @@ func main() { flag.StringVar(&metricGatewayDynamicMemoryRequest, "metric-gateway-dynamic-memory-request", "0", "Additional memory request for metrics OpenTelemetry Collector per MetricPipeline") flag.IntVar(&maxMetricPipelines, "metric-gateway-pipelines", 3, "Maximum number of MetricPipelines to be created. If 0, no limit is applied.") - flag.StringVar(&fluentBitDeniedFilterPlugins, "fluent-bit-denied-filter-plugins", "kubernetes,rewrite_tag,multiline", "Comma separated list of denied filter plugins even if allowUnsupportedPlugins is enabled. If empty, all filter plugins are allowed.") + flag.StringVar(&fluentBitDeniedFilterPlugins, "fluent-bit-denied-filter-plugins", "kubernetes,rewrite_tag", "Comma separated list of denied filter plugins even if allowUnsupportedPlugins is enabled. If empty, all filter plugins are allowed.") flag.StringVar(&fluentBitDeniedOutputPlugins, "fluent-bit-denied-output-plugins", "", "Comma separated list of denied output plugins even if allowUnsupportedPlugins is enabled. If empty, all output plugins are allowed.") flag.StringVar(&fluentBitMemoryBufferLimit, "fluent-bit-memory-buffer-limit", "10M", "Fluent Bit memory buffer limit per log pipeline") flag.StringVar(&fluentBitFsBufferLimit, "fluent-bit-filesystem-buffer-limit", "1G", "Fluent Bit filesystem buffer limit per log pipeline") diff --git a/webhook/dryrun/testdata/expected/pipelines/dynamic/logpipeline-1.conf b/webhook/dryrun/testdata/expected/pipelines/dynamic/logpipeline-1.conf index 871eef7b1..bda9f94bc 100644 --- a/webhook/dryrun/testdata/expected/pipelines/dynamic/logpipeline-1.conf +++ b/webhook/dryrun/testdata/expected/pipelines/dynamic/logpipeline-1.conf @@ -4,7 +4,7 @@ db /data/flb_logpipeline-1.db exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log,/var/log/containers/*_kyma-system_*-*.log,/var/log/containers/*_kube-system_*-*.log,/var/log/containers/*_istio-system_*-*.log,/var/log/containers/*_compass-system_*-*.log mem_buf_limit 5MB - multiline.parser docker, cri, go, python, java + multiline.parser cri path /var/log/containers/*_*_*-*.log read_from_head true skip_long_lines on diff --git a/webhook/dryrun/testdata/expected/pipelines/fluent-bit.conf b/webhook/dryrun/testdata/expected/pipelines/fluent-bit.conf index 92e034ce7..2444a654d 100644 --- a/webhook/dryrun/testdata/expected/pipelines/fluent-bit.conf +++ b/webhook/dryrun/testdata/expected/pipelines/fluent-bit.conf @@ -11,7 +11,7 @@ [INPUT] Name tail Path /var/log/containers/*.log - multiline.parser docker, cri, go, python, java + multiline.parser cri Tag tele.* Mem_Buf_Limit 5MB Skip_Long_Lines On diff --git a/webhook/dryrun/testdata/given/fluent-bit-configmap.yaml b/webhook/dryrun/testdata/given/fluent-bit-configmap.yaml index 470cc7b32..298706702 100644 --- a/webhook/dryrun/testdata/given/fluent-bit-configmap.yaml +++ b/webhook/dryrun/testdata/given/fluent-bit-configmap.yaml @@ -21,7 +21,7 @@ data: [INPUT] Name tail Path /var/log/containers/*.log - multiline.parser docker, cri, go, python, java + multiline.parser cri Tag tele.* Mem_Buf_Limit 5MB Skip_Long_Lines On