-
-
Notifications
You must be signed in to change notification settings - Fork 7.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(microservice): add TLS over TCP support #7516
Changes from 7 commits
8bbf128
567c902
b2883c7
ae124a9
0cf63a3
18896a5
18eb60d
4dd99be
2d49664
a6e2b5f
4589984
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -25,6 +25,7 @@ import { | |||||||||||||||
RedisOptions, | ||||||||||||||||
RmqOptions, | ||||||||||||||||
TcpClientOptions, | ||||||||||||||||
TcpTlsClientOptions, | ||||||||||||||||
WritePacket, | ||||||||||||||||
} from '../interfaces'; | ||||||||||||||||
import { ProducerDeserializer } from '../interfaces/deserializer.interface'; | ||||||||||||||||
|
@@ -128,7 +129,7 @@ export abstract class ClientProxy { | |||||||||||||||
|
||||||||||||||||
protected getOptionsProp< | ||||||||||||||||
T extends ClientOptions['options'], | ||||||||||||||||
K extends keyof T | ||||||||||||||||
K extends keyof T, | ||||||||||||||||
>(obj: T, prop: K, defaultValue: T[K] = undefined) { | ||||||||||||||||
return (obj && obj[prop]) || defaultValue; | ||||||||||||||||
} | ||||||||||||||||
|
@@ -140,26 +141,32 @@ export abstract class ClientProxy { | |||||||||||||||
protected initializeSerializer(options: ClientOptions['options']) { | ||||||||||||||||
this.serializer = | ||||||||||||||||
(options && | ||||||||||||||||
(options as | ||||||||||||||||
| RedisOptions['options'] | ||||||||||||||||
| NatsOptions['options'] | ||||||||||||||||
| MqttOptions['options'] | ||||||||||||||||
| TcpClientOptions['options'] | ||||||||||||||||
| RmqOptions['options'] | ||||||||||||||||
| KafkaOptions['options']).serializer) || | ||||||||||||||||
( | ||||||||||||||||
options as | ||||||||||||||||
| RedisOptions['options'] | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you think about extracting these types into a Union type declared, something like :
Suggested change
|
||||||||||||||||
| NatsOptions['options'] | ||||||||||||||||
| MqttOptions['options'] | ||||||||||||||||
| TcpClientOptions['options'] | ||||||||||||||||
| TcpTlsClientOptions['options'] | ||||||||||||||||
| RmqOptions['options'] | ||||||||||||||||
| KafkaOptions['options'] | ||||||||||||||||
).serializer) || | ||||||||||||||||
new IdentitySerializer(); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
protected initializeDeserializer(options: ClientOptions['options']) { | ||||||||||||||||
this.deserializer = | ||||||||||||||||
(options && | ||||||||||||||||
(options as | ||||||||||||||||
| RedisOptions['options'] | ||||||||||||||||
| NatsOptions['options'] | ||||||||||||||||
| MqttOptions['options'] | ||||||||||||||||
| TcpClientOptions['options'] | ||||||||||||||||
| RmqOptions['options'] | ||||||||||||||||
| KafkaOptions['options']).deserializer) || | ||||||||||||||||
( | ||||||||||||||||
options as | ||||||||||||||||
| RedisOptions['options'] | ||||||||||||||||
| NatsOptions['options'] | ||||||||||||||||
| MqttOptions['options'] | ||||||||||||||||
| TcpClientOptions['options'] | ||||||||||||||||
| TcpTlsClientOptions['options'] | ||||||||||||||||
| RmqOptions['options'] | ||||||||||||||||
| KafkaOptions['options'] | ||||||||||||||||
).deserializer) || | ||||||||||||||||
new IncomingResponseDeserializer(); | ||||||||||||||||
} | ||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,55 @@ | ||
import { Logger } from '@nestjs/common'; | ||
import * as net from 'net'; | ||
import { EmptyError, lastValueFrom } from 'rxjs'; | ||
import { EmptyError, lastValueFrom, Observable } from 'rxjs'; | ||
import { share, tap } from 'rxjs/operators'; | ||
import * as tls from 'tls'; | ||
import { | ||
CLOSE_EVENT, | ||
ECONNREFUSED, | ||
ERROR_EVENT, | ||
MESSAGE_EVENT, | ||
TCP_DEFAULT_HOST, | ||
TCP_DEFAULT_PORT, | ||
TCP_DEFAULT_USE_TLS, | ||
TLS_CONNECT_EVENT, | ||
} from '../constants'; | ||
import { JsonSocket } from '../helpers/json-socket'; | ||
import { PacketId, ReadPacket, WritePacket } from '../interfaces'; | ||
import { TcpClientOptions } from '../interfaces/client-metadata.interface'; | ||
import { | ||
TcpClientOptions, | ||
TcpTlsClientOptions, | ||
} from '../interfaces/client-metadata.interface'; | ||
import { ClientProxy } from './client-proxy'; | ||
|
||
export class ClientTCP extends ClientProxy { | ||
protected connection: Promise<any>; | ||
private readonly logger = new Logger(ClientTCP.name); | ||
private readonly port: number; | ||
private readonly host: string; | ||
private readonly useTls: boolean; | ||
private isConnected = false; | ||
private socket: JsonSocket; | ||
|
||
constructor(options: TcpClientOptions['options']) { | ||
/** | ||
* The underling netSocket used by the TLS Socket | ||
*/ | ||
private netSocket: net.Socket | null = null; | ||
|
||
constructor(); | ||
constructor(options: TcpClientOptions['options']); | ||
constructor(options: TcpTlsClientOptions['options']); | ||
constructor( | ||
private readonly options?: | ||
| TcpClientOptions['options'] | ||
| TcpTlsClientOptions['options'], | ||
) { | ||
super(); | ||
if (options === undefined) { | ||
this.options = {}; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reason for this change? |
||
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT; | ||
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST; | ||
this.useTls = this.getOptionsProp(options, 'useTls') || TCP_DEFAULT_USE_TLS; | ||
|
||
this.initializeSerializer(options); | ||
this.initializeDeserializer(options); | ||
|
@@ -39,17 +62,35 @@ export class ClientTCP extends ClientProxy { | |
this.socket = this.createSocket(); | ||
this.bindEvents(this.socket); | ||
|
||
const source$ = this.connect$(this.socket.netSocket).pipe( | ||
tap(() => { | ||
this.isConnected = true; | ||
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) => | ||
this.handleResponse(buffer), | ||
); | ||
}), | ||
share(), | ||
); | ||
|
||
this.socket.connect(this.port, this.host); | ||
let source$: Observable<any>; | ||
|
||
if (this.useTls) { | ||
this.netSocket.connect(this.port, this.host); | ||
source$ = this.connect$( | ||
this.socket.netSocket, | ||
ERROR_EVENT, | ||
TLS_CONNECT_EVENT, | ||
).pipe( | ||
tap(() => { | ||
this.isConnected = true; | ||
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) => | ||
this.handleResponse(buffer), | ||
); | ||
}), | ||
share(), | ||
); | ||
} else { | ||
source$ = this.connect$(this.socket.netSocket).pipe( | ||
tap(() => { | ||
this.isConnected = true; | ||
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) => | ||
this.handleResponse(buffer), | ||
); | ||
}), | ||
share(), | ||
); | ||
this.socket.connect(this.port, this.host); | ||
} | ||
Comment on lines
+71
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the only difference here is whether to use |
||
this.connection = lastValueFrom(source$).catch(err => { | ||
if (err instanceof EmptyError) { | ||
return; | ||
|
@@ -81,7 +122,20 @@ export class ClientTCP extends ClientProxy { | |
} | ||
|
||
public createSocket(): JsonSocket { | ||
return new JsonSocket(new net.Socket()); | ||
let socket: net.Socket | tls.TLSSocket = new net.Socket(); | ||
|
||
/** | ||
* TLS enabled, "upgrade" the TCP Socket to TLS | ||
*/ | ||
if (this.useTls === true) { | ||
/** | ||
* Options are TcpTlsClientOptions | ||
*/ | ||
const options = this.options as TcpTlsClientOptions['options']; | ||
this.netSocket = socket; | ||
socket = tls.connect({ ...options, socket }); | ||
} | ||
return new JsonSocket(socket); | ||
} | ||
|
||
public close() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this condition seems to be used only for type-checking purposes, can we just force cast
options
to the appropriate type and remove thisif
?