Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for custom multiline filter #1259

Merged
merged 10 commits into from
Jul 10, 2024
1 change: 0 additions & 1 deletion docs/user/02-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ The `unsupportedMode` attribute of a LogPipeline indicates that you are using a

You cannot enable the following plugins, because they potentially harm the stability:

- Multiline Filter
- Kubernetes Filter
- Rewrite_Tag Filter

Expand Down
5 changes: 4 additions & 1 deletion internal/fluentbit/config/builder/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ func BuildFluentBitConfig(pipeline *telemetryv1alpha1.LogPipeline, config Builde

var sb strings.Builder
sb.WriteString(createInputSection(pipeline, includePath, excludePath))
// skip if the filter is a multiline filter, multiline filter should be first filter in the pipeline filter chain
// see for more details https://docs.fluentbit.io/manual/pipeline/filters/multiline-stacktrace
sb.WriteString(createCustomFilters(pipeline, multilineFilter))
sb.WriteString(createRecordModifierFilter(pipeline))
sb.WriteString(createKubernetesFilter(pipeline))
sb.WriteString(createCustomFilters(pipeline))
sb.WriteString(createCustomFilters(pipeline, nonMultilineFilter))
sb.WriteString(createLuaDedotFilter(pipeline))
sb.WriteString(createOutputSection(pipeline, config.PipelineDefaults))

Expand Down
15 changes: 13 additions & 2 deletions internal/fluentbit/config/builder/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ func TestMergeSectionsConfig(t *testing.T) {
db /data/flb_foo.db
exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log,/var/log/containers/*_*_container1-*.log,/var/log/containers/*_*_container2-*.log
mem_buf_limit 5MB
multiline.parser docker, cri, go, python, java
multiline.parser cri
path /var/log/containers/*_*_*-*.log
read_from_head true
skip_long_lines on
storage.type filesystem
tag foo.*

[FILTER]
name multiline
match foo.*
multiline.parser java

[FILTER]
name record_modifier
match foo.*
Expand Down Expand Up @@ -151,6 +156,12 @@ func TestMergeSectionsConfig(t *testing.T) {
regex log aa
`,
},
{
Custom: `
name multiline
multiline.parser java
`,
},
},
Output: telemetryv1alpha1.Output{
HTTP: &telemetryv1alpha1.HTTPOutput{
Expand Down Expand Up @@ -181,7 +192,7 @@ func TestMergeSectionsConfigCustomOutput(t *testing.T) {
alias foo
db /data/flb_foo.db
mem_buf_limit 5MB
multiline.parser docker, cri, go, python, java
multiline.parser cri
path /var/log/containers/*_*_*-*.log
read_from_head true
skip_long_lines on
Expand Down
22 changes: 19 additions & 3 deletions internal/fluentbit/config/builder/custom_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,36 @@ import (
"strings"

telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/fluentbit/config"
)

func createCustomFilters(pipeline *telemetryv1alpha1.LogPipeline) string {
const (
multilineFilter = "multiline"
nonMultilineFilter = "non-multiline"
)

func createCustomFilters(pipeline *telemetryv1alpha1.LogPipeline, filterType string) string {
var filters []string

for _, filter := range pipeline.Spec.Filters {
builder := NewFilterSectionBuilder()
customFilterParams := parseMultiline(filter.Custom)
isMultiline := isMultilineFilter(customFilterParams)

if (filterType == multilineFilter && !isMultiline) || (filterType == nonMultilineFilter && isMultiline) {
continue
}

builder := NewFilterSectionBuilder()
for _, p := range customFilterParams {
builder.AddConfigParam(p.Key, p.Value)
}
builder.AddConfigParam("match", fmt.Sprintf("%s.*", pipeline.Name))
filters = append(filters, builder.Build())
}

return strings.Join(filters, "")
return strings.Join(filters, "\n")
}

func isMultilineFilter(filter config.ParameterList) bool {
return filter.ContainsKey("name") && strings.Compare(filter.GetByKey("name").Value, "multiline") == 0
}
56 changes: 56 additions & 0 deletions internal/fluentbit/config/builder/custom_filter_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,57 @@
package builder

import (
"testing"

"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
)

func TestCreateCustomFilters(t *testing.T) {
testPipeline := &telemetryv1alpha1.LogPipeline{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: telemetryv1alpha1.LogPipelineSpec{
Filters: []telemetryv1alpha1.Filter{
{
Custom: `
name multiline
`,
},
{
Custom: `
name grep
`,
},
},
},
}

tests := []struct {
name string
pipeline *telemetryv1alpha1.LogPipeline
filterType string
want string
}{
{
name: "Test Multiline Filter",
pipeline: testPipeline,
filterType: multilineFilter,
want: "[FILTER]\n name multiline\n match foo.*\n\n",
},
{
name: "Test Non-Multiline Filter",
pipeline: testPipeline,
filterType: nonMultilineFilter,
want: "[FILTER]\n name grep\n match foo.*\n\n",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filterConf := createCustomFilters(tt.pipeline, tt.filterType)
require.Equal(t, filterConf, tt.want)
})
}
}
2 changes: 1 addition & 1 deletion internal/fluentbit/config/builder/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func createInputSection(pipeline *telemetryv1alpha1.LogPipeline, includePath, ex
inputBuilder.AddConfigParam("alias", pipeline.Name)
inputBuilder.AddConfigParam("path", includePath)
inputBuilder.AddIfNotEmpty("exclude_path", excludePath)
inputBuilder.AddConfigParam("multiline.parser", "docker, cri, go, python, java")
inputBuilder.AddConfigParam("multiline.parser", "cri")
inputBuilder.AddConfigParam("tag", fmt.Sprintf("%s.*", pipeline.Name))
inputBuilder.AddConfigParam("skip_long_lines", "on")
inputBuilder.AddConfigParam("db", fmt.Sprintf("/data/flb_%s.db", pipeline.Name))
Expand Down
2 changes: 1 addition & 1 deletion internal/fluentbit/config/builder/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCreateInput(t *testing.T) {
db /data/flb_test-logpipeline.db
exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log
mem_buf_limit 5MB
multiline.parser docker, cri, go, python, java
multiline.parser cri
path /var/log/containers/*.log
read_from_head true
skip_long_lines on
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func main() {
flag.StringVar(&metricGatewayDynamicMemoryRequest, "metric-gateway-dynamic-memory-request", "0", "Additional memory request for metrics OpenTelemetry Collector per MetricPipeline")
flag.IntVar(&maxMetricPipelines, "metric-gateway-pipelines", 3, "Maximum number of MetricPipelines to be created. If 0, no limit is applied.")

flag.StringVar(&fluentBitDeniedFilterPlugins, "fluent-bit-denied-filter-plugins", "kubernetes,rewrite_tag,multiline", "Comma separated list of denied filter plugins even if allowUnsupportedPlugins is enabled. If empty, all filter plugins are allowed.")
flag.StringVar(&fluentBitDeniedFilterPlugins, "fluent-bit-denied-filter-plugins", "kubernetes,rewrite_tag", "Comma separated list of denied filter plugins even if allowUnsupportedPlugins is enabled. If empty, all filter plugins are allowed.")
flag.StringVar(&fluentBitDeniedOutputPlugins, "fluent-bit-denied-output-plugins", "", "Comma separated list of denied output plugins even if allowUnsupportedPlugins is enabled. If empty, all output plugins are allowed.")
flag.StringVar(&fluentBitMemoryBufferLimit, "fluent-bit-memory-buffer-limit", "10M", "Fluent Bit memory buffer limit per log pipeline")
flag.StringVar(&fluentBitFsBufferLimit, "fluent-bit-filesystem-buffer-limit", "1G", "Fluent Bit filesystem buffer limit per log pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
db /data/flb_logpipeline-1.db
exclude_path /var/log/containers/telemetry-fluent-bit-*_kyma-system_fluent-bit-*.log,/var/log/containers/*_kyma-system_*-*.log,/var/log/containers/*_kube-system_*-*.log,/var/log/containers/*_istio-system_*-*.log,/var/log/containers/*_compass-system_*-*.log
mem_buf_limit 5MB
multiline.parser docker, cri, go, python, java
multiline.parser cri
path /var/log/containers/*_*_*-*.log
read_from_head true
skip_long_lines on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[INPUT]
Name tail
Path /var/log/containers/*.log
multiline.parser docker, cri, go, python, java
multiline.parser cri
Tag tele.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Expand Down
2 changes: 1 addition & 1 deletion webhook/dryrun/testdata/given/fluent-bit-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ data:
[INPUT]
Name tail
Path /var/log/containers/*.log
multiline.parser docker, cri, go, python, java
multiline.parser cri
Tag tele.*
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Expand Down
Loading