Skip to content

Commit

Permalink
introduce journalbeat/pkg in order to provide reusable shared code (e…
Browse files Browse the repository at this point in the history
…lastic#19581)

This PR moves some functionality into journalbeat/pkg for reuse:
- field conversion
- adding field filters
- low-level reader

The change is mostly moving code around with minor cleanups (e.g. by introducing more dedicated types and tests). Functionality from original read is split into: reader, matchers, and conversion
  • Loading branch information
Steffen Siering authored Jul 2, 2020
1 parent 794ed81 commit da1e55f
Show file tree
Hide file tree
Showing 18 changed files with 1,159 additions and 832 deletions.
422 changes: 211 additions & 211 deletions NOTICE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
Expand Down
38 changes: 0 additions & 38 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,18 @@
package config

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
}

const (
// SeekInvalid is an invalid value for seek
SeekInvalid SeekMode = iota
// SeekHead option seeks to the head of a journal
SeekHead
// SeekTail option seeks to the tail of a journal
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor

seekHeadStr = "head"
seekTailStr = "tail"
seekCursorStr = "cursor"
)

var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
}

seekModes = map[string]SeekMode{
seekHeadStr: SeekHead,
seekTailStr: SeekTail,
seekCursorStr: SeekCursor,
}
)

// Unpack validates and unpack "seek" config option
func (m *SeekMode) Unpack(value string) error {
mode, ok := seekModes[value]
if !ok {
return fmt.Errorf("invalid seek mode '%s'", value)
}

*m = mode

return nil
}
94 changes: 0 additions & 94 deletions journalbeat/config/config_test.go

This file was deleted.

13 changes: 7 additions & 6 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package input
import (
"time"

"github.com/elastic/beats/v7/journalbeat/config"
"github.com/elastic/beats/v7/journalbeat/pkg/journalfield"
"github.com/elastic/beats/v7/journalbeat/pkg/journalread"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
Expand All @@ -38,11 +39,11 @@ type Config struct {
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
Seek journalread.SeekMode `config:"seek"`
// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback config.SeekMode `config:"cursor_seek_fallback"`
CursorSeekFallback journalread.SeekMode `config:"cursor_seek_fallback"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`
Matches []journalfield.Matcher `config:"include_matches"`
// SaveRemoteHostname defines if the original source of the entry needs to be saved.
SaveRemoteHostname bool `config:"save_remote_hostname"`

Expand All @@ -59,8 +60,8 @@ var (
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
CursorSeekFallback: config.SeekHead,
Seek: journalread.SeekCursor,
CursorSeekFallback: journalread.SeekHead,
SaveRemoteHostname: false,
}
)
119 changes: 119 additions & 0 deletions journalbeat/pkg/journalfield/conv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package journalfield

import (
"fmt"
"strconv"
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// FieldConversion provides the mappings and conversion rules for raw fields of journald entries.
type FieldConversion map[string]Conversion

// Conversion configures the conversion rules for a field.
type Conversion struct {
Names []string
IsInteger bool
Dropped bool
}

// Converter applis configured conversion rules to journald entries, producing
// a new common.MapStr.
type Converter struct {
log *logp.Logger
conversions FieldConversion
}

// NewConverter creates a new Converter from the given conversion rules. If
// conversions is nil, internal default conversion rules will be applied.
func NewConverter(log *logp.Logger, conversions FieldConversion) *Converter {
if conversions == nil {
conversions = journaldEventFields
}

return &Converter{log: log, conversions: conversions}
}

// Convert creates a common.MapStr from the raw fields by applying the
// configured conversion rules.
// Field type conversion errors are logged to at debug level and the original
// value is added to the map.
func (c *Converter) Convert(entryFields map[string]string) common.MapStr {
fields := common.MapStr{}
var custom common.MapStr

for entryKey, v := range entryFields {
if fieldConversionInfo, ok := c.conversions[entryKey]; !ok {
if custom == nil {
custom = common.MapStr{}
}
normalized := strings.ToLower(strings.TrimLeft(entryKey, "_"))
custom.Put(normalized, v)
} else if !fieldConversionInfo.Dropped {
value, err := convertValue(fieldConversionInfo, v)
if err != nil {
value = v
c.log.Debugf("Journald mapping error: %v", err)
}
for _, name := range fieldConversionInfo.Names {
fields.Put(name, value)
}
}
}

if len(custom) != 0 {
fields.Put("journald.custom", custom)
}

return fields
}

func convertValue(fc Conversion, value string) (interface{}, error) {
if fc.IsInteger {
v, err := strconv.ParseInt(value, 10, 64)
if err != nil {
// On some versions of systemd the 'syslog.pid' can contain the username
// appended to the end of the pid. In most cases this does not occur
// but in the cases that it does, this tries to strip ',\w*' from the
// value and then perform the conversion.
s := strings.Split(value, ",")
v, err = strconv.ParseInt(s[0], 10, 64)
if err != nil {
return value, fmt.Errorf("failed to convert field %s \"%v\" to int: %v", fc.Names[0], value, err)
}
}
return v, nil
}
return value, nil
}

// helpers for creating a field conversion table.

var ignoredField = Conversion{Dropped: true}

func text(names ...string) Conversion {
return Conversion{Names: names, IsInteger: false, Dropped: false}
}

func integer(names ...string) Conversion {
return Conversion{Names: names, IsInteger: true, Dropped: false}
}
Loading

0 comments on commit da1e55f

Please sign in to comment.