diff --git a/go/protocols/materialize/lifecycle.go b/go/protocols/materialize/lifecycle.go index ba1cf0046..0ec5991fc 100644 --- a/go/protocols/materialize/lifecycle.go +++ b/go/protocols/materialize/lifecycle.go @@ -166,7 +166,7 @@ func (it *StoreIterator) Err() error { return it.err } -func ReadAcknowledge(stream MaterializeStream, request *pm.Request) error { +func readAcknowledge(stream MaterializeStream, request *pm.Request) error { if request.Open == nil && request.StartCommit == nil { panic(fmt.Sprintf("expected prior request is Open or StartCommit, got %#v", request)) } else if err := recv(stream, request); err != nil { @@ -180,7 +180,7 @@ func ReadAcknowledge(stream MaterializeStream, request *pm.Request) error { return nil } -func ReadFlush(request *pm.Request) error { +func validateIsFlush(request *pm.Request) error { if request.Flush == nil { return fmt.Errorf("protocol error (expected Flush, got %#v)", request) } else if err := request.Validate_(); err != nil { @@ -189,7 +189,7 @@ func ReadFlush(request *pm.Request) error { return nil } -func ReadStartCommit(request *pm.Request) (*pc.Checkpoint, error) { +func checkpointFromStartCommit(request *pm.Request) (*pc.Checkpoint, error) { if request.StartCommit == nil { return nil, fmt.Errorf("protocol error (expected StartCommit, got %#v)", request) } else if err := request.Validate_(); err != nil { @@ -198,7 +198,7 @@ func ReadStartCommit(request *pm.Request) (*pc.Checkpoint, error) { return request.StartCommit.RuntimeCheckpoint, nil } -func WriteOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Response, error) { +func writeOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Response, error) { var response = pm.Response{Opened: opened} if err := stream.Send(&response); err != nil { @@ -207,7 +207,7 @@ func WriteOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Respo return response, nil } -func WriteAcknowledged(stream MaterializeStream, state *pf.ConnectorState, response *pm.Response) error { +func writeAcknowledged(stream MaterializeStream, state *pf.ConnectorState, response *pm.Response) error { if response.Opened == nil && response.StartedCommit == nil { panic(fmt.Sprintf("expected prior response is Opened or StartedCommit, got %#v", response)) } @@ -222,7 +222,7 @@ func WriteAcknowledged(stream MaterializeStream, state *pf.ConnectorState, respo return nil } -func WriteLoaded( +func writeLoaded( stream MaterializeStream, response *pm.Response, binding int, @@ -243,7 +243,7 @@ func WriteLoaded( return nil } -func WriteFlushed(stream MaterializeStream, response *pm.Response) error { +func writeFlushed(stream MaterializeStream, response *pm.Response) error { if response.Acknowledged == nil && response.Loaded == nil { panic(fmt.Sprintf("expected prior response is Acknowledged or Loaded, got %#v", response)) } @@ -256,7 +256,7 @@ func WriteFlushed(stream MaterializeStream, response *pm.Response) error { return nil } -func WriteStartedCommit( +func writeStartedCommit( stream MaterializeStream, response *pm.Response, checkpoint *pf.ConnectorState, diff --git a/go/protocols/materialize/transactor.go b/go/protocols/materialize/transactor.go index ca957f589..5b6a051d3 100644 --- a/go/protocols/materialize/transactor.go +++ b/go/protocols/materialize/transactor.go @@ -112,7 +112,7 @@ func RunTransactions( opened pm.Response_Opened, transactor Transactor, ) (_err error) { - defer func() { transactor.Destroy() }() + defer transactor.Destroy() if err := open.Validate(); err != nil { return fmt.Errorf("open is invalid: %w", err) @@ -127,7 +127,7 @@ func RunTransactions( } var rxRequest = pm.Request{Open: &open} - var txResponse, err = WriteOpened(stream, &opened) + var txResponse, err = writeOpened(stream, &opened) if err != nil { return err } @@ -169,7 +169,7 @@ func RunTransactions( if ackState, err := transactor.Acknowledge(ctx); err != nil { return err - } else if err := WriteAcknowledged(stream, ackState, &txResponse); err != nil { + } else if err := writeAcknowledged(stream, ackState, &txResponse); err != nil { return err } @@ -200,7 +200,7 @@ func RunTransactions( } loaded++ - return WriteLoaded(stream, &txResponse, binding, doc) + return writeLoaded(stream, &txResponse, binding, doc) }) if it.awaitDoneCh == nil && awaitErr != nil { @@ -225,7 +225,7 @@ func RunTransactions( loadIt = LoadIterator{stream: stream, request: &rxRequest, awaitDoneCh: awaitDoneCh, ctx: loadCtx} ) - if err = ReadAcknowledge(stream, &rxRequest); err != nil { + if err = readAcknowledge(stream, &rxRequest); err != nil { return err } @@ -263,9 +263,9 @@ func RunTransactions( return nil // Graceful shutdown. } - if err = ReadFlush(&rxRequest); err != nil { + if err = validateIsFlush(&rxRequest); err != nil { return err - } else if err = WriteFlushed(stream, &txResponse); err != nil { + } else if err = writeFlushed(stream, &txResponse); err != nil { return err } @@ -279,7 +279,7 @@ func RunTransactions( return fmt.Errorf("transactor.Store: %w", err) } var runtimeCheckpoint *pc.Checkpoint - if runtimeCheckpoint, err = ReadStartCommit(&rxRequest); err != nil { + if runtimeCheckpoint, err = checkpointFromStartCommit(&rxRequest); err != nil { return err } @@ -304,7 +304,7 @@ func RunTransactions( default: } - if err = WriteStartedCommit(stream, &txResponse, stateUpdate); err != nil { + if err = writeStartedCommit(stream, &txResponse, stateUpdate); err != nil { return err } }