Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drop is-ip by representing IPs internally as octets #214

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@
"debug": "^4.3.1",
"dgram": "^1.0.1",
"err-code": "^3.0.1",
"ip6addr": "^0.2.3",
"is-ip": "^4.0.0",
"rlp": "^2.2.6",
"strict-event-emitter-types": "^2.0.0",
"varint": "^6.0.0"
Expand Down
16 changes: 0 additions & 16 deletions src/message/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
RequestId,
IPingMessage,
MessageType,
IPongMessage,
IFindNodeMessage,
INodesMessage,
ITalkReqMessage,
Expand All @@ -25,21 +24,6 @@ export function createPingMessage(enrSeq: SequenceNumber): IPingMessage {
};
}

export function createPongMessage(
id: RequestId,
enrSeq: SequenceNumber,
recipientIp: string,
recipientPort: number
): IPongMessage {
return {
type: MessageType.PONG,
id,
enrSeq,
recipientIp,
recipientPort,
};
}

export function createFindNodeMessage(distances: number[]): IFindNodeMessage {
return {
type: MessageType.FINDNODE,
Expand Down
19 changes: 10 additions & 9 deletions src/message/decode.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import * as RLP from "rlp";
import { convertToString } from "@multiformats/multiaddr/convert";
import { toBigIntBE } from "bigint-buffer";
import * as ip6addr from "ip6addr";
import {
IPingMessage,
IPongMessage,
Expand All @@ -17,7 +15,7 @@ import {
ITalkRespMessage,
} from "./types.js";
import { ENR } from "../enr/index.js";
import { toNewUint8Array } from "../util/index.js";
import { ipFromBytes } from "../util/ip.js";

const ERR_INVALID_MESSAGE = "invalid message";

Expand Down Expand Up @@ -66,21 +64,24 @@ function decodePong(data: Buffer): IPongMessage {
if (!Array.isArray(rlpRaw) || rlpRaw.length !== 4) {
throw new Error(ERR_INVALID_MESSAGE);
}
let stringIpAddr = convertToString("ip4", toNewUint8Array(rlpRaw[2]));
const parsedIp = ip6addr.parse(stringIpAddr);
if (parsedIp.kind() === "ipv4") {
stringIpAddr = parsedIp.toString({ format: "v4" });

const ip = ipFromBytes(rlpRaw[2]);
// IP must be 4 or 16 bytes
if (ip === undefined) {
throw new Error(ERR_INVALID_MESSAGE);
}

// recipientPort is a uint16 (2 bytes)
if (rlpRaw[3].length > 2) {
throw new Error(ERR_INVALID_MESSAGE);
}
const port = rlpRaw[3].length ? rlpRaw[3].readUIntBE(0, rlpRaw[3].length) : 0;

return {
type: MessageType.PONG,
id: toBigIntBE(rlpRaw[0]),
enrSeq: toBigIntBE(rlpRaw[1]),
recipientIp: stringIpAddr,
recipientPort: rlpRaw[3].length ? rlpRaw[3].readUIntBE(0, rlpRaw[3].length) : 0,
addr: { ip, port },
};
}

Expand Down
18 changes: 9 additions & 9 deletions src/message/encode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as RLP from "rlp";
import { multiaddr } from "@multiformats/multiaddr";
import { isIPv4 } from "is-ip";
import { ipToBytes } from "../util/ip.js";

import {
IPingMessage,
Expand Down Expand Up @@ -56,17 +55,18 @@ export function encodePingMessage(m: IPingMessage): Buffer {
}

export function encodePongMessage(m: IPongMessage): Buffer {
const ipMultiaddr = multiaddr(`/${isIPv4(m.recipientIp) ? "ip4" : "ip6"}/${m.recipientIp}`);
const tuple = ipMultiaddr.tuples()[0][1];
if (!tuple) {
throw new Error("invalid address for encoding");
}
if (m.recipientPort < 0 || m.recipientPort > 65535) {
if (m.addr.port < 0 || m.addr.port > 65535) {
throw new Error("invalid port for encoding");
}
return Buffer.concat([
Buffer.from([MessageType.PONG]),
RLP.encode([toBuffer(m.id), toBuffer(m.enrSeq), tuple, m.recipientPort]),
RLP.encode([
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to force prettier into placing the arguments vertically

toBuffer(m.id),
toBuffer(m.enrSeq),
ipToBytes(m.addr.ip),
m.addr.port,
]),
]);
}

Expand Down
10 changes: 8 additions & 2 deletions src/message/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { SequenceNumber, ENR } from "../enr/index.js";
import { SocketAddress } from "../util/ip.js";

export type RequestId = bigint;

export type NodeAddressIP = {
family: 4 | 6;
octets: number[];
port: number;
};

export enum MessageType {
PING = 1,
PONG = 2,
Expand Down Expand Up @@ -46,8 +53,7 @@ export interface IPongMessage {
type: MessageType.PONG;
id: RequestId;
enrSeq: SequenceNumber;
recipientIp: string;
recipientPort: number;
addr: SocketAddress;
}

export interface IFindNodeMessage {
Expand Down
49 changes: 27 additions & 22 deletions src/service/addrVotes.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,52 @@
import { isIPv4 } from "is-ip";
import { NodeId } from "../enr/index.js";
import { SocketAddress } from "../util/ip.js";

type MultiaddrStr = string;
/** Serialized representation of the IP:port vote from the Pong message */
type VoteID = string;

const MAX_VOTES = 200;

export class AddrVotes {
/** Bounded by `MAX_VOTES`, on new votes evicts the oldest votes */
private readonly votes = new Map<NodeId, { multiaddrStr: MultiaddrStr; unixTsMs: number }>();
/** Bounded by votes, if the vote count of some `MultiaddrStr` reaches 0, its key is deleted */
private readonly tallies = new Map<MultiaddrStr, number>();
private readonly votes = new Map<NodeId, { socketAddrStr: VoteID; unixTsMs: number }>();
/** Bounded by votes, if the vote count reaches 0, its key is deleted */
private readonly tallies = new Map<VoteID, number>();

constructor(private readonly addrVotesToUpdateEnr: number) {}

/**
* Adds vote to a given `recipientIp` and `recipientPort`. If the votes for this addr are greater than `votesToWin`,
* this function returns the winning `multiaddrStr` and clears existing votes, restarting the process.
* Adds vote to a given IP:port tuple from a Pong message. If the votes for this addr are greater than `votesToWin`,
* @returns true if the added vote is the winning vote. In that case clears all existing votes.
*/
addVote(
voter: NodeId,
{ recipientIp, recipientPort }: { recipientIp: string; recipientPort: number }
): { multiaddrStr: string } | undefined {
const multiaddrStr = `/${isIPv4(recipientIp) ? "ip4" : "ip6"}/${recipientIp}/udp/${recipientPort}`;
addVote(voter: NodeId, ip: SocketAddress): boolean {
const socketAddrStr = serializeSocketAddr(ip);

const prevVote = this.votes.get(voter);
if (prevVote?.multiaddrStr === multiaddrStr) {
if (prevVote?.socketAddrStr === socketAddrStr) {
// Same vote, ignore
return;
return false;
} else if (prevVote !== undefined) {
// If there was a previous vote, remove from tally
const prevVoteTally = (this.tallies.get(prevVote.multiaddrStr) ?? 0) - 1;
const prevVoteTally = (this.tallies.get(prevVote.socketAddrStr) ?? 0) - 1;
if (prevVoteTally <= 0) {
this.tallies.delete(prevVote.multiaddrStr);
this.tallies.delete(prevVote.socketAddrStr);
} else {
this.tallies.set(prevVote.multiaddrStr, prevVoteTally);
this.tallies.set(prevVote.socketAddrStr, prevVoteTally);
}
}

const currentTally = (this.tallies.get(multiaddrStr) ?? 0) + 1;
const currentTally = (this.tallies.get(socketAddrStr) ?? 0) + 1;

// Conclude vote period if there are enough votes for an option
if (currentTally >= this.addrVotesToUpdateEnr) {
// If enough peers vote on the same multiaddrStr conclude the vote
// If enough peers vote the same conclude the vote
this.clear();
return { multiaddrStr };
return true;
}

// Persist vote
this.tallies.set(multiaddrStr, currentTally);
this.votes.set(voter, { multiaddrStr, unixTsMs: Date.now() });
this.tallies.set(socketAddrStr, currentTally);
this.votes.set(voter, { socketAddrStr: socketAddrStr, unixTsMs: Date.now() });

// If there are too many votes, remove the oldest
if (this.votes.size > MAX_VOTES) {
Expand All @@ -59,10 +57,17 @@ export class AddrVotes {
}
}
}

return false;
}

clear(): void {
this.votes.clear();
this.tallies.clear();
}
}

/** Arbitrary serialization of SocketAddr, used only to tally votes */
function serializeSocketAddr(addr: SocketAddress): string {
return `${addr.ip.type}-${Buffer.from(addr.ip.octets).toString("hex")}:${addr.port}`;
}
49 changes: 32 additions & 17 deletions src/service/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from "events";
import debug from "debug";
import { randomBytes } from "@libp2p/crypto";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { Multiaddr } from "@multiformats/multiaddr";
import { PeerId } from "@libp2p/interface-peer-id";

import { ITransportService, UDPTransportService } from "../transport/index.js";
Expand Down Expand Up @@ -29,7 +29,6 @@ import {
IPongMessage,
IPingMessage,
requestMatchesResponse,
createPongMessage,
ITalkReqMessage,
ITalkRespMessage,
createTalkRequestMessage,
Expand All @@ -51,6 +50,13 @@ import {
INodesResponse,
} from "./types.js";
import { RateLimiter, RateLimiterOpts } from "../rateLimit/index.js";
import {
getSocketAddressOnENR,
multiaddrFromSocketAddress,
isEqualSocketAddress,
multiaddrToSocketAddress,
setSocketAddressOnENR,
} from "../util/ip.js";

const log = debug("discv5:service");

Expand Down Expand Up @@ -718,14 +724,19 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
}

const ipUDP = multiaddrToSocketAddress(nodeAddr.socketAddr);

const pongMessage: IPongMessage = {
type: MessageType.PONG,
id: message.id,
enrSeq: this.enr.seq,
addr: ipUDP,
};

// build the Pong response
log("Sending PONG response to node: %o", nodeAddr);
try {
const srcOpts = nodeAddr.socketAddr.toOptions();
this.sessionService.sendResponse(
nodeAddr,
createPongMessage(message.id, this.enr.seq, srcOpts.host, srcOpts.port)
);
this.sessionService.sendResponse(nodeAddr, pongMessage);
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.PONG] });
} catch (e) {
log("Failed to send Pong. Error %s", (e as Error).message);
Expand Down Expand Up @@ -846,16 +857,20 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Received a PONG response from %o", nodeAddr);

if (this.config.enrUpdate) {
const winningVote = this.addrVotes.addVote(nodeAddr.nodeId, message);
const currentAddr = this.enr.getLocationMultiaddr("udp");
if (winningVote && (!currentAddr || winningVote.multiaddrStr !== currentAddr.toString())) {
log("Local ENR (IP & UDP) updated: %s", winningVote.multiaddrStr);
const votedAddr = multiaddr(winningVote.multiaddrStr);
this.enr.setLocationMultiaddr(votedAddr);
this.emit("multiaddrUpdated", votedAddr);

// publish update to all connected peers
this.pingConnectedPeers();
const isWinningVote = this.addrVotes.addVote(nodeAddr.nodeId, message.addr);

if (isWinningVote) {
const currentAddr = getSocketAddressOnENR(this.enr);
const winningAddr = message.addr;
if (!currentAddr || !isEqualSocketAddress(currentAddr, winningAddr)) {
log("Local ENR (IP & UDP) updated: %s", isWinningVote);
// Set new IP and port
setSocketAddressOnENR(this.enr, winningAddr);
this.emit("multiaddrUpdated", multiaddrFromSocketAddress(winningAddr));

// publish update to all connected peers
this.pingConnectedPeers();
}
}
}

Expand Down
Loading