-
Notifications
You must be signed in to change notification settings - Fork 486
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into fix-udp-syslogtarget
- Loading branch information
Showing
32 changed files
with
1,537 additions
and
200 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package internal | ||
|
||
import ( | ||
"encoding/binary" | ||
"fmt" | ||
"hash/crc32" | ||
) | ||
|
||
var ( | ||
markerHeaderV1 = []byte{'0', '1'} | ||
) | ||
|
||
// EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format, | ||
// which in v1 includes the segment number and a trailing CRC code of the first 10 bytes. | ||
func EncodeMarkerV1(segment uint64) ([]byte, error) { | ||
// marker format v1 | ||
// marker [ 0 , 1 ] - HEADER, which is used to track version | ||
// marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment | ||
// marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial | ||
bs := make([]byte, 14) | ||
// write header with marker format version | ||
bs[0] = markerHeaderV1[0] | ||
bs[1] = markerHeaderV1[1] | ||
// write actual marked segment number | ||
binary.BigEndian.PutUint64(bs[2:10], segment) | ||
// checksum is the IEEE CRC32 checksum of the first 10 bytes of the marker record | ||
checksum := crc32.ChecksumIEEE(bs[0:10]) | ||
binary.BigEndian.PutUint32(bs[10:], checksum) | ||
|
||
return bs, nil | ||
} | ||
|
||
// DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1. | ||
func DecodeMarkerV1(bs []byte) (uint64, error) { | ||
// first check that read byte stream has expected length | ||
if len(bs) != 14 { | ||
return 0, fmt.Errorf("bad length %d", len(bs)) | ||
} | ||
|
||
// check CRC first | ||
expectedCrc := crc32.ChecksumIEEE(bs[0:10]) | ||
gotCrc := binary.BigEndian.Uint32(bs[len(bs)-4:]) | ||
if expectedCrc != gotCrc { | ||
return 0, fmt.Errorf("corrupted WAL marker") | ||
} | ||
|
||
// check expected version header | ||
header := bs[:2] | ||
if !(header[0] == markerHeaderV1[0] && header[1] == markerHeaderV1[1]) { | ||
return 0, fmt.Errorf("wrong WAL marker header") | ||
} | ||
|
||
// lastly, decode marked segment number | ||
return binary.BigEndian.Uint64(bs[2:10]), nil | ||
} |
50 changes: 50 additions & 0 deletions
50
component/common/loki/client/internal/marker_encoding_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package internal | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestMarkerEncodingV1(t *testing.T) { | ||
t.Run("encode and decode", func(t *testing.T) { | ||
segment := uint64(123) | ||
bs, err := EncodeMarkerV1(segment) | ||
require.NoError(t, err) | ||
|
||
gotSegment, err := DecodeMarkerV1(bs) | ||
require.NoError(t, err) | ||
require.Equal(t, segment, gotSegment) | ||
}) | ||
|
||
t.Run("decoding errors", func(t *testing.T) { | ||
t.Run("bad checksum", func(t *testing.T) { | ||
segment := uint64(123) | ||
bs, err := EncodeMarkerV1(segment) | ||
require.NoError(t, err) | ||
|
||
// change last byte | ||
bs[13] = '5' | ||
|
||
_, err = DecodeMarkerV1(bs) | ||
require.Error(t, err) | ||
}) | ||
|
||
t.Run("bad length", func(t *testing.T) { | ||
_, err := DecodeMarkerV1(make([]byte, 15)) | ||
require.Error(t, err) | ||
}) | ||
|
||
t.Run("bad header", func(t *testing.T) { | ||
segment := uint64(123) | ||
bs, err := EncodeMarkerV1(segment) | ||
require.NoError(t, err) | ||
|
||
// change first header byte | ||
bs[0] = '5' | ||
|
||
_, err = DecodeMarkerV1(bs) | ||
require.Error(t, err) | ||
}) | ||
}) | ||
} |
101 changes: 101 additions & 0 deletions
101
component/common/loki/client/internal/marker_file_handler.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package internal | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/grafana/agent/component/common/loki/wal" | ||
"github.com/natefinch/atomic" | ||
) | ||
|
||
const ( | ||
MarkerFolderName = "remote" | ||
MarkerFileName = "segment_marker" | ||
|
||
MarkerFolderMode os.FileMode = 0o700 | ||
MarkerFileMode os.FileMode = 0o600 | ||
) | ||
|
||
// MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular | ||
// segment number as the last one marked. | ||
type MarkerFileHandler interface { | ||
wal.Marker | ||
|
||
// MarkSegment writes in the backing file-store that a particular segment is the last one marked. | ||
MarkSegment(segment int) | ||
} | ||
|
||
type markerFileHandler struct { | ||
logger log.Logger | ||
lastMarkedSegmentDir string | ||
lastMarkedSegmentFilePath string | ||
} | ||
|
||
var ( | ||
_ MarkerFileHandler = (*markerFileHandler)(nil) | ||
) | ||
|
||
// NewMarkerFileHandler creates a new markerFileHandler. | ||
func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error) { | ||
markerDir := filepath.Join(walDir, MarkerFolderName) | ||
// attempt to create dir if doesn't exist | ||
if err := os.MkdirAll(markerDir, MarkerFolderMode); err != nil { | ||
return nil, fmt.Errorf("error creating segment marker folder %q: %w", markerDir, err) | ||
} | ||
|
||
mfh := &markerFileHandler{ | ||
logger: logger, | ||
lastMarkedSegmentDir: filepath.Join(markerDir), | ||
lastMarkedSegmentFilePath: filepath.Join(markerDir, MarkerFileName), | ||
} | ||
|
||
return mfh, nil | ||
} | ||
|
||
// LastMarkedSegment implements wlog.Marker. | ||
func (mfh *markerFileHandler) LastMarkedSegment() int { | ||
bs, err := os.ReadFile(mfh.lastMarkedSegmentFilePath) | ||
if os.IsNotExist(err) { | ||
level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath) | ||
return -1 | ||
} else if err != nil { | ||
level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) | ||
return -1 | ||
} | ||
|
||
savedSegment, err := DecodeMarkerV1(bs) | ||
if err != nil { | ||
level.Error(mfh.logger).Log("msg", "could not decode segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) | ||
return -1 | ||
} | ||
|
||
return int(savedSegment) | ||
} | ||
|
||
// MarkSegment implements MarkerHandler. | ||
func (mfh *markerFileHandler) MarkSegment(segment int) { | ||
encodedMarker, err := EncodeMarkerV1(uint64(segment)) | ||
if err != nil { | ||
level.Error(mfh.logger).Log("msg", "failed to encode marker when marking segment", "err", err) | ||
return | ||
} | ||
|
||
if err := mfh.atomicallyWriteMarker(encodedMarker); err != nil { | ||
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err) | ||
return | ||
} | ||
|
||
level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment) | ||
} | ||
|
||
// atomicallyWriteMarker attempts to perform an atomic write of the marker contents. This is delegated to | ||
// https://github.com/natefinch/atomic/blob/master/atomic.go, that first handles atomic file renaming for UNIX and | ||
// Windows systems. Also, atomic.WriteFile will first write the contents to a temporal file, and then perform the atomic | ||
// rename, swapping the marker, or not at all. | ||
func (mfh *markerFileHandler) atomicallyWriteMarker(bs []byte) error { | ||
return atomic.WriteFile(mfh.lastMarkedSegmentFilePath, bytes.NewReader(bs)) | ||
} |
66 changes: 66 additions & 0 deletions
66
component/common/loki/client/internal/marker_file_handler_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package internal | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestMarkerFileHandler(t *testing.T) { | ||
logger := log.NewLogfmtLogger(os.Stdout) | ||
getTempDir := func(t *testing.T) string { | ||
dir := t.TempDir() | ||
return dir | ||
} | ||
|
||
t.Run("invalid last marked segment when there's no marker file", func(t *testing.T) { | ||
dir := getTempDir(t) | ||
fh, err := NewMarkerFileHandler(logger, dir) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, -1, fh.LastMarkedSegment()) | ||
}) | ||
|
||
t.Run("reads the last segment from existing marker file", func(t *testing.T) { | ||
dir := getTempDir(t) | ||
fh, err := NewMarkerFileHandler(logger, dir) | ||
require.NoError(t, err) | ||
|
||
// write first something to marker | ||
markerFile := filepath.Join(dir, MarkerFolderName, MarkerFileName) | ||
bs, err := EncodeMarkerV1(10) | ||
require.NoError(t, err) | ||
err = os.WriteFile(markerFile, bs, MarkerFileMode) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, 10, fh.LastMarkedSegment()) | ||
}) | ||
|
||
t.Run("marks segment, and then reads value from it", func(t *testing.T) { | ||
dir := getTempDir(t) | ||
fh, err := NewMarkerFileHandler(logger, dir) | ||
require.NoError(t, err) | ||
|
||
fh.MarkSegment(12) | ||
require.Equal(t, 12, fh.LastMarkedSegment()) | ||
}) | ||
|
||
t.Run("marker file and directory is created with correct permissions", func(t *testing.T) { | ||
dir := getTempDir(t) | ||
fh, err := NewMarkerFileHandler(logger, dir) | ||
require.NoError(t, err) | ||
|
||
fh.MarkSegment(12) | ||
// check folder first | ||
stats, err := os.Stat(filepath.Join(dir, MarkerFolderName)) | ||
require.NoError(t, err) | ||
require.Equal(t, MarkerFolderMode, stats.Mode().Perm()) | ||
// then file | ||
stats, err = os.Stat(filepath.Join(dir, MarkerFolderName, MarkerFileName)) | ||
require.NoError(t, err) | ||
require.Equal(t, MarkerFileMode, stats.Mode().Perm()) | ||
}) | ||
} |
Oops, something went wrong.