Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws-manager] Retry controlPort on conflict #7079

Merged
merged 1 commit into from
Dec 6, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 75 additions & 79 deletions components/ws-manager/pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
Expand Down Expand Up @@ -487,103 +488,98 @@ func (m *Manager) ControlPort(ctx context.Context, req *api.ControlPortRequest)
tracing.ApplyOWI(span, log.OWI("", "", req.Id))
defer tracing.FinishSpan(span, &err)

pod, err := m.findWorkspacePod(ctx, req.Id)
if err != nil {
return nil, xerrors.Errorf("cannot find workspace: %w", err)
}
if pod == nil {
return nil, status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id)
}
tracing.ApplyOWI(span, wsk8s.GetOWIFromObject(&pod.ObjectMeta))

servicePrefix, ok := pod.Annotations[servicePrefixAnnotation]
if !ok || servicePrefix == "" {
return nil, xerrors.Errorf("workspace pod %s has no service prefix annotation", pod.Name)
}
// dunno why in k8s IP ports are int32 not uint16
port := req.Spec.Port

notifyStatusChange := func() error {
// by modifying the ports service we have changed the workspace status. However, this status change is not propagated
// through the regular monitor mechanism as we did not modify the pod itself. We have to send out a status update
// outselves. Doing it ourselves lets us synchronize the status update with probing for actual availability, not just
// the service modification in Kubernetes.
wso := workspaceObjects{Pod: pod}
err := m.completeWorkspaceObjects(ctx, &wso)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
pod, err := m.findWorkspacePod(ctx, req.Id)
if err != nil {
return xerrors.Errorf("cannot update status: %w", err)
return xerrors.Errorf("cannot find workspace: %w", err)
}
status, err := m.getWorkspaceStatus(wso)
if err != nil {
return xerrors.Errorf("cannot update status: %w", err)
if pod == nil {
return status.Errorf(codes.NotFound, "workspace %s does not exist", req.Id)
}
tracing.ApplyOWI(span, wsk8s.GetOWIFromObject(&pod.ObjectMeta))

exposedPorts := extractExposedPorts(pod)
existingPortSpecIdx := -1
for i, p := range exposedPorts.Ports {
if p.Port == port {
existingPortSpecIdx = i
break
}
}
m.OnChange(ctx, status)

return nil
}
servicePrefix, ok := pod.Annotations[servicePrefixAnnotation]
if !ok || servicePrefix == "" {
return xerrors.Errorf("workspace pod %s has no service prefix annotation", pod.Name)
}

// dunno why in k8s IP ports are int32 not uint16
port := req.Spec.Port
if req.Expose && existingPortSpecIdx < 0 {
// port is not exposed yet - patch the pod
url, err := config.RenderWorkspacePortURL(m.Config.WorkspacePortURLTemplate, config.PortURLContext{
Host: m.Config.GitpodHostURL,
ID: req.Id,
IngressPort: fmt.Sprint(port),
Prefix: servicePrefix,
WorkspacePort: fmt.Sprint(port),
})
if err != nil {
return xerrors.Errorf("cannot render public URL for %d: %w", port, err)
}

exposedPorts := extractExposedPorts(pod)
portSpec := &api.PortSpec{
Port: uint32(port),
Visibility: req.Spec.Visibility,
Url: url,
}

existingPortSpecIdx := -1
for i, p := range exposedPorts.Ports {
if p.Port == port {
existingPortSpecIdx = i
break
exposedPorts.Ports = append(exposedPorts.Ports, portSpec)
} else if req.Expose && existingPortSpecIdx >= 0 {
exposedPorts.Ports[existingPortSpecIdx].Visibility = req.Spec.Visibility
} else if !req.Expose && existingPortSpecIdx < 0 {
// port isn't exposed already - we're done here
return nil
} else if !req.Expose && existingPortSpecIdx >= 0 {
// port is exposed but shouldn't be - remove it from the port list
exposedPorts.Ports = append(exposedPorts.Ports[:existingPortSpecIdx], exposedPorts.Ports[existingPortSpecIdx+1:]...)
}
}

if req.Expose && existingPortSpecIdx < 0 {
// port is not exposed yet - patch the pod
url, err := config.RenderWorkspacePortURL(m.Config.WorkspacePortURLTemplate, config.PortURLContext{
Host: m.Config.GitpodHostURL,
ID: req.Id,
IngressPort: fmt.Sprint(port),
Prefix: servicePrefix,
WorkspacePort: fmt.Sprint(port),
})
// update pod annotation
data, err := exposedPorts.ToBase64()
if err != nil {
return nil, xerrors.Errorf("cannot render public URL for %d: %w", port, err)
}

portSpec := &api.PortSpec{
Port: uint32(port),
Visibility: req.Spec.Visibility,
Url: url,
return xerrors.Errorf("cannot update status: %w", err)
}

exposedPorts.Ports = append(exposedPorts.Ports, portSpec)
} else if req.Expose && existingPortSpecIdx >= 0 {
exposedPorts.Ports[existingPortSpecIdx].Visibility = req.Spec.Visibility
} else if !req.Expose && existingPortSpecIdx < 0 {
// port isn't exposed already - we're done here
return &api.ControlPortResponse{}, nil
} else if !req.Expose && existingPortSpecIdx >= 0 {
// port is exposed but shouldn't be - remove it from the port list
exposedPorts.Ports = append(exposedPorts.Ports[:existingPortSpecIdx], exposedPorts.Ports[existingPortSpecIdx+1:]...)
}
if pod.Annotations[wsk8s.WorkspaceExposedPorts] != data {
log.WithField("ports", exposedPorts).Debug("updating exposed ports")
pod.Annotations[wsk8s.WorkspaceExposedPorts] = data

// update pod annotation
data, err := exposedPorts.ToBase64()
if err != nil {
return nil, xerrors.Errorf("cannot update status: %w", err)
}

if pod.Annotations[wsk8s.WorkspaceExposedPorts] != data {
log.WithField("ports", exposedPorts).Debug("updating exposed ports")
pod.Annotations[wsk8s.WorkspaceExposedPorts] = data
// update pod
err = m.Clientset.Update(ctx, pod)
if err != nil {
// do not wrap error so we don't break the retry mechanism
return err
}
}

// update pod
err = m.Clientset.Update(ctx, pod)
// by modifying the ports service we have changed the workspace status. However, this status change is not propagated
// through the regular monitor mechanism as we did not modify the pod itself. We have to send out a status update
// outselves. Doing it ourselves lets us synchronize the status update with probing for actual availability, not just
// the service modification in Kubernetes.
wso := workspaceObjects{Pod: pod}
err = m.completeWorkspaceObjects(ctx, &wso)
if err != nil {
return nil, xerrors.Errorf("cannot update workspace pod: %w", err)
return xerrors.Errorf("cannot update status: %w", err)
}
}
status, err := m.getWorkspaceStatus(wso)
if err != nil {
return xerrors.Errorf("cannot update status: %w", err)
}
m.OnChange(ctx, status)

err = notifyStatusChange()
if err != nil {
return nil, err
}
return nil
})

return &api.ControlPortResponse{}, nil
}
Expand Down