Skip to content

Commit

Permalink
Add console lifecycle event recording (gocardless#247)
Browse files Browse the repository at this point in the history
* Add console lifecycle event recording

This changeset adds the ability to publish events from a consoles'
lifecycle to a Google Pub/Sub topic.

This is disabled by default, and can be enabled selectively by
providing both the id for the project containing the topic, alongside
the topic id.

---

This change additionally bumps the version of the google api client
used. This requires a corresponding update in the base path of the
requests to use subdomains specific to each google api set.
e.g. www.googleapis.com to admin.googleapis.com

* Migrate Console request call

This change fixes an issue in the previous commit that referenced the
console before it was created, therefore having an empty name and
creation timestamp.
  • Loading branch information
jackatbancast authored Nov 30, 2021
1 parent 8b41717 commit d95e2ea
Show file tree
Hide file tree
Showing 14 changed files with 836 additions and 85 deletions.
29 changes: 18 additions & 11 deletions apis/workloads/v1alpha1/console_attach_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,31 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/gocardless/theatre/v3/pkg/logging"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/gocardless/theatre/v3/pkg/logging"
)

// +kubebuilder:object:generate=false
type ConsoleAttachObserverWebhook struct {
client client.Client
recorder record.EventRecorder
logger logr.Logger
decoder *admission.Decoder
requestTimeout time.Duration
client client.Client
recorder record.EventRecorder
lifecycleRecorder LifecycleEventRecorder
logger logr.Logger
decoder *admission.Decoder
requestTimeout time.Duration
}

func NewConsoleAttachObserverWebhook(c client.Client, recorder record.EventRecorder, logger logr.Logger, requestTimeout time.Duration) *ConsoleAttachObserverWebhook {
func NewConsoleAttachObserverWebhook(c client.Client, recorder record.EventRecorder, lifecycleRecorder LifecycleEventRecorder, logger logr.Logger, requestTimeout time.Duration) *ConsoleAttachObserverWebhook {
return &ConsoleAttachObserverWebhook{
client: c,
recorder: recorder,
logger: logger,
requestTimeout: requestTimeout,
client: c,
recorder: recorder,
lifecycleRecorder: lifecycleRecorder,
logger: logger,
requestTimeout: requestTimeout,
}
}

Expand Down Expand Up @@ -127,6 +130,10 @@ func (c *ConsoleAttachObserverWebhook) Handle(ctx context.Context, req admission
),
"event", "ConsoleAttach",
)
err := c.lifecycleRecorder.ConsoleAttach(ctx, csl, req.UserInfo.Username, attachOptions.Container)
if err != nil {
logging.WithNoRecord(logger).Error(err, "failed to record event")
}

return admission.Allowed("attachment observed")
}
10 changes: 6 additions & 4 deletions apis/workloads/v1alpha1/console_authenticator_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (

// +kubebuilder:object:generate=false
type ConsoleAuthenticatorWebhook struct {
logger logr.Logger
decoder *admission.Decoder
lifecycleRecorder LifecycleEventRecorder
logger logr.Logger
decoder *admission.Decoder
}

func NewConsoleAuthenticatorWebhook(logger logr.Logger) *ConsoleAuthenticatorWebhook {
func NewConsoleAuthenticatorWebhook(lifecycleRecorder LifecycleEventRecorder, logger logr.Logger) *ConsoleAuthenticatorWebhook {
return &ConsoleAuthenticatorWebhook{
logger: logger,
lifecycleRecorder: lifecycleRecorder,
logger: logger,
}
}

Expand Down
20 changes: 14 additions & 6 deletions apis/workloads/v1alpha1/console_authorisation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/gocardless/theatre/v3/pkg/logging"
rbacutils "github.com/gocardless/theatre/v3/pkg/rbac"
)

// +kubebuilder:object:generate=false
type ConsoleAuthorisationWebhook struct {
client client.Client
logger logr.Logger
decoder *admission.Decoder
client client.Client
lifecycleRecorder LifecycleEventRecorder
logger logr.Logger
decoder *admission.Decoder
}

func NewConsoleAuthorisationWebhook(c client.Client, logger logr.Logger) *ConsoleAuthorisationWebhook {
func NewConsoleAuthorisationWebhook(c client.Client, lifecycleRecorder LifecycleEventRecorder, logger logr.Logger) *ConsoleAuthorisationWebhook {
return &ConsoleAuthorisationWebhook{
client: c,
logger: logger,
client: c,
lifecycleRecorder: lifecycleRecorder,
logger: logger,
}
}

Expand Down Expand Up @@ -75,6 +78,11 @@ func (c *ConsoleAuthorisationWebhook) Handle(ctx context.Context, req admission.
}

logger.Info("authorisation successful", "event", "authorisation.success")
err = c.lifecycleRecorder.ConsoleAuthorise(ctx, csl, user)
if err != nil {
logging.WithNoRecord(logger).Error(err, "failed to record event", "event", "console.authorise")
}

return admission.ValidationResponse(true, "")
}

Expand Down
147 changes: 147 additions & 0 deletions apis/workloads/v1alpha1/lifecycle_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package v1alpha1

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/gocardless/theatre/v3/pkg/workloads/console/events"
)

func CommonEventFromConsole(ctx string, eventKind events.EventKind, csl *Console) events.CommonEvent {
return events.CommonEvent{
Version: "v1alpha1",
Kind: events.KindConsole,
Event: eventKind,
ObservedAt: time.Now().UTC(),
Id: events.NewConsoleEventID(
ctx, csl.Namespace, csl.Name,
csl.CreationTimestamp.Time,
),
Annotations: map[string]string{},
}
}

// +kubebuilder:object:generate=false
type LifecycleEventRecorder interface {
ConsoleRequest(context.Context, *Console) error
ConsoleAuthorise(context.Context, *Console, string) error
ConsoleStart(context.Context, *Console, string) error
ConsoleAttach(context.Context, *Console, string, string) error
ConsoleTerminate(context.Context, *Console, bool) error
}

var _ LifecycleEventRecorder = &lifecycleEventRecorderImpl{}

// lifecycleEventRecorderImpl implements the interface to record lifecycle events given
//
// +kubebuilder:object:generate=false
type lifecycleEventRecorderImpl struct {
// context name for the kubernetes cluster where this recorder runs
contextName string

logger logr.Logger
publisher events.Publisher
}

func NewLifecycleEventRecorder(contextName string, logger logr.Logger, publisher events.Publisher) LifecycleEventRecorder {
return &lifecycleEventRecorderImpl{
contextName: contextName,
logger: logger,
publisher: publisher,
}
}

func (l *lifecycleEventRecorderImpl) ConsoleRequest(ctx context.Context, csl *Console) error {
event := &events.ConsoleRequestEvent{
CommonEvent: CommonEventFromConsole(l.contextName, events.EventRequest, csl),
Spec: events.ConsoleRequestSpec{
Reason: csl.Spec.Reason,
Username: csl.Spec.User,
Context: l.contextName,
Namespace: csl.Namespace,
ConsoleTemplate: csl.Spec.ConsoleTemplateRef.Name,
Console: csl.Name,
Timestamp: csl.CreationTimestamp.Time,
Labels: csl.Labels,
},
}

id, err := l.publisher.Publish(ctx, event)
if err != nil {
return err
}

l.logger.Info("event recorded", "id", id, "event", events.EventRequest)
return nil
}

func (l *lifecycleEventRecorderImpl) ConsoleAuthorise(ctx context.Context, csl *Console, username string) error {
event := &events.ConsoleAuthoriseEvent{
CommonEvent: CommonEventFromConsole(l.contextName, events.EventAuthorise, csl),
Spec: events.ConsoleAuthoriseSpec{
Username: username,
},
}

id, err := l.publisher.Publish(ctx, event)
if err != nil {
return err
}

l.logger.Info("event recorded", "id", id, "event", events.EventAuthorise)
return nil
}

func (l *lifecycleEventRecorderImpl) ConsoleStart(ctx context.Context, csl *Console, jobName string) error {
event := &events.ConsoleStartEvent{
CommonEvent: CommonEventFromConsole(l.contextName, events.EventStart, csl),
Spec: events.ConsoleStartSpec{
Job: jobName,
},
}

id, err := l.publisher.Publish(ctx, event)
if err != nil {
return err
}

l.logger.Info("event recorded", "id", id, "event", events.EventStart)
return nil
}

func (l *lifecycleEventRecorderImpl) ConsoleAttach(ctx context.Context, csl *Console, username string, containerName string) error {
event := &events.ConsoleAttachEvent{
CommonEvent: CommonEventFromConsole(l.contextName, events.EventAttach, csl),
Spec: events.ConsoleAttachSpec{
Username: username,
Pod: csl.Status.PodName,
Container: containerName,
},
}

id, err := l.publisher.Publish(ctx, event)
if err != nil {
return err
}

l.logger.Info("event recorded", "id", id, "event", events.EventAttach)
return nil
}

func (l *lifecycleEventRecorderImpl) ConsoleTerminate(ctx context.Context, csl *Console, timedOut bool) error {
event := &events.ConsoleTerminatedEvent{
CommonEvent: CommonEventFromConsole(l.contextName, events.EventTerminated, csl),
Spec: events.ConsoleTerminatedSpec{
TimedOut: timedOut,
},
}

id, err := l.publisher.Publish(ctx, event)
if err != nil {
return err
}

l.logger.Info("event recorded", "id", id, "event", events.EventTerminated)
return nil
}
30 changes: 26 additions & 4 deletions cmd/workloads-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"github.com/gocardless/theatre/v3/cmd"
consolecontroller "github.com/gocardless/theatre/v3/controllers/workloads/console"
"github.com/gocardless/theatre/v3/pkg/signals"
"github.com/gocardless/theatre/v3/pkg/workloads/console/events"
)

var (
scheme = runtime.NewScheme()

app = kingpin.New("workloads-manager", "Manages workloads.crd.gocardless.com resources").Version(cmd.VersionStanza())
app = kingpin.New("workloads-manager", "Manages workloads.crd.gocardless.com resources").Version(cmd.VersionStanza())
contextName = app.Flag("context-name", "Distinct name for the context this controller runs within. Usually the user-facing name of the kubernetes context for the cluster").Envar("CONTEXT_NAME").String()
pubsubProjectId = app.Flag("pubsub-project-id", "ID for the project containing the Pub/Sub topic for console event publishing").Envar("PUBSUB_PROJECT_ID").String()
pubsubTopicId = app.Flag("pubsub-topic-id", "ID of the topic to publish lifecycle event messages").Envar("PUBSUB_TOPIC_ID").String()

commonOpts = cmd.NewCommonOptions(app).WithMetrics(app)
)
Expand All @@ -40,6 +44,20 @@ func main() {
ctx, cancel := signals.SetupSignalHandler()
defer cancel()

// Create publisher sink for console lifecycle events
var publisher events.Publisher
var err error
if len(*pubsubProjectId) > 0 && len(*pubsubTopicId) > 0 {
publisher, err = events.NewGooglePubSubPublisher(ctx, *pubsubProjectId, *pubsubTopicId)
if err != nil {
app.Fatalf("failed to create publisher for %s/%s", *pubsubProjectId, *pubsubTopicId)
}
defer publisher.(*events.GooglePubSubPublisher).Stop()
} else { // Default to a nop publisher
publisher = events.NewNopPublisher()
}
lifecycleRecorder := workloadsv1alpha1.NewLifecycleEventRecorder(*contextName, logger, publisher)

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MetricsBindAddress: fmt.Sprintf("%s:%d", commonOpts.MetricAddress, commonOpts.MetricPort),
Port: 443,
Expand All @@ -53,16 +71,18 @@ func main() {

// controller
if err = (&consolecontroller.ConsoleReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("console"),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
LifecycleRecorder: lifecycleRecorder,
Log: ctrl.Log.WithName("controllers").WithName("console"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(ctx, mgr); err != nil {
app.Fatalf("failed to create controller: %v", err)
}

// console authenticator webhook
mgr.GetWebhookServer().Register("/mutate-consoles", &admission.Webhook{
Handler: workloadsv1alpha1.NewConsoleAuthenticatorWebhook(
lifecycleRecorder,
logger.WithName("webhooks").WithName("console-authenticator"),
),
})
Expand All @@ -71,6 +91,7 @@ func main() {
mgr.GetWebhookServer().Register("/validate-consoleauthorisations", &admission.Webhook{
Handler: workloadsv1alpha1.NewConsoleAuthorisationWebhook(
mgr.GetClient(),
lifecycleRecorder,
logger.WithName("webhooks").WithName("console-authorisation"),
),
})
Expand All @@ -95,6 +116,7 @@ func main() {
Handler: workloadsv1alpha1.NewConsoleAttachObserverWebhook(
mgr.GetClient(),
mgr.GetEventRecorderFor("console-attach-observer"),
lifecycleRecorder,
logger.WithName("webhooks").WithName("console-attach-observer"),
10*time.Second,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ var _ = Describe("NewGoogleDirectory", func() {
})

JustBeforeEach(func() {
gock.New("https://www.googleapis.com/admin/directory/v1/groups/platform%40gocardless.com/members").
gock.New("https://admin.googleapis.com/admin/directory/v1/groups/platform%40gocardless.com/members").
Reply(200).
JSON(membersResponse)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = Describe("NewGoogleDirectory", func() {
})

JustBeforeEach(func() {
url := "https://www.googleapis.com/admin/directory/v1/groups/platform%40gocardless.com/members"
url := "https://admin.googleapis.com/admin/directory/v1/groups/platform%40gocardless.com/members"

gock.New(url).Times(1).Reply(200).JSON(pageOne)
gock.New(url).Times(1).MatchParam("pageToken", pageOne.NextPageToken).Reply(200).JSON(pageTwo)
Expand Down
Loading

0 comments on commit d95e2ea

Please sign in to comment.