Skip to content

Commit

Permalink
Refactor Docker Init with ContainerRuntime Interface (#1748)
Browse files Browse the repository at this point in the history
  • Loading branch information
schnie authored Nov 26, 2024
1 parent 9fe6f52 commit 8316e9a
Show file tree
Hide file tree
Showing 16 changed files with 644 additions and 359 deletions.
91 changes: 0 additions & 91 deletions airflow/container_runtime_test.go

This file was deleted.

79 changes: 4 additions & 75 deletions airflow/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package airflow

import (
"bufio"
"bytes"
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"text/tabwriter"
"time"

"github.com/astronomer/astro-cli/airflow/runtimes"

semver "github.com/Masterminds/semver/v3"
airflowTypes "github.com/astronomer/astro-cli/airflow/types"
astrocore "github.com/astronomer/astro-cli/astro-client-core"
Expand Down Expand Up @@ -45,15 +45,12 @@ const (
RuntimeImageLabel = "io.astronomer.docker.runtime.version"
AirflowImageLabel = "io.astronomer.docker.airflow.version"
componentName = "airflow"
podmanCmd = "podman"
dockerStateUp = "running"
dockerExitState = "exited"
defaultAirflowVersion = uint64(0x2) //nolint:gomnd
triggererAllowedRuntimeVersion = "4.0.0"
triggererAllowedAirflowVersion = "2.2.0"
pytestDirectory = "tests"
OpenCmd = "open"
dockerCmd = "docker"
registryUsername = "cli"
unknown = "unknown"
major = "major"
Expand Down Expand Up @@ -86,9 +83,7 @@ var (
exportSettings = settings.Export
envExportSettings = settings.EnvExport

openURL = browser.OpenURL
timeoutNum = 60
tickNum = 500
openURL = browser.OpenURL

majorUpdatesAirflowProviders = []string{}
minorUpdatesAirflowProviders = []string{}
Expand Down Expand Up @@ -200,18 +195,6 @@ func DockerComposeInit(airflowHome, envFile, dockerfile, imageName string) (*Doc
//
//nolint:gocognit
func (d *DockerCompose) Start(imageName, settingsFile, composeFile, buildSecretString string, noCache, noBrowser bool, waitTime time.Duration, envConns map[string]astrocore.EnvironmentObjectConnection) error {
// check if docker is up for macOS
containerRuntime, err := GetContainerRuntimeBinary()
if err != nil {
return err
}
if runtime.GOOS == "darwin" && containerRuntime == dockerCmd {
err := startDocker()
if err != nil {
return err
}
}

// Get project containers
psInfo, err := d.composeService.Ps(context.Background(), d.projectName, api.PsOptions{
All: true,
Expand Down Expand Up @@ -1103,7 +1086,7 @@ func (d *DockerCompose) Bash(container string) error {
}
}
// exec into container
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -1416,57 +1399,3 @@ func checkServiceState(serviceState, expectedState string) bool {
scrubbedState := strings.Split(serviceState, " ")[0]
return scrubbedState == expectedState
}

func startDocker() error {
containerRuntime, err := GetContainerRuntimeBinary()
if err != nil {
return err
}

buf := new(bytes.Buffer)
err = cmdExec(containerRuntime, buf, buf, "ps")
if err != nil {
// open docker
fmt.Println("\nDocker is not running. Starting up the Docker engine…")
err = cmdExec(OpenCmd, buf, os.Stderr, "-a", dockerCmd)
if err != nil {
return err
}
fmt.Println("\nIf you don't see Docker Desktop starting, exit this command and start it manually.")
fmt.Println("If you don't have Docker Desktop installed, install it (https://www.docker.com/products/docker-desktop/) and try again.")
fmt.Println("If you are using Colima or another Docker alternative, start the engine manually.")
// poll for docker
err = waitForDocker()
if err != nil {
return err
}
}
return nil
}

func waitForDocker() error {
containerRuntime, err := GetContainerRuntimeBinary()
if err != nil {
return err
}

buf := new(bytes.Buffer)
timeout := time.After(time.Duration(timeoutNum) * time.Second)
ticker := time.NewTicker(time.Duration(tickNum) * time.Millisecond)
for {
select {
// Got a timeout! fail with a timeout error
case <-timeout:
return errors.New("timed out waiting for docker")
// Got a tick, we should check if docker is up & running
case <-ticker.C:
buf.Reset()
err := cmdExec(containerRuntime, buf, buf, "ps")
if err != nil {
continue
} else {
return nil
}
}
}
}
26 changes: 14 additions & 12 deletions airflow/docker_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"regexp"
"strings"

"github.com/astronomer/astro-cli/airflow/runtimes"

"github.com/astronomer/astro-cli/pkg/util"
cliCommand "github.com/docker/cli/cli/command"
cliConfig "github.com/docker/cli/cli/config"
Expand Down Expand Up @@ -71,7 +73,7 @@ func shouldAddPullFlag(dockerfilePath string) (bool, error) {
}

func (d *DockerImage) Build(dockerfilePath, buildSecretString string, buildConfig airflowTypes.ImageBuildConfig) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -134,7 +136,7 @@ func (d *DockerImage) Build(dockerfilePath, buildSecretString string, buildConfi

func (d *DockerImage) Pytest(pytestFile, airflowHome, envFile, testHomeDirectory string, pytestArgs []string, htmlReport bool, buildConfig airflowTypes.ImageBuildConfig) (string, error) {
// delete container
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return "", err
}
Expand Down Expand Up @@ -245,7 +247,7 @@ func (d *DockerImage) Pytest(pytestFile, airflowHome, envFile, testHomeDirectory
}

func (d *DockerImage) ConflictTest(workingDirectory, testHomeDirectory string, buildConfig airflowTypes.ImageBuildConfig) (string, error) {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return "", err
}
Expand Down Expand Up @@ -321,7 +323,7 @@ func parseExitCode(logs string) string {
}

func (d *DockerImage) CreatePipFreeze(altImageName, pipFreezeFile string) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -349,7 +351,7 @@ func (d *DockerImage) CreatePipFreeze(altImageName, pipFreezeFile string) error
}

func (d *DockerImage) Push(remoteImage, username, token string) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -454,7 +456,7 @@ func (d *DockerImage) getRegistryToAuth(imageName string) (string, error) {
func (d *DockerImage) Pull(remoteImage, username, token string) error {
// Pulling image to registry
fmt.Println(pullingImagePrompt)
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -490,7 +492,7 @@ var displayJSONMessagesToStream = func(responseBody io.ReadCloser, auxCallback f
}

func (d *DockerImage) GetLabel(altImageName, labelName string) (string, error) {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return "", err
}
Expand All @@ -516,7 +518,7 @@ func (d *DockerImage) GetLabel(altImageName, labelName string) (string, error) {
}

func (d *DockerImage) DoesImageExist(image string) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand All @@ -533,7 +535,7 @@ func (d *DockerImage) DoesImageExist(image string) error {
func (d *DockerImage) ListLabels() (map[string]string, error) {
var labels map[string]string

containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return labels, err
}
Expand All @@ -556,7 +558,7 @@ func (d *DockerImage) ListLabels() (map[string]string, error) {
}

func (d *DockerImage) TagLocalImage(localImage string) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand All @@ -569,7 +571,7 @@ func (d *DockerImage) TagLocalImage(localImage string) error {
}

func (d *DockerImage) Run(dagID, envFile, settingsFile, containerName, dagFile, executionDate string, taskLogs bool) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down Expand Up @@ -689,7 +691,7 @@ var cmdExec = func(cmd string, stdout, stderr io.Writer, args ...string) error {

// When login and push do not work use bash to run docker commands, this function is for users using colima
func pushWithBash(authConfig *cliTypes.AuthConfig, image string) error {
containerRuntime, err := GetContainerRuntimeBinary()
containerRuntime, err := runtimes.GetContainerRuntimeBinary()
if err != nil {
return err
}
Expand Down
40 changes: 0 additions & 40 deletions airflow/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,46 +1694,6 @@ func (s *Suite) TestDockerComposeRunDAG() {

var errExecMock = errors.New("docker is not running")

func (s *Suite) TestStartDocker() {
s.Run("start docker success", func() {
counter := 0
cmdExec = func(cmd string, stdout, stderr io.Writer, args ...string) error {
switch cmd {
case "open":
return nil
case "docker":
if counter == 0 {
counter++
return errExecMock
}
return nil
default:
return errExecMock
}
}

err := startDocker()
s.NoError(err)
})

s.Run("start docker fail", func() {
timeoutNum = 5

cmdExec = func(cmd string, stdout, stderr io.Writer, args ...string) error {
switch cmd {
case "open":
return nil
case "docker":
return errExecMock
default:
return errExecMock
}
}
err := startDocker()
s.Contains(err.Error(), "timed out waiting for docker")
})
}

func (s *Suite) TestCreateDockerProject() {
fs := afero.NewMemMapFs()
configYaml := testUtil.NewTestConfig(testUtil.LocalPlatform)
Expand Down
Loading

0 comments on commit 8316e9a

Please sign in to comment.