Skip to content

Commit

Permalink
feat: Pipelines are now versioned
Browse files Browse the repository at this point in the history
In order to eventually have canary-able deployments in Gofer we must
first support versioned pipelines.

This allows us to:
* Have a good target in which to roll back and forward.
* Understand what we are gaining and losing on each change.
* Track each update as it happens.

This is not easy though as pipelines have parts which are easy to version
(namely the config) and parts which are much harder to version (how do
we handle the cutting over of triggers?).

Because of this nuance, we've had to redesign a lot of earlier
assumptions for how Gofer models worked. This was a major refactor and
since I was here I made a few other large sweeping changes.

* Full storage package refactor: The storage layer was hard to use,
brittle, and inflexible. I've refactored it, leaning a bit more on
sqlx and going back to basics. I tried to make the storage package
work in the past by keeping the domain models to a minimum. I've since
learned this does not work once things become reasonably complicated. One
of the main refactors to the storage package is the introduction of
dedicated storage models. This means that I have to write a bunch of
boilerplate code to switch from in-memory models to the storage ones,
but the looser coupling is worth it. More storage refactors coming
as I learn what works at large scale and what doesn't.
https://github.com/go-jet/jet looks interesting.

* Removal of Triggers as Pipeline configuration: I desparately wanted
to have pipeline configurations encompass everything a pipeline would
have to offer, so that it was easy to look at a config and know exactly
what was going on in a particular pipeline. Triggers were a pain in the
ass though. Triggers unfortunately occupy a very special place in Gofer's
archetecture. Without triggers nothing really gets done. And so allowing
the user to apply all the same functionality to triggers as they would
with any other deployment was short-sighted. Triggers don't make a lot
of sense as a canary deployment. Triggers aren't ephemeral, they are
either on or their off. No in-between.

Instead Triggers can now be added to your pipeline via the command line.
This way trigger subscriptions aren't held down by the paradigms of
configuration change.

* Pipelines are now versioned: Not only have we added versions to pipelines,
but they now have deployments and configurations and metadata and a lot
of smaller loosely coupled parts so that they aren't a huge data monolith.
This means a lot of breaking changes for outward (and inward for that matter)
apis.

* Just lots of general breakages everywhere: Pretty much a large percentage
of things just aren't the same anymore.
  • Loading branch information
clintjedwards committed Jan 30, 2023
1 parent 3f514c6 commit d90ca94
Show file tree
Hide file tree
Showing 135 changed files with 15,005 additions and 11,749 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ build-protos:
> protoc --proto_path=proto --go_out=proto/go --go_opt=paths=source_relative \
--go-grpc_out=proto/go --go-grpc_opt=paths=source_relative proto/*.proto
> cd proto/rust
> cargo build
> cargo build --release
.PHONY: build-protos

## build-sdk: build rust sdk
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ It's purpose is to run short term jobs such as: code linters, build tools, tests

## Features:

- Deploy it as a single static binary.
- Write your pipelines in a programming language you're familar with. (**Go** or **Rust** for now).
- Deploy Gofer as a single static binary, manage Gofer through the included command line interface.
- Write your pipelines in a programming language you're familar with; stop cobbling together unfamiliar yaml. (**Go** or **Rust** for now).
- Test and run your pipelines locally; No more <i>"commit it and see"</i> testing.
- Pluggable: Write your own triggers, shared tasks, and more in any language (through GRPC).
- DAG(Directed Acyclic Graph) support.
- Reliability tooling: A/B test, version, and canary new pipelines.
- Reliability tooling: Version, Blue/Green deploy, and canary deploy updated versions of your pipelines.
- Bring your own everything! Secret store, object store, container scheduler. Gofer has the interfaces to support all of them.

## Demo:
Expand Down
90 changes: 68 additions & 22 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

We need a common way to alert on a PR or something that a task has succeeded or failed or similar.

### Canaried pipelines

- With versioned pipelines we now need the ability for Gofer to roll out your pipeline for you and watch telemetry on it.
- We need to add "update methods" to pipeline settings which will control the manner in which we roll out updates. Runs will need to include which version of the pipeline has run

### API

- Write/Ensure proper validation for all endpoints.
Expand Down Expand Up @@ -37,14 +42,15 @@ We need a common way to alert on a PR or something that a task has succeeded or

### Common Tasks

- It would be nice to create a common task with some basic "the user wants to do something when this condition
happens". What is the best way to do this?
- If pipeline fails 3 runs in a row.
- If pipeline failure rate ever dives below certain percentage.
- If total time of a run exceeds a given duration.
- When a run finishes.
- When a run fails.
- If a particular task run fails or succeeds.
Eventually we will need to create some common tasks that are based around alerting. It would be nice to send the user
a notification around one or more of the following events should the user want it:

- If pipeline fails 3 runs in a row.
- If pipeline failure rate ever dives below certain percentage.
- If total time of a run exceeds a given duration.
- When a run finishes.
- When a run fails.
- If a particular task run fails or succeeds.

### CLI

Expand All @@ -61,6 +67,7 @@ We need a common way to alert on a PR or something that a task has succeeded or
- Create an init function for both rust and golang(simply just prompts you for your language) and then creates a new default vanilla pipeline. (similar to the old "config init" command)
- Inspiration for CLI design: https://github.com/bensadeh/circumflex
- Look into bubble tea for some interactions.
- A diff command might be awesome.

### Scheduler

Expand Down Expand Up @@ -141,24 +148,63 @@ We need a common way to alert on a PR or something that a task has succeeded or
These are prefixed with `gofer_plugin_config_{var}`
- Gofer then passes them another set of env vars from the user's own config.
These are prefixed with `gofer_plugin_param_{var}`
- Write better documentation on how to spin Gofer up locally so you can test out your pipeline.

### On the floor

- Triggers need to be removed from pipeline configuration. They should instead be set up globally for the pipeline
by the user through the command line. This is because they are fairly in-elastic. If we were canarying a pipeline
how do we determine which pipeline triggers go to which versions of the pipeline. This is almost impossible.
Treating triggers as global things actually makes all the code so much easier to work with.
- Create a container for custom use that has gofer-cli already packed in and possibly allows
- Think about making a new task type that you can pass commands to that automatically uses the gofer container. So users can get zero to code ultra-fast.
- Fixing Pipeline updates and rolling out versioned pipelines.
- Gofer needs versioned pipelines as a first step into supporting the possibility of canarying pipelines.
- We need to make a user settable limit for pipeline versions. Delete older versions.
- Several things need to get done
1. We need to figure out how to support versioned pipelines in the context of the database and data models. We'll probably need to change schema quite a bit.
2. Clean up how trigger sub/un-subs work. Hitting the upgrade endpoint for your pipeline should return immediately and update the pipeline's status to updating.
- (We'll probably need to add statues to pipeline [Ready, Updating])
- During this "updating" time Gofer will remove triggers and subscribe triggers as necessary.
- If this process fails Gofer will rollback to old trigger state.
- If this fails Gofer will mark the pipeline as paused(or better) for a specific reason.
- Clients of the API will kick off the update by passing the proto as usual and then listen for pipeline updates which can then be relayed to the user.
- Once the pipeline finishes updating the pipeline will switch back to Ready state but the API will not
autoswitch back to active.
3. We need to add "update methods" to pipeline settings which will control the manner in which we roll out updates. Runs will need to include which version of the pipeline has run
- During this "updating" time Gofer will remove triggers and subscribe triggers as necessary.
- If this process fails Gofer will rollback to old trigger state.
- If this fails Gofer will mark the pipeline as disabled(or better) for a specific reason(note a pipeline error here).
- Clients of the API will kick off the update by passing the proto as usual and then listen for pipeline updates which can then be relayed to the user via events.
- In the event of a rollback, when we mark a pipeline as active after it has been deprecated, we should clear the deprecation timestamp.
- We can use this deprecation timestamp to find the oldest version.
- We need the ability for the user to trigger a pipeline rollback(with and option to return to a non-new version)
- The Cli should give the user the ability to roll forward and backward by simply specifying version number.
- We can do this by splitting the actions into two. We have two endpoints:
- Register pipeline
- Deploy pipeline
- This allows us to separate the two in a way that makes it much like long-running jobs. We first upload the new version
then we control how we deploy the new version.
- Errors only occur without the user knowledge at this step so it makes sense.
- This also gives us the chance to rewrite the startup for pipelines. Instead of Gofer trying to best try reconnect
pipelines to their triggers now we can launch a deployment for each pipeline and then compartmentalize the stuff that
happens there. This gives the user a nice interface to understand pipeline failures in relation to Gofer.
- Deployments should have types: The type we're currently doing is immediate cutover.
- We need to redo restoreTriggerSubscriptions with the new tao of what a trigger is.
- Update the CLI to work with all of this.
- Clean up both github triggers and add a github common task.
- common task we can throw in there as a parallel task a the start of each pipeline. It will consume github commit, inform github of the pipeline pending and then query gofer to see when the run has ended. When the run ends the task will then inform github that the run has finished with a particular state.
- Think about making a new task type that you can pass commands to that automatically uses the gofer container. So users can get zero to code ultra-fast.
- We need to clean up the storage layer. There is way too much that we're doing on our own and it's prone to mistakes.
Mostly when we convert the struct right before inserting it into the database.
- We need to move to three layers of models: Proto -> Internal -> Storage.
- When we need to execute a transaction we can create a WithTx that takes a callback. We can have the things we need to wrap into transactions in that callback
and then we wont need to do weird nil things.
- get rid of FromProtos and hide the models package. It's a domain level package so it should not be shown anyway. The only interface will be protos.
- When we fail a trigger during deployment we should give a better error currently we give general.
- We might want to think about wrapping Pipeline, so we can treat it like an object more. This would enable us to
perform a lot of actions straightforwardly and have it all work out in the backend. For example, unsubscribing a trigger
for a pipeline would save things to the database naturally, publish the event, etc and we get that consistent behavior
no matter what.
- Think more about event naming, right now the naming is kinda weird because we're trying events to the objects that we normally
work with like pipelines and runs. Maybe that grouping should be in documentation only? Including it in the name can make for
awkward confusing names.
- For instance: EventStartedDeployPipeline instead of EventStartedPipelineDeploy
- To support, spreading traffic to different versions of pipeline, we can first check if this pipeline has a deployment
if that deployment is going out then the deployment has a type. We then take those versions of the deployment and
share traffic based on that type.
- pipeline_trriger_status: state/status should be subscribed, disabled, failed. disabled means it's still subscribed, but any events will be ignored.
- For To and From functions in the API remove the errors. They are all must functions for ease of use.
- Write tests for all these functions also.
- Go back and remove all panics that aren't data related.
- The api layer needs a package that wraps the storage package and makes it easier to use.
- CLI: pipeline[register, list, get] and trigger[subscribe, unsubscribe] files need work.
- Update pipeline getting started documentation, triggers are handled differently now.
- If CLI detects this is the first time we've pushed to thsi pipeline we then need to automatically deploy it.
- THe CLI should also comment that now is the time to add triggers.
48 changes: 34 additions & 14 deletions containers/triggers/cron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ const ParameterExpression = "expression"
var checkInterval = time.Minute

type subscription struct {
pipelineTriggerLabel string
pipeline string
namespace string
pipeline string
pipelineTriggerLabel string
timeframe avail.Timeframe
}

type subscriptionID struct {
namespace string
pipeline string
pipelineTriggerLabel string
}

type trigger struct {
events chan *proto.TriggerWatchResponse
subscriptions []subscription
subscriptions map[subscriptionID]*subscription
}

func newTrigger() *trigger {
Expand Down Expand Up @@ -66,31 +72,45 @@ func (t *trigger) Subscribe(ctx context.Context, request *proto.TriggerSubscribe
return nil, fmt.Errorf("could not parse expression: %q", err)
}

subID := subscriptionID{
request.NamespaceId,
request.PipelineId,
request.PipelineTriggerLabel,
}

// It is perfectly possible for Gofer to attempt to subscribe an already subscribed pipeline. In this case,
// we can simply ignore the request.
_, exists = t.subscriptions[subID]
if exists {
log.Debug().Str("namespace_id", request.NamespaceId).Str("trigger_label", request.PipelineTriggerLabel).
Str("pipeline_id", request.PipelineId).Msg("pipeline already subscribed; ignoring request")
return &proto.TriggerSubscribeResponse{}, nil
}

// While it might result in a faster check to start a goroutine for each subscription the interval
// for most of these expressions should be on the order of minutes. So one event loop checking the
// result for all of them should still result in no missed checks.
t.subscriptions = append(t.subscriptions, subscription{
pipelineTriggerLabel: request.PipelineTriggerLabel,
pipeline: request.PipelineId,
t.subscriptions[subID] = &subscription{
namespace: request.NamespaceId,
pipeline: request.PipelineId,
pipelineTriggerLabel: request.PipelineTriggerLabel,
timeframe: timeframe,
})
}

log.Debug().Str("trigger_label", request.PipelineTriggerLabel).Str("pipeline_id", request.PipelineId).
Str("namespace_id", request.NamespaceId).Msg("subscribed pipeline")
return &proto.TriggerSubscribeResponse{}, nil
}

func (t *trigger) Unsubscribe(ctx context.Context, request *proto.TriggerUnsubscribeRequest) (*proto.TriggerUnsubscribeResponse, error) {
for index, subscription := range t.subscriptions {
if subscription.pipelineTriggerLabel == request.PipelineTriggerLabel &&
subscription.namespace == request.NamespaceId &&
subscription.pipeline == request.PipelineId {
t.subscriptions = append(t.subscriptions[:index], t.subscriptions[index+1:]...)
return &proto.TriggerUnsubscribeResponse{}, nil
}
subID := subscriptionID{
namespace: request.NamespaceId,
pipeline: request.PipelineId,
pipelineTriggerLabel: request.PipelineTriggerLabel,
}

delete(t.subscriptions, subID)

log.Debug().Str("trigger_label", request.PipelineTriggerLabel).Str("pipeline_id", request.PipelineId).
Str("namespace_id", request.NamespaceId).Msg("unsubscribed pipeline")
return &proto.TriggerUnsubscribeResponse{}, nil
Expand Down
59 changes: 40 additions & 19 deletions containers/triggers/interval/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ const (
// This structure is to keep details about those subscriptions so that we may perform the triggers duties on those
// pipeline subscriptions.
type subscription struct {
pipelineTriggerLabel string
pipeline string
namespace string
pipeline string
pipelineTriggerLabel string
quit context.CancelFunc
}

Expand All @@ -66,9 +66,9 @@ type subscription struct {
// of this unique key. That is because, when relevant triggers should be expected that pipelines might
// want to subscribe more than once.
type subscriptionID struct {
pipelineTriggerLabel string
pipeline string
namespace string
pipeline string
pipelineTriggerLabel string
}

// Trigger is a structure that every Gofer trigger should have. It is essentially the God struct. It contains
Expand Down Expand Up @@ -124,7 +124,8 @@ func newTrigger() (*trigger, error) {

// startInterval is the main logic of what enables the interval trigger to work. Each pipeline that is subscribed runs
// this function which simply waits for the set duration and then pushes a "WatchResponse" event into the trigger's main channel.
func (t *trigger) startInterval(ctx context.Context, pipeline, namespace, pipelineTriggerLabel string, duration time.Duration) {
func (t *trigger) startInterval(ctx context.Context, namespace, pipeline string, pipelineTriggerLabel string, duration time.Duration,
) {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -165,16 +166,33 @@ func (t *trigger) Subscribe(ctx context.Context, request *proto.TriggerSubscribe
return nil, fmt.Errorf("durations cannot be less than %s", t.minDuration)
}

subctx, quit := context.WithCancel(t.parentContext)
t.subscriptions[subscriptionID{
request.PipelineTriggerLabel,
request.PipelineId,
subID := subscriptionID{
request.NamespaceId,
}] = &subscription{request.PipelineTriggerLabel, request.NamespaceId, request.PipelineId, quit}
request.PipelineId,
request.PipelineTriggerLabel,
}

go t.startInterval(subctx, request.PipelineId, request.NamespaceId, request.PipelineTriggerLabel, duration)
// It is perfectly possible for Gofer to attempt to subscribe an already subscribed pipeline. In this case,
// we can simply ignore the request.
_, exists = t.subscriptions[subID]
if exists {
log.Debug().Str("namespace_id", request.NamespaceId).Str("trigger_label", request.PipelineTriggerLabel).
Str("pipeline_id", request.PipelineId).Msg("pipeline already subscribed; ignoring request")
return &proto.TriggerSubscribeResponse{}, nil
}

log.Debug().Str("namespace_id", request.NamespaceId).Str("trigger_label", request.PipelineTriggerLabel).Str("pipeline_id", request.PipelineId).Msg("subscribed pipeline")
subctx, quit := context.WithCancel(t.parentContext)
t.subscriptions[subID] = &subscription{
namespace: request.NamespaceId,
pipeline: request.PipelineId,
pipelineTriggerLabel: request.PipelineTriggerLabel,
quit: quit,
}

go t.startInterval(subctx, request.NamespaceId, request.PipelineId, request.PipelineTriggerLabel, duration)

log.Debug().Str("namespace_id", request.NamespaceId).Str("trigger_label", request.PipelineTriggerLabel).
Str("pipeline_id", request.PipelineId).Msg("subscribed pipeline")
return &proto.TriggerSubscribeResponse{}, nil
}

Expand All @@ -194,21 +212,24 @@ func (t *trigger) Watch(ctx context.Context, request *proto.TriggerWatchRequest)
// previously subscribed.
func (t *trigger) Unsubscribe(ctx context.Context, request *proto.TriggerUnsubscribeRequest) (*proto.TriggerUnsubscribeResponse, error) {
subscription, exists := t.subscriptions[subscriptionID{
pipelineTriggerLabel: request.PipelineTriggerLabel,
pipeline: request.PipelineId,
namespace: request.NamespaceId,
pipeline: request.PipelineId,
pipelineTriggerLabel: request.PipelineTriggerLabel,
}]

// It is perfectly possible for Gofer to attempt to unsubscribe an already unsubscribed pipeline. In this case,
// we can simply ignore the request.
if !exists {
return &proto.TriggerUnsubscribeResponse{},
fmt.Errorf("could not find subscription for trigger %s pipeline %s namespace %s",
request.PipelineTriggerLabel, request.PipelineId, request.NamespaceId)
log.Debug().Str("namespace_id", request.NamespaceId).Str("trigger_label", request.PipelineTriggerLabel).
Str("pipeline_id", request.PipelineId).Msg("no subscription found for pipeline")
return &proto.TriggerUnsubscribeResponse{}, nil
}

subscription.quit()
delete(t.subscriptions, subscriptionID{
pipelineTriggerLabel: request.PipelineTriggerLabel,
pipeline: request.PipelineId,
namespace: request.NamespaceId,
pipeline: request.PipelineId,
pipelineTriggerLabel: request.PipelineTriggerLabel,
})
return &proto.TriggerUnsubscribeResponse{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion documentation/src/ref/triggers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ You can [create](#how-to-add-new-triggers) your own triggers, but Gofer provides
Triggers must first be installed by Gofer administrators before they can be used. They can be installed by the CLI. For more information on how to install a specific trigger run:

```bash
gofer triggers install -h
gofer trigger install -h
```

## How do I configure a Trigger?
Expand Down
15 changes: 0 additions & 15 deletions examplePipelines/go/trigger/go.mod

This file was deleted.

Loading

0 comments on commit d90ca94

Please sign in to comment.