-
Notifications
You must be signed in to change notification settings - Fork 14
/
fifo-queue.sim.w
50 lines (43 loc) · 1.18 KB
/
fifo-queue.sim.w
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bring expect;
bring util;
bring cloud;
bring "./api.w" as api;
struct FifoQueueMessage {
groupId: str;
message: str;
}
pub class FifoQueue_sim impl api.IFifoQueue {
queue: cloud.Queue;
counter: cloud.Counter;
new(){
this.queue = new cloud.Queue();
this.counter = new cloud.Counter();
}
pub setConsumer(handler: inflight (str): void, options: api.SetConsumerOptions?) {
let counter = this.counter;
this.queue.setConsumer(inflight (event: str) => {
let message = FifoQueueMessage.parseJson(event);
util.waitUntil(inflight () => {
let value = counter.peek(message.groupId);
if value == 0 {
let acquired = counter.inc(1, message.groupId);
if acquired == 0 {
return true;
} else {
counter.dec(1, message.groupId);
return false;
}
}
return false;
}, timeout: 30m);
try {
handler(message.message);
} finally {
counter.dec(1, message.groupId);
}
});
}
inflight pub push(message: str, options: api.PushOptions) {
this.queue.push(Json.stringify({ groupId: options.groupId, message: message }));
}
}