Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go-sdk/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ default:
@just --list

# Build all components
build: build-celery build-examples
build: build-celery build-edge build-examples

# Build the worker binary
build-celery:
go build -o bin/airflow-go-celery ./celery/cmd

build-edge:
go build -o bin/airflow-go-edge ./edge/cmd

# Build all example bundles
build-examples:
@just example/bundle/build
Expand Down
3 changes: 2 additions & 1 deletion go-sdk/bundle/bundlev1/bundlev1server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/apache/airflow/go-sdk/bundle/bundlev1"
"github.com/apache/airflow/go-sdk/bundle/bundlev1/bundlev1server/impl"
"github.com/apache/airflow/go-sdk/pkg/bundles/shared"
"github.com/apache/airflow/go-sdk/pkg/config"
)

var versionInfo *bool = flag.Bool("bundle-metadata", false, "show the embedded bundle info")
Expand All @@ -56,7 +57,7 @@ type ServerConfig struct{}
// Zero or more options to configure the server may also be passed. There are no options yet, this is to allow
// future changes without breaking compatibility
func Serve(bundle bundlev1.BundleProvider, opts ...ServeOpt) error {
shared.SetupViper("")
config.SetupViper("")

hcLogger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Expand Down
11 changes: 10 additions & 1 deletion go-sdk/celery/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@

package main

import "github.com/apache/airflow/go-sdk/celery/commands"
import (
"fmt"
"os"

"github.com/apache/airflow/go-sdk/celery/commands"
)

func main() {
if os.Getenv("AIRFLOW_BUNDLE_MAGIC_COOKIE") != "" {
fmt.Println("(We're not a bundle plugin)")
os.Exit(0)
}
commands.Execute()
}
57 changes: 5 additions & 52 deletions go-sdk/celery/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,13 @@ package commands

import (
"context"
"log/slog"
"os"

"github.com/MatusOllah/slogcolor"
"github.com/fatih/color"
cc "github.com/ivanpirog/coloredcobra"
"github.com/spf13/cobra"

"github.com/apache/airflow/go-sdk/pkg/bundles/shared"
"github.com/apache/airflow/go-sdk/pkg/logging/shclog"
"github.com/apache/airflow/go-sdk/pkg/config"
)

var cfgFile string

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "airflow-go-celery",
Expand All @@ -42,61 +35,21 @@ var rootCmd = &cobra.Command{
All options (other than ` + "`--config`" + `) can be specified in the config file using
the same name as the CLI argument but without the ` + "`--`" + ` prefix.`,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return initializeConfig(cmd)
return config.Configure(cmd)
},
}

// Execute is the main entrypoint, and runs the Celery broker app and listens for Celery Tasks
func Execute() {
cc.Init(&cc.Config{
RootCmd: rootCmd,
Headings: cc.Bold,
Commands: cc.Yellow + cc.Bold,
Example: cc.Italic,
ExecName: cc.HiMagenta + cc.Bold,
Flags: cc.Green,
FlagsDataType: cc.Italic + cc.White,
})
err := rootCmd.ExecuteContext(context.Background())
if err != nil {
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "",
"config file (default is $HOME/airflow/go-sdk.yaml)")
config.InitColor(rootCmd)
rootCmd.PersistentFlags().
String("config", "", "config file (default is $HOME/airflow/go-sdk.yaml)")
rootCmd.AddCommand(runCmd)
}

// initConfig reads in config file and ENV variables if set.
func initializeConfig(cmd *cobra.Command) error {
if err := shared.SetupViper(cfgFile); err != nil {
return err
}
// Bind the current command's flags to viper
shared.BindFlagsToViper(cmd)

logger := makeLogger()
slog.SetDefault(logger)

return nil
}

func makeLogger() *slog.Logger {
opts := *slogcolor.DefaultOptions
leveler := &slog.LevelVar{}
leveler.Set(shclog.SlogLevelTrace)

opts.Level = leveler
opts.LevelTags = map[slog.Level]string{
shclog.SlogLevelTrace: color.New(color.FgHiGreen).Sprint("TRACE"),
slog.LevelDebug: color.New(color.BgCyan, color.FgHiWhite).Sprint("DEBUG"),
slog.LevelInfo: color.New(color.BgGreen, color.FgHiWhite).Sprint("INFO "),
slog.LevelWarn: color.New(color.BgYellow, color.FgHiWhite).Sprint("WARN "),
slog.LevelError: color.New(color.BgRed, color.FgHiWhite).Sprint("ERROR"),
}

log := slog.New(slogcolor.NewHandler(os.Stderr, &opts))
return log
}
37 changes: 37 additions & 0 deletions go-sdk/edge/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"fmt"
"os"

"github.com/apache/airflow/go-sdk/edge/commands"
)

func main() {
if os.Getenv("AIRFLOW_BUNDLE_MAGIC_COOKIE") != "" {
fmt.Println("(We're not a bundle plugin)")
os.Exit(0)
}
err := commands.Root.ExecuteContext(context.Background())
if err != nil {
os.Exit(1)
}
}
45 changes: 45 additions & 0 deletions go-sdk/edge/commands/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package commands

import (
"github.com/spf13/cobra"

"github.com/apache/airflow/go-sdk/pkg/config"
)

// Root represents the base command when called without any subcommands
var Root = &cobra.Command{
Use: "airflow-go-edge",
Short: "Airflow worker for running Go workloads sent via Edge Worker API.",
Long: `Airflow worker for running Go workloads sent via Edge Worker API.

All options (other than ` + "`--config`" + `) can be specified in the config file using
the same name as the CLI argument but without the ` + "`--`" + ` prefix.`,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return config.Configure(cmd)
},
SilenceUsage: true,
}

func init() {
Root.PersistentFlags().
String("config", "", "config file (default is $HOME/airflow/go-sdk.yaml)")
Root.AddCommand(runCmd)
config.InitColor(Root)
}
68 changes: 68 additions & 0 deletions go-sdk/edge/commands/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package commands

import (
"context"

"github.com/spf13/cobra"

"github.com/apache/airflow/go-sdk/edge"
)

var runCmd = &cobra.Command{
Use: "run",
Short: "Connect to Edge Executor API and run Airflow workloads",
Long: "Connect to Edge Executor API and run Airflow workloads",

RunE: func(cmd *cobra.Command, args []string) error {
return edge.Run(context.Background())
},
}

func init() {
flags := runCmd.Flags()
flags.StringP(
"execution-api-url",
"",
"http://localhost:8080/execution/",
"Execution API to connect to",
)
flags.StringSliceP(
"queues",
"q",
[]string{"default"},
"Comma delimited list of queues to serve, serve all queues if not provided.",
)
flags.StringP(
"api-url",
"",
"",
"URL endpoint on which the Airflow code edge API is accessible from edge worker.",
)
flags.StringP(
"hostname",
"H",
"",
"Set the hostname of worker if you have multiple workers on a single machine.",
)

runCmd.MarkFlagRequired("api-url")
flags.SetAnnotation("api-url", "viper-mapping", []string{"edge.api_url"})
flags.SetAnnotation("hostname", "viper-mapping", []string{"edge.hostname"})
}
Loading