Skip to content

Commit

Permalink
chore(rosenet-node): refactor file structure
Browse files Browse the repository at this point in the history
Update file names, paths, variable names and more.
  • Loading branch information
mkermani144 committed Apr 28, 2024
1 parent f884c39 commit 8c2ce80
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
"terminal.integrated.defaultProfile.linux": "zsh",
"terminal.integrated.profiles.linux": { "zsh": { "path": "/bin/zsh" } },
},
"extensions": ["hashicorp.terraform"],
},
},
"features": {
"ghcr.io/robbert229/devcontainer-features/opentofu:1": {},
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { AbstractLogger } from '@rosen-bridge/logger-interface';

const RoseNetNodeTools = {
const RoseNetNodeContext = {
logger: console as AbstractLogger,
init(logger: AbstractLogger) {
this.logger = logger;
},
};

export default RoseNetNodeTools;
export default RoseNetNodeContext;
27 changes: 15 additions & 12 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import {
privateKeyToPeerId,
} from '@rosen-bridge/rosenet-utils';

import { decode, encode } from './codec';
import getStreamAndPushable from './getStreamAndPushable';
import RoseNetNodeTools from './RoseNetNodeTools';
import { decode, encode } from './utils/codec';
import streamService from './stream/stream-service';
import RoseNetNodeContext from './context/RoseNetNodeContext';

import RoseNetNodeError from './errors/RoseNetNodeError';

Expand All @@ -36,7 +36,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
throw new RoseNetNodeError('Cannot start a RoseNet node without a relay');
}

RoseNetNodeTools.init(logger);
RoseNetNodeContext.init(logger);

/**
* return if a peer is unauthorized, i.e. not whitelisted
Expand All @@ -47,7 +47,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {

const peerId = await privateKeyToPeerId(config.privateKey);

RoseNetNodeTools.logger.debug(`PeerId ${peerId.toString()} generated`);
RoseNetNodeContext.logger.debug(`PeerId ${peerId.toString()} generated`);

const node = await createLibp2p({
peerId,
Expand Down Expand Up @@ -85,20 +85,23 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
pubsub: gossipsub({ allowPublishToZeroPeers: true }),
},
});
RoseNetNodeTools.logger.debug('RoseNet node created');
RoseNetNodeContext.logger.debug('RoseNet node created');

addEventListeners(node, RoseNetNodeTools.logger);
addEventListeners(node, RoseNetNodeContext.logger);

return {
start: async () => node.start(),
sendMessage: async (to: string, message: string) => {
const { stream, pushable } = await getStreamAndPushable(to, node);
const { stream, pushable } = await streamService.getStreamAndPushable(
to,
node,
);

pipe(pushable, (source) => map(source, encode), stream);
pushable.push(message);
await Promise.resolve();

RoseNetNodeTools.logger.debug('message piped through created stream', {
RoseNetNodeContext.logger.debug('message piped through created stream', {
message,
});
},
Expand All @@ -108,7 +111,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
Expand All @@ -118,7 +121,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
pipe(stream, decode, async (source) => {
for await (const message of source) {
await handler(connection.remotePeer.toString(), message);
RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
'incoming message handled successfully',
{
message,
Expand All @@ -129,7 +132,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
},
{ runOnTransientConnection: true },
);
RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
},
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { peerIdFromString } from '@libp2p/peer-id';
import { shuffle } from 'fast-shuffle';
import { Libp2p } from 'libp2p';

import RoseNetNodeTools from './RoseNetNodeTools';
import createPushable, { Pushable } from './pushable';
import RoseNetNodeContext from '../context/RoseNetNodeContext';
import createPushable, { Pushable } from './createPushable';

import { ROSENET_DIRECT_PROTOCOL_V1 } from './constants';
import { ROSENET_DIRECT_PROTOCOL_V1 } from '../constants';

const cache = new Map<
string,
Expand All @@ -33,7 +33,7 @@ const isStreamWritable = (stream: Stream) =>
async function getStreamAndPushable(to: string, node: Libp2p) {
const cacheHit = cache.get(to);
if (cacheHit && isStreamWritable(cacheHit.stream)) {
RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
`Found existing stream and pushable in the cache to peer ${to}`,
{
stream: {
Expand All @@ -55,7 +55,7 @@ async function getStreamAndPushable(to: string, node: Libp2p) {
);
const connection = possibleOpenConnectionToPeer ?? (await node.dial(peerId));

RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
possibleOpenConnectionToPeer
? `Found an open connection to peer ${to}`
: `Established a new connection to peer ${to}`,
Expand All @@ -80,7 +80,7 @@ async function getStreamAndPushable(to: string, node: Libp2p) {
runOnTransientConnection: true,
}));

RoseNetNodeTools.logger.debug(
RoseNetNodeContext.logger.debug(
possibleWritableStream
? `Found an open stream to peer ${to}`
: `Created a new stream to peer ${to}`,
Expand All @@ -106,4 +106,6 @@ async function getStreamAndPushable(to: string, node: Libp2p) {
return pair;
}

export default getStreamAndPushable;
export default {
getStreamAndPushable,
};
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { it, describe, expect } from 'vitest';

import createPushable, { Pushable } from '../lib/pushable';
import createPushable, { Pushable } from '../../lib/stream/createPushable';

/**
* delegate to the pushable parameter, essentially returning an iterator from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import first from 'it-first';
import { Uint8ArrayList } from 'uint8arraylist';
import { describe, expect, it } from 'vitest';

import { decode, encode } from '../lib/codec';
import { decode, encode } from '../../lib/utils/codec';

const message = 'hello world';
const lengthPrefixByteArray = Uint8Array.from([message.length]);
Expand Down

0 comments on commit 8c2ce80

Please sign in to comment.