Skip to content

Commit

Permalink
[Go SDK + Protos] Fix Proto Spec for Pane encoding + Go SDK implement…
Browse files Browse the repository at this point in the history
…ation. (#33840)
  • Loading branch information
lostluck authored Feb 4, 2025
1 parent 14df78c commit 2712794
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
* Fixed session window aggregation, which wasn't being performed per-key. ([#33542](https://github.com/apache/beam/issues/33542)).)
* [Dataflow Streaming Appliance] Fixed commits failing with KeyCommitTooLargeException when a key outputs >180MB of results. [#33588](https://github.com/apache/beam/issues/33588).
* Fixed a Dataflow template creation issue that ignores template file creation errors (Java) ([#33636](https://github.com/apache/beam/issues/33636))

* Correctly documented Pane Encodings in the portability protocols ([#33840](https://github.com/apache/beam/issues/33840)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,9 @@ message BagStateSpec {
string element_coder_id = 1;
}

// OrderedListState values are encoded with the var int encoded
// millis-since-epoch followed by the value encoded by the provided coder.
// Be aware this is not the standard timestamp value encoding.
message OrderedListStateSpec {
string element_coder_id = 1;
}
Expand Down Expand Up @@ -946,15 +949,15 @@ message StandardCoders {
// coder.
//
// pane - The first byte of the pane info determines which type of
// encoding is used, as well as the is_first, is_last, and timing
// encoding is used, as well as the is_first, is_last and timing
// fields. If this byte is bits [0 1 2 3 4 5 6 7], then:
// * bits [0 1 2 3] determine the encoding as follows:
// 0000 - The entire pane info is encoded as a single byte.
// The is_first, is_last, and timing fields are encoded
// as below, and the index and non-speculative index are
// both zero (and hence are not encoded here).
// 0001 - The pane info is encoded as this byte plus a single
// VarInt encoed integer representing the pane index. The
// VarInt encoded integer representing the pane index. The
// non-speculative index can be derived as follows:
// -1 if the pane is early, otherwise equal to index.
// 0010 - The pane info is encoded as this byte plus two VarInt
Expand All @@ -965,8 +968,10 @@ message StandardCoders {
// 01 - on time
// 10 - late
// 11 - unknown
// * bit 6 is 1 if this is the first pane, 0 otherwise.
// * bit 7 is 1 if this is the last pane, 0 otherwise.
// * bit 6 is 1 if this is the last pane, 0 otherwise.
// Commonly set with `byte |= 0x02`
// * bit 7 is 1 if this is the first pane, 0 otherwise.
// Commonly set with `byte |= 0x01`
//
// element - The element incoded using the supplied element coder.
//
Expand Down Expand Up @@ -1329,7 +1334,7 @@ message Trigger {
message AfterSynchronizedProcessingTime {
}

// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// The default trigger. Equivalent to AfterEndOfWindow { Late: Always }} but
// specially denoted to indicate the user did not alter the triggering.
message Default {
}
Expand All @@ -1339,12 +1344,12 @@ message Trigger {
int32 element_count = 1;
}

// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
// Never ready. There will only be an ON_TIME final output at window
// expiration.
message Never {
}

// Always ready. This can also be expressed as ElementCount(1) but
// Always ready. This can also be expressed as Repeat{ ElementCount(1) } but
// is more explicit.
message Always {
}
Expand Down
8 changes: 4 additions & 4 deletions sdks/go/pkg/beam/core/graph/coder/panes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {

pane := byte(0)
if v.IsFirst {
pane |= 0x02
pane |= 0x01
}
if v.IsLast {
pane |= 0x01
pane |= 0x02
}
pane |= byte(v.Timing << 2)

Expand Down Expand Up @@ -64,10 +64,10 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
func NewPane(b byte) typex.PaneInfo {
pn := typex.NoFiringPane()

if !(b&0x02 == 2) {
if !(b&0x01 == 1) {
pn.IsFirst = false
}
if !(b&0x01 == 1) {
if !(b&0x02 == 2) {
pn.IsLast = false
}

Expand Down
54 changes: 48 additions & 6 deletions sdks/go/pkg/beam/core/graph/coder/panes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ func equalPanes(left, right typex.PaneInfo) bool {

func TestPaneCoder(t *testing.T) {
tests := []struct {
name string
timing typex.PaneTiming
first bool
last bool
index int64
nsIndex int64
name string
timing typex.PaneTiming
first bool
last bool
index int64
nsIndex int64
firstByte byte
}{
{
"false bools",
Expand All @@ -47,6 +48,7 @@ func TestPaneCoder(t *testing.T) {
false,
0,
0,
0b00001100,
},
{
"true bools",
Expand All @@ -55,6 +57,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
0,
0b00001111,
},
{
"first pane",
Expand All @@ -63,6 +66,7 @@ func TestPaneCoder(t *testing.T) {
false,
0,
0,
0b00001101,
},
{
"last pane",
Expand All @@ -71,6 +75,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
0,
0b00001110,
},
{
"on time, different index and non-speculative",
Expand All @@ -79,6 +84,7 @@ func TestPaneCoder(t *testing.T) {
false,
1,
2,
0b00100100,
},
{
"valid early pane",
Expand All @@ -87,6 +93,7 @@ func TestPaneCoder(t *testing.T) {
false,
math.MaxInt64,
-1,
0b00010001,
},
{
"on time, max non-speculative index",
Expand All @@ -95,6 +102,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
math.MaxInt64,
0b00100110,
},
{
"late pane, max index",
Expand All @@ -103,6 +111,7 @@ func TestPaneCoder(t *testing.T) {
false,
math.MaxInt64,
0,
0b00101000,
},
{
"on time, min non-speculative index",
Expand All @@ -111,6 +120,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
math.MinInt64,
0b00100110,
},
{
"late, min index",
Expand All @@ -119,6 +129,34 @@ func TestPaneCoder(t *testing.T) {
false,
math.MinInt64,
0,
0b00101000,
},
{
"last late firing",
typex.PaneLate,
false,
true,
2,
1,
0b00101010,
},
{
"encodeByte 41",
typex.PaneLate,
true,
false,
2,
1,
0b00101001, // 41
},
{
"encodeByte 18",
typex.PaneEarly,
false,
true,
0,
-1,
0b00010010, // 18
},
}
for _, test := range tests {
Expand All @@ -129,6 +167,10 @@ func TestPaneCoder(t *testing.T) {
if err != nil {
t.Fatalf("failed to encode pane %v, got %v", input, err)
}
first := buf.Bytes()[0]
if got, want := first, test.firstByte; got != want {
t.Errorf("Unexpected First Byte: got %#08b, want %#08b, for %v ", got, want, input)
}
got, err := DecodePane(&buf)
if err != nil {
t.Fatalf("failed to decode pane from buffer %v, got %v", &buf, err)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (t *NeverTrigger) String() string {
func (t NeverTrigger) trigger() {}

// Never creates a Never Trigger that is never ready to fire.
// There will only be an ON_TIME output and a final output at window expiration.
// There will only be a single ON_TIME final output at window expiration + allowed lateness.
func Never() *NeverTrigger {
return &NeverTrigger{}
}
Expand Down
13 changes: 13 additions & 0 deletions sdks/go/pkg/beam/core/typex/special.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ const (
PaneUnknown PaneTiming = 3
)

func (t PaneTiming) String() string {
switch t {
case PaneEarly:
return "early"
case PaneOnTime:
return "ontime"
case PaneLate:
return "late"
default:
return "unknown"
}
}

// PaneInfo represents the output pane.
type PaneInfo struct {
Timing PaneTiming
Expand Down

0 comments on commit 2712794

Please sign in to comment.