Skip to content

Commit

Permalink
feat: support gRPC stream API (#855)
Browse files Browse the repository at this point in the history
  • Loading branch information
czy88840616 authored Feb 21, 2021
1 parent 8d90442 commit bd51c46
Show file tree
Hide file tree
Showing 19 changed files with 889 additions and 83 deletions.
28 changes: 24 additions & 4 deletions packages/decorator/src/microservice/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
saveModule,
MS_PROVIDER_KEY,
attachClassMetadata,
savePropertyMetadata,
MS_GRPC_METHOD_KEY,
MS_DUBBO_METHOD_KEY,
MS_HSF_METHOD_KEY,
Expand Down Expand Up @@ -38,14 +39,33 @@ export function Provider(
};
}

export function GrpcMethod(methodName?: string): MethodDecorator {
export enum GrpcStreamTypeEnum {
BASE = 'base',
DUPLEX = 'ServerDuplexStream',
READABLE = 'ServerReadableStream',
WRITEABLE = 'ServerWritableStream',
}

export function GrpcMethod(
methodOptions: {
methodName?: string;
type?: GrpcStreamTypeEnum;
onEnd?: string;
} = {}
): MethodDecorator {
return (target, propertyName, descriptor: PropertyDescriptor) => {
attachClassMetadata(
if (!methodOptions.type) {
methodOptions.type = GrpcStreamTypeEnum.BASE;
}
savePropertyMetadata(
MS_GRPC_METHOD_KEY,
{
methodName: methodName || propertyName,
methodName: methodOptions.methodName || propertyName,
type: methodOptions.type,
onEnd: methodOptions.onEnd,
},
target
target,
propertyName
);

return descriptor;
Expand Down
71 changes: 54 additions & 17 deletions packages/grpc/src/comsumer/clients.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
Autoload,
Config,
Init,
Logger,
Expand All @@ -8,12 +7,15 @@ import {
ScopeEnum,
} from '@midwayjs/decorator';
import { credentials, loadPackageDefinition } from '@grpc/grpc-js';
import { DefaultConfig } from '../interface';
import { DefaultConfig, IClientOptions } from '../interface';
import { loadProto } from '../util';
import * as camelCase from 'camelcase';
import { ILogger } from '@midwayjs/logger';
import { ClientUnaryRequest } from './type/unary-request';
import { ClientDuplexStreamRequest } from './type/duplex-request';
import { ClientReadableRequest } from './type/readable-request';
import { ClientWritableRequest } from './type/writeable-request';

@Autoload()
@Provide('clients')
@Scope(ScopeEnum.Singleton)
export class GRPCClients extends Map {
Expand All @@ -26,7 +28,7 @@ export class GRPCClients extends Map {
@Init()
async initService() {
if (!this.grpcConfig['services']) {
this.logger.error('Please set gRPC services in your config["grpc"]');
this.logger.debug('Please set gRPC services in your config["grpc"]');
return;
}
for (const cfg of this.grpcConfig['services']) {
Expand All @@ -46,19 +48,14 @@ export class GRPCClients extends Map {
);
for (const methodName of Object.keys(packageDefinition[definition])) {
const originMethod = connectionService[methodName];
connectionService[methodName] = async (...args) => {
return new Promise((resolve, reject) => {
originMethod.call(
connectionService,
args[0],
(err, response) => {
if (err) {
reject(err);
}
resolve(response);
}
);
});
connectionService[methodName] = (
clientOptions: IClientOptions = {}
) => {
return this.getClientRequestImpl(
connectionService,
originMethod,
clientOptions
);
};
connectionService[camelCase(methodName)] =
connectionService[methodName];
Expand All @@ -72,4 +69,44 @@ export class GRPCClients extends Map {
getService<T>(serviceName: string): T {
return this.get(serviceName);
}

getClientRequestImpl(client, originalFunction, options = {}) {
const genericFunctionSelector =
(originalFunction.requestStream ? 2 : 0) |
(originalFunction.responseStream ? 1 : 0);

let genericFunctionName;
switch (genericFunctionSelector) {
case 0:
genericFunctionName = new ClientUnaryRequest(
client,
originalFunction,
options
);
break;
case 1:
genericFunctionName = new ClientReadableRequest(
client,
originalFunction,
options
);
break;
case 2:
genericFunctionName = new ClientWritableRequest(
client,
originalFunction,
options
);
break;
case 3:
genericFunctionName = new ClientDuplexStreamRequest(
client,
originalFunction,
options
);
break;
}

return genericFunctionName;
}
}
4 changes: 3 additions & 1 deletion packages/grpc/src/comsumer/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Configuration, Logger } from '@midwayjs/decorator';
import { setLogger } from '@grpc/grpc-js';
import { ILogger } from '@midwayjs/logger';
import { join } from 'path';
import { IMidwayContainer } from '@midwayjs/core';

@Configuration({
namespace: 'grpc',
Expand All @@ -11,7 +12,8 @@ export class AutoConfiguration {
@Logger()
logger: ILogger;

async onReady() {
async onReady(container: IMidwayContainer) {
setLogger(this.logger);
await container.getAsync('grpc:clients');
}
}
98 changes: 98 additions & 0 deletions packages/grpc/src/comsumer/type/duplex-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { Metadata } from '@grpc/grpc-js';
import { IClientDuplexStreamService } from '../../interface';

export class ClientDuplexStreamRequest<reqType, resType>
implements IClientDuplexStreamService<reqType, resType> {
correlationId: number;
timeout_message;
queue;
client;
metadata;
timeout;
stream;
promise;
messageKey: string;

static get MAX_INT32() {
return 2147483647;
}

constructor(
client,
original_function,
options: {
metadata?: Metadata;
timeout?: number;
timeoutMessage?: number;
messageKey?: string;
} = {}
) {
this.queue = {};
this.correlationId = 0;
this.timeout_message = options.timeoutMessage || 1000;
this.metadata = options.metadata || new Metadata();
this.messageKey = options.messageKey || 'id';

// Deadline is advisable to be set
// It should be a timestamp value in milliseconds
let deadline = undefined;
if (options.timeout !== undefined) {
deadline = Date.now() + options.timeout;
}
this.stream = original_function.call(client, this.metadata, {
deadline: deadline,
});

this.stream.on('error', () => {});
this.stream.on('data', data => {
if (this.queue[data[this.messageKey]]) {
clearTimeout(this.queue[data[this.messageKey]]['timeout']);
this.queue[data[this.messageKey]]['cb'](null, data);
delete this.queue[data[this.messageKey]];
}
});
}

_nextId() {
if (this.correlationId >= ClientDuplexStreamRequest.MAX_INT32) {
this.correlationId = 0;
}
return this.correlationId++;
}

sendMessage(content: reqType = {} as any): Promise<resType> {
return new Promise((resolve, reject) => {
const id = this._nextId();

if (this.stream.received_status) {
return reject('stream_closed');
}

const cb = (err: Error, response?) => {
if (err) {
reject(err);
} else {
resolve(response);
}
};

this.queue[id] = {
cb,
timeout: setTimeout(() => {
delete this.queue[id];
cb(new Error(`provider response timeout in ${this.timeout_message}`));
}, this.timeout_message),
};
content[this.messageKey] = id;
this.stream.write(content);
});
}

end(): void {
return this.stream.end();
}

getCall() {
return this.stream;
}
}
57 changes: 57 additions & 0 deletions packages/grpc/src/comsumer/type/readable-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Metadata } from '@grpc/grpc-js';
import { IClientReadableStreamService } from '../../interface';

export class ClientReadableRequest<reqType, resType>
implements IClientReadableStreamService<reqType, resType> {
client;
metadata;
timeout;
stream;
queue;
original_function;

constructor(
client,
original_function,
options: {
metadata?: Metadata;
timeout?: number;
} = {}
) {
this.queue = [];
this.client = client;
this.metadata = options.metadata || new Metadata();
this.timeout = options.timeout || undefined;
this.original_function = original_function;
}

sendMessage(content: reqType): Promise<resType[]> {
return new Promise((resolve, reject) => {
// Deadline is advisable to be set
// It should be a timestamp value in milliseconds
let deadline = undefined;
if (this.timeout !== undefined) {
deadline = Date.now() + this.timeout;
}
this.stream = this.original_function.call(
this.client,
content,
this.metadata,
{ deadline: deadline }
);
this.stream.on('error', error => {
reject(error);
});
this.stream.on('data', data => {
this.queue.push(data);
});
this.stream.on('end', () => {
resolve(this.queue);
});
});
}

getCall() {
return this.stream;
}
}
68 changes: 68 additions & 0 deletions packages/grpc/src/comsumer/type/unary-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { Metadata, ClientUnaryCall } from '@grpc/grpc-js';
import { IClientUnaryService } from '../../interface';

export class ClientUnaryRequest<reqType, resType>
implements IClientUnaryService<reqType, resType> {
client;
metadata;
timeout;
original_function;

constructor(
client,
original_function,
options: {
metadata?: Metadata;
timeout?: number;
} = {}
) {
this.client = client;
this.metadata = options.metadata || new Metadata();
this.timeout = options.timeout || undefined;
this.original_function = original_function;
}

sendMessage(
content: reqType,
handler?: (call: ClientUnaryCall) => void
): Promise<resType> {
return new Promise<resType>((resolve, reject) => {
// Deadline is advisable to be set
// It should be a timestamp value in milliseconds
let deadline = undefined;
if (this.timeout !== undefined) {
deadline = Date.now() + this.timeout;
}
const call = this.original_function.call(
this.client,
content,
this.metadata,
{ deadline: deadline },
(error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
}
);
handler && handler(call);
});
}

sendMessageWithCallback(content: reqType, callback): ClientUnaryCall {
// Deadline is advisable to be set
// It should be a timestamp value in milliseconds
let deadline = undefined;
if (this.timeout !== undefined) {
deadline = Date.now() + this.timeout;
}
return this.original_function.call(
this.client,
content,
this.metadata,
{ deadline: deadline },
callback
);
}
}
Loading

0 comments on commit bd51c46

Please sign in to comment.