Skip to content

Commit

Permalink
route finding timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Jan 4, 2025
1 parent 5ea4ffa commit 9ab8828
Showing 1 changed file with 77 additions and 71 deletions.
148 changes: 77 additions & 71 deletions src/Gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export default class Gateway extends Peer {
private gates: Array<Gate> = [];
private refreshId: number = 0;
private routeFilter?: (routingEntry: RoutingEntry) => Promise<boolean>;
private routeFindingTimeout: number = 5 * 60 * 1000; // 5 minutes

constructor(secret: string, listenOnAddr: string, routeFilter?: (routingEntry: RoutingEntry) => Promise<boolean>, opts?: object) {
super(secret, true, opts);
Expand Down Expand Up @@ -266,87 +267,92 @@ export default class Gateway extends Peer {

// look for a route
const findRoute = async () => {
console.log("Looking for route");
while (true) {
const route = this.getRoute(gatePort);
if (!route) {
// no route: kill channel
channel.close?.();
break;
}
try {
console.log("Looking for route");
const routeFindingStartedAt = Date.now();

while (true) {
const route:Buffer = this.getRoute(gatePort);

// send open request and wait for response
try {
await new Promise((res, rej) => {
console.log("Test route", b4a.toString(route, "hex"));
this.send(
route,
Message.create(MessageActions.open, {
channelPort: channelPort,
gatePort: gatePort,
}),
);
const timeout = setTimeout(() => rej("timeout"), 5000); // timeout open request
this.addMessageHandler((peer, msg) => {
if (msg.actionId == MessageActions.open && msg.channelPort == channelPort) {
if (msg.error) {
console.log("Received error", msg.error);
rej(msg.error);
return true; // error, detach
}
console.log("Received confirmation");
channel.route = route;
channel.accepted = true;
clearTimeout(timeout);
res(channel);
return true; // detach listener
}
return !channel.alive; // detach when channel dies
});
});

// send open request and wait for response
try {
await new Promise((res, rej) => {
console.log("Test route", b4a.toString(route, "hex"));
this.send(
route,
Message.create(MessageActions.open, {
channelPort: channelPort,
gatePort: gatePort,
}),
// found a route
console.log(
"New gate channel opened:",
channelPort,
" tot: ",
gate.channels.length,
gate.protocol,
"\nInitiated from ",
socket.remoteAddress,
":",
socket.remotePort,
"\n To",
socket.localAddress,
":",
socket.localPort,
);
const timeout = setTimeout(() => rej("timeout"), 5000); // timeout open request

// pipe route data to gate
this.addMessageHandler((peer, msg) => {
if (msg.actionId == MessageActions.open && msg.channelPort == channelPort) {
if (msg.error) {
console.log("Received error", msg.error);
rej(msg.error);
return true; // error, detach
}
console.log("Received confirmation");
channel.route = route;
channel.accepted = true;
clearTimeout(timeout);
res(channel);
// everytime data is piped, reset expiration
channel.expire = Date.now() + channel.duration;

// pipe
if (msg.actionId == MessageActions.stream && msg.channelPort == channelPort && msg.data) {
socket.write(msg.data);
return false;
} else if (msg.actionId == MessageActions.close && (!msg.channelPort || msg.channelPort == channelPort || msg.channelPort <= 0) /* close all */) {
channel.close?.();
return true; // detach listener
}
return !channel.alive; // detach when channel dies
});
});

// found a route
console.log(
"New gate channel opened:",
channelPort,
" tot: ",
gate.channels.length,
gate.protocol,
"\nInitiated from ",
socket.remoteAddress,
":",
socket.remotePort,
"\n To",
socket.localAddress,
":",
socket.localPort,
);

// pipe route data to gate
this.addMessageHandler((peer, msg) => {
// everytime data is piped, reset expiration
channel.expire = Date.now() + channel.duration;

// pipe
if (msg.actionId == MessageActions.stream && msg.channelPort == channelPort && msg.data) {
socket.write(msg.data);
return false; // detach listener
} else if (msg.actionId == MessageActions.close && (!msg.channelPort || msg.channelPort == channelPort || msg.channelPort <= 0) /* close all */) {
channel.close?.();
return true; // detach listener
}
return !channel.alive; // detach when channel dies
});

// pipe pending buffered data
channel.pipeData?.();
// pipe pending buffered data
channel.pipeData?.();

// exit route finding mode, now everything is ready
break;
} catch (e) {
console.error(e);
await new Promise((res) => setTimeout(res, 100)); // wait 100 ms
// exit route finding mode, now everything is ready
break;
} catch (e) {
console.error(e);
if(Date.now() - routeFindingStartedAt > this.routeFindingTimeout){
throw new Error("Route finding timeout");
}
await new Promise((res) => setTimeout(res, 100)); // wait 100 ms
}
}
} catch (e) {
channel.close?.();
throw e;
}
};
findRoute().catch(console.error);
Expand Down

0 comments on commit 9ab8828

Please sign in to comment.