Skip to content

Commit

Permalink
materialize: unexport & rename some message handling routines
Browse files Browse the repository at this point in the history
None of these needed to be exported anymore, so I unexported them. Also renamed
some of them to be more clear, for example `ReadFlush` didn't actually read
anything, but rather just validated that the last read message by the Load
iterator was a "flush".
  • Loading branch information
williamhbaker committed Oct 8, 2024
1 parent 35137b6 commit c43ddc2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
16 changes: 8 additions & 8 deletions go/protocols/materialize/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (it *StoreIterator) Err() error {
return it.err
}

func ReadAcknowledge(stream MaterializeStream, request *pm.Request) error {
func readAcknowledge(stream MaterializeStream, request *pm.Request) error {
if request.Open == nil && request.StartCommit == nil {
panic(fmt.Sprintf("expected prior request is Open or StartCommit, got %#v", request))
} else if err := recv(stream, request); err != nil {
Expand All @@ -180,7 +180,7 @@ func ReadAcknowledge(stream MaterializeStream, request *pm.Request) error {
return nil
}

func ReadFlush(request *pm.Request) error {
func validateIsFlush(request *pm.Request) error {
if request.Flush == nil {
return fmt.Errorf("protocol error (expected Flush, got %#v)", request)
} else if err := request.Validate_(); err != nil {
Expand All @@ -189,7 +189,7 @@ func ReadFlush(request *pm.Request) error {
return nil
}

func ReadStartCommit(request *pm.Request) (*pc.Checkpoint, error) {
func checkpointFromStartCommit(request *pm.Request) (*pc.Checkpoint, error) {
if request.StartCommit == nil {
return nil, fmt.Errorf("protocol error (expected StartCommit, got %#v)", request)
} else if err := request.Validate_(); err != nil {
Expand All @@ -198,7 +198,7 @@ func ReadStartCommit(request *pm.Request) (*pc.Checkpoint, error) {
return request.StartCommit.RuntimeCheckpoint, nil
}

func WriteOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Response, error) {
func writeOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Response, error) {
var response = pm.Response{Opened: opened}

if err := stream.Send(&response); err != nil {
Expand All @@ -207,7 +207,7 @@ func WriteOpened(stream MaterializeStream, opened *pm.Response_Opened) (pm.Respo
return response, nil
}

func WriteAcknowledged(stream MaterializeStream, state *pf.ConnectorState, response *pm.Response) error {
func writeAcknowledged(stream MaterializeStream, state *pf.ConnectorState, response *pm.Response) error {
if response.Opened == nil && response.StartedCommit == nil {
panic(fmt.Sprintf("expected prior response is Opened or StartedCommit, got %#v", response))
}
Expand All @@ -222,7 +222,7 @@ func WriteAcknowledged(stream MaterializeStream, state *pf.ConnectorState, respo
return nil
}

func WriteLoaded(
func writeLoaded(
stream MaterializeStream,
response *pm.Response,
binding int,
Expand All @@ -243,7 +243,7 @@ func WriteLoaded(
return nil
}

func WriteFlushed(stream MaterializeStream, response *pm.Response) error {
func writeFlushed(stream MaterializeStream, response *pm.Response) error {
if response.Acknowledged == nil && response.Loaded == nil {
panic(fmt.Sprintf("expected prior response is Acknowledged or Loaded, got %#v", response))
}
Expand All @@ -256,7 +256,7 @@ func WriteFlushed(stream MaterializeStream, response *pm.Response) error {
return nil
}

func WriteStartedCommit(
func writeStartedCommit(
stream MaterializeStream,
response *pm.Response,
checkpoint *pf.ConnectorState,
Expand Down
18 changes: 9 additions & 9 deletions go/protocols/materialize/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func RunTransactions(
opened pm.Response_Opened,
transactor Transactor,
) (_err error) {
defer func() { transactor.Destroy() }()
defer transactor.Destroy()

if err := open.Validate(); err != nil {
return fmt.Errorf("open is invalid: %w", err)
Expand All @@ -127,7 +127,7 @@ func RunTransactions(
}

var rxRequest = pm.Request{Open: &open}
var txResponse, err = WriteOpened(stream, &opened)
var txResponse, err = writeOpened(stream, &opened)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func RunTransactions(

if ackState, err := transactor.Acknowledge(ctx); err != nil {
return err
} else if err := WriteAcknowledged(stream, ackState, &txResponse); err != nil {
} else if err := writeAcknowledged(stream, ackState, &txResponse); err != nil {
return err
}

Expand Down Expand Up @@ -200,7 +200,7 @@ func RunTransactions(
}

loaded++
return WriteLoaded(stream, &txResponse, binding, doc)
return writeLoaded(stream, &txResponse, binding, doc)
})

if it.awaitDoneCh == nil && awaitErr != nil {
Expand All @@ -225,7 +225,7 @@ func RunTransactions(
loadIt = LoadIterator{stream: stream, request: &rxRequest, awaitDoneCh: awaitDoneCh, ctx: loadCtx}
)

if err = ReadAcknowledge(stream, &rxRequest); err != nil {
if err = readAcknowledge(stream, &rxRequest); err != nil {
return err
}

Expand Down Expand Up @@ -263,9 +263,9 @@ func RunTransactions(
return nil // Graceful shutdown.
}

if err = ReadFlush(&rxRequest); err != nil {
if err = validateIsFlush(&rxRequest); err != nil {
return err
} else if err = WriteFlushed(stream, &txResponse); err != nil {
} else if err = writeFlushed(stream, &txResponse); err != nil {
return err
}

Expand All @@ -279,7 +279,7 @@ func RunTransactions(
return fmt.Errorf("transactor.Store: %w", err)
}
var runtimeCheckpoint *pc.Checkpoint
if runtimeCheckpoint, err = ReadStartCommit(&rxRequest); err != nil {
if runtimeCheckpoint, err = checkpointFromStartCommit(&rxRequest); err != nil {
return err
}

Expand All @@ -304,7 +304,7 @@ func RunTransactions(
default:
}

if err = WriteStartedCommit(stream, &txResponse, stateUpdate); err != nil {
if err = writeStartedCommit(stream, &txResponse, stateUpdate); err != nil {
return err
}
}
Expand Down

0 comments on commit c43ddc2

Please sign in to comment.