Skip to content

Commit

Permalink
feat(core): Group interface to represent entire network or sub-sets o…
Browse files Browse the repository at this point in the history
…f it
  • Loading branch information
aholstenson committed Jun 24, 2021
1 parent 36d6e66 commit 061bd6b
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 21 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/commands/inspect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const handler = async (args: any) => {
log(chalk.red(' Unavailable '), node.id);
});

logInfo('Joining', chalk.magenta(net.networkName), 'as', chalk.blue(net.networkId));
logInfo('Joining', chalk.magenta(net.name), 'as', chalk.blue(net.networkId));

await net.join();

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const handler = async (args: any) => {
log(indent(2, message.data));
});

logInfo('Joining', chalk.magenta(net.networkName), 'as', chalk.blue(net.networkId));
logInfo('Joining', chalk.magenta(net.name), 'as', chalk.blue(net.networkId));

await net.join();

Expand Down
77 changes: 77 additions & 0 deletions packages/core/src/Group.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Subscribable } from 'atvik';

import { MessageData } from './MessageData';
import { MessageType } from './MessageType';
import { MessageUnion } from './MessageUnion';
import { Node } from './Node';

/**
* Groups are a collection of nodes that can be reached in some way. This may
* either be a full network or a sub-set of one. Groups will emit events when
* membership changes via {@link onNodeAvailable} and {@link onNodeUnavailable}.
*
* Messages sent to nodes in the group can be listened to via {@link onMessage}
* and broadcasts can be sent via {@link broadcast}.
*
* ```javascript
* const group = ...;
*
* // Listen to messages
* group.onMessage(message => ...);
*
* // Join the group
* await group.join();
*
* // Leave the group
* await group.leave();
* ```
*/
export interface Group<MessageTypes extends object = any> {
/**
* Name of this group, useful for debugging purposes.
*/
readonly name: string;

/**
* Event emitted when a new node joins this exchange.
*/
readonly onNodeAvailable: Subscribable<this, [ node: Node<MessageTypes> ]>;

/**
* Event emitted when a node leaves this exchange.
*/
readonly onNodeUnavailable: Subscribable<this, [ node: Node<MessageTypes> ]>;

/**
* Event emitted when a message is received on this exchange.
*/
readonly onMessage: Subscribable<this, [ message: MessageUnion<MessageTypes> ]>;

/**
* Get all of the nodes that are part of this group.
*/
readonly nodes: ReadonlyArray<Node>;

/**
* Broadcast a message to all nodes that have joined this exchange.
*
* @param type -
* the type of message to send
* @param data -
* the data to send
*/
broadcast<T extends MessageType<MessageTypes>>(
type: T,
data: MessageData<MessageTypes, T>
): Promise<void>;

/**
* Join this exchange.
*/
join(): Promise<void>;

/**
* Leave this exchange.
*/
leave(): Promise<void>;
}
24 changes: 11 additions & 13 deletions packages/core/src/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Transport, generateId, encodeId } from 'ataraxia-transport';

import { Exchange } from './exchange/Exchange';
import { Exchanges } from './exchange/Exchanges';
import { Group } from './Group';
import { MessageData } from './MessageData';
import { MessageType } from './MessageType';
import { MessageUnion } from './MessageUnion';
Expand Down Expand Up @@ -148,7 +149,7 @@ export interface NetworkOptions {
* });
* ```
*/
export class Network<MessageTypes extends object = any> {
export class Network<MessageTypes extends object = any> implements Group<MessageTypes> {
/**
* Debugger for log messages.
*/
Expand All @@ -162,7 +163,7 @@ export class Network<MessageTypes extends object = any> {
/**
* The name of the network.
*/
public readonly networkName: string;
public readonly name: string;

/**
* If this node is connecting to the network as an endpoint.
Expand Down Expand Up @@ -226,7 +227,7 @@ export class Network<MessageTypes extends object = any> {
this.#debug = debug(debugNamespace);

this.networkIdBinary = generateId();
this.networkName = options.name;
this.name = options.name;
this.endpoint = options.endpoint || false;

this.#transports = [];
Expand Down Expand Up @@ -345,7 +346,7 @@ export class Network<MessageTypes extends object = any> {
if(this.#active) {
transport.start({
networkId: this.networkIdBinary,
networkName: this.networkName,
networkName: this.name,
endpoint: this.endpoint,
debugNamespace: this.#debug.namespace
})
Expand All @@ -362,14 +363,14 @@ export class Network<MessageTypes extends object = any> {
* promise that resolves when the network is started, the value will
* represent if the network was actually started or not.
*/
public async join(): Promise<boolean> {
if(this.#active) return false;
public async join(): Promise<void> {
if(this.#active) return;

this.#debug('About to join network as ' + this.networkId);

const options = {
networkId: this.networkIdBinary,
networkName: this.networkName,
networkName: this.name,
endpoint: this.endpoint,
debugNamespace: this.#debug.namespace
};
Expand All @@ -382,7 +383,6 @@ export class Network<MessageTypes extends object = any> {
// Start all the transports
try {
await Promise.all(this.#transports.map(t => t.start(options)));
return true;
} catch(err) {
// Stop the topology if an error occurs
await this.#topology.stop();
Expand All @@ -397,19 +397,17 @@ export class Network<MessageTypes extends object = any> {
* Leave the currently joined network.
*
* @returns
* promise that resolves when the network is stopped, the value will
* represent if the network was actually stopper or not.
* promise that resolves when the network is stopped
*/
public async leave(): Promise<boolean> {
if(! this.#active) return false;
public async leave(): Promise<void> {
if(! this.#active) return;

// Stop the topology
await this.#topology.stop();

// Stop all the transports
await Promise.all(this.#transports.map(t => t.stop()));
this.#active = false;
return true;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/exchange/Exchanges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class Exchanges {

this.exchanges = new Map();

this.debug = debug('ataraxia:' + net.networkName + ':exchanges');
this.debug = debug('ataraxia:' + net.name + ':exchanges');

this.net.onMessage(this.handleMessage.bind(this));
this.net.onNodeAvailable(this.handleNodeAvailable.bind(this));
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/exchange/SharedExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class SharedExchange {

this.nodes = new Map();

this.debug = debug('ataraxia:' + net.networkName + ':exchange:' + id);
this.debug = debug('ataraxia:' + net.name + ':exchange:' + id);

this.instances = new Set();
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/topology/Topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class Topology {
* @param options -
* options to apply
*/
public constructor(parent: Pick<Network, 'networkName' | 'networkIdBinary'>, options: TopologyOptions) {
public constructor(parent: Pick<Network, 'name' | 'networkIdBinary'>, options: TopologyOptions) {
this.endpoint = options.endpoint || false;
this.broadcastTimeout = null;

Expand All @@ -101,7 +101,7 @@ export class Topology {
this.unavailableEvent = new Event(this);
this.dataEvent = new Event(this);

this.debug = debug('ataraxia:' + parent.networkName + ':topology');
this.debug = debug('ataraxia:' + parent.name + ':topology');

this.self = new TopologyNode(this, parent.networkIdBinary);
this.self.direct = true;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/test/topology/TopologyTester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class TopologyTester {

topology: new Topology({
networkIdBinary: generatedId,
networkName: id
name: id
}, {})
};

Expand Down
2 changes: 1 addition & 1 deletion packages/services/src/Services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export class Services {
private readonly calls: RequestReplyHelper<any>;

public constructor(network: Network) {
this.debug = debug('ataraxia:' + network.networkName + ':services');
this.debug = debug('ataraxia:' + network.name + ':services');
this.nodes = new Map();

this.serviceAvailableEvent = new Event(this);
Expand Down

0 comments on commit 061bd6b

Please sign in to comment.