-
Notifications
You must be signed in to change notification settings - Fork 4
/
debug.ts
55 lines (51 loc) · 2.35 KB
/
debug.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { randomBytes } from 'crypto';
import LiftbridgeStream, { StartPosition } from './stream';
import LiftbridgeMessage, { AckPolicy } from './message';
import LiftbridgeClient from './index';
import { ErrorCodes } from './errors';
if (!module.parent) {
const subject = 'test7';
const streamName = 'test-stream-gp-7';
function msg() {
const key = 'KEY-' + randomBytes(10).toString('hex');
return new LiftbridgeMessage({ subject, key, value: `VALUE-ok-${key}`, ackPolicy: AckPolicy.ALL, partitionStrategy: 'key' });
}
const lbClient = new LiftbridgeClient(['localhost:9292']);
const stream = new LiftbridgeStream({ subject, name: streamName, partitions: 1 });
lbClient.connect().then((client) => {
console.log('connected to -> ', client.getChannel().getTarget());
lbClient.createStream(stream).then(response => {
console.log('response for create stream = ', response.toObject());
}).catch(err => {
if (err.code !== ErrorCodes.ERR_PARTITION_ALREADY_EXISTS) {
throw err;
}
}).finally(async () => {
console.log('going to publish', msg().toObject());
const pubres1 = await lbClient.publish(msg());
console.log('publish result 1 = ', pubres1.toObject());
const pubres2 = await lbClient.publish(msg());
console.log('publish result 2 = ', pubres2.toObject());
const pubres3 = await lbClient.publish(msg());
console.log('publish result 3 = ', pubres3.toObject());
await lbClient.publish(msg());
console.log('going to subscribe');
const sub = lbClient.subscribe(new LiftbridgeStream({ subject, name: streamName, startPosition: StartPosition.EARLIEST }));
sub.on('status', (data) => {
console.log('subscribe on status = ', data);
});
sub.on('data', (data) => {
console.log('subscribe on data = ', data.toObject());
});
sub.on('error', err => {
console.error('subscribe on error! ', err);
});
sub.on('close', () => {
console.log('subscribe on close!');
});
await lbClient.publish(msg());
await lbClient.publish(msg());
await lbClient.publish(msg());
});
});
}