Skip to content

Commit

Permalink
chore: simple in memory provisioning scheduler (#2815)
Browse files Browse the repository at this point in the history
Rough domain model for provisioning deployments. 
- `Task` is a single executable task to bring the infra from one state
to another
- `Deployment` is a list of sequentially executable tasks
- `Provisioner` is an interface for calling provisioners, wrapping calls
to provisioner plugins

This is a WIP, and there are a lot of `TODO`s around, but I wanted to
get this out to keep PRs small.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] committed Sep 25, 2024
1 parent 93de931 commit 1297472
Show file tree
Hide file tree
Showing 6 changed files with 461 additions and 6 deletions.
133 changes: 133 additions & 0 deletions backend/provisioner/deployment/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package deployment

import (
"context"
"fmt"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
)

type TaskState string

const (
TaskStatePending TaskState = ""
TaskStateRunning TaskState = "running"
TaskStateDone TaskState = "done"
TaskStateFailed TaskState = "failed"
)

// Task is a unit of work for a deployment
type Task struct {
Handler Provisioner
Module string
State TaskState
Desired []*provisioner.Resource
Existing []*provisioner.Resource
// populated only when the task is done
Output []*provisioner.Resource

// set if the task is currently running
RunningToken string
}

func (t *Task) Start(ctx context.Context) error {
if t.State != TaskStatePending {
return fmt.Errorf("task state is not pending: %s", t.State)
}
t.State = TaskStateRunning
token, err := t.Handler.Provision(ctx, t.Module, t.constructResourceContext(t.Desired), t.Existing)
if err != nil {
t.State = TaskStateFailed
return fmt.Errorf("error provisioning resources: %w", err)
}
if token == "" {
// no changes
t.State = TaskStateDone
t.Output = t.Desired
}
t.RunningToken = token
return nil
}

func (t *Task) constructResourceContext(r []*provisioner.Resource) []*provisioner.ResourceContext {
result := make([]*provisioner.ResourceContext, len(r))
for i, res := range r {
result[i] = &provisioner.ResourceContext{
Resource: res,
// TODO: Collect previously constructed resources from a dependency graph here
}
}
return result
}

func (t *Task) Progress(ctx context.Context) error {
if t.State != TaskStateRunning {
return fmt.Errorf("task state is not running: %s", t.State)
}
state, output, err := t.Handler.State(ctx, t.RunningToken, t.Desired)
if err != nil {
return fmt.Errorf("error getting state: %w", err)
}
if state == TaskStateDone {
t.State = TaskStateDone
t.Output = output
}
return nil
}

// Deployment is a single deployment of resources for a single module
type Deployment struct {
Module string
Tasks []*Task
}

// next running or pending task. Nil if all tasks are done.
func (d *Deployment) next() *Task {
for _, t := range d.Tasks {
if t.State == TaskStatePending || t.State == TaskStateRunning || t.State == TaskStateFailed {
return t
}
}
return nil
}

// Progress the deployment. Returns true if there are still tasks running or pending.
func (d *Deployment) Progress(ctx context.Context) (bool, error) {
next := d.next()
if next == nil {
return false, nil
}

if next.State == TaskStatePending {
err := next.Start(ctx)
if err != nil {
return true, err
}
}
err := next.Progress(ctx)
return d.next() != nil, err
}

type DeploymentState struct {
Pending []*Task
Running *Task
Failed *Task
Done []*Task
}

func (d *Deployment) State() *DeploymentState {
result := &DeploymentState{}
for _, t := range d.Tasks {
switch t.State {
case TaskStatePending:
result.Pending = append(result.Pending, t)
case TaskStateRunning:
result.Running = t
case TaskStateFailed:
result.Failed = t
case TaskStateDone:
result.Done = append(result.Done, t)
}
}
return result
}
92 changes: 92 additions & 0 deletions backend/provisioner/deployment/deployment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package deployment_test

import (
"context"
"testing"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/provisioner/deployment"
"github.com/alecthomas/assert/v2"
)

// MockProvisioner is a mock implementation of the Provisioner interface
type MockProvisioner struct {
Token string
stateCalls int
}

var _ deployment.Provisioner = (*MockProvisioner)(nil)

func (m *MockProvisioner) Provision(
ctx context.Context,
module string,
desired []*provisioner.ResourceContext,
existing []*provisioner.Resource,
) (string, error) {
return m.Token, nil
}

func (m *MockProvisioner) State(
ctx context.Context,
token string,
desired []*provisioner.Resource,
) (deployment.TaskState, []*provisioner.Resource, error) {
m.stateCalls++
if m.stateCalls <= 1 {
return deployment.TaskStateRunning, nil, nil
}
return deployment.TaskStateDone, desired, nil
}

func TestDeployment_Progress(t *testing.T) {
ctx := context.Background()

t.Run("no tasks", func(t *testing.T) {
deployment := &deployment.Deployment{}
progress, err := deployment.Progress(ctx)
assert.NoError(t, err)
assert.False(t, progress)
})

t.Run("progresses each provisioner in order", func(t *testing.T) {
registry := deployment.ProvisionerRegistry{}
registry.Register(&MockProvisioner{Token: "foo"}, deployment.ResourceTypePostgres)
registry.Register(&MockProvisioner{Token: "bar"}, deployment.ResourceTypeMysql)

dpl := registry.CreateDeployment(
"test-module",
[]*provisioner.Resource{{
ResourceId: "a",
Resource: &provisioner.Resource_Mysql{},
}, {
ResourceId: "b",
Resource: &provisioner.Resource_Postgres{},
}},
[]*provisioner.Resource{},
)

assert.Equal(t, 2, len(dpl.State().Pending))

_, err := dpl.Progress(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, len(dpl.State().Pending))
assert.NotZero(t, dpl.State().Running)

_, err = dpl.Progress(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, len(dpl.State().Pending))
assert.Zero(t, dpl.State().Running)
assert.Equal(t, 1, len(dpl.State().Done))

_, err = dpl.Progress(ctx)
assert.NoError(t, err)
assert.Equal(t, 0, len(dpl.State().Pending))
assert.NotZero(t, dpl.State().Running)
assert.Equal(t, 1, len(dpl.State().Done))

running, err := dpl.Progress(ctx)
assert.NoError(t, err)
assert.Equal(t, 2, len(dpl.State().Done))
assert.False(t, running)
})
}
171 changes: 171 additions & 0 deletions backend/provisioner/deployment/provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package deployment

import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
)

// ResourceType is a type of resource used to configure provisioners
type ResourceType string

const (
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
)

// Provisioner is a runnable process to provision resources
type Provisioner interface {
Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error)
State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error)
}

type provisionerConfig struct {
provisioner Provisioner
types []ResourceType
}

// ProvisionerRegistry contains all known resource handlers in the order they should be executed
type ProvisionerRegistry struct {
Provisioners []*provisionerConfig
}

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) {
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{
provisioner: handler,
types: types,
})
}

// CreateDeployment to take the system to the desired state
func (reg *ProvisionerRegistry) CreateDeployment(module string, desiredResources, existingResources []*provisioner.Resource) *Deployment {
var result []*Task

existingByHandler := reg.groupByProvisioner(existingResources)
desiredByHandler := reg.groupByProvisioner(desiredResources)

for handler, desired := range desiredByHandler {
existing := existingByHandler[handler]
result = append(result, &Task{
Handler: handler,
Desired: desired,
Existing: existing,
})
}
return &Deployment{Tasks: result, Module: module}
}

// ExtractResources from a module schema
func ExtractResources(sch *schema.Module) ([]*provisioner.Resource, error) {
var result []*provisioner.Resource
for _, decl := range sch.Decls {
if db, ok := decl.(*schema.Database); ok {
if db.Type == "postgres" {
result = append(result, &provisioner.Resource{
ResourceId: decl.GetName(),
Resource: &provisioner.Resource_Postgres{},
})
} else if db.Type == "mysql" {
result = append(result, &provisioner.Resource{
ResourceId: decl.GetName(),
Resource: &provisioner.Resource_Mysql{},
})
} else {
return nil, fmt.Errorf("unknown db type: %s", db.Type)
}
}
}
return result, nil
}

func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Resource) map[Provisioner][]*provisioner.Resource {
result := map[Provisioner][]*provisioner.Resource{}
for _, r := range resources {
for _, cfg := range reg.Provisioners {
for _, t := range cfg.types {
typed := typeOf(r)
if t == typed {
result[cfg.provisioner] = append(result[cfg.provisioner], r)
break
}
}
}
}
return result
}

func typeOf(r *provisioner.Resource) ResourceType {
if _, ok := r.Resource.(*provisioner.Resource_Mysql); ok {
return ResourceTypeMysql
} else if _, ok := r.Resource.(*provisioner.Resource_Postgres); ok {
return ResourceTypePostgres
}
return ResourceTypeUnknown
}

// PluginProvisioner delegates provisioning to an external plugin
type PluginProvisioner struct {
cmdCtx context.Context
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient]
}

func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) {
client, cmdCtx, err := plugin.Spawn(
ctx,
log.Debug,
name,
dir,
exe,
provisionerconnect.NewProvisionerPluginServiceClient,
)
if err != nil {
return nil, fmt.Errorf("error spawning plugin: %w", err)
}

return &PluginProvisioner{
cmdCtx: cmdCtx,
client: client,
}, nil
}

func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
DesiredResources: desired,
ExistingResources: existing,
FtlClusterId: "ftl",
Module: module,
}))
if err != nil {
return "", fmt.Errorf("error calling plugin: %w", err)
}
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED {
return resp.Msg.ProvisioningToken, nil
}
return "", nil
}

func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{
ProvisioningToken: token,
}))
if err != nil {
return "", nil, fmt.Errorf("error getting status from plugin: %w", err)
}
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage)
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
return TaskStateDone, success.Success.UpdatedResources, nil
}
return TaskStateRunning, nil, nil
}

var _ Provisioner = (*PluginProvisioner)(nil)
Loading

0 comments on commit 1297472

Please sign in to comment.