Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

various updates to capture & materialization protocols #5

Merged
merged 4 commits into from
Sep 7, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions materialize/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,141 @@ func (m *ApplyResponse) UnmarshalJSON(b []byte) error {
return jsonpb.Unmarshal(bytes.NewReader(b), m)
}

func (m *TransactionRequest) Validate() error {
var count int
if m.Open != nil {
if err := m.Open.Validate(); err != nil {
return pb.ExtendContext(err, "Open")
}
count += 1
}
if m.Load != nil {
if err := m.Load.Validate(); err != nil {
return pb.ExtendContext(err, "Load")
}
count += 1
}
if m.Prepare != nil {
if err := m.Prepare.Validate(); err != nil {
return pb.ExtendContext(err, "Prepare")
}
count += 1
}
if m.Store != nil {
if err := m.Store.Validate(); err != nil {
return pb.ExtendContext(err, "Store")
}
count += 1
}
if m.Commit != nil {
if err := m.Commit.Validate(); err != nil {
return pb.ExtendContext(err, "Commit")
}
count += 1
}

if count != 1 {
return pb.NewValidationError("expected one of Open, Load, Prepare, Store, Commit")
}
Comment on lines +189 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common go idiom for treating a struct like an enum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I've seen enough of it to call it idiomatic. Mostly I don't see it checked, and it makes me ☹️ .

Generics are finally coming to Go, but I'd dearly love me some Rust-like algebraic enums and pattern matching....

return nil
}

func (m *TransactionRequest_Open) Validate() error {
if err := m.Materialization.Validate(); err != nil {
return pb.ExtendContext(err, "Materialization")
} else if m.Version == "" {
return pb.NewValidationError("expected Version")
} else if m.KeyBegin > m.KeyEnd {
return pb.NewValidationError("invalid KeyBegin / KeyEnd range")
}
return nil
}

func (m *TransactionRequest_Load) Validate() error {
if len(m.PackedKeys) == 0 {
return pb.NewValidationError("expected PackedKeys")
}
return nil
}

func (m *TransactionRequest_Prepare) Validate() error {
if len(m.FlowCheckpoint) == 0 {
return pb.NewValidationError("expected FlowCheckpoint")
}
return nil
}

func (m *TransactionRequest_Store) Validate() error {
if ll := len(m.DocsJson); ll == 0 {
return pb.NewValidationError("expected DocsJson")
} else if lr := len(m.PackedKeys); ll != lr {
return pb.NewValidationError("expected PackedKeys length to match DocsJson: %d vs %d", ll, lr)
} else if lr = len(m.PackedValues); ll != lr {
return pb.NewValidationError("expected PackedValues length to match DocsJson: %d vs %d", ll, lr)
} else if lr := len(m.Exists); ll != lr {
return pb.NewValidationError("expected Exists length to match DocsJson: %d vs %d", ll, lr)
}
return nil
}

func (m *TransactionRequest_Commit) Validate() error {
return nil
}

func (m *TransactionResponse) Validate() error {
var count int
if m.Opened != nil {
if err := m.Opened.Validate(); err != nil {
return pb.ExtendContext(err, "Opened")
}
count += 1
}
if m.Loaded != nil {
if err := m.Loaded.Validate(); err != nil {
return pb.ExtendContext(err, "Loaded")
}
count += 1
}
if m.Prepared != nil {
if err := m.Prepared.Validate(); err != nil {
return pb.ExtendContext(err, "Prepared")
}
count += 1
}
if m.Committed != nil {
if err := m.Committed.Validate(); err != nil {
return pb.ExtendContext(err, "Committed")
}
count += 1
}

if count != 1 {
return pb.NewValidationError("expected one of Opened, Loaded, Prepared, Committed")
}
return nil
}

func (m *TransactionResponse_Opened) Validate() error {
// FlowCheckpoint may be empty.
return nil
}

func (m *TransactionResponse_Loaded) Validate() error {
if len(m.DocsJson) == 0 {
return pb.NewValidationError("expected DocsJson")
}
return nil
}

func (m *TransactionResponse_Prepared) Validate() error {
// DriverCheckpointMergePatchJson may be empty.
return nil
}

func (m *TransactionResponse_Committed) Validate() error {
return nil
}

func (m *TransactionRequest_Open) MarshalJSON() ([]byte, error) {
var b bytes.Buffer
var err = (&jsonpb.Marshaler{}).Marshal(&b, m)
Expand Down