Skip to content

Commit

Permalink
[#24789] Make Prism the default Go SDK runner. (#27703)
Browse files Browse the repository at this point in the history
* [prism] Make Prism the default Go SDK runner.

* ws lint

* add maincall to ptest test.

* update comments on prism test filters.

* Update playground script to use go1.20

* fix typo

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Aug 2, 2023
1 parent 929a26d commit 9c80b49
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 25 deletions.
Empty file modified playground/infrastructure/cloudbuild/playground_ci_examples.sh
100644 → 100755
Empty file.
1 change: 1 addition & 0 deletions sdks/go/examples/large_wordcount/large_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/examples/minimal_wordcount/minimal_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Expand Down Expand Up @@ -119,6 +119,6 @@ func main() {
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

// Run the pipeline on the direct runner.
direct.Execute(context.Background(), p)
// Run the pipeline on the prism runner.
prism.Execute(context.Background(), p)
}
2 changes: 1 addition & 1 deletion sdks/go/examples/snippets/10metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func queryMetrics(pr beam.PipelineResult, ns, n string) metrics.QueryResults {

// [END metrics_query]

var runner = "direct"
var runner = "prism"

// [START metrics_pipeline]

Expand Down
4 changes: 0 additions & 4 deletions sdks/go/pkg/beam/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// TODO(herohde) 7/6/2017: do we want to make the selected runner visible to
// transformations? That would allow runner-dependent operations or
// verification, but require that it is stored in Init and used for Run.

var (
runners = make(map[string]func(ctx context.Context, p *Pipeline) (PipelineResult, error))
)
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/runners/direct/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

// Package direct contains the direct runner for running single-bundle
// pipelines in the current process. Useful for testing.
//
// Deprecated: Use prism as a local runner instead.
// Reliance on the direct runner leads to non-portable pipelines.
package direct

import (
Expand Down
20 changes: 10 additions & 10 deletions sdks/go/pkg/beam/runners/prism/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ single machine use.

For Go SDK users:
- `import "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"`
- Short term: set runner to "prism" to use it, or invoke directly.
- Medium term: switch the default from "direct" to "prism".
- Short term: set runner to "prism" to use it, or invoke directly. &#x2611;
- Medium term: switch the default from "direct" to "prism". &#x2611;
- Long term: alias "direct" to "prism", and delete legacy Go direct runner.

Prisms allow breaking apart and separating a beam of light into
Expand Down Expand Up @@ -118,7 +118,7 @@ can have features selectively disabled to ensure

## Current Limitations

* Experimental and testing use only.
* Testing use only.
* Executing docker containers isn't yet implemented.
* This precludes running the Java and Python SDKs, or their transforms for Cross Language.
* Loopback execution only.
Expand All @@ -127,7 +127,6 @@ can have features selectively disabled to ensure
* Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds.
* Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing.
* Doesn't yet execute all beam pipeline features.
* No UI for job status inspection.

## Implemented so far.

Expand All @@ -140,18 +139,24 @@ can have features selectively disabled to ensure
* Global Window
* Interval Windowing
* Session Windows.
* CoGBKs
* Combines lifted and unlifted.
* Expands Splittable DoFns
* Process Continuations (AKA Streaming transform support)
* Limited support for Process Continuations
* Residuals are rescheduled for execution immeadiately.
* The transform must be finite (and eventually return a stop process continuation)
* Basic Metrics support
* Stand alone execution support
* Web UI available when run as a standalone command.
* Progess tracking
* Channel Splitting
* Dynamic Splitting

## Next feature short list (unordered)

See https://github.com/apache/beam/issues/24789 for current status.

* Resolve watermark advancement for Process Continuations
* Test Stream
* Triggers & Complex Windowing Strategy execution.
* State
Expand All @@ -162,11 +167,6 @@ See https://github.com/apache/beam/issues/24789 for current status.
* FnAPI Optimizations
* Fusion
* Data with ProcessBundleRequest & Response
* Progess tracking
* Channel Splitting
* Dynamic Splitting
* Stand alone execution support
* UI reporting of in progress jobs

This is not a comprehensive feature set, but a set of goals to best
support users of the Go SDK in testing their pipelines.
Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag.

// ptest uses the direct runner to execute pipelines by default.
// ptest uses the prism runner to execute pipelines by default.
// but includes the direct runner for legacy fallback reasons to
// support users overriding the default back to the direct runner.
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
)

// TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc.

// Create creates a pipeline and a PCollection with the given values.
func Create(values []any) (*beam.Pipeline, beam.Scope, beam.PCollection) {
p := beam.NewPipeline()
Expand Down Expand Up @@ -65,7 +66,7 @@ func CreateList2(a, b any) (*beam.Pipeline, beam.Scope, beam.PCollection, beam.P
// to function.
var (
Runner = runners.Runner
defaultRunner = "direct"
defaultRunner = "prism"
mainCalled = false
)

Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/testing/ptest/ptest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)

func TestMain(m *testing.M) {
Main(m)
}

func TestCreate(t *testing.T) {
inputs := []any{"a", "b", "c"}
p, s, col := Create(inputs)
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/x/beamx/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
)

var (
runner = runners.Runner
defaultRunner = "direct"
defaultRunner = "prism"
)

func getRunner() string {
Expand All @@ -51,7 +52,7 @@ func getRunner() string {
}

// Run invokes beam.Run with the runner supplied by the flag "runner". It
// defaults to the direct runner, but all beam-distributed runners and textio
// defaults to the prism runner, but all beam-distributed runners and textio
// filesystems are implicitly registered.
func Run(ctx context.Context, p *beam.Pipeline) error {
_, err := beam.Run(ctx, getRunner(), p)
Expand Down
36 changes: 35 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,38 @@ var portableFilters = []string{
"TestSetStateClear",
}

// TODO(lostluck): set up a specific run for these.
var prismFilters = []string{
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",

// TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The prsim runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The prism runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
}

var flinkFilters = []string{
// TODO(https://github.com/apache/beam/issues/20723): Flink tests timing out on reads.
"TestXLang_Combine.*",
Expand Down Expand Up @@ -249,7 +281,7 @@ var dataflowFilters = []string{
"TestCheckpointing",
// TODO(21761): This test needs to provide GCP project to expansion service.
"TestBigQueryIO_BasicWriteQueryRead",
// Can't handle the test spanner container or access a local spanner.
// Can't handle the test spanner container or access a local spanner.
"TestSpannerIO.*",
// Dataflow does not drain jobs by itself.
"TestDrain",
Expand Down Expand Up @@ -292,6 +324,8 @@ func CheckFilters(t *testing.T) {
switch runner {
case "direct", "DirectRunner":
filters = directFilters
case "prism", "PrismRunner":
filters = prismFilters
case "portable", "PortableRunner":
filters = portableFilters
case "flink", "FlinkRunner":
Expand Down

0 comments on commit 9c80b49

Please sign in to comment.