Skip to content

Commit

Permalink
feat: instant job update notifications using server-sent events
Browse files Browse the repository at this point in the history
This commit introduces a new feature that enables clients to receive
instant notifications for job status updates.

Details:

- A new endpoint `/jobs/{id}/status/subscribe` has been added, which
  implements instant notifications through Server-Sent Events (SSE).
- By subscribing to this endpoint, clients can efficiently keep track of
  job status changes without the need to repeatedly poll the server.
- A client reference implementation has been added to wfxctl.

Signed-off-by: Michael Adler <michael.adler@siemens.com>
  • Loading branch information
michaeladler committed Aug 23, 2023
1 parent 9dc0031 commit b997d97
Show file tree
Hide file tree
Showing 54 changed files with 2,827 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Instant job update notifications using server-sent events (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand Down
6 changes: 6 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ var WorkflowInvalid = model.Error{
Logref: "18f57adc70dd79c7fb4f1246be8a6e04",
Message: "Workflow validation failed",
}

var JobTerminalState = model.Error{
Code: "wfx.jobTerminalState",
Logref: "916f0a913a3e4a52a96bd271e029c201",
Message: "The request was invalid because the job is in a terminal state",
}
121 changes: 121 additions & 0 deletions api/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/persistence"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
jsonpath "github.com/steinfletcher/apitest-jsonpath"
Expand Down Expand Up @@ -118,3 +122,120 @@ func TestJobStatusUpdate(t *testing.T) {
Assert(jsonpath.Contains(`$.state`, "DOWNLOADING")).
End()
}

func TestJobStatusSubscribe(t *testing.T) {
db := newInMemoryDB(t)
wf := dau.DirectWorkflow()
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)

north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
jobReq := model.JobRequest{
ClientID: "foo",
Workflow: wf.Name,
}
job, err := job.CreateJob(context.Background(), db, &jobReq)
require.NoError(t, err)
jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

job, err = db.UpdateJob(context.Background(), job, persistence.JobUpdate{
Status: &model.JobStatus{State: "ACTIVATING"},
})
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
if status.SubscriberCount() > 0 {
break
}
time.Sleep(time.Millisecond)
}
// give it some extra time, just to be safe
time.Sleep(500 * time.Millisecond)
// update job to terminal state
_, err = status.Update(context.Background(), db, job.ID, &model.JobStatus{State: "ACTIVATED"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
apitest.New().
Handler(handler).
Get(jobPath).WithContext(ctx).
Expect(t).
Status(http.StatusOK).
Header("Content-Type", "text/event-stream").
Body(`data: {"state":"ACTIVATING"}
data: {"state":"ACTIVATED"}
`).
End()

wg.Wait()
status.ShutdownSubscribers()
})
}
}

func TestJobStatusSubscribe_NotFound(t *testing.T) {
db := newInMemoryDB(t)
north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/42/status/subscribe").
Expect(t).
Status(http.StatusNotFound).
Header("Content-Type", "application/json").
Assert(jsonpath.Equal(`$.errors[0].code`, "wfx.jobNotFound")).
End()
})
}
}

func TestJobStatusSubscribe_TerminalState(t *testing.T) {
db := newInMemoryDB(t)

wf := dau.DirectWorkflow()
_, _ = workflow.CreateWorkflow(context.Background(), db, wf)

tmpJob := model.Job{
ClientID: "foo",
Workflow: wf,
Status: &model.JobStatus{State: "ACTIVATED"},
}

job, err := db.CreateJob(context.Background(), &tmpJob)
require.NoError(t, err)

jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

north, south := createNorthAndSouth(t, db)
handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
apitest.New().
Handler(handler).
Get(jobPath).
Expect(t).
Status(http.StatusBadRequest).
Header("Content-Type", "application/json").
Assert(jsonpath.Equal(`$.errors[0].code`, "wfx.jobTerminalState")).
End()
})
}
}
16 changes: 16 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/middleware/responder/util"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -263,5 +265,19 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsIDStatusSubscribeHandler = northbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
eventChan, err := status.AddSubscriber(ctx, storage, params.ID)
if ftag.Get(err) == ftag.NotFound {
return util.ForceJSONResponse(http.StatusNotFound,
&model.ErrorResponse{Errors: []*model.Error{&JobNotFound}})
} else if ftag.Get(err) == ftag.InvalidArgument {
return util.ForceJSONResponse(http.StatusBadRequest,
&model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}})
}
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
16 changes: 16 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/middleware/responder/util"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -176,5 +178,19 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsIDStatusSubscribeHandler = southbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
eventChan, err := status.AddSubscriber(ctx, storage, params.ID)
if ftag.Get(err) == ftag.NotFound {
return util.ForceJSONResponse(http.StatusNotFound,
&model.ErrorResponse{Errors: []*model.Error{&JobNotFound}})
} else if ftag.Get(err) == ftag.InvalidArgument {
return util.ForceJSONResponse(http.StatusBadRequest,
&model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}})
}
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
7 changes: 5 additions & 2 deletions api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ func createNorthAndSouth(t *testing.T, db persistence.Storage) (http.Handler, ht
}

func persistJob(t *testing.T, db persistence.Storage) *model.Job {
wf, err := workflow.CreateWorkflow(context.Background(), db, dau.DirectWorkflow())
require.NoError(t, err)
wf := dau.DirectWorkflow()
if found, _ := workflow.GetWorkflow(context.Background(), db, wf.Name); found == nil {
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)
}

jobReq := model.JobRequest{
ClientID: "foo",
Expand Down
6 changes: 5 additions & 1 deletion cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -255,7 +256,10 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Create a context with a timeout to allow outstanding requests to complete
// shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections
status.ShutdownSubscribers()

// create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
timeout = k.Duration(gracefulTimeoutFlag)
Expand Down
2 changes: 2 additions & 0 deletions cmd/wfxctl/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/getstatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/gettags"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/query"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/subscribestatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatedefinition"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatestatus"
"github.com/spf13/cobra"
Expand All @@ -43,4 +44,5 @@ func init() {
Command.AddCommand(addtags.Command)
Command.AddCommand(deltags.Command)
Command.AddCommand(gettags.Command)
Command.AddCommand(subscribestatus.Command)
}
19 changes: 19 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <michael.adler@siemens.com>
*/

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
Loading

0 comments on commit b997d97

Please sign in to comment.