diff --git a/internal/runner/meta.go b/internal/runner/meta.go index 4c3e815dd..31cb777b8 100644 --- a/internal/runner/meta.go +++ b/internal/runner/meta.go @@ -2,7 +2,6 @@ package runner import ( "context" - "errors" "log/slog" "net/netip" "time" @@ -44,6 +43,8 @@ type RunnerMetaAgentPool struct { Name string `json:"name"` // Agent pool's organization. OrganizationName string `json:"organization-name"` + // ID of agent token that was used to authenticate runner. + TokenID resource.ID `json:"token-id"` } type registerOptions struct { @@ -64,20 +65,21 @@ type registerOptions struct { CurrentJobs []resource.ID `json:"current-jobs,omitempty"` } -func (m *RunnerMeta) register(opts registerOptions) error { - if m.ID != resource.EmptyID { - return errors.New("runner has already registered") +// register registers an unregistered runner, constructing a RunnerMeta which +// provides info about the newly registered runner. +func register(runner *unregistered, opts registerOptions) (*RunnerMeta, error) { + meta := &RunnerMeta{ + ID: resource.NewID(resource.RunnerKind), + Name: opts.Name, + Version: opts.Version, + MaxJobs: opts.Concurrency, + AgentPool: runner.pool, } - m.ID = resource.NewID(resource.RunnerKind) - m.Name = opts.Name - m.Version = opts.Version - m.MaxJobs = opts.Concurrency - - if err := m.setStatus(RunnerIdle, true); err != nil { - return err + if err := meta.setStatus(RunnerIdle, true); err != nil { + return nil, err } if opts.IPAddress != nil { - m.IPAddress = *opts.IPAddress + meta.IPAddress = *opts.IPAddress } else { // IP address not provided: try to get local IP address used for // outbound comms, and if that fails, use localhost @@ -85,10 +87,10 @@ func (m *RunnerMeta) register(opts registerOptions) error { if err != nil { ip = netip.IPv6Loopback() } - m.IPAddress = ip + meta.IPAddress = ip } - return nil + return meta, nil } func (m *RunnerMeta) setStatus(status RunnerStatus, ping bool) error { @@ -138,6 +140,7 @@ func (m *RunnerMetaAgentPool) LogValue() slog.Value { slog.String("id", m.ID.String()), slog.String("name", m.Name), slog.String("organization", m.OrganizationName), + slog.String("token-id", m.TokenID.String()), ) } diff --git a/internal/runner/runner.go b/internal/runner/runner.go index fc1ed97eb..b0d19fa67 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -117,9 +117,15 @@ func newRunner( func (r *Runner) Start(ctx context.Context) error { r.logger.V(r.v).Info("starting runner", "version", internal.Version) - // initialize terminator + // Initialize terminator, which is responsible for terminating jobs in + // response to cancelation signals. terminator := &terminator{mapping: make(map[resource.ID]cancelable)} + // Authenticate as unregistered runner with the registration endpoint. This + // is only necessary for the server runner; the agent runner relies on + // middleware to authenticate as an unregistered runner on the server. + ctx = authz.AddSubjectToContext(ctx, &unregistered{}) + // register runner with server, which responds with an updated runner // registrationMetadata, including a unique ID. registrationMetadata, err := r.client.register(ctx, registerOptions{ @@ -151,10 +157,10 @@ func (r *Runner) Start(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + r.logger.V(r.v).Info("sending final status update before shutting down") + if updateErr := r.client.updateStatus(ctx, registrationMetadata.ID, RunnerExited); updateErr != nil { err = fmt.Errorf("sending final status update: %w", updateErr) - } else { - r.logger.V(r.v).Info("sent final status update", "status", "exited") } }() diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go new file mode 100644 index 000000000..5abd455fb --- /dev/null +++ b/internal/runner/runner_test.go @@ -0,0 +1,67 @@ +package runner + +import ( + "context" + "testing" + + "github.com/leg100/otf/internal/logr" + "github.com/leg100/otf/internal/resource" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRunner(t *testing.T) { + updates := make(chan RunnerStatus) + wantID := resource.NewID(resource.RunnerKind) + + r, err := newRunner( + logr.Discard(), + &fakeRunnerClient{registeredID: wantID, updates: updates}, + &fakeOperationSpawner{}, + false, + Config{}, + ) + require.NoError(t, err) + + // Terminate runner at end of test + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + startErr := make(chan error) + go func() { + startErr <- r.Start(ctx) + }() + + // Test that runner registers itself + assert.Equal(t, &RunnerMeta{ID: wantID}, <-r.registered) + // Terminate runner + cancel() + // Test that runner sends final status update + assert.Equal(t, RunnerExited, <-updates) +} + +type fakeRunnerClient struct { + client + + registeredID resource.ID + updates chan RunnerStatus +} + +func (f *fakeRunnerClient) register(ctx context.Context, opts registerOptions) (*RunnerMeta, error) { + return &RunnerMeta{ID: f.registeredID}, nil +} + +func (f *fakeRunnerClient) getJobs(ctx context.Context, agentID resource.ID) ([]*Job, error) { + // Block until context canceled + <-ctx.Done() + return nil, nil +} + +func (f *fakeRunnerClient) updateStatus(ctx context.Context, agentID resource.ID, status RunnerStatus) error { + f.updates <- status + return nil +} + +type fakeOperationSpawner struct { + operationSpawner +} diff --git a/internal/runner/server.go b/internal/runner/server.go index 64a1ffc95..36d196286 100644 --- a/internal/runner/server.go +++ b/internal/runner/server.go @@ -1,9 +1,6 @@ package runner import ( - "context" - - "github.com/leg100/otf/internal/authz" "github.com/leg100/otf/internal/logr" "github.com/leg100/otf/internal/releases" ) @@ -24,13 +21,8 @@ type ServerRunnerOptions struct { Jobs operationJobsClient } -// ServerRunner is a runner built into the otfd server prcess. -type ServerRunner struct { - *Runner -} - // NewServerRunner constructs a server runner. -func NewServerRunner(opts ServerRunnerOptions) (*ServerRunner, error) { +func NewServerRunner(opts ServerRunnerOptions) (*Runner, error) { daemon, err := newRunner( opts.Logger, opts.Runners, @@ -53,15 +45,7 @@ func NewServerRunner(opts ServerRunnerOptions) (*ServerRunner, error) { if err != nil { return nil, err } - return &ServerRunner{Runner: daemon}, nil -} - -// Start the server runner daemon. -func (d *ServerRunner) Start(ctx context.Context) error { - // Authenticate as runner with server endpoints. - ctx = authz.AddSubjectToContext(ctx, d.RunnerMeta) - - return d.Runner.Start(ctx) + return daemon, nil } type localOperationSpawner struct { diff --git a/internal/runner/service.go b/internal/runner/service.go index f16c81546..58b4637ec 100644 --- a/internal/runner/service.go +++ b/internal/runner/service.go @@ -24,10 +24,7 @@ import ( "github.com/leg100/otf/internal/workspace" ) -var ( - ErrInvalidStateTransition = errors.New("invalid runner state transition") - ErrUnauthorizedRegistration = errors.New("unauthorized runner registration") -) +var ErrInvalidStateTransition = errors.New("invalid runner state transition") type ( Service struct { @@ -129,12 +126,14 @@ func NewService(opts ServiceOptions) *Service { // Register with auth middleware the agent token kind and a means of // retrieving the appropriate runner corresponding to the agent token ID opts.TokensService.RegisterKind(resource.AgentTokenKind, func(ctx context.Context, tokenID resource.ID) (authz.Subject, error) { + // Fetch agent pool corresponding to the provided token. This + // effectively authenticates the token. pool, err := svc.db.getPoolByTokenID(ctx, tokenID) if err != nil { return nil, err } - // if the runner has registered then it should be sending its ID in an - // http header + // if the runner has already registered then it should be sending its ID + // in an http header headers, err := otfhttp.HeadersFromContext(ctx) if err != nil { return nil, err @@ -152,10 +151,11 @@ func NewService(opts ServiceOptions) *Service { } // Agent runner hasn't registered yet, so set subject to a runner with a // agent pool info, which will be used when registering the runner below. - return &RunnerMeta{AgentPool: &RunnerMetaAgentPool{ + return &unregistered{pool: &RunnerMetaAgentPool{ ID: pool.ID, Name: pool.Name, OrganizationName: pool.Organization, + TokenID: tokenID, }}, nil }) // create jobs when a plan or apply is enqueued @@ -206,17 +206,22 @@ func (s *Service) WatchJobs(ctx context.Context) (<-chan pubsub.Event[*Job], fun func (s *Service) register(ctx context.Context, opts registerOptions) (*RunnerMeta, error) { runner, err := func() (*RunnerMeta, error) { - runner, err := runnerFromContext(ctx) + subject, err := authz.SubjectFromContext(ctx) if err != nil { - return nil, ErrUnauthorizedRegistration + return nil, err } - if err := runner.register(opts); err != nil { + unregistered, ok := subject.(*unregistered) + if !ok { + return nil, internal.ErrAccessNotPermitted + } + registered, err := register(unregistered, opts) + if err != nil { return nil, err } - if err := s.db.create(ctx, runner); err != nil { + if err := s.db.create(ctx, registered); err != nil { return nil, err } - return runner, nil + return registered, nil }() if err != nil { s.Error(err, "registering runner") diff --git a/internal/runner/unregistered.go b/internal/runner/unregistered.go new file mode 100644 index 000000000..5d6b5365e --- /dev/null +++ b/internal/runner/unregistered.go @@ -0,0 +1,14 @@ +package runner + +import "github.com/leg100/otf/internal/authz" + +// unregistered describes a runner that is not yet registered. +type unregistered struct { + // unregistered is a subject only for the purposes of satisfying the + // token-handling middleware which doesn't call any of the interface + // methods. + authz.Subject + + // pool is non-nil if the runner is an agent. + pool *RunnerMetaAgentPool +}