Skip to content

Commit

Permalink
Merge pull request #2 from JanYork/core
Browse files Browse the repository at this point in the history
Pull core
  • Loading branch information
JanYork authored May 16, 2024
2 parents 3247c61 + 93eb5ef commit 38be1e8
Show file tree
Hide file tree
Showing 115 changed files with 851 additions and 2,336 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ nohup.out
dist/
coverage/
.idea/

!index.ts
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
index.ts
26 changes: 5 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,6 @@
- 统一的日志记录器注入,方便调试。

## 注意
项目使用了`module-alias`,所以必须要在门面文件中引入`require('module-alias/register')`,并且在`tsconfig.json`中配置`baseUrl``paths`

```json
{
"compilerOptions": {
"baseUrl": ".",
"paths": {
"@/*": ["src/*"]
}
}
}
```

对于Macos or Linux,`build`操作失败,可以尝试使用`sudo npm run build`

对于Windows,`build`操作失败,可以尝试使用“以管理员身份运行”命令行。
Expand All @@ -43,7 +30,7 @@ npm install
# 初始化 grpc 代码
npm run build
# 安装rocketmq nodejs 客户端
npm i rocketmq-grpc-client
npm i rocketmq-grpc
```

开启 grpc-js 的调试日志:
Expand All @@ -59,9 +46,7 @@ GRPC_TRACE=compression GRPC_VERBOSITY=debug GRPC_TRACE=all npm run xxx or node x
发送消息

```ts
require('module-alias/register');

import { Producer } from '@/producer';
import {Producer} from "rocketmq-grpc";

const simpleProducer = new Producer({
endpoints: 'localhost:8081'
Expand All @@ -87,14 +72,13 @@ console.log('checkout:simpleProducer init success!');
});
});
})();

```

消费消息

```ts
require('module-alias/register');

import { SimpleConsumer } from '@/consumer';
import {SimpleConsumer} from "rocketmq-grpc";

const consumer = new SimpleConsumer({
consumerGroup: 'checkout-group',
Expand Down Expand Up @@ -155,7 +139,7 @@ async function startAndConsumeMessages() {

startAndConsumeMessages().catch(console.error);
```
更多的示例可以参考[这里](./src/examples)
更多的示例可以参考[这里](https://github.com/JanYork/rocketmq-client-node/tree/core/examples)

### 消息类型

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { IGrpcClient } from "@/server/client";
import * as grpc from "@grpc/grpc-js";
import { MessagingServiceClient } from "@/rpc/apache/rocketmq/v2/service_grpc_pb";
import { Endpoints } from "@/model";
import { ChannelCredentials } from "@grpc/grpc-js";
import * as grpc from '@grpc/grpc-js';
import { ChannelCredentials } from '@grpc/grpc-js';
import { MessagingServiceClient } from '../../rpc/apache/rocketmq/v2/service_grpc_pb';
import { IGrpcClient } from '../interface';
import { Endpoints } from '../../model';

/**
* gRPC 客户端的抽象基类,实现基本的客户端功能。
Expand Down Expand Up @@ -40,7 +40,7 @@ export default abstract class BaseGrpcClient implements IGrpcClient {
*/
abstract unaryCall<TRequest, TResponse>(
method: string,
requestData: TRequest,
requestData: TRequest
): Promise<TResponse>;

/**
Expand All @@ -51,7 +51,7 @@ export default abstract class BaseGrpcClient implements IGrpcClient {
*/
abstract serverStreamingCall<TRequest, TResponse>(
method: string,
requestData: TRequest,
requestData: TRequest
): grpc.ClientReadableStream<TResponse>;

/**
Expand All @@ -60,7 +60,7 @@ export default abstract class BaseGrpcClient implements IGrpcClient {
* @param method
*/
abstract clientStreamingCall<TRequest>(
method: string,
method: string
): grpc.ClientWritableStream<TRequest>;

/**
Expand All @@ -69,7 +69,7 @@ export default abstract class BaseGrpcClient implements IGrpcClient {
* @param method
*/
abstract duplexStreamingCall<TRequest, TResponse>(
method: string,
method: string
): grpc.ClientDuplexStream<TRequest, TResponse>;

/**
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions src/server/client/common/setting.ts → client/common/setting.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {
ClientType,
Settings as SettingsPB,
} from "@/rpc/apache/rocketmq/v2/definition_pb";
import { Endpoints } from "@/model";
import { RetryPolicy } from "@/retry";
Settings as SettingsPB
} from '../../rpc/apache/rocketmq/v2/definition_pb';
import { Endpoints } from '../../model';
import { RetryPolicy } from '../../retry';

/**
* 抽象类 Settings 定义了客户端设置的基本结构和方法。
Expand Down Expand Up @@ -59,7 +59,7 @@ export abstract class Setting {
clientType: ClientType,
accessPoint: Endpoints,
requestTimeout: number,
retryPolicy?: RetryPolicy,
retryPolicy?: RetryPolicy
) {
this.clientId = clientId;
this.clientType = clientType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ClientDuplexStream } from "@grpc/grpc-js";
import { TelemetryCommand } from "@/rpc/apache/rocketmq/v2/service_pb";
import { Endpoints } from "@/model";
import type { RpcBaseClient } from "@/server/client";
import Logger from "@/logger";
import { ClientDuplexStream } from '@grpc/grpc-js';
import { TelemetryCommand } from '../../rpc/apache/rocketmq/v2/service_pb';
import { RpcBaseClient } from '../rpc-base-client';
import Logger from '../../logger';
import { Endpoints } from '../../model';

/**
* 遥测会话
Expand Down Expand Up @@ -48,11 +48,11 @@ export class TelemetrySession {
*/
release() {
this.#logger.info({
message: "Begin to release telemetry session",
message: 'Begin to release telemetry session',
context: {
endpoints: this.#endpoints,
clientId: this.#client.id,
},
clientId: this.#client.id
}
});

this.#stream.end();
Expand Down Expand Up @@ -84,9 +84,9 @@ export class TelemetrySession {
*/
#renewStream(inited: boolean) {
this.#stream = this.#client.createTelemetryStream(this.#endpoints);
this.#stream.on("data", this.#onData.bind(this));
this.#stream.once("error", this.#onError.bind(this));
this.#stream.once("end", this.#onEnd.bind(this));
this.#stream.on('data', this.#onData.bind(this));
this.#stream.once('error', this.#onError.bind(this));
this.#stream.once('end', this.#onEnd.bind(this));

if (!inited) {
this.syncSetting();
Expand All @@ -106,71 +106,71 @@ export class TelemetrySession {
switch (commandCase) {
case TelemetryCommand.CommandCase.SETTINGS:
this.#logger.info({
message: "Receive settings from remote",
message: 'Receive settings from remote',
context: {
endpoints,
clientId,
},
clientId
}
});

this.#client.onSettingsCommand(endpoints, command.getSettings()!);
break;
case TelemetryCommand.CommandCase.RECOVER_ORPHANED_TRANSACTION_COMMAND: {
this.#logger.info({
message: "Receive orphaned transaction recovery command from remote",
message: 'Receive orphaned transaction recovery command from remote',
context: {
endpoints,
clientId,
},
clientId
}
});

this.#client.onRecoverOrphanedTransactionCommand(
endpoints,
command.getRecoverOrphanedTransactionCommand()!,
command.getRecoverOrphanedTransactionCommand()!
);
break;
}
case TelemetryCommand.CommandCase.VERIFY_MESSAGE_COMMAND: {
this.#logger.info({
message: "Receive message verification command from remote",
message: 'Receive message verification command from remote',
context: {
endpoints,
clientId,
},
clientId
}
});

this.#client.onVerifyMessageCommand(
endpoints,
command.getVerifyMessageCommand()!,
command.getVerifyMessageCommand()!
);
break;
}
case TelemetryCommand.CommandCase.PRINT_THREAD_STACK_TRACE_COMMAND: {
this.#logger.info({
message: "Receive thread stack print command from remote",
message: 'Receive thread stack print command from remote',
context: {
endpoints,
clientId,
},
clientId
}
});

this.#client.onPrintThreadStackTraceCommand(
endpoints,
command.getPrintThreadStackTraceCommand()!,
command.getPrintThreadStackTraceCommand()!
);
break;
}
default: {
const commandBO = command.toObject();

this.#logger.warn({
message: "Receive unrecognized command from remote",
message: 'Receive unrecognized command from remote',
context: {
endpoints,
commandCase,
command: commandBO,
clientId,
},
clientId
}
});

// 如果遥测会话启动失败,需要通知客户端
Expand All @@ -187,12 +187,12 @@ export class TelemetrySession {
*/
#onError(err: Error) {
this.#logger.error({
message: "Exception raised from stream response observer",
message: 'Exception raised from stream response observer',
context: {
endpoints: this.#endpoints,
clientId: this.#client.id,
error: err,
},
error: err
}
});

this.release();
Expand All @@ -209,11 +209,11 @@ export class TelemetrySession {
*/
#onEnd() {
this.#logger.info({
message: "Receive completion for stream response observer",
message: 'Receive completion for stream response observer',
context: {
endpoints: this.#endpoints,
clientId: this.#client.id,
},
clientId: this.#client.id
}
});

this.release();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os from "node:os";
import path from "node:path";
import { readFileSync } from "node:fs";
import { UA, Language } from "@/rpc/apache/rocketmq/v2/definition_pb";
import os from 'node:os';
import path from 'node:path';
import { readFileSync } from 'node:fs';
import { Language, UA } from '../../rpc/apache/rocketmq/v2/definition_pb';

// 从 package.json 文件中读取版本号
const VERSION: string = JSON.parse(
readFileSync(path.join(__dirname, "../../../../../package.json"), "utf-8"),
readFileSync(path.join(__dirname, '../../../package.json'), 'utf-8')
).version;

/**
Expand All @@ -27,7 +27,7 @@ export class UserAgent {
static readonly INSTANCE: UserAgent = new UserAgent(
VERSION,
os.platform(),
os.hostname(),
os.hostname()
);

/**
Expand Down
11 changes: 11 additions & 0 deletions client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export * from './abstract/client-base.rpc';
export * from './common/setting';
export * from './common/telemetry-session';
export * from './common/user-agent';
export * from './common/client-flag.helper';
export * from './interface/grpc-client.interface';
export * from './interface/session-credential.interface';
export * from './interface/base-client-option.interface';
export * from './rpc-client';
export * from './rpc-client-manger';
export * from './rpc-base-client';
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SessionCredential } from "@/server/client";
import Logger from "@/logger";
import { ISessionCredential } from './session-credential.interface';
import Logger from '../../logger';

/**
* RocketMQ 客户端基本配置参数
Expand Down Expand Up @@ -28,7 +28,7 @@ export interface BaseClientOption {
/**
* 会话凭证,如果启用了 SSL,必须提供会话凭证
*/
sessionCredential?: SessionCredential;
sessionCredential?: ISessionCredential;

/**
* 请求超时时间,单位:毫秒z
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions client/interface/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './base-client-option.interface';
export * from './grpc-client.interface';
export * from './session-credential.interface';
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* @email <747945307@qq.com>
* @date 2024/4/30 17:57
*/
export interface SessionCredential {
export interface ISessionCredential {
/**
* 访问密钥(AccessKey)
*/
Expand Down
Loading

0 comments on commit 38be1e8

Please sign in to comment.