Skip to content

Commit

Permalink
feat(js-sdk): Impl PubSub API
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Oct 30, 2021
1 parent 811f6e5 commit 277fc92
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 14 deletions.
8 changes: 8 additions & 0 deletions configs/config_integration_redis_etcd.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@
}
}
},
"pub_subs": {
"redis": {
"metadata": {
"redisHost": "localhost:6380",
"redisPassword": ""
}
}
},
"app": {
"app_id": "app1",
"grpc_callback_port": 9999
Expand Down
17 changes: 17 additions & 0 deletions sdk/js-sdk/demo/pubsub/client.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { strict as assert } from 'assert';
import { Client } from 'layotto';

const client = new Client();
assert(client);

async function main() {
const pubsubName = 'redis';
const topic = 'topic1';
const value = `bar, from js-sdk, ${Date()}`;

await client.pubsub.publish({
pubsubName, topic, data: { value },
});
}

main();
31 changes: 31 additions & 0 deletions sdk/js-sdk/demo/pubsub/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 Layotto Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { strict as assert } from 'assert'
import { Client } from 'layotto';

const client = new Client();
assert(client);

async function main() {
const pubsubName = 'redis';
const topic = 'topic1';
const value = `bar, from js-sdk, ${Date()}`;

await client.pubsub.publish({
pubsubName, topic, data: { value },
});
}

main();
12 changes: 12 additions & 0 deletions sdk/js-sdk/demo/pubsub/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Server } from 'layotto';

async function main() {
const server = new Server();
server.pubsub.subscribe('redis', 'topic1', async (data) => {
console.log('topic1 event data: %j', data);
});

await server.start();
}

main();
2 changes: 2 additions & 0 deletions sdk/js-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"homepage": "https://github.com/mosn/layotto#readme",
"devDependencies": {
"@eggjs/tsconfig": "^1.0.0",
"@types/google-protobuf": "^3.15.5",
"@types/jest": "^27.0.2",
"egg-ci": "^1.19.0",
"grpc-tools": "^1.11.2",
Expand All @@ -46,6 +47,7 @@
"ts-jest": "^27.0.7",
"ts-node": "^10.3.0",
"tslint": "^6.1.3",
"tslint-config-prettier": "^1.18.0",
"typescript": "^4.4.4"
},
"dependencies": {
Expand Down
3 changes: 2 additions & 1 deletion sdk/js-sdk/src/client/API.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class API {

createMetadata(request: RequestWithMeta): Metadata {
const metadata = new Metadata();
for (const key in request.requestMeta) {
if (!request.requestMeta) return metadata;
for (const key of Object.keys(request.requestMeta)) {
metadata.add(key, request.requestMeta[key]);
}
return metadata;
Expand Down
9 changes: 8 additions & 1 deletion sdk/js-sdk/src/client/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import Invoker from './Invoker';
import Lock from './Lock';
import Sequencer from './Sequencer';
import Configuration from './Configuration';
import PubSub from './PubSub';

const debug = debuglog('layotto:client');
const debug = debuglog('layotto:client:main');

export default class Client {
readonly host: string;
Expand All @@ -34,6 +35,7 @@ export default class Client {
private _lock: Lock;
private _sequencer: Sequencer;
private _configuration: Configuration;
private _pubsub: PubSub;

constructor(port: string = process.env.runtime_GRPC_PORT || '34904',
host: string = process.env.runtime_GRPC_HOST || '127.0.0.1') {
Expand Down Expand Up @@ -73,4 +75,9 @@ export default class Client {
if (!this._configuration) this._configuration = new Configuration(this._runtime);
return this._configuration;
}

get pubsub() {
if (!this._pubsub) this._pubsub = new PubSub(this._runtime);
return this._pubsub;
}
}
2 changes: 1 addition & 1 deletion sdk/js-sdk/src/client/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from '../types/Configuration';
import { convertArrayToKVString } from '../types/common';

const debug = debuglog('layotto:configuration');
const debug = debuglog('layotto:client:configuration');

export default class Configuration extends API {
// GetConfiguration gets configuration from configuration store.
Expand Down
22 changes: 16 additions & 6 deletions sdk/js-sdk/src/client/Invoker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,24 @@ export default class Invoker extends API {
this.runtime.invokeService(req, this.createMetadata(request), (err, res: InvokeResponsePB) => {
if (err) return reject(err);
const contentType = res.getContentType().split(';', 1)[0].toLowerCase();
const content = res.getData() as Buffer;
const response: InvokeResponse = { contentType, content };
const rawData = res.getData();
let content;
if (contentType === 'application/json') {
response.content = JSON.parse(content.toString());
}
if (contentType === 'text/plain') {
response.content = content.toString();
if (rawData) {
content = JSON.parse(Buffer.from(rawData.getValue_asU8()).toString());
} else {
content = {};
}
} else if (contentType === 'text/plain') {
if (rawData) {
content = Buffer.from(rawData.getValue_asU8()).toString();
} else {
content = '';
}
} else {
content = rawData ? rawData.getValue_asU8() : [];
}
const response: InvokeResponse = { contentType, content };
resolve(response);
});
});
Expand Down
39 changes: 39 additions & 0 deletions sdk/js-sdk/src/client/PubSub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Layotto Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
PublishEventRequest as PublishEventRequestPB,
} from '../../proto/runtime_pb';
import { API } from './API';
import { PublishEventRequest } from '../types/PubSub';

export default class PubSub extends API {
async publish(request: PublishEventRequest): Promise<void> {
const req = new PublishEventRequestPB();
req.setPubsubName(request.pubsubName);
req.setTopic(request.topic);
// https://mosn.io/layotto/#/zh/design/pubsub/pubsub-api-and-compability-with-dapr-component
// PublishRequest.Data 和 NewMessage.Data 里面放符合 CloudEvent 1.0 规范的 json 数据(能反序列化放进 map[string]interface{})
req.setData(Buffer.from(JSON.stringify(request.data)));
req.setDataContentType('application/json');
// FIXME: metadata

return new Promise((resolve, reject) => {
this.runtime.publishEvent(req, this.createMetadata(request), (err) => {
if (err) return reject(err);
resolve();
});
});
}
}
2 changes: 2 additions & 0 deletions sdk/js-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
* limitations under the License.
*/
import Client from './client/Client';
import Server from './server/Server';
import * as utils from './utils';
import * as RumtimeTypes from '../proto/runtime_pb';

export {
Client,
Server,
utils,
RumtimeTypes,
}
86 changes: 86 additions & 0 deletions sdk/js-sdk/src/server/GRPCServerImpl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { debuglog } from 'node:util';
import * as grpc from '@grpc/grpc-js';
import { Empty } from 'google-protobuf/google/protobuf/empty_pb';
import { IAppCallbackServer } from '../../proto/appcallback_grpc_pb';
import {
ListTopicSubscriptionsResponse,
TopicSubscription,
TopicEventRequest,
TopicEventResponse,
} from '../../proto/appcallback_pb';
import { PubSubCallback } from '../types/PubSub';

const debug = debuglog('layotto:server:grpc');

// @ts-ignore
export default class GRPCServerImpl implements IAppCallbackServer {
private readonly _handlersTopics: { [key: string]: PubSubCallback };
constructor() {
this._handlersTopics = {};
}

private createPubSubHandlerKey(pubsubName: string, topic: string): string {
return `${pubsubName}|${topic}`.toLowerCase();
}

registerPubSubSubscriptionHandler(pubsubName: string, topic: string, callback: PubSubCallback): void {
const handlerKey = this.createPubSubHandlerKey(pubsubName, topic);
if (this._handlersTopics[handlerKey]) {
throw new Error(`Topic: "${handlerKey}" handler was exists`);
}
this._handlersTopics[handlerKey] = callback;
debug('PubSub Event from topic: "%s" is registered', handlerKey);
}

async onTopicEvent(call: grpc.ServerUnaryCall<TopicEventRequest, TopicEventResponse>,
callback: grpc.sendUnaryData<TopicEventResponse>): Promise<void> {
const req = call.request;
const res = new TopicEventResponse();
const handlerKey = this.createPubSubHandlerKey(req.getPubsubName(), req.getTopic());

const handler = this._handlersTopics[handlerKey];
if (!handler) {
debug('PubSub Event from topic: "%s" was not handled, drop now', handlerKey);
// FIXME: should retry?
res.setStatus(TopicEventResponse.TopicEventResponseStatus.DROP);
return callback(null, res);
}

// https://mosn.io/layotto/#/zh/design/pubsub/pubsub-api-and-compability-with-dapr-component
// PublishRequest.Data 和 NewMessage.Data 里面放符合 CloudEvent 1.0 规范的 json 数据(能反序列化放进 map[string]interface{})
const rawData = Buffer.from(req.getData_asU8()).toString();
debug('PubSub Event from topic: "%s" raw data: %j, typeof %s', handlerKey, rawData, typeof rawData);
let data;
try {
data = JSON.parse(rawData);
} catch {
data = rawData;
}

try {
await handler(data);
res.setStatus(TopicEventResponse.TopicEventResponseStatus.SUCCESS);
} catch (e) {
// FIXME: should retry?
debug('PubSub Event from topic: "%s" handler throw error: %s, drop now', handlerKey, e);
res.setStatus(TopicEventResponse.TopicEventResponseStatus.DROP);
}

callback(null, res);
}

async listTopicSubscriptions(_call: grpc.ServerUnaryCall<Empty, ListTopicSubscriptionsResponse>,
callback: grpc.sendUnaryData<ListTopicSubscriptionsResponse>): Promise<void> {
const res = new ListTopicSubscriptionsResponse();
const subscriptionsList = Object.keys(this._handlersTopics).map(key => {
const splits = key.split('|');
const sub = new TopicSubscription();
sub.setPubsubName(splits[0]);
sub.setTopic(splits[1]);
return sub;
});
debug('listTopicSubscriptions call: %j', subscriptionsList);
res.setSubscriptionsList(subscriptionsList);
callback(null, res);
}
}
18 changes: 18 additions & 0 deletions sdk/js-sdk/src/server/PubSub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { debuglog } from 'node:util';
import { PubSubCallback } from '../types/PubSub';
import GRPCServerImpl from './GRPCServerImpl';

const debug = debuglog('layotto:server:pubsub');

export default class PubSub {
readonly server: GRPCServerImpl;

constructor(server: GRPCServerImpl) {
this.server = server;
}

async subscribe(pubsubName: string, topic: string, cb: PubSubCallback): Promise<void> {
debug('Registering onTopicEvent Handler: PubSub = %s, Topic = %s', pubsubName, topic);
this.server.registerPubSubSubscriptionHandler(pubsubName, topic, cb);
}
}
Loading

0 comments on commit 277fc92

Please sign in to comment.