Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24789][prism] internal/worker + tentative data #25478

Merged
merged 3 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

// TentativeData is where data for in progress bundles is put
// until the bundle executes successfully.
type TentativeData struct {
Raw map[string][][]byte
}

// WriteData adds data to a given global collectionID.
func (d *TentativeData) WriteData(colID string, data []byte) {
if d.Raw == nil {
d.Raw = map[string][][]byte{}
}
d.Raw[colID] = append(d.Raw[colID], data)
}
114 changes: 114 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
"sync"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"golang.org/x/exp/slog"
)

// B represents an extant ProcessBundle instruction sent to an SDK worker.
// Generally manipulated by another package to interact with a worker.
type B struct {
InstID string // ID for the instruction processing this bundle.
PBDID string // ID for the ProcessBundleDescriptor

// InputTransformID is data being sent to the SDK.
InputTransformID string
InputData [][]byte // Data specifically for this bundle.

// TODO change to a single map[tid] -> map[input] -> map[window] -> struct { Iter data, MultiMap data } instead of all maps.
// IterableSideInputData is a map from transformID, to inputID, to window, to data.
IterableSideInputData map[string]map[string]map[typex.Window][][]byte
// MultiMapSideInputData is a map from transformID, to inputID, to window, to data key, to data values.
MultiMapSideInputData map[string]map[string]map[typex.Window]map[string][][]byte

// OutputCount is the number of data outputs this bundle has.
// We need to see this many closed data channels before the bundle is complete.
OutputCount int
// dataWait is how we determine if a bundle is finished, by waiting for each of
// a Bundle's DataSinks to produce their last output.
// After this point we can "commit" the bundle's output for downstream use.
dataWait sync.WaitGroup
OutputData engine.TentativeData
Resp chan *fnpb.ProcessBundleResponse

SinkToPCollection map[string]string

// TODO: Metrics for this bundle, can be handled after the fact.
}

// Init initializes the bundle's internal state for waiting on all
// data and for relaying a response back.
func (b *B) Init() {
// We need to see final data signals that match the number of
// outputs the stage this bundle executes posesses
b.dataWait.Add(b.OutputCount)
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
}

func (b *B) LogValue() slog.Value {
return slog.GroupValue(
slog.String("ID", b.InstID),
slog.String("stage", b.PBDID))
}

// ProcessOn executes the given bundle on the given W, blocking
// until all data is complete.
//
// Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized)
// Assumes the bundle descriptor is already registered with the W.
//
// While this method mostly manipulates a W, putting it on a B avoids mixing the workers
// public GRPC APIs up with local calls.
func (b *B) ProcessOn(wk *W) {
wk.mu.Lock()
wk.bundles[b.InstID] = b
wk.mu.Unlock()

slog.Debug("processing", "bundle", b, "worker", wk)

// Tell the SDK to start processing the bundle.
wk.InstReqs <- &fnpb.InstructionRequest{
InstructionId: b.InstID,
Request: &fnpb.InstructionRequest_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleRequest{
ProcessBundleDescriptorId: b.PBDID,
},
},
}

// TODO: make batching decisions.
for i, d := range b.InputData {
wk.DataReqs <- &fnpb.Elements{
Data: []*fnpb.Elements_Data{
{
InstructionId: b.InstID,
TransformId: b.InputTransformID,
Data: d,
IsLast: i+1 == len(b.InputData),
},
},
}
}

slog.Debug("waiting on data", "bundle", b)
b.dataWait.Wait() // Wait until data is ready.
}
52 changes: 52 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
"bytes"
"sync"
"testing"
)

func TestBundle_ProcessOn(t *testing.T) {
wk := New("test")
b := &B{
InstID: "testInst",
PBDID: "testPBDID",
OutputCount: 1,
InputData: [][]byte{{1, 2, 3}},
}
b.Init()
var completed sync.WaitGroup
completed.Add(1)
go func() {
b.ProcessOn(wk)
completed.Done()
}()
b.dataWait.Done()
gotData := <-wk.DataReqs
if got, want := gotData.GetData()[0].GetData(), []byte{1, 2, 3}; !bytes.EqualFold(got, want) {
t.Errorf("ProcessOn(): data not sent; got %v, want %v", got, want)
}

gotInst := <-wk.InstReqs
if got, want := gotInst.GetInstructionId(), b.InstID; got != want {
t.Errorf("ProcessOn(): bad instruction ID; got %v, want %v", got, want)
}
if got, want := gotInst.GetProcessBundle().GetProcessBundleDescriptorId(), b.PBDID; got != want {
t.Errorf("ProcessOn(): bad process bundle descriptor ID; got %v, want %v", got, want)
}
}
Loading