diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index 67ed337bed57..9a1641e314d1 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "os" + "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" @@ -47,7 +48,12 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker } else { // Cross-compile as last resort. - worker, err := runnerlib.BuildTempWorkerBinary(ctx) + var copts runnerlib.CompileOpts + if strings.HasPrefix(opts.MachineType, "t2a") { + copts.Arch = "arm64" + } + + worker, err := runnerlib.BuildTempWorkerBinary(ctx, copts) if err != nil { return presult, err } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go index 8fbbc0846007..b21362ba9fde 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go @@ -41,12 +41,20 @@ func IsWorkerCompatibleBinary() (string, bool) { var unique int32 +// CompileOpts are additional options for dynamic compiles of the local code +// for development purposes. Production runs should build the worker binary +// separately for the target environment. +// See https://beam.apache.org/documentation/sdks/go-cross-compilation/ for details. +type CompileOpts struct { + OS, Arch string +} + // BuildTempWorkerBinary creates a local worker binary in the tmp directory // for linux/amd64. Caller responsible for deleting the binary. -func BuildTempWorkerBinary(ctx context.Context) (string, error) { +func BuildTempWorkerBinary(ctx context.Context, opts CompileOpts) (string, error) { id := atomic.AddInt32(&unique, 1) filename := filepath.Join(os.TempDir(), fmt.Sprintf("worker-%v-%v", id, time.Now().UnixNano())) - if err := buildWorkerBinary(ctx, filename); err != nil { + if err := buildWorkerBinary(ctx, filename, opts); err != nil { return "", err } return filename, nil @@ -59,7 +67,7 @@ func BuildTempWorkerBinary(ctx context.Context) (string, error) { // * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go (skip: 3) // /usr/local/go/src/runtime/proc.go (skip: 4) // not always present // /usr/local/go/src/runtime/asm_amd64.s (skip: 4 or 5) -func buildWorkerBinary(ctx context.Context, filename string) error { +func buildWorkerBinary(ctx context.Context, filename string, opts CompileOpts) error { program := "" var isTest bool for i := 3; ; i++ { @@ -77,9 +85,17 @@ func buildWorkerBinary(ctx context.Context, filename string) error { } goos := "linux" goarch := "amd64" + + if opts.OS != "" { + goos = opts.OS + } + if opts.Arch != "" { + goarch = opts.Arch + } + cgo := "0" - log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", goos, goarch, cgo, program, filename) + log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", program, goos, goarch, cgo, filename) // Cross-compile given go program. Not awesome. program = program[:strings.LastIndex(program, "/")+1] diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index 5b49d2f94739..eb854dbfcdba 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -48,7 +48,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO } else { // Cross-compile as last resort. - worker, err := BuildTempWorkerBinary(ctx) + worker, err := BuildTempWorkerBinary(ctx, CompileOpts{}) if err != nil { return presult, err } diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index d1048b49c0bd..4aa0c5a48a70 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -45,6 +45,30 @@ task dataflowValidatesRunner() { } } +// ValidatesRunner tests for Dataflow. Runs tests in the integration directory +// with Dataflow to validate that the runner behaves as expected, on arm64 machines. +task dataflowValidatesRunnerARM64() { + group = "Verification" + + dependsOn ":sdks:go:test:goBuild" + dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" + + doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--machine_type=t2a-standard-1", + ] + def options = [ + "--runner dataflow", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", + ] + exec { + executable "sh" + args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}" + } + } +} + // ValidatesRunner tests for Flink. Runs tests in the integration directory // with Flink to validate that the runner behaves as expected. task flinkValidatesRunner {