Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#26902] Add switch for supporting cross compile arm64 for dataflow #27649

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"os"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
Expand All @@ -47,7 +48,12 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
} else {
// Cross-compile as last resort.

worker, err := runnerlib.BuildTempWorkerBinary(ctx)
var copts runnerlib.CompileOpts
if strings.HasPrefix(opts.MachineType, "t2a") {
copts.Arch = "arm64"
}

worker, err := runnerlib.BuildTempWorkerBinary(ctx, copts)
if err != nil {
return presult, err
}
Expand Down
24 changes: 20 additions & 4 deletions sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ func IsWorkerCompatibleBinary() (string, bool) {

var unique int32

// CompileOpts are additional options for dynamic compiles of the local code
// for development purposes. Production runs should build the worker binary
// separately for the target environment.
// See https://beam.apache.org/documentation/sdks/go-cross-compilation/ for details.
type CompileOpts struct {
OS, Arch string
}

// BuildTempWorkerBinary creates a local worker binary in the tmp directory
// for linux/amd64. Caller responsible for deleting the binary.
func BuildTempWorkerBinary(ctx context.Context) (string, error) {
func BuildTempWorkerBinary(ctx context.Context, opts CompileOpts) (string, error) {
id := atomic.AddInt32(&unique, 1)
filename := filepath.Join(os.TempDir(), fmt.Sprintf("worker-%v-%v", id, time.Now().UnixNano()))
if err := buildWorkerBinary(ctx, filename); err != nil {
if err := buildWorkerBinary(ctx, filename, opts); err != nil {
return "", err
}
return filename, nil
Expand All @@ -59,7 +67,7 @@ func BuildTempWorkerBinary(ctx context.Context) (string, error) {
// * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go (skip: 3)
// /usr/local/go/src/runtime/proc.go (skip: 4) // not always present
// /usr/local/go/src/runtime/asm_amd64.s (skip: 4 or 5)
func buildWorkerBinary(ctx context.Context, filename string) error {
func buildWorkerBinary(ctx context.Context, filename string, opts CompileOpts) error {
program := ""
var isTest bool
for i := 3; ; i++ {
Expand All @@ -77,9 +85,17 @@ func buildWorkerBinary(ctx context.Context, filename string) error {
}
goos := "linux"
goarch := "amd64"

if opts.OS != "" {
goos = opts.OS
}
if opts.Arch != "" {
goarch = opts.Arch
}

cgo := "0"

log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", goos, goarch, cgo, program, filename)
log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", program, goos, goarch, cgo, filename)

// Cross-compile given go program. Not awesome.
program = program[:strings.LastIndex(program, "/")+1]
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO
} else {
// Cross-compile as last resort.

worker, err := BuildTempWorkerBinary(ctx)
worker, err := BuildTempWorkerBinary(ctx, CompileOpts{})
if err != nil {
return presult, err
}
Expand Down
24 changes: 24 additions & 0 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ task dataflowValidatesRunner() {
}
}

// ValidatesRunner tests for Dataflow. Runs tests in the integration directory
// with Dataflow to validate that the runner behaves as expected, on arm64 machines.
task dataflowValidatesRunnerARM64() {
group = "Verification"

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"

doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--machine_type=t2a-standard-1",
]
def options = [
"--runner dataflow",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
}
}
}

// ValidatesRunner tests for Flink. Runs tests in the integration directory
// with Flink to validate that the runner behaves as expected.
task flinkValidatesRunner {
Expand Down