Skip to content

Commit

Permalink
Init journald input (#19619)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 3, 2020
1 parent d7ef9af commit d0b1aa2
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 84 deletions.
167 changes: 167 additions & 0 deletions dev-tools/mage/pkgdeps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mage

import (
"fmt"

"github.com/magefile/mage/sh"
"github.com/pkg/errors"
)

type PackageInstaller struct {
table map[PlatformDescription][]PackageDependency
}

type PlatformDescription struct {
Name string
Arch string
DefaultTag string
}

type PackageDependency struct {
archTag string
dependencies []string
}

var (
Linux386 = PlatformDescription{Name: "linux/386", Arch: "i386", DefaultTag: "i386"}
LinuxAMD64 = PlatformDescription{Name: "linux/amd64", Arch: "", DefaultTag: ""} // builders run on amd64 platform
LinuxARM64 = PlatformDescription{Name: "linux/arm64", Arch: "arm64", DefaultTag: "arm64"}
LinuxARM5 = PlatformDescription{Name: "linux/arm5", Arch: "armel", DefaultTag: "armel"}
LinuxARM6 = PlatformDescription{Name: "linux/arm6", Arch: "armel", DefaultTag: "armel"}
LinuxARM7 = PlatformDescription{Name: "linux/arm7", Arch: "armhf", DefaultTag: "armhf"}
LinuxMIPS = PlatformDescription{Name: "linux/mips", Arch: "mips", DefaultTag: "mips"}
LinuxMIPSLE = PlatformDescription{Name: "linux/mipsle", Arch: "mipsel", DefaultTag: "mipsel"}
LinuxMIPS64LE = PlatformDescription{Name: "linux/mips64le", Arch: "mips64el", DefaultTag: "mips64el"}
LinuxPPC64LE = PlatformDescription{Name: "linux/ppc64le", Arch: "ppc64el", DefaultTag: "ppc64el"}
LinuxS390x = PlatformDescription{Name: "linux/s390x", Arch: "s390x", DefaultTag: "s390x"}
)

func NewPackageInstaller() *PackageInstaller {
return &PackageInstaller{}
}

func (i *PackageInstaller) AddEach(ps []PlatformDescription, names ...string) *PackageInstaller {
for _, p := range ps {
i.Add(p, names...)
}
return i
}

func (i *PackageInstaller) Add(p PlatformDescription, names ...string) *PackageInstaller {
i.AddPackages(p, p.Packages(names...))
return i
}

func (i *PackageInstaller) AddPackages(p PlatformDescription, details ...PackageDependency) *PackageInstaller {
if i.table == nil {
i.table = map[PlatformDescription][]PackageDependency{}
}
i.table[p] = append(i.table[p], details...)
return i
}

func (i *PackageInstaller) Installer(name string) func() error {
var platform PlatformDescription
for p := range i.table {
if p.Name == name {
platform = p
}
}

if platform.Name == "" {
return func() error { return nil }
}

return func() error {
return i.Install(platform)
}
}

func (i *PackageInstaller) Install(p PlatformDescription) error {
packages := map[string]struct{}{}
for _, details := range i.table[p] {
for _, name := range details.List() {
packages[name] = struct{}{}
}
}

j, lst := 0, make([]string, len(packages))
for name := range packages {
lst[j], j = name, j+1
}

return installDependencies(p.Arch, lst...)
}

func installDependencies(arch string, pkgs ...string) error {
if arch != "" {
err := sh.Run("dpkg", "--add-architecture", arch)
if err != nil {
return errors.Wrap(err, "error while adding architecture")
}
}

if err := sh.Run("apt-get", "update"); err != nil {
return err
}

params := append([]string{"install", "-y",
"--no-install-recommends",

// Journalbeat is built with old versions of Debian that don't update
// their repositories, so they have expired keys.
// Allow unauthenticated packages.
// This was not enough: "-o", "Acquire::Check-Valid-Until=false",
"--allow-unauthenticated",
}, pkgs...)
return sh.Run("apt-get", params...)
}

func (p PlatformDescription) Packages(names ...string) PackageDependency {
return PackageDependency{}.WithTag(p.DefaultTag).Add(names...)
}

func (p PackageDependency) Add(deps ...string) PackageDependency {
if len(deps) == 0 {
return p
}

// always copy to ensure that we never share or overwrite slices due to capacity being too large
p.dependencies = append(make([]string, 0, len(p.dependencies)+len(deps)), p.dependencies...)
p.dependencies = append(p.dependencies, deps...)
return p
}

func (p PackageDependency) WithTag(tag string) PackageDependency {
p.archTag = tag
return p
}

func (p PackageDependency) List() []string {
if p.archTag == "" {
return p.dependencies
}

names := make([]string, len(p.dependencies))
for i, name := range p.dependencies {
names[i] = fmt.Sprintf("%v:%v", name, p.archTag)
}
return names
}
1 change: 1 addition & 0 deletions filebeat/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM golang:1.13.10
RUN \
apt-get update \
&& apt-get install -y --no-install-recommends \
libsystemd-dev \
netcat \
rsync \
python3 \
Expand Down
72 changes: 72 additions & 0 deletions filebeat/input/journald/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build linux,cgo

package journald

import (
"errors"
"time"

"github.com/elastic/beats/v7/journalbeat/pkg/journalfield"
"github.com/elastic/beats/v7/journalbeat/pkg/journalread"
)

// Config stores the options of a journald input.
type config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`

// Backoff is the current interval to wait before
// attemting to read again from the journal.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`

// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`

// Seek is the method to read from journals.
Seek journalread.SeekMode `config:"seek"`

// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback journalread.SeekMode `config:"cursor_seek_fallback"`

// Matches store the key value pairs to match entries.
Matches []journalfield.Matcher `config:"include_matches"`

// SaveRemoteHostname defines if the original source of the entry needs to be saved.
SaveRemoteHostname bool `config:"save_remote_hostname"`
}

var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback")

func defaultConfig() config {
return config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: journalread.SeekCursor,
CursorSeekFallback: journalread.SeekHead,
SaveRemoteHostname: false,
}
}

func (c *config) Validate() error {
if c.CursorSeekFallback != journalread.SeekHead && c.CursorSeekFallback != journalread.SeekTail {
return errInvalidSeekFallback
}
return nil
}
55 changes: 55 additions & 0 deletions filebeat/input/journald/conv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package journald

import (
"time"

"github.com/elastic/beats/v7/journalbeat/pkg/journalfield"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

func eventFromFields(
log *logp.Logger,
timestamp uint64,
entryFields map[string]string,
saveRemoteHostname bool,
) beat.Event {
created := time.Now()
c := journalfield.NewConverter(log, nil)
fields := c.Convert(entryFields)
fields.Put("event.kind", "event")

// if entry is coming from a remote journal, add_host_metadata overwrites the source hostname, so it
// has to be copied to a different field
if saveRemoteHostname {
remoteHostname, err := fields.GetValue("host.hostname")
if err == nil {
fields.Put("log.source.address", remoteHostname)
}
}

fields.Put("event.created", created)
receivedByJournal := time.Unix(0, int64(timestamp)*1000)

return beat.Event{
Timestamp: receivedByJournal,
Fields: fields,
}
}
Loading

0 comments on commit d0b1aa2

Please sign in to comment.