diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index fd8580fdcd46..cff591c964e3 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -314,7 +314,15 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool { // Append implements Chunk. func (c *MemChunk) Append(entry *logproto.Entry) error { - if err := c.head.append(entry.Timestamp.UnixNano(), entry.Line); err != nil { + entryTimestamp := entry.Timestamp.UnixNano() + + // If the head block is empty but there are cut blocks, we have to make + // sure the new entry is not out of order compared to the previous block + if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { + return ErrOutOfOrder + } + + if err := c.head.append(entryTimestamp, entry.Line); err != nil { return err } diff --git a/pkg/chunkenc/gzip_test.go b/pkg/chunkenc/gzip_test.go index 5e1363e08b11..a2573fda577b 100644 --- a/pkg/chunkenc/gzip_test.go +++ b/pkg/chunkenc/gzip_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/require" ) @@ -216,6 +218,47 @@ func TestGZIPChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) } +func TestMemChunk_AppendOutOfOrder(t *testing.T) { + t.Parallel() + + type tester func(t *testing.T, chk *MemChunk) + + tests := map[string]tester{ + "append out of order in the same block": func(t *testing.T, chk *MemChunk) { + assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) + assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + }, + "append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) { + assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) + assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + assert.NoError(t, chk.cut()) + + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + }, + "append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) { + assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) + assert.NoError(t, chk.cut()) + + assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + assert.NoError(t, chk.cut()) + + assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + }, + } + + for testName, tester := range tests { + tester := tester + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + tester(t, NewMemChunk(EncGZIP)) + }) + } +} + var result []Chunk func BenchmarkWriteGZIP(b *testing.B) {