Skip to content

Commit

Permalink
feat: add optional middleware to workers (#33)
Browse files Browse the repository at this point in the history
* add optional middleware to handlers

* move route middleware assignment to route definition
  • Loading branch information
HomelessDinosaur authored Aug 14, 2024
1 parent 414a388 commit b80998e
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 81 deletions.
8 changes: 6 additions & 2 deletions lib/src/api/bucket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class Bucket {

/// Create a blob event subscription triggered on the [blobEventType] filtered by files that match the [keyPrefixFilter].
Future<void> on(BlobEventType blobEventType, String keyPrefixFilter,
FileEventHandler handler) async {
FileEventHandler handler,
{List<FileEventHandler> middlewares = const []}) async {
// Create the request to register the Storage listener with the membrane
final eventType = switch (blobEventType) {
BlobEventType.write => $p.BlobEventType.Created,
Expand All @@ -68,7 +69,10 @@ class Bucket {
blobEventType: eventType,
);

var worker = FileEventWorker(registrationRequest, handler, this,
final composedHandler =
composeMiddleware([...middlewares, handler], FileEventContext.fromCtx);

var worker = FileEventWorker(registrationRequest, composedHandler, this,
client: _storageListenerClient);

await worker.start();
Expand Down
43 changes: 36 additions & 7 deletions lib/src/context/blobevent.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@ enum BlobEventType { write, delete }
/// The context of a Blob event request/response.
class BlobEventContext
extends TriggerContext<BlobEventRequest, BlobEventResponse> {
BlobEventContext(super.id, super.req, super.resp);
late BlobEventHandler _nextHandler;

BlobEventContext(super.id, super.req, super.resp,
{next = _defaultHandler<BlobEventContext>}) {
_nextHandler = next;
}

/// Create a Blob Event context from a server message.
BlobEventContext.fromRequest($bp.ServerMessage msg)
: this(msg.id, BlobEventRequest(msg.blobEventRequest.blobEvent.key),
BlobEventResponse());
: this(
msg.id,
BlobEventRequest(msg.blobEventRequest.blobEvent.key),
BlobEventResponse(),
);

BlobEventContext.fromCtx(BlobEventContext ctx, BlobEventHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

Future<BlobEventContext> next() async {
return await _nextHandler(this);
}

/// Converts the context to a gRPC client response.
$bp.ClientMessage toResponse() {
Expand All @@ -21,14 +36,28 @@ class BlobEventContext
/// The context of a Blob event request/response.
class FileEventContext
extends TriggerContext<FileEventRequest, BlobEventResponse> {
FileEventContext(super.id, super.req, super.resp);
late FileEventHandler _nextHandler;

FileEventContext(super.id, super.req, super.resp,
{next = _defaultHandler<FileEventContext>}) {
_nextHandler = next;
}

/// Create a Blob Event context from a server message.
FileEventContext.fromRequest($bp.ServerMessage msg, Bucket bucket)
: this(
msg.id,
FileEventRequest(bucket.file(msg.blobEventRequest.blobEvent.key)),
BlobEventResponse());
msg.id,
FileEventRequest(bucket.file(msg.blobEventRequest.blobEvent.key)),
BlobEventResponse(),
);

/// Call the next middleware in the middleware chain
FileEventContext.fromCtx(FileEventContext ctx, FileEventHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

Future<FileEventContext> next() async {
return await _nextHandler(this);
}

/// Converts the context to a gRPC client response.
$bp.ClientMessage toResponse() {
Expand Down
15 changes: 14 additions & 1 deletion lib/src/context/http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ part of './common.dart';

/// The context of a HTTP request/response triggered by a request to an API.
class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
HttpContext(super.id, super.req, super.resp);
late HttpHandler _nextHandler;

HttpContext(super.id, super.req, super.resp,
{next = _defaultHandler<HttpContext>}) {
_nextHandler = next;
}

/// Create a HTTP context from a server message.
HttpContext.fromRequest($ap.ServerMessage msg)
Expand All @@ -21,6 +26,14 @@ class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
HttpResponse(),
);

HttpContext.fromCtx(HttpContext ctx, HttpHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

/// Call the next middleware in the middleware chain
Future<HttpContext> next() async {
return await _nextHandler(this);
}

/// Converts the context to a gRPC client response.
$ap.ClientMessage toResponse() {
return $ap.ClientMessage(id: id, httpResponse: res.toWire());
Expand Down
22 changes: 18 additions & 4 deletions lib/src/context/interval.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,28 @@ part of './common.dart';
/// The context for a scheduled interval request/response.
class IntervalContext
extends TriggerContext<IntervalRequest, IntervalResponse> {
IntervalContext(super.id, super.req, super.resp);
late IntervalHandler _nextHandler;

IntervalContext(super.id, super.req, super.resp,
{next = _defaultHandler<IntervalContext>}) {
_nextHandler = next;
}

/// Create an Interval context from a server message.
IntervalContext.fromRequest($sp.ServerMessage msg)
: this(
msg.id,
IntervalRequest(scheduleName: msg.intervalRequest.scheduleName),
IntervalResponse());
msg.id,
IntervalRequest(scheduleName: msg.intervalRequest.scheduleName),
IntervalResponse(),
);

IntervalContext.fromCtx(IntervalContext ctx, IntervalHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

/// Call the next middleware in the middleware chain
Future<IntervalContext> next() async {
return await _nextHandler(this);
}

/// Converts the context to a gRPC client response.
$sp.ClientMessage toResponse() {
Expand Down
28 changes: 21 additions & 7 deletions lib/src/context/message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,33 @@ part of './common.dart';

/// The context for a topic message for a subscription.
class MessageContext extends TriggerContext<MessageRequest, MessageResponse> {
MessageContext(super.id, super.req, super.resp);
late MessageHandler _nextHandler;

MessageContext(super.id, super.req, super.resp,
{next = _defaultHandler<MessageContext>}) {
_nextHandler = next;
}

/// Create an Event context from a server message.
factory MessageContext.fromRequest($ep.ServerMessage msg) {
var payload = Proto.mapFromStruct(msg.messageRequest.message.structPayload);

return MessageContext(
msg.id,
MessageRequest(
msg.messageRequest.topicName,
payload,
),
MessageResponse());
msg.id,
MessageRequest(
msg.messageRequest.topicName,
payload,
),
MessageResponse(),
);
}

MessageContext.fromCtx(MessageContext ctx, MessageHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

/// Call the next middleware in the middleware chain
Future<MessageContext> next() async {
return await _nextHandler(this);
}

/// Converts the context to a gRPC client response.
Expand Down
14 changes: 14 additions & 0 deletions lib/src/context/middleware.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,17 @@ typedef MessageHandler = Handler<MessageContext>;
typedef BlobEventHandler = Handler<BlobEventContext>;
typedef FileEventHandler = Handler<FileEventContext>;
typedef WebsocketHandler = Handler<WebsocketContext>;

Future<T> _defaultHandler<T extends TriggerContext>(T ctx) async => ctx;

Handler<T> composeMiddleware<T extends TriggerContext>(
List<Handler<T>> handlers,
T Function(T ctx, Handler<T> next) converter) =>
(T ctx) async {
final Handler<T> composedHandler = handlers.reversed
.fold((ctx) async => ctx, (nextHandler, currHandler) {
return (ctx) => currHandler(converter(ctx, nextHandler));
});

return await composedHandler(ctx);
};
32 changes: 23 additions & 9 deletions lib/src/context/websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ enum WebsocketEvent { connect, disconnect, message }
/// Base context for a websocket based request/response.
class WebsocketContext
extends TriggerContext<WebsocketRequest, WebsocketResponse> {
WebsocketContext(super.id, super.req, super.resp);
late WebsocketHandler _nextHandler;

WebsocketContext(super.id, super.req, super.resp,
{next = _defaultHandler<WebsocketContext>}) {
_nextHandler = next;
}

factory WebsocketContext.fromRequest($wp.ServerMessage msg) {
var eventType = WebsocketEvent.connect;
Expand All @@ -27,14 +32,23 @@ class WebsocketContext
}

return WebsocketContext(
msg.id,
WebsocketRequest(
msg.websocketEventRequest.socketName,
msg.websocketEventRequest.connectionId,
eventType,
queryParams,
message),
WebsocketResponse());
msg.id,
WebsocketRequest(
msg.websocketEventRequest.socketName,
msg.websocketEventRequest.connectionId,
eventType,
queryParams,
message),
WebsocketResponse(),
);
}

WebsocketContext.fromCtx(WebsocketContext ctx, WebsocketHandler next)
: this(ctx.id, ctx.req, ctx.res, next: next);

/// Call the next middleware in the middleware chain
Future<WebsocketContext> next() async {
return await _nextHandler(this);
}

$wp.ClientMessage toResponse() {
Expand Down
Loading

0 comments on commit b80998e

Please sign in to comment.