Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #972 from LiskHQ/969-use_socket_cluster_types_defi…
Browse files Browse the repository at this point in the history
…nitions

Use socket cluster types definitions - Closes#969
  • Loading branch information
shuse2 authored Jan 9, 2019
2 parents 12ec2b2 + 31f8918 commit 4236990
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 61 deletions.
101 changes: 101 additions & 0 deletions packages/lisk-p2p/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/lisk-p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
"@types/semver": "5.5.0",
"@types/sinon": "5.0.7",
"@types/sinon-chai": "3.2.1",
"@types/socketcluster-client": "13.0.0",
"@types/socketcluster-server": "14.2.0",
"@types/validator": "9.4.4",
"@types/verror": "1.10.3",
"@types/ws": "6.0.1",
Expand Down
39 changes: 26 additions & 13 deletions packages/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { EventEmitter } from 'events';
import http, { Server } from 'http';
import { platform } from 'os';
import querystring from 'querystring';
import socketClusterServer from 'socketcluster-server';
import { attach, SCServer, SCServerSocket } from 'socketcluster-server';

import { Peer, PeerInfo } from './peer';

Expand All @@ -33,6 +33,12 @@ import {
ProtocolPeerList,
} from './p2p_types';

type SCServerCloseCallback = () => void;

type SCServerUpdated = {
readonly close: (sCServerCloseCallback: SCServerCloseCallback) => void;
} & SCServer;

import { PeerPool } from './peer_pool';

export const EVENT_NEW_INBOUND_PEER = 'newInboundPeer';
Expand All @@ -45,7 +51,7 @@ export class P2P extends EventEmitter {
private readonly _config: P2PConfig;
private readonly _peerPool: PeerPool;
private readonly _httpServer: Server;
private readonly _scServer: any;
private readonly _scServer: SCServerUpdated; // Until we get comeplete definition for SCServer
private readonly _newPeers: Set<PeerInfo>;
// TODO ASAP: private readonly _triedPeers: Set<PeerInfo>;
private _nodeStatus: P2PNodeStatus;
Expand All @@ -64,7 +70,7 @@ export class P2P extends EventEmitter {

this._peerPool = new PeerPool();
this._httpServer = http.createServer();
this._scServer = socketClusterServer.attach(this._httpServer);
this._scServer = attach(this._httpServer) as SCServerUpdated;
}

/* tslint:disable:next-line: prefer-function-over-method */
Expand Down Expand Up @@ -102,7 +108,12 @@ export class P2P extends EventEmitter {
private async _startPeerServer(): Promise<void> {
this._scServer.on(
'connection',
(socket: any): void => {
(socket: SCServerSocket): void => {
if (!socket.request.url) {
super.emit(EVENT_FAILED_TO_ADD_INBOUND_PEER);

return;
}
const queryObject = querystring.parse(socket.request.url);
if (
typeof queryObject.wsPort !== 'string' ||
Expand All @@ -115,15 +126,17 @@ export class P2P extends EventEmitter {
const peerId = Peer.constructPeerId(socket.remoteAddress, wsPort);
const existingPeer = this._peerPool.getPeer(peerId);
if (existingPeer === undefined) {
const peer = new Peer({
inboundSocket: socket,
ipAddress: socket.remoteAddress,
os: queryObject.os,
version: queryObject.version,
wsPort,
nodeStatus: this._nodeStatus,
height: queryObject.height ? +queryObject.height : 0,
});
const peer = new Peer(
{
ipAddress: socket.remoteAddress,
os: queryObject.os,
version: queryObject.version,
wsPort,
nodeStatus: this._nodeStatus,
height: queryObject.height ? +queryObject.height : 0,
},
socket,
);
this._peerPool.addPeer(peer);
super.emit(EVENT_NEW_INBOUND_PEER, peer);
super.emit(EVENT_NEW_PEER, peer);
Expand Down
79 changes: 45 additions & 34 deletions packages/lisk-p2p/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import {
P2PResponsePacket,
} from './p2p_types';

import socketClusterClient from 'socketcluster-client';

import { create, SCClientSocket } from 'socketcluster-client';
import { SCServerSocket } from 'socketcluster-server';
import { processPeerListFromResponse } from './response_handler_sanitization';

export interface PeerInfo {
Expand All @@ -31,7 +31,6 @@ export interface PeerInfo {
readonly nodeStatus?: P2PNodeStatus; // TODO DELEEETETETE
readonly clock?: Date;
readonly height: number;
readonly inboundSocket?: any; // TODO: Type SCServerSocket
readonly os: string;
readonly version: string;
}
Expand All @@ -53,43 +52,46 @@ export class Peer {
private readonly _id: string;
private readonly _peerInfo: PeerInfo;
private readonly _height: number;
private _inboundSocket: any;
private _outboundSocket: any;
private _inboundSocket: SCServerSocket | undefined;
private _outboundSocket: SCClientSocket | undefined;
private readonly _ipAddress: string;
private readonly _wsPort: number;
private _nodeStatus: P2PNodeStatus | undefined;

public constructor(peerInfo: PeerInfo) {
public constructor(peerInfo: PeerInfo, inboundSocket?: SCServerSocket) {
this._peerInfo = peerInfo;
this._ipAddress = peerInfo.ipAddress;
this._wsPort = peerInfo.wsPort;
this._id = Peer.constructPeerId(this._ipAddress, this._wsPort);
this._inboundSocket = peerInfo.inboundSocket;
this._inboundSocket = inboundSocket;
this._height = peerInfo.height ? peerInfo.height : 0;
}

private _createOutboundSocket(): any {
if (!this._outboundSocket) {
this._outboundSocket = socketClusterClient.create({
hostname: this._ipAddress,
port: this._wsPort,
query: this._nodeStatus,
autoConnect: false,
});
}
private _createOutboundSocket(): SCClientSocket {
const query = JSON.stringify(this._nodeStatus);

return create({
hostname: this._ipAddress,
port: this._wsPort,
query,
autoConnect: false,
});
}

public connect(): void {
this._createOutboundSocket();
this._outboundSocket.connect();
if (!this.outboundSocket) {
this.outboundSocket = this._createOutboundSocket();
}

this.outboundSocket.connect();
}

public disconnect(code: number = 1000, reason?: string): void {
if (this._inboundSocket) {
this._inboundSocket.disconnect(code, reason);
if (this.inboundSocket) {
this.inboundSocket.disconnect(code, reason);
}
if (this._outboundSocket) {
this._outboundSocket.disconnect(code, reason);
if (this.outboundSocket) {
this.outboundSocket.disconnect(code, reason);
}
}

Expand All @@ -101,8 +103,10 @@ export class Peer {
resolve: (result: P2PResponsePacket) => void,
reject: (result: Error) => void,
): void => {
this._createOutboundSocket();
this._outboundSocket.emit(
if (!this.outboundSocket) {
this.outboundSocket = this._createOutboundSocket();
}
this.outboundSocket.emit(
'rpc-request',
{
type: '/RPCRequest',
Expand Down Expand Up @@ -150,8 +154,15 @@ export class Peer {
}

public send<T>(packet: P2PMessagePacket<T>): void {
this._createOutboundSocket();
this._outboundSocket.emit(packet.event, {
if (!this.outboundSocket) {
this.outboundSocket = this._createOutboundSocket();
this.outboundSocket.emit(packet.event, {
data: packet.data,
});

return;
}
this.outboundSocket.emit(packet.event, {
data: packet.data,
});
}
Expand All @@ -176,19 +187,19 @@ export class Peer {
return this._nodeStatus;
}

public set inboundSocket(value: any) {
public set inboundSocket(value: SCServerSocket | undefined) {
this._inboundSocket = value;
}

public get inboundSocket(): any {
public get inboundSocket(): SCServerSocket | undefined {
return this._inboundSocket;
}

public set outboundSocket(value: any) {
public set outboundSocket(value: SCClientSocket | undefined) {
this._outboundSocket = value;
}

public get outboundSocket(): any {
public get outboundSocket(): SCClientSocket | undefined {
return this._outboundSocket;
}

Expand All @@ -197,13 +208,13 @@ export class Peer {
}

public get state(): PeerConnectionState {
const inbound = this._inboundSocket
? this._inboundSocket.state === this._inboundSocket.OPEN
const inbound = this.inboundSocket
? this.inboundSocket.state === this.inboundSocket.OPEN
? ConnectionState.CONNECTED
: ConnectionState.DISCONNECTED
: ConnectionState.DISCONNECTED;
const outbound = this._outboundSocket
? this._outboundSocket.state === this._outboundSocket.OPEN
const outbound = this.outboundSocket
? this.outboundSocket.state === this.outboundSocket.OPEN
? ConnectionState.CONNECTED
: ConnectionState.DISCONNECTED
: ConnectionState.DISCONNECTED;
Expand Down
1 change: 0 additions & 1 deletion packages/lisk-p2p/test/unit/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ describe('peer', () => {
ipAddress: '12.12.12.12',
wsPort: 5001,
height: 545776,
inboundSocket: undefined,
os: 'darwin',
version: '1.1.0',
};
Expand Down
Loading

0 comments on commit 4236990

Please sign in to comment.