Skip to content

Commit

Permalink
dockercomposeservice: lift and shift the dockercompose Up into the re…
Browse files Browse the repository at this point in the history
…conciler (#5577)

This creates two codepaths:
- a "ForceApply" codepath that does the actual "docker compose up"
- a Reconciler codepath that updates the apiserver with the results
  • Loading branch information
nicks authored Mar 9, 2022
1 parent 097b7f5 commit 2cbc727
Show file tree
Hide file tree
Showing 17 changed files with 1,053 additions and 610 deletions.
10 changes: 5 additions & 5 deletions internal/cli/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions internal/controllers/apis/imagemap/imagemap.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package imagemap

import (
"context"
"fmt"
"os/exec"

"k8s.io/apimachinery/pkg/types"

apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
"github.com/tilt-dev/tilt/pkg/model"
)
Expand Down Expand Up @@ -51,3 +55,23 @@ func InjectIntoLocalEnv(cmd *exec.Cmd, imageMapNames []string, imageMaps map[typ
}
return nil
}

// Populate a map with all the given imagemaps, skipping any that don't exist
func NamesToObjects(ctx context.Context, client ctrlclient.Client, names []string) (map[types.NamespacedName]*v1alpha1.ImageMap, error) {
imageMaps := make(map[types.NamespacedName]*v1alpha1.ImageMap)
for _, name := range names {
var im v1alpha1.ImageMap
nn := types.NamespacedName{Name: name}
err := client.Get(ctx, nn, &im)
if err != nil {
if apierrors.IsNotFound(err) {
// If the map isn't found, keep going
continue
}
return nil, err
}

imageMaps[nn] = &im
}
return imageMaps, nil
}
235 changes: 227 additions & 8 deletions internal/controllers/core/dockercomposeservice/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package dockercomposeservice

import (
"context"
"sync"

dtypes "github.com/docker/docker/api/types"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -14,20 +17,37 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/docker/go-connections/nat"

"github.com/tilt-dev/tilt/internal/controllers/apicmp"
"github.com/tilt-dev/tilt/internal/controllers/apis/configmap"
"github.com/tilt-dev/tilt/internal/controllers/indexer"
"github.com/tilt-dev/tilt/internal/docker"
"github.com/tilt-dev/tilt/internal/dockercompose"
"github.com/tilt-dev/tilt/internal/store"
"github.com/tilt-dev/tilt/pkg/apis"
"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
"github.com/tilt-dev/tilt/pkg/logger"
"github.com/tilt-dev/tilt/pkg/model"
)

type Reconciler struct {
dcc dockercompose.DockerComposeClient
dc docker.Client
st store.RStore
ctrlClient ctrlclient.Client
indexer *indexer.Indexer
requeuer *indexer.Requeuer
mu sync.Mutex

// Protected by the mutex.
results map[types.NamespacedName]*Result
}

func (r *Reconciler) CreateBuilder(mgr ctrl.Manager) (*builder.Builder, error) {
b := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.DockerComposeService{}).
Watches(r.requeuer, handler.Funcs{}).
Watches(&source.Kind{Type: &v1alpha1.ImageMap{}},
handler.EnqueueRequestsFromMapFunc(r.indexer.Enqueue)).
Watches(&source.Kind{Type: &v1alpha1.ConfigMap{}},
Expand All @@ -36,15 +56,26 @@ func (r *Reconciler) CreateBuilder(mgr ctrl.Manager) (*builder.Builder, error) {
return b, nil
}

func NewReconciler(ctrlClient ctrlclient.Client, st store.RStore, scheme *runtime.Scheme) *Reconciler {
func NewReconciler(
ctrlClient ctrlclient.Client,
dcc dockercompose.DockerComposeClient,
dc docker.Client,
st store.RStore,
scheme *runtime.Scheme,
) *Reconciler {
return &Reconciler{
ctrlClient: ctrlClient,
dcc: dcc,
dc: dc.ForOrchestrator(model.OrchestratorDC),
indexer: indexer.NewIndexer(scheme, indexDockerComposeService),
st: st,
requeuer: indexer.NewRequeuer(),
results: make(map[types.NamespacedName]*Result),
}
}

// Reconcile manages namespace watches for the modified DockerComposeService object.
// Redeploy the docker compose service when its spec
// changes or any of its dependencies change.
func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
nn := request.NamespacedName

Expand All @@ -59,12 +90,58 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
return ctrl.Result{}, nil
}

// Get configmap's disable status
ctx = store.MustObjectLogHandler(ctx, r.st, &obj)
disableStatus, err := configmap.MaybeNewDisableStatus(ctx, r.ctrlClient, obj.Spec.DisableSource, obj.Status.DisableStatus)
if err != nil {
return ctrl.Result{}, err
}

r.recordDisableStatus(nn, obj.Spec, *disableStatus)

// TODO(nick): Cleanup when the dockercompose service is disabled.
// TODO(nick): Deploy dockercompose services that aren't managed via buildcontrol

err = r.maybeUpdateStatus(ctx, nn, &obj)
if err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

// Apply the DockerCompose service spec, unconditionally.
//
// Update the apiserver when finished.
// Create a result object if necessary. Caller must hold the mutex.
func (r *Reconciler) ensureResultExists(nn types.NamespacedName) *Result {
existing, hasExisting := r.results[nn]
if hasExisting {
return existing
}

result := &Result{}
r.results[nn] = result
return result
}

// Record disable state of the service.
func (r *Reconciler) recordDisableStatus(
nn types.NamespacedName,
spec v1alpha1.DockerComposeServiceSpec,
disableStatus v1alpha1.DisableStatus) {
r.mu.Lock()
defer r.mu.Unlock()

result := r.ensureResultExists(nn)
if apicmp.DeepEqual(result.Status.DisableStatus, &disableStatus) {
return
}

update := result.Status.DeepCopy()
update.DisableStatus = &disableStatus
result.Status = *update
}

// Apply the DockerCompose service spec, unconditionally,
// and requeue the reconciler so that it updates the apiserver.
//
// We expose this as a public method as a hack! Currently, in Tilt, BuildController
// handles dependencies between resources. The API server doesn't know about build
Expand All @@ -74,9 +151,127 @@ func (r *Reconciler) ForceApply(
ctx context.Context,
nn types.NamespacedName,
spec v1alpha1.DockerComposeServiceSpec,
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap) (v1alpha1.DockerComposeServiceStatus, error) {
// TK(nick): Fill this out.
return v1alpha1.DockerComposeServiceStatus{}, nil
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap,
dcManagedBuild bool) v1alpha1.DockerComposeServiceStatus {
status := r.forceApplyHelper(ctx, nn, spec, imageMaps, dcManagedBuild)
r.requeuer.Add(nn)
return status
}

// Records status when an apply fail.
// This might mean the image build failed, if we're using dc-managed image builds.
// Does not necessarily clear the current running container.
func (r *Reconciler) recordApplyError(
nn types.NamespacedName,
spec v1alpha1.DockerComposeServiceSpec,
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap,
err error,
startTime metav1.MicroTime,
) v1alpha1.DockerComposeServiceStatus {
r.mu.Lock()
defer r.mu.Unlock()

result := r.ensureResultExists(nn)
status := result.Status.DeepCopy()
status.LastApplyStartTime = startTime
status.LastApplyFinishTime = apis.NowMicro()
status.ApplyError = err.Error()
result.Status = *status
result.SetSpec(spec, imageMaps)
return *status
}

// Records status when an apply succeeds.
func (r *Reconciler) recordApplyStatus(
nn types.NamespacedName,
spec v1alpha1.DockerComposeServiceSpec,
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap,
newStatus v1alpha1.DockerComposeServiceStatus,
) v1alpha1.DockerComposeServiceStatus {
r.mu.Lock()
defer r.mu.Unlock()

result := r.ensureResultExists(nn)
disableStatus := result.Status.DisableStatus
newStatus.DisableStatus = disableStatus
result.Status = newStatus
result.SetSpec(spec, imageMaps)

return newStatus
}

// A helper that applies the given specs to the cluster,
// tracking the state of the deploy in the results map.
func (r *Reconciler) forceApplyHelper(
ctx context.Context,
nn types.NamespacedName,
spec v1alpha1.DockerComposeServiceSpec,
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap,
// TODO(nick): Figure out a better way to infer the dcManagedBuild setting.
dcManagedBuild bool,
) v1alpha1.DockerComposeServiceStatus {
startTime := apis.NowMicro()
stdout := logger.Get(ctx).Writer(logger.InfoLvl)
stderr := logger.Get(ctx).Writer(logger.InfoLvl)
err := r.dcc.Up(ctx, spec, dcManagedBuild, stdout, stderr)
if err != nil {
return r.recordApplyError(nn, spec, imageMaps, err, startTime)
}

// grab the initial container state
cid, err := r.dcc.ContainerID(ctx, spec)
if err != nil {
return r.recordApplyError(nn, spec, imageMaps, err, startTime)
}

containerJSON, err := r.dc.ContainerInspect(ctx, string(cid))
if err != nil {
logger.Get(ctx).Debugf("Error inspecting container %s: %v", cid, err)
}

var containerState *dtypes.ContainerState
if containerJSON.ContainerJSONBase != nil && containerJSON.ContainerJSONBase.State != nil {
containerState = containerJSON.ContainerJSONBase.State
}

var ports nat.PortMap
if containerJSON.NetworkSettings != nil {
ports = containerJSON.NetworkSettings.NetworkSettingsBase.Ports
}

status := dockercompose.ToServiceStatus(cid, containerState, ports)
status.LastApplyStartTime = startTime
status.LastApplyFinishTime = apis.NowMicro()
return r.recordApplyStatus(nn, spec, imageMaps, status)
}

// Update the status on the apiserver if necessary.
func (r *Reconciler) maybeUpdateStatus(ctx context.Context, nn types.NamespacedName, obj *v1alpha1.DockerComposeService) error {
newStatus := v1alpha1.DockerComposeServiceStatus{}
existing, ok := r.results[nn]
if ok {
newStatus = existing.Status
}

if apicmp.DeepEqual(obj.Status, newStatus) {
return nil
}

oldError := obj.Status.ApplyError
newError := newStatus.ApplyError
update := obj.DeepCopy()
update.Status = *(newStatus.DeepCopy())

err := r.ctrlClient.Status().Update(ctx, update)
if err != nil {
return err
}

// Print new errors on objects that aren't managed by the buildcontroller.
if newError != "" && oldError != newError && update.Annotations[v1alpha1.AnnotationManagedBy] == "" {
logger.Get(ctx).Errorf("dockercomposeservice %s: %s", obj.Name, newError)
}
return nil
}

var imGVK = v1alpha1.SchemeGroupVersion.WithKind("ImageMap")
Expand All @@ -94,3 +289,27 @@ func indexDockerComposeService(obj client.Object) []indexer.Key {

return result
}

// Keeps track of the state we currently know about.
type Result struct {
Spec v1alpha1.DockerComposeServiceSpec
ImageMapSpecs []v1alpha1.ImageMapSpec
ImageMapStatuses []v1alpha1.ImageMapStatus
Status v1alpha1.DockerComposeServiceStatus
}

func (r *Result) SetSpec(spec v1alpha1.DockerComposeServiceSpec, imageMaps map[types.NamespacedName]*v1alpha1.ImageMap) {
r.Spec = spec
r.ImageMapSpecs = nil
r.ImageMapStatuses = nil
for _, imageMapName := range r.Spec.ImageMaps {
im, ok := imageMaps[types.NamespacedName{Name: imageMapName}]
if !ok {
// this should never happen, but if it does, just continue quietly.
continue
}

r.ImageMapSpecs = append(r.ImageMapSpecs, im.Spec)
r.ImageMapStatuses = append(r.ImageMapStatuses, im.Status)
}
}
Loading

0 comments on commit 2cbc727

Please sign in to comment.