Skip to content

Commit

Permalink
Replace go channel placeholders with a channel package (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 authored May 23, 2023
1 parent 65a0016 commit e9776ff
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 41 deletions.
1 change: 1 addition & 0 deletions packages/nitro-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
},
"dependencies": {
"@libp2p/mdns": "^8.0.0",
"@nodeguy/channel": "^1.0.2",
"@types/debug": "^4.1.7",
"debug": "^4.3.4",
"ethers": "^6.4.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AddressLike } from 'ethers';
import { GoReceivingChannelPlaceholder } from '../../../go-channel';
import type { ReadChannel } from '@nodeguy/channel';

import { ChainTransaction } from '../../../protocols/interfaces';

// ChainEvent dictates which methods all chain events must implement
Expand All @@ -9,7 +10,7 @@ export interface ChainEvent {

// TODO: Add eth chainservice implementation
export interface ChainService {
eventFeed (): GoReceivingChannelPlaceholder<ChainEvent>;
eventFeed (): ReadChannel<ChainEvent>;

// TODO: Use protocols chain transaction type
// TODO: Can throw an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import {
AddressLike, Log, TransactionLike, ethers,
} from 'ethers';
import debug from 'debug';
import type { ReadChannel, ReadWriteChannel } from '@nodeguy/channel';

import { NitroAdjudicator } from './adjudicator/nitro-adjudicator';
import { ChainService, ChainEvent } from './chainservice';
import { GoChannelPlaceholder, GoReceivingChannelPlaceholder } from '../../../go-channel';
import { ChainTransaction } from '../../../protocols/interfaces';

interface EthChain {
Expand Down Expand Up @@ -34,7 +34,7 @@ export class EthChainService implements ChainService {

private txSigner: TransactionLike;

private out: GoChannelPlaceholder<Event>;
private out: ReadWriteChannel<ChainEvent>;

private logger: debug.Debugger;

Expand All @@ -49,7 +49,7 @@ export class EthChainService implements ChainService {
consensusAppAddress: AddressLike,
virtualPaymentAppAddress: AddressLike,
txSigner: TransactionLike,
out: GoChannelPlaceholder<Event>,
out: ReadWriteChannel<ChainEvent>,
logger: debug.Debugger,
ctx: AbortController,
cancel: () => void,
Expand Down Expand Up @@ -104,8 +104,8 @@ export class EthChainService implements ChainService {

// eventFeed returns the out chan, and narrows the type so that external consumers may only receive on it.
// TODO: Implement
eventFeed(): GoReceivingChannelPlaceholder<ChainEvent> {
return new GoReceivingChannelPlaceholder<ChainEvent>();
eventFeed(): ReadChannel<ChainEvent> {
return this.out.readOnly();
}

// TODO: Implement
Expand Down
37 changes: 18 additions & 19 deletions packages/nitro-client/src/client/engine/engine.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { AddressLike, ethers } from 'ethers';
import { GoChannelPlaceholder, GoReceivingChannelPlaceholder } from '../../go-channel';
import createChannel from '@nodeguy/channel';
import type { ReadChannel, ReadWriteChannel } from '@nodeguy/channel';

import { MessageService } from './messageservice/messageservice';
import { ChainService, ChainEvent } from './chainservice/chainservice';
import { Store } from './store/store';
Expand All @@ -20,19 +22,19 @@ export type PaymentRequest = {
export class EngineEvent {}

export class Engine {
objectiveRequestsFromAPI: GoChannelPlaceholder<ObjectiveRequest>;
objectiveRequestsFromAPI: ReadWriteChannel<ObjectiveRequest>;

paymentRequestsFromAPI: GoChannelPlaceholder<PaymentRequest>;
paymentRequestsFromAPI: ReadWriteChannel<PaymentRequest>;

private fromChain: GoReceivingChannelPlaceholder<ChainEvent>;
private fromChain: ReadChannel<ChainEvent>;

private fromMsg: GoReceivingChannelPlaceholder<Message>;
private fromMsg: ReadChannel<Message>;

private fromLedger: GoChannelPlaceholder<Proposal>;
private fromLedger: ReadWriteChannel<Proposal>;

private _toApi: GoChannelPlaceholder<EngineEvent>;
private _toApi: ReadWriteChannel<EngineEvent>;

private stop: GoChannelPlaceholder<void>;
private stop: ReadWriteChannel<void>;

private msg: MessageService;

Expand All @@ -59,22 +61,19 @@ export class Engine {
) {
this.store = store;

// TODO: Use buffered channel
this.fromLedger = new GoChannelPlaceholder<Proposal>();

// bind to inbound chans
this.objectiveRequestsFromAPI = new GoReceivingChannelPlaceholder<ObjectiveRequest>();
this.paymentRequestsFromAPI = new GoReceivingChannelPlaceholder<PaymentRequest>();
this.stop = new GoChannelPlaceholder();
this.fromLedger = createChannel<Proposal>(100);
// bind to inbound channels
this.objectiveRequestsFromAPI = createChannel<ObjectiveRequest>();
this.paymentRequestsFromAPI = createChannel<PaymentRequest>();
this.stop = createChannel();

this.fromChain = chain.eventFeed();
this.fromMsg = msg.out();

this.chain = chain;
this.msg = msg;

// TODO: Use buffered channel
this._toApi = new GoChannelPlaceholder<EngineEvent>();
this._toApi = createChannel<EngineEvent>(100);

// logging.ConfigureZeroLogger()
// e.logger = zerolog.New(logDestination).With().Timestamp().Str("engine", e.store.GetAddress().String()[0:8]).Caller().Logger()
Expand All @@ -91,8 +90,8 @@ export class Engine {
// e.metrics = NewMetricsRecorder(*e.store.GetAddress(), metricsApi)
}

toApi(): GoChannelPlaceholder<EngineEvent> {
return this._toApi;
get toApi(): ReadChannel<EngineEvent> {
return this._toApi.readOnly();
}

// TODO: Can throw an error
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { GoReceivingChannelPlaceholder } from '../../../go-channel';
import { ReadChannel } from '@nodeguy/channel';

import { Message } from '../../../protocols/messages';

// TODO: Add p2p implementation
Expand All @@ -7,7 +8,7 @@ export interface MessageService {
// TODO: Update comments

// Out returns a chan for receiving messages from the message service
out (): GoReceivingChannelPlaceholder<Message>;
out (): ReadChannel<Message>;

// Send is for sending messages with the message service
// TODO: Use protocols message type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import debug from 'debug';
// https://github.com/microsoft/TypeScript/issues/49721
// @ts-expect-error
import type { Libp2p } from 'libp2p';
import type { ReadChannel, ReadWriteChannel } from '@nodeguy/channel';

// @ts-expect-error
import type { PrivateKey } from '@libp2p/crypto';
Expand All @@ -14,7 +15,6 @@ import type { Stream } from '@libp2p/interface-connection';
// @ts-expect-error
import type { Multiaddr } from '@multiformats/multiaddr';

import { GoChannelPlaceholder, GoReceivingChannelPlaceholder } from '../../../../go-channel';
import { SyncMap } from '../../../../internal/safesync/safesync';
import { Message } from '../../../../protocols/messages';
import { Address } from '../../../../types/types';
Expand All @@ -36,7 +36,7 @@ interface PeerInfo {
// P2PMessageService is a rudimentary message service that uses TCP to send and receive messages.
export class P2PMessageService {
// For forwarding processed messages to the engine
private toEngine: GoChannelPlaceholder<Message>;
private toEngine: ReadWriteChannel<Message>;

private peers: SyncMap<BasicPeerInfo>;

Expand All @@ -48,18 +48,18 @@ export class P2PMessageService {

private mdns: (components: MulticastDNSComponents) => PeerDiscovery;

private newPeerInfo: GoChannelPlaceholder<BasicPeerInfo>;
private newPeerInfo: ReadWriteChannel<BasicPeerInfo>;

private logger: debug.Debugger;

constructor(
toEngine: GoChannelPlaceholder<Message>,
toEngine: ReadWriteChannel<Message>,
peers: SyncMap<BasicPeerInfo>,
me: Address,
key: PrivateKey,
p2pHost: Libp2p,
mdns: (components: MulticastDNSComponents) => PeerDiscovery,
newPeerInfo: GoChannelPlaceholder<BasicPeerInfo>,
newPeerInfo: ReadWriteChannel<BasicPeerInfo>,
logger: debug.Debugger,
) {
this.toEngine = toEngine;
Expand Down Expand Up @@ -117,15 +117,15 @@ export class P2PMessageService {

// out returns a channel that can be used to receive messages from the message service
// TODO: Implement and remove void
out(): GoReceivingChannelPlaceholder<Message> | void {}
out(): ReadChannel<Message> | void {}

// Closes the P2PMessageService
// TODO: Implement and remove void
close(): Error | void {}

// peerInfoReceived returns a channel that receives a PeerInfo when a peer is discovered
// TODO: Implement and remove void
peerInfoReceived(): GoReceivingChannelPlaceholder<BasicPeerInfo> | void {}
peerInfoReceived(): ReadChannel<BasicPeerInfo> | void {}

// AddPeers adds the peers to the message service.
// We ignore peers that are ourselves.
Expand Down
6 changes: 0 additions & 6 deletions packages/nitro-client/src/go-channel.ts

This file was deleted.

5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2141,6 +2141,11 @@
resolved "https://registry.yarnpkg.com/@noble/secp256k1/-/secp256k1-1.7.1.tgz#b251c70f824ce3ca7f8dc3df08d58f005cc0507c"
integrity sha512-hOUk6AyBFmqVrv7k5WAw/LpszxVbj9gGN4JRkIX52fdFAj1UA61KXmZDvqVEm+pOyec3+fIeZB02LYa/pWOArw==

"@nodeguy/channel@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@nodeguy/channel/-/channel-1.0.2.tgz#def667bf81a8d3472310e5cbb191dde81e64a063"
integrity sha512-ljUGxFKwMy1mxo+Y8tGBzanRGHQTK2qnL2hReugCTdJlGm6Ew/I4Ujz1wAztk/2XCgvt1AoiqziQ5sETYOsGyA==

"@nodelib/fs.scandir@2.1.5":
version "2.1.5"
resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz#7619c2eb21b25483f6d167548b4cfd5a7488c3d5"
Expand Down

0 comments on commit e9776ff

Please sign in to comment.