From b80998e4559cbbbbc304f8e3a1773b2caaa59c7b Mon Sep 17 00:00:00 2001 From: Ryan Cartwright <39504851+HomelessDinosaur@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:32:09 +1000 Subject: [PATCH] feat: add optional middleware to workers (#33) * add optional middleware to handlers * move route middleware assignment to route definition --- lib/src/api/bucket.dart | 8 ++- lib/src/context/blobevent.dart | 43 ++++++++++-- lib/src/context/http.dart | 15 ++++- lib/src/context/interval.dart | 22 +++++-- lib/src/context/message.dart | 28 ++++++-- lib/src/context/middleware.dart | 14 ++++ lib/src/context/websocket.dart | 32 ++++++--- lib/src/resources/api.dart | 110 +++++++++++++++++++++---------- lib/src/resources/bucket.dart | 8 ++- lib/src/resources/schedule.dart | 20 ++++-- lib/src/resources/topic.dart | 8 ++- lib/src/resources/websocket.dart | 24 +++++-- 12 files changed, 251 insertions(+), 81 deletions(-) diff --git a/lib/src/api/bucket.dart b/lib/src/api/bucket.dart index 97d450d..2e456ee 100644 --- a/lib/src/api/bucket.dart +++ b/lib/src/api/bucket.dart @@ -55,7 +55,8 @@ class Bucket { /// Create a blob event subscription triggered on the [blobEventType] filtered by files that match the [keyPrefixFilter]. Future on(BlobEventType blobEventType, String keyPrefixFilter, - FileEventHandler handler) async { + FileEventHandler handler, + {List middlewares = const []}) async { // Create the request to register the Storage listener with the membrane final eventType = switch (blobEventType) { BlobEventType.write => $p.BlobEventType.Created, @@ -68,7 +69,10 @@ class Bucket { blobEventType: eventType, ); - var worker = FileEventWorker(registrationRequest, handler, this, + final composedHandler = + composeMiddleware([...middlewares, handler], FileEventContext.fromCtx); + + var worker = FileEventWorker(registrationRequest, composedHandler, this, client: _storageListenerClient); await worker.start(); diff --git a/lib/src/context/blobevent.dart b/lib/src/context/blobevent.dart index a07c54b..177c298 100644 --- a/lib/src/context/blobevent.dart +++ b/lib/src/context/blobevent.dart @@ -5,12 +5,27 @@ enum BlobEventType { write, delete } /// The context of a Blob event request/response. class BlobEventContext extends TriggerContext { - BlobEventContext(super.id, super.req, super.resp); + late BlobEventHandler _nextHandler; + + BlobEventContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } /// Create a Blob Event context from a server message. BlobEventContext.fromRequest($bp.ServerMessage msg) - : this(msg.id, BlobEventRequest(msg.blobEventRequest.blobEvent.key), - BlobEventResponse()); + : this( + msg.id, + BlobEventRequest(msg.blobEventRequest.blobEvent.key), + BlobEventResponse(), + ); + + BlobEventContext.fromCtx(BlobEventContext ctx, BlobEventHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + Future next() async { + return await _nextHandler(this); + } /// Converts the context to a gRPC client response. $bp.ClientMessage toResponse() { @@ -21,14 +36,28 @@ class BlobEventContext /// The context of a Blob event request/response. class FileEventContext extends TriggerContext { - FileEventContext(super.id, super.req, super.resp); + late FileEventHandler _nextHandler; + + FileEventContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } /// Create a Blob Event context from a server message. FileEventContext.fromRequest($bp.ServerMessage msg, Bucket bucket) : this( - msg.id, - FileEventRequest(bucket.file(msg.blobEventRequest.blobEvent.key)), - BlobEventResponse()); + msg.id, + FileEventRequest(bucket.file(msg.blobEventRequest.blobEvent.key)), + BlobEventResponse(), + ); + + /// Call the next middleware in the middleware chain + FileEventContext.fromCtx(FileEventContext ctx, FileEventHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + Future next() async { + return await _nextHandler(this); + } /// Converts the context to a gRPC client response. $bp.ClientMessage toResponse() { diff --git a/lib/src/context/http.dart b/lib/src/context/http.dart index 29fa3e9..85d1d67 100644 --- a/lib/src/context/http.dart +++ b/lib/src/context/http.dart @@ -2,7 +2,12 @@ part of './common.dart'; /// The context of a HTTP request/response triggered by a request to an API. class HttpContext extends TriggerContext { - HttpContext(super.id, super.req, super.resp); + late HttpHandler _nextHandler; + + HttpContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } /// Create a HTTP context from a server message. HttpContext.fromRequest($ap.ServerMessage msg) @@ -21,6 +26,14 @@ class HttpContext extends TriggerContext { HttpResponse(), ); + HttpContext.fromCtx(HttpContext ctx, HttpHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + /// Call the next middleware in the middleware chain + Future next() async { + return await _nextHandler(this); + } + /// Converts the context to a gRPC client response. $ap.ClientMessage toResponse() { return $ap.ClientMessage(id: id, httpResponse: res.toWire()); diff --git a/lib/src/context/interval.dart b/lib/src/context/interval.dart index a35623d..7e096b0 100644 --- a/lib/src/context/interval.dart +++ b/lib/src/context/interval.dart @@ -3,14 +3,28 @@ part of './common.dart'; /// The context for a scheduled interval request/response. class IntervalContext extends TriggerContext { - IntervalContext(super.id, super.req, super.resp); + late IntervalHandler _nextHandler; + + IntervalContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } /// Create an Interval context from a server message. IntervalContext.fromRequest($sp.ServerMessage msg) : this( - msg.id, - IntervalRequest(scheduleName: msg.intervalRequest.scheduleName), - IntervalResponse()); + msg.id, + IntervalRequest(scheduleName: msg.intervalRequest.scheduleName), + IntervalResponse(), + ); + + IntervalContext.fromCtx(IntervalContext ctx, IntervalHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + /// Call the next middleware in the middleware chain + Future next() async { + return await _nextHandler(this); + } /// Converts the context to a gRPC client response. $sp.ClientMessage toResponse() { diff --git a/lib/src/context/message.dart b/lib/src/context/message.dart index c5abd59..abf6878 100644 --- a/lib/src/context/message.dart +++ b/lib/src/context/message.dart @@ -2,19 +2,33 @@ part of './common.dart'; /// The context for a topic message for a subscription. class MessageContext extends TriggerContext { - MessageContext(super.id, super.req, super.resp); + late MessageHandler _nextHandler; + + MessageContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } /// Create an Event context from a server message. factory MessageContext.fromRequest($ep.ServerMessage msg) { var payload = Proto.mapFromStruct(msg.messageRequest.message.structPayload); return MessageContext( - msg.id, - MessageRequest( - msg.messageRequest.topicName, - payload, - ), - MessageResponse()); + msg.id, + MessageRequest( + msg.messageRequest.topicName, + payload, + ), + MessageResponse(), + ); + } + + MessageContext.fromCtx(MessageContext ctx, MessageHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + /// Call the next middleware in the middleware chain + Future next() async { + return await _nextHandler(this); } /// Converts the context to a gRPC client response. diff --git a/lib/src/context/middleware.dart b/lib/src/context/middleware.dart index 7f20001..4265f7b 100644 --- a/lib/src/context/middleware.dart +++ b/lib/src/context/middleware.dart @@ -8,3 +8,17 @@ typedef MessageHandler = Handler; typedef BlobEventHandler = Handler; typedef FileEventHandler = Handler; typedef WebsocketHandler = Handler; + +Future _defaultHandler(T ctx) async => ctx; + +Handler composeMiddleware( + List> handlers, + T Function(T ctx, Handler next) converter) => + (T ctx) async { + final Handler composedHandler = handlers.reversed + .fold((ctx) async => ctx, (nextHandler, currHandler) { + return (ctx) => currHandler(converter(ctx, nextHandler)); + }); + + return await composedHandler(ctx); + }; diff --git a/lib/src/context/websocket.dart b/lib/src/context/websocket.dart index 2c47dd8..7942ac2 100644 --- a/lib/src/context/websocket.dart +++ b/lib/src/context/websocket.dart @@ -5,7 +5,12 @@ enum WebsocketEvent { connect, disconnect, message } /// Base context for a websocket based request/response. class WebsocketContext extends TriggerContext { - WebsocketContext(super.id, super.req, super.resp); + late WebsocketHandler _nextHandler; + + WebsocketContext(super.id, super.req, super.resp, + {next = _defaultHandler}) { + _nextHandler = next; + } factory WebsocketContext.fromRequest($wp.ServerMessage msg) { var eventType = WebsocketEvent.connect; @@ -27,14 +32,23 @@ class WebsocketContext } return WebsocketContext( - msg.id, - WebsocketRequest( - msg.websocketEventRequest.socketName, - msg.websocketEventRequest.connectionId, - eventType, - queryParams, - message), - WebsocketResponse()); + msg.id, + WebsocketRequest( + msg.websocketEventRequest.socketName, + msg.websocketEventRequest.connectionId, + eventType, + queryParams, + message), + WebsocketResponse(), + ); + } + + WebsocketContext.fromCtx(WebsocketContext ctx, WebsocketHandler next) + : this(ctx.id, ctx.req, ctx.res, next: next); + + /// Call the next middleware in the middleware chain + Future next() async { + return await _nextHandler(this); } $wp.ClientMessage toResponse() { diff --git a/lib/src/resources/api.dart b/lib/src/resources/api.dart index 8447f21..d49ad56 100644 --- a/lib/src/resources/api.dart +++ b/lib/src/resources/api.dart @@ -12,8 +12,12 @@ enum HttpMethod { class ApiOptions { List security; String basePath; + List middlewares; - ApiOptions({this.security = const [], this.basePath = ""}); + ApiOptions( + {this.security = const [], + this.basePath = "", + this.middlewares = const []}); } /// An API resource. @@ -51,64 +55,65 @@ class Api extends Resource { /// A GET request [handler] that [match]es a specific route. Future get(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .get(handler); } /// A POST request [handler] that [match]es a specific route. Future post(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .post(handler); } /// A PUT request [handler] that [match]es a specific route. Future put(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .put(handler); } /// A PATCH request [handler] that [match]es a specific route. Future patch(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .patch(handler); } /// A DELETE request [handler] that [match]es a specific route. Future delete(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .delete(handler); } /// A OPTIONS request [handler] that [match]es a specific route. Future options(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .options(handler); } /// A request [handler] that [match]es a specific route on all HTTP methods. Future all(String match, HttpHandler handler, - {List? security}) async { - await Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient) + {List? security, + List middlewares = const []}) async { + await route(match, security: security, middlewares: middlewares) .all(handler); } - /// Create a route that [match]es on a specific path. - Route route(String match, {List? security}) { - return Route(this, opts.basePath + match, - security: security ?? opts.security, apiClient: _apiClient); + /// Create a route that [match]es on a specific path with optional [middlewares] and [security]. + Route route(String match, + {List? security, List middlewares = const []}) { + return Route(this, match, + security: security, apiClient: _apiClient, middlewares: middlewares); } } @@ -125,14 +130,25 @@ class Route { ApiClient? _apiClient; - Route(this.api, this.match, - {this.security = const [], ApiClient? apiClient}) { + late List _middlewares; + + Route(this.api, match, + {security, + ApiClient? apiClient, + List middlewares = const []}) + : match = api.opts.basePath + match, + security = security ?? api.opts.security { _apiClient = apiClient; + _middlewares = middlewares; } /// A GET request [handler] for this route. Future get(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.get], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.get], security: security, client: _apiClient); await worker.start(); @@ -140,7 +156,11 @@ class Route { /// A POST request [handler] for this route. Future post(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.post], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.post], security: security, client: _apiClient); await worker.start(); @@ -148,7 +168,11 @@ class Route { /// A PUT request [handler] for this route. Future put(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.put], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.put], security: security, client: _apiClient); await worker.start(); @@ -156,7 +180,11 @@ class Route { /// A PATCH request [handler] for this route. Future patch(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.patch], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.patch], security: security, client: _apiClient); await worker.start(); @@ -164,7 +192,11 @@ class Route { /// A DELETE request [handler] for this route. Future delete(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.delete], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.delete], security: security, client: _apiClient); await worker.start(); @@ -172,7 +204,11 @@ class Route { /// An OPTIONS request [handler] for this route. Future options(HttpHandler handler) async { - var worker = ApiWorker(this, handler, [HttpMethod.options], + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, [HttpMethod.options], security: security, client: _apiClient); await worker.start(); @@ -180,7 +216,11 @@ class Route { /// A request [handler] for this route that matches all HTTP methods. Future all(HttpHandler handler) async { - var worker = ApiWorker(this, handler, HttpMethod.values, + final composedHandler = composeMiddleware( + [...api.opts.middlewares, ..._middlewares, handler], + HttpContext.fromCtx); + + var worker = ApiWorker(this, composedHandler, HttpMethod.values, security: security, client: _apiClient); await worker.start(); diff --git a/lib/src/resources/bucket.dart b/lib/src/resources/bucket.dart index f14c269..ae04a2c 100644 --- a/lib/src/resources/bucket.dart +++ b/lib/src/resources/bucket.dart @@ -51,7 +51,8 @@ class BucketResource extends SecureResource { /// Create a blob event subscription triggered on the [blobEventType] filtered by files that match the [keyPrefixFilter]. Future on(BlobEventType blobEventType, String keyPrefixFilter, - BlobEventHandler handler) async { + BlobEventHandler handler, + {List middlewares = const []}) async { // Create the request to register the Storage listener with the membrane final eventType = switch (blobEventType) { BlobEventType.write => $bp.BlobEventType.Created, @@ -64,7 +65,10 @@ class BucketResource extends SecureResource { blobEventType: eventType, ); - var worker = BlobEventWorker(registrationRequest, handler, + final composedHandler = + composeMiddleware([...middlewares, handler], BlobEventContext.fromCtx); + + var worker = BlobEventWorker(registrationRequest, composedHandler, client: _storageListenerClient); await worker.start(); diff --git a/lib/src/resources/schedule.dart b/lib/src/resources/schedule.dart index f722614..aab4d3e 100644 --- a/lib/src/resources/schedule.dart +++ b/lib/src/resources/schedule.dart @@ -23,25 +23,33 @@ class Schedule extends Resource { return $p.ResourceDeclareRequest(id: res); } - /// Run [middleware] at a certain interval defined by the [rate]. E.g. '7 days', '3 hours', '30 minutes'. - Future every(String rate, IntervalHandler middleware) async { + /// Run the [handler] at a certain interval defined by the [rate]. E.g. '7 days', '3 hours', '30 minutes'. + Future every(String rate, IntervalHandler handler, + {List middlewares = const []}) async { var registrationRequest = $sp.RegistrationRequest( scheduleName: name, every: $s.ScheduleEvery(rate: rate)); - var worker = IntervalWorker(registrationRequest, middleware, + final composedHandler = + composeMiddleware([...middlewares, handler], IntervalContext.fromCtx); + + var worker = IntervalWorker(registrationRequest, composedHandler, client: _schedulesClient); await worker.start(); } - /// Run [middleware] at a certain interval defined by the [cronExpression]. - Future cron(String cronExpression, IntervalHandler middleware) async { + /// Run the [handler] at a certain interval defined by the [cronExpression]. + Future cron(String cronExpression, IntervalHandler handler, + {List middlewares = const []}) async { var registrationRequest = $sp.RegistrationRequest( scheduleName: name, cron: $sp.ScheduleCron(expression: cronExpression), ); - var worker = IntervalWorker(registrationRequest, middleware, + final composedHandler = + composeMiddleware([...middlewares, handler], IntervalContext.fromCtx); + + var worker = IntervalWorker(registrationRequest, composedHandler, client: _schedulesClient); await worker.start(); diff --git a/lib/src/resources/topic.dart b/lib/src/resources/topic.dart index 816e9d2..274a55a 100644 --- a/lib/src/resources/topic.dart +++ b/lib/src/resources/topic.dart @@ -19,10 +19,14 @@ class Topic extends SecureResource { } /// Register a [handler] to subscribe to messages sent to the topic. - Future subscribe(MessageHandler middleware) async { + Future subscribe(MessageHandler handler, + {List middlewares = const []}) async { var registrationRequest = $tp.RegistrationRequest(topicName: name); - var worker = SubscriptionWorker(registrationRequest, middleware, + final composedHandler = + composeMiddleware([...middlewares, handler], MessageContext.fromCtx); + + var worker = SubscriptionWorker(registrationRequest, composedHandler, client: _subscriberClient); await worker.start(); diff --git a/lib/src/resources/websocket.dart b/lib/src/resources/websocket.dart index fe85e4a..2767cd7 100644 --- a/lib/src/resources/websocket.dart +++ b/lib/src/resources/websocket.dart @@ -49,33 +49,45 @@ class Websocket extends Resource { } /// Set a [handler] for connection requests to the socket. - Future onConnect(WebsocketHandler handler) async { + Future onConnect(WebsocketHandler handler, + {List middlewares = const []}) async { var registrationRequest = $wp.RegistrationRequest( eventType: $wp.WebsocketEventType.Connect, socketName: name); - var worker = WebsocketWorker(registrationRequest, handler, + final composedHandler = + composeMiddleware([...middlewares, handler], WebsocketContext.fromCtx); + + var worker = WebsocketWorker(registrationRequest, composedHandler, client: _websocketHandlerClient); await worker.start(); } /// Set a [handler] for disconnection requests to the socket. - Future onDisconnect(WebsocketHandler handler) async { + Future onDisconnect(WebsocketHandler handler, + {List middlewares = const []}) async { var registrationRequest = $wp.RegistrationRequest( eventType: $wp.WebsocketEventType.Disconnect, socketName: name); - var worker = WebsocketWorker(registrationRequest, handler, + final composedHandler = + composeMiddleware([...middlewares, handler], WebsocketContext.fromCtx); + + var worker = WebsocketWorker(registrationRequest, composedHandler, client: _websocketHandlerClient); await worker.start(); } /// Set a [handler] for messages to the socket. - Future onMessage(WebsocketHandler handler) async { + Future onMessage(WebsocketHandler handler, + {List middlewares = const []}) async { var registrationRequest = $wp.RegistrationRequest( eventType: $wp.WebsocketEventType.Message, socketName: name); - var worker = WebsocketWorker(registrationRequest, handler, + final composedHandler = + composeMiddleware([...middlewares, handler], WebsocketContext.fromCtx); + + var worker = WebsocketWorker(registrationRequest, composedHandler, client: _websocketHandlerClient); await worker.start();