Skip to content

Commit

Permalink
Merge pull request #2106 from keboola/fix-notifier-guarantees
Browse files Browse the repository at this point in the history
fix: Do not obtain notifier before writeRow
  • Loading branch information
Matovidlo authored Oct 23, 2024
2 parents 56e0efe + e9219a4 commit cded779
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package csv

import (
"context"
"io"
"sync"

Expand All @@ -20,7 +21,7 @@ type Encoder struct {
columns column.Columns
writersPool *fastcsv.WritersPool
valuesPool *sync.Pool
notifier func() *notify.Notifier
notifier func(ctx context.Context) *notify.Notifier
}

var columnRenderer = column.NewRenderer() //nolint:gochecknoglobals // contains Jsonnet VMs sync.Pool
Expand All @@ -29,7 +30,13 @@ var columnRenderer = column.NewRenderer() //nolint:gochecknoglobals // contains
// The order of the lines is not preserved, because we use the writers pool,
// but also because there are several source nodes with a load balancer in front of them.
// In case of encoder accepts too big csv row, it returns error.
func NewEncoder(concurrency int, rowSizeLimit datasize.ByteSize, mapping any, out io.Writer, notifier func() *notify.Notifier) (*Encoder, error) {
func NewEncoder(
concurrency int,
rowSizeLimit datasize.ByteSize,
mapping any,
out io.Writer,
notifier func(ctx context.Context) *notify.Notifier,
) (*Encoder, error) {
tableMapping, ok := mapping.(table.Mapping)
if !ok {
return nil, errors.Errorf("csv encoder supports only table mapping, given %v", mapping)
Expand All @@ -49,7 +56,6 @@ func NewEncoder(concurrency int, rowSizeLimit datasize.ByteSize, mapping any, ou
}

func (w *Encoder) WriteRecord(record recordctx.Context) (result.WriteRecordResult, error) {
writeRecordResult := result.NewNotifierWriteRecordResult(w.notifier())
// Reduce memory allocations
values := w.valuesPool.Get().(*[]any)
defer w.valuesPool.Put(values)
Expand All @@ -58,29 +64,30 @@ func (w *Encoder) WriteRecord(record recordctx.Context) (result.WriteRecordResul
for i, col := range w.columns {
value, err := columnRenderer.CSVValue(col, record)
if err != nil {
return writeRecordResult, errors.PrefixErrorf(err, "cannot convert column %q to CSV value", col)
return result.WriteRecordResult{}, errors.PrefixErrorf(err, "cannot convert column %q to CSV value", col)
}
(*values)[i] = value
}

// Encode the values to CSV format
n, err := w.writersPool.WriteRow(values)
writeRecordResult.N = n
if err != nil {
var valErr fastcsv.ValueError
if errors.As(err, &valErr) {
columnName := w.columns[valErr.ColumnIndex].ColumnName()
return writeRecordResult, errors.Errorf(`cannot convert value of the column "%s" to the string: %w`, columnName, err)
return result.WriteRecordResult{}, errors.Errorf(`cannot convert value of the column "%s" to the string: %w`, columnName, err)
}
var limitErr fastcsv.LimitError
if errors.As(err, &limitErr) {
columnName := w.columns[limitErr.ColumnIndex].ColumnName()
return writeRecordResult, svcerrors.NewPayloadTooLargeError(errors.Errorf(`too big CSV row, column: "%s", row limit: %s`, columnName, limitErr.Limit.HumanReadable()))
return result.WriteRecordResult{}, svcerrors.NewPayloadTooLargeError(errors.Errorf(`too big CSV row, column: "%s", row limit: %s`, columnName, limitErr.Limit.HumanReadable()))
}

return writeRecordResult, err
return result.WriteRecordResult{}, err
}

// Get notifier after succcessful written record
writeRecordResult := result.NewNotifierWriteRecordResult(n, w.notifier(record.Ctx()))
// Buffers can be released
// Important: values slice contains reference to the body []byte buffer, so it can be released sooner.
record.ReleaseBuffers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csv_test

import (
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -33,6 +34,40 @@ type fileCompression struct {
DisableValidation bool
}

type unknown struct {
Name string `json:"name" validate:"required"`
}

type staticNotifier struct {
notifier *notify.Notifier
}

func newStaticNotifier() *staticNotifier {
return &staticNotifier{}
}

func (v unknown) ColumnType() column.Type {
return column.Type("unknown")
}

func (v unknown) ColumnName() string {
return v.Name
}

func (v unknown) IsPrimaryKey() bool {
return true
}

func (s *staticNotifier) createStaticNotifier(_ context.Context) *notify.Notifier {
if s.notifier == nil {
notifier := notify.New()
s.notifier = notifier
return notifier
}

return s.notifier
}

// nolint:tparallel // false positive
func TestCSVWriter(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -97,7 +132,10 @@ func TestCSVWriterAboveLimit(t *testing.T) {
column.Body{Name: "body"},
},
}
csvEncoder, err := csv.NewEncoder(0, 40*datasize.B, columns, io.Discard, notify.New)
newNotifier := func(ctx context.Context) *notify.Notifier {
return notify.New()
}
csvEncoder, err := csv.NewEncoder(0, 40*datasize.B, columns, io.Discard, newNotifier)
require.NoError(t, err)
record := recordctx.FromHTTP(
utctime.MustParse("2000-01-01T03:00:00.000Z").Time(),
Expand All @@ -114,6 +152,41 @@ func TestCSVWriterAboveLimit(t *testing.T) {
assert.Equal(t, "too big CSV row, column: \"body\", row limit: 40 B", err.Error())
}

// TestCSVWriterDoNotGetNotifierBeforeWrite guarantees that notifier is always obtained after successful writeRow
// not before writeRow, during mapping CSV columns.
func TestCSVWriterDoNotGetNotifierBeforeWrite(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Input rows
columns := table.Mapping{
Columns: column.Columns{
column.Datetime{Name: "datetime"},
column.Body{Name: "body"},
unknown{Name: "path"},
},
}
staticNotifier := newStaticNotifier()
csvEncoder, err := csv.NewEncoder(0, 40*datasize.B, columns, io.Discard, staticNotifier.createStaticNotifier)
require.NoError(t, err)
record := recordctx.FromHTTP(
utctime.MustParse("2000-01-01T03:00:00.000Z").Time(),
&http.Request{Body: io.NopCloser(strings.NewReader("foobar"))},
)
notifier := staticNotifier.createStaticNotifier(ctx)
notifier.Done(nil)
r, err := csvEncoder.WriteRecord(record)
require.Error(t, err)

// It is expected that no notifier is returned before writing actual record, but after writing record
if !assert.Nil(t, r.Notifier) {
err = r.Notifier.Wait(ctx)
require.NoError(t, err)
return
}
}

func newTestCase(comp fileCompression, syncMode writesync.Mode, syncWait bool, parallelWrite bool) *testcase.WriterTestCase {
// Input rows
columns := column.Columns{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encoder

import (
"context"
"io"

"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/encoder/csv"
Expand All @@ -9,12 +10,12 @@ import (
)

type Factory interface {
NewEncoder(cfg Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (Encoder, error)
NewEncoder(cfg Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (Encoder, error)
}

type DefaultFactory struct{}

func (DefaultFactory) NewEncoder(cfg Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (Encoder, error) {
func (DefaultFactory) NewEncoder(cfg Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (Encoder, error) {
switch cfg.Type {
case TypeCSV:
return csv.NewEncoder(cfg.Concurrency, cfg.RowSizeLimit, mapping, out, notifier)
Expand All @@ -23,14 +24,14 @@ func (DefaultFactory) NewEncoder(cfg Config, mapping any, out io.Writer, notifie
}
}

func FactoryFn(fn func(cfg Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (Encoder, error)) Factory {
func FactoryFn(fn func(cfg Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (Encoder, error)) Factory {
return factoryFn{Fn: fn}
}

type factoryFn struct {
Fn func(cfg Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (Encoder, error)
Fn func(cfg Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (Encoder, error)
}

func (f factoryFn) NewEncoder(cfg Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (Encoder, error) {
func (f factoryFn) NewEncoder(cfg Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (Encoder, error) {
return f.Fn(cfg, mapping, out, notifier)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ type WriteRecordResult struct {
Notifier *notify.Notifier
}

func NewNotifierWriteRecordResult(notifier *notify.Notifier) WriteRecordResult {
func NewNotifierWriteRecordResult(n int, notifier *notify.Notifier) WriteRecordResult {
return WriteRecordResult{
N: n,
Notifier: notifier,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestEncodingPipeline_FlushError(t *testing.T) {
d, _ := dependencies.NewMockedSourceScope(t, ctx)

slice := test.NewSlice()
slice.Encoding.Encoder.OverrideEncoderFactory = encoder.FactoryFn(func(cfg encoder.Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (encoder.Encoder, error) {
slice.Encoding.Encoder.OverrideEncoderFactory = encoder.FactoryFn(func(cfg encoder.Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (encoder.Encoder, error) {
w := newDummyEncoder(out, nil, notifier)
w.FlushError = errors.New("some error")
return w, nil
Expand All @@ -98,7 +98,7 @@ func TestEncodingPipeline_CloseError(t *testing.T) {
d, _ := dependencies.NewMockedSourceScope(t, ctx)

slice := test.NewSlice()
slice.Encoding.Encoder.OverrideEncoderFactory = encoder.FactoryFn(func(cfg encoder.Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (encoder.Encoder, error) {
slice.Encoding.Encoder.OverrideEncoderFactory = encoder.FactoryFn(func(cfg encoder.Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (encoder.Encoder, error) {
w := newDummyEncoder(out, nil, notifier)
w.CloseError = errors.New("some error")
return w, nil
Expand Down Expand Up @@ -227,21 +227,22 @@ foo3
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"10B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"debug","message":"sync to disk done"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"notifier obtained"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"closing encoding pipeline"}
{"level":"debug","message":"stopping syncer"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"debug","message":"syncer stopped"}
Expand Down Expand Up @@ -340,8 +341,8 @@ foo3
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"closing encoding pipeline"}
{"level":"debug","message":"stopping syncer"}
{"level":"debug","message":"starting sync to cache"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"starting sync to cache"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to cache done"}
Expand Down Expand Up @@ -575,7 +576,7 @@ func (tc *encodingTestCase) AssertLogs(expected string) bool {
return tc.Logger.AssertJSONMessages(tc.T, expected)
}

func (h *writerSyncHelper) NewEncoder(cfg encoder.Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (encoder.Encoder, error) {
func (h *writerSyncHelper) NewEncoder(cfg encoder.Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (encoder.Encoder, error) {
return newDummyEncoder(out, h.writeDone, notifier), nil
}

Expand Down Expand Up @@ -626,35 +627,33 @@ func (h *writerSyncHelper) TriggerSync(tb testing.TB) {
type dummyEncoder struct {
out io.Writer
writeDone chan struct{}
notifier func() *notify.Notifier
notifier func(ctx context.Context) *notify.Notifier
FlushError error
CloseError error
}

func dummyEncoderFactory(cfg encoder.Config, mapping any, out io.Writer, notifier func() *notify.Notifier) (encoder.Encoder, error) {
func dummyEncoderFactory(cfg encoder.Config, mapping any, out io.Writer, notifier func(ctx context.Context) *notify.Notifier) (encoder.Encoder, error) {
return newDummyEncoder(out, nil, notifier), nil
}

func newDummyEncoder(out io.Writer, writeDone chan struct{}, notifier func() *notify.Notifier) *dummyEncoder {
func newDummyEncoder(out io.Writer, writeDone chan struct{}, notifier func(ctx context.Context) *notify.Notifier) *dummyEncoder {
return &dummyEncoder{out: out, writeDone: writeDone, notifier: notifier}
}

func (w *dummyEncoder) WriteRecord(record recordctx.Context) (result.WriteRecordResult, error) {
wrr := result.NewNotifierWriteRecordResult(w.notifier())

body, err := record.BodyBytes()
if err != nil {
return wrr, err
return result.WriteRecordResult{}, err
}

body = append(body, '\n')

n, err := w.out.Write(body)
wrr := result.NewNotifierWriteRecordResult(n, w.notifier(record.Ctx()))
if err == nil && w.writeDone != nil {
w.writeDone <- struct{}{}
}

wrr.N = n
return wrr, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ func NewSyncer(
}

// Notifier to wait for the next sync.
func (s *Syncer) Notifier() *notify.Notifier {
func (s *Syncer) Notifier(ctx context.Context) *notify.Notifier {
// Wait is disabled, return nil notifier, *notify.Notifier(nil).Wait() is valid NOP call.
if !s.config.Wait {
return nil
}

s.notifierLock.RLock()
s.logger.Debug(ctx, "notifier obtained")
notifier := s.notifier
s.notifierLock.RUnlock()
return notifier
Expand Down
Loading

0 comments on commit cded779

Please sign in to comment.