Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Pause application webhooks #7247

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ For details about compatibility between different releases, see the **Commitment
- `ListGatewaysRequest` and `ListEndDevicesRequest` RPCs have a new `Filter` field that supports an `updated_since` timestamp.
- Preparation for universal rights assigned to users.
- This requires a database schema migration (`ttn-lw-stack is-db migrate`).
- Option to pause application webhooks.

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions api/ttn/lorawan/v3/api.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<a name="top"></a>

Check warning on line 1 in api/ttn/lorawan/v3/api.md

View workflow job for this annotation

GitHub Actions / Check Mergeability

api/ttn/lorawan/v3/api.md has a conflict when merging TheThingsIndustries/lorawan-stack:v3.32.

# API Documentation

Expand Down Expand Up @@ -2073,6 +2073,7 @@
| `service_data` | [`ApplicationWebhook.Message`](#ttn.lorawan.v3.ApplicationWebhook.Message) | | |
| `health_status` | [`ApplicationWebhookHealth`](#ttn.lorawan.v3.ApplicationWebhookHealth) | | |
| `field_mask` | [`google.protobuf.FieldMask`](#google.protobuf.FieldMask) | | |
| `paused` | [`bool`](#bool) | | Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point. |

#### Field Rules

Expand Down
8 changes: 8 additions & 0 deletions api/ttn/lorawan/v3/api.swagger.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{

Check warning on line 1 in api/ttn/lorawan/v3/api.swagger.json

View workflow job for this annotation

GitHub Actions / Check Mergeability

api/ttn/lorawan/v3/api.swagger.json has a conflict when merging TheThingsIndustries/lorawan-stack:v3.32.
"swagger": "2.0",
"info": {
"title": "The Things Stack for LoRaWAN v3 API",
Expand Down Expand Up @@ -20844,6 +20844,10 @@
},
"field_mask": {
"type": "string"
},
"paused": {
"type": "boolean",
"description": "Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point."
}
}
},
Expand Down Expand Up @@ -20983,6 +20987,10 @@
},
"field_mask": {
"type": "string"
},
"paused": {
"type": "boolean",
"description": "Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point."
}
}
},
Expand Down
5 changes: 4 additions & 1 deletion api/ttn/lorawan/v3/applicationserver_web.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ message ApplicationWebhook {
reserved 23;
reserved "queue";

// next: 24
// Set to temporarily pause forwarding uplink data to this end point and receiving downlinks from this end point.
bool paused = 24;

// next: 25
}

message ApplicationWebhooks {
Expand Down
14 changes: 13 additions & 1 deletion pkg/applicationserver/io/web/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var webhookFanOutFieldMask = []string{
"join_accept",
"location_solved",
"service_data",
"paused",
"uplink_message",
"uplink_normalized",
}
Expand Down Expand Up @@ -135,6 +136,12 @@ func (w *webhooks) handleUp(ctx context.Context, msg *ttnpb.ApplicationUp) error
Health: hook.HealthStatus,
})
ctx = log.NewContextWithField(ctx, "hook", hook.Ids.WebhookId)

if hook.Paused {
log.FromContext(ctx).Debug("Webhook is paused")
continue
}

f := func(ctx context.Context) error {
req, err := NewRequest(ctx, w.downlinks, msg, hook)
if err != nil {
Expand Down Expand Up @@ -185,7 +192,7 @@ func (w *webhooks) handleDown(
"webhook_id", hookID.WebhookId,
))

hook, err := w.registry.Get(ctx, hookID, []string{"format"})
hook, err := w.registry.Get(ctx, hookID, []string{"format", "paused"})
if err != nil {
webhandlers.Error(res, req, err)
return
Expand All @@ -194,6 +201,11 @@ func (w *webhooks) handleDown(
webhandlers.Error(res, req, errWebhookNotFound.New())
return
}
if hook.Paused {
logger.Debug("Webhook is paused")
res.WriteHeader(http.StatusNotAcceptable)
return
}
format, ok := formats[hook.Format]
if !ok {
webhandlers.Error(res, req, errFormatNotFound.WithAttributes("format", hook.Format))
Expand Down
201 changes: 201 additions & 0 deletions pkg/applicationserver/io/web/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestWebhooks(t *testing.T) {
ServiceData: &ttnpb.ApplicationWebhook_Message{
Path: tc.prefix + "service/data",
},
Paused: false,
FieldMask: ttnpb.FieldMask(
"correlation_ids",
"end_device_ids",
Expand All @@ -162,6 +163,7 @@ func TestWebhooks(t *testing.T) {
"up.downlink_sent",
"up.join_accept",
"up.location_solved",
"up.paused",
"up.service_data",
"up.uplink_message",
"up.uplink_normalized",
Expand All @@ -183,6 +185,7 @@ func TestWebhooks(t *testing.T) {
"join_accept",
"location_solved",
"service_data",
"paused",
"uplink_message",
"uplink_normalized",
}, nil
Expand Down Expand Up @@ -574,4 +577,202 @@ func TestWebhooks(t *testing.T) {
}
})
})

//nolint:paralleltest
t.Run("PausedUplink", func(t *testing.T) {
_, ctx := test.New(t)

// Create an active webhook.
_, err := registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: false,
UplinkMessage: &ttnpb.ApplicationWebhook_Message{Path: "/"},
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"uplink_message",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

sinkCh := make(chan *http.Request, 1)
testSink := mocksink.New(sinkCh)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := componenttest.NewComponent(t, &component.Config{})
componenttest.StartComponent(t, c)
defer c.Close()

as := mock.NewServer(c)
_, err = web.NewWebhooks(ctx, as, registry, testSink, downlinks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

message := &ttnpb.ApplicationUp{
EndDeviceIds: registeredDeviceID,
Up: &ttnpb.ApplicationUp_UplinkMessage{
UplinkMessage: &ttnpb.ApplicationUplink{
SessionKeyId: []byte{0x11},
FPort: 42,
FCnt: 42,
FrmPayload: []byte{0x1, 0x2, 0x3},
},
},
}

a := assertions.New(t)
err = as.Publish(ctx, message)
if !a.So(err, should.BeNil) {
t.FailNow()
}

var req *http.Request
select {
case req = <-sinkCh:
case <-time.After(timeout):
t.Fatal("Expected message but nothing received")
}

actualBody, err := stdio.ReadAll(req.Body)
if !a.So(err, should.BeNil) {
t.FailNow()
}
expectedBody, err := formatters.JSON.FromUp(message)
if !a.So(err, should.BeNil) {
t.FailNow()
}
a.So(actualBody, should.Resemble, expectedBody)

// Pause the webhook.
_, err = registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: true,
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

err = as.Publish(ctx, message)
if !a.So(err, should.BeNil) {
t.FailNow()
}

select {
case req = <-sinkCh:
t.Fatalf("Did not expect message but received: %v", req)
case <-time.After(timeout):
// Webhook was not received.
}
})
johanstokking marked this conversation as resolved.
Show resolved Hide resolved

//nolint:paralleltest
t.Run("PausedDownlink", func(t *testing.T) {
is, isAddr, closeIS := mockis.New(ctx)
defer closeIS()

is.ApplicationRegistry().Add(ctx, registeredApplicationID, registeredApplicationKey,
ttnpb.Right_RIGHT_APPLICATION_SETTINGS_BASIC,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_READ,
ttnpb.Right_RIGHT_APPLICATION_DEVICES_WRITE,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_READ,
ttnpb.Right_RIGHT_APPLICATION_TRAFFIC_DOWN_WRITE)
conf := &component.Config{
ServiceBase: config.ServiceBase{
GRPC: config.GRPC{
Listen: ":0",
AllowInsecureForCredentials: true,
},
Cluster: cluster.Config{
IdentityServer: isAddr,
},
},
}

_, err := registry.Set(ctx, ids, nil,
func(_ *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
return &ttnpb.ApplicationWebhook{
Ids: ids,
BaseUrl: "https://myapp.com/api/ttn/v3/{/appID,devID}",
Format: "json",
Paused: true,
UplinkMessage: &ttnpb.ApplicationWebhook_Message{Path: "/"},
},
[]string{
"ids.application_ids",
"ids.webhook_id",
"base_url",
"format",
"paused",
"uplink_message",
"field_mask",
}, nil
})
if err != nil {
t.Fatalf("Failed to set webhook in registry: %s", err)
}

sinkCh := make(chan *http.Request, 1)
testSink := mocksink.New(sinkCh)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := componenttest.NewComponent(t, conf)
as := mock.NewServer(c)
w, err := web.NewWebhooks(ctx, as, registry, testSink, downlinks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
c.RegisterWeb(w)
componenttest.StartComponent(t, c)
defer c.Close()

mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY)

// Check the status error code when scheduling downlink to a paused webhook.
url := fmt.Sprintf("/api/v3/as/applications/%s/webhooks/%s/devices/%s/down/replace",
ids.ApplicationIds.ApplicationId, ids.WebhookId, registeredDeviceID.DeviceId,
)
body := bytes.NewReader([]byte(`{"downlinks":[]}`))
req := httptest.NewRequest(http.MethodPost, url, body)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", registeredApplicationKey))

res := httptest.NewRecorder()
c.ServeHTTP(res, req)
a.So(res.Code, should.Equal, http.StatusNotAcceptable)
downlinks, err := as.DownlinkQueueList(ctx, registeredDeviceID)
if !a.So(err, should.BeNil) {
t.FailNow()
}

a.So(downlinks, should.Resemble, []*ttnpb.ApplicationDownlink{})
})
}
Loading
Loading