Skip to content
This repository has been archived by the owner on Nov 23, 2022. It is now read-only.

Commit

Permalink
feat(relay, lib): add stack to relay and store/shard improvements (#86)
Browse files Browse the repository at this point in the history
* feat(relay, lib): add stack to relay and store/shard improvements

#26; #64

* chore(relay): move wrtc to dependencies
  • Loading branch information
0x77dev authored Mar 26, 2022
1 parent 0271aa5 commit a38c875
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 37 deletions.
5 changes: 4 additions & 1 deletion packages/explorer/src/main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export const App: React.FC = () => {
let stop = () => {}

const run = async () => {
const stack = await Stack.create({ namespace })
const stack = await Stack.create({
namespace,
relay: localStorage['relay']
})
window.stack = stack
window.Shard = Shard
window.ShardKind = ShardKind
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
"license": "GPL-3.0",
"name": "@dstack-js/ipfs",
"repository": "https://github.com/dstack-js/dstack.git",
"type": "module",
"type": "commonjs",
"version": "0.2.47"
}
7 changes: 6 additions & 1 deletion packages/ipfs/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export const bootstrap = async (
const query = gql`
query Bootstrap($protocol: Protocol!, $hostname: String!, $port: Int!) {
listen(protocol: $protocol, hostname: $hostname, port: $port)
peers(randomize: true)
peers(
randomize: true
protocol: $protocol
hostname: $hostname
port: $port
)
}
`

Expand Down
9 changes: 7 additions & 2 deletions packages/ipfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { bootstrap } from './bootstrap'
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import WebRTCStar from '@dstack-js/transport'
import type { IPFSOptions } from 'ipfs-core/src/components/storage'

export interface Options {
namespace: string;
relay?: string;
wrtc?: any;
privateKey?: string;
repo?: IPFSOptions['repo'];
}

export { CID, PeerId }
Expand All @@ -17,12 +19,14 @@ export const create = async ({
namespace,
relay,
wrtc,
privateKey
privateKey,
repo
}: Options): Promise<IPFS> => {
const { listen, peers } = await bootstrap(namespace, relay)

return IPFSCreate({
init: { privateKey },
repo,
config: {
Discovery: {
webRTCStar: { Enabled: true }
Expand Down Expand Up @@ -57,7 +61,8 @@ export const create = async ({
relay: {
enabled: true,
hop: {
enabled: true
enabled: true,
active: true
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"module": "ESNext",
"module": "CommonJS",
"forceConsistentCasingInFileNames": true,
"strict": true,
"noImplicitOverride": true,
Expand Down
37 changes: 27 additions & 10 deletions packages/lib/src/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import all from 'it-all'
import type { CID } from 'ipfs-core'
import { PeerUnreachableError, Store } from '.'
import { PubSub } from './pubsub'
import { Storage } from './storage'
import { create } from '@dstack-js/ipfs'
import { InMemoryStorage, Storage } from './storage'
import { create, Options } from '@dstack-js/ipfs'

export interface Peer {
id: string;
Expand Down Expand Up @@ -33,19 +33,29 @@ export interface StackOptions {
*
* No need to provide it unless you want to use DStack in non browser environment
*/
wrtc?: any;
wrtc?: Options['wrtc'];
/**
* Relay GraphQL Endpoint
*
* Defaults to DStack Cloud
*/
relay?: string;
relay?: Options['relay'];
/**
* Storage implementation
*
* No need to provide it unless you want custom storage implementation to be used
*/
storage?: Storage;
/**
* A path to store IPFS repo
*
* No need to provide it unless you to create a more than one Stack instance
*/
repo?: Options['repo'];
/**
* Preload shard on store replication
*/
loadOnReplicate?: boolean;
}

export class Stack {
Expand All @@ -55,9 +65,14 @@ export class Stack {
private announceInterval?: ReturnType<typeof setTimeout>
public announce = true

constructor(public namespace: CID, public ipfs: IPFS, storage: Storage) {
private constructor(
public namespace: CID,
public ipfs: IPFS,
storage: Storage,
loadOnReplicate?: boolean
) {
this.pubsub = new PubSub(ipfs, namespace.toString())
this.store = new Store(this, storage)
this.store = new Store(this, storage, loadOnReplicate)
}

/**
Expand Down Expand Up @@ -100,16 +115,18 @@ export class Stack {
ipfs,
storage,
relay,
wrtc
wrtc,
repo,
loadOnReplicate
}: StackOptions) {
if (!ipfs) {
ipfs = await create({ namespace, relay, wrtc })
ipfs = await create({ namespace, relay, wrtc, repo })
}

const cid = await ipfs.dag.put({ namespace })
storage = storage || new Storage(namespace)
storage = storage || new InMemoryStorage(namespace)

const stack = new Stack(cid, ipfs, storage)
const stack = new Stack(cid, ipfs, storage, loadOnReplicate)
await stack.start()

return stack
Expand Down
12 changes: 11 additions & 1 deletion packages/lib/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
export class Storage<T = { value: string; date: Date }> {
/* eslint-disable @typescript-eslint/no-misused-new */
export interface Storage<T = { value: string; date: Date }> {
namespace: string;

keys(): Promise<string[]>;
get(key: string): Promise<T | null>;
set(key: string, value: T): Promise<void>;
}

export class InMemoryStorage<T = { value: string; date: Date }>
implements Storage<T> {
private data: { [key: string]: T } = {}

constructor(public namespace: string) {}
Expand Down
22 changes: 18 additions & 4 deletions packages/lib/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ export type StackMessage = ReplicateMessage | ReplicateRequestMessage;

export class Store {
private pubsub: PubSub<StackMessage>
private storage: Storage

constructor(private stack: Stack, storage: Storage) {
constructor(
private stack: Stack,
private storage: Storage,
private loadOnReplicate: boolean = false
) {
this.pubsub = stack.pubsub.create('$$store')
this.storage = storage
}

private async onReplicate(msg: Message<StackMessage>): Promise<void> {
Expand All @@ -37,10 +39,22 @@ export class Store {
value: msg.data.value,
date
})

if (this.loadOnReplicate && msg.data.value.startsWith('/shard/')) {
Shard.from(this.stack, msg.data.value)
.then((shard) => {
console.debug('shard replicated', shard)
})
.catch((err) => {
console.warn('Failed to load on replicate', msg.data, err)
})
}
}

private watchShard(key: string, watch: Shard) {
watch.on('update', async (shard): Promise<void> => {
console.log(shard.toString())

this.storage.set(key, {
value: shard.toString(),
date: new Date()
Expand Down Expand Up @@ -93,7 +107,7 @@ export class Store {
await this.pubsub.subscribe('replicateRequest', async (msg) => {
if (msg.data.kind !== 'replicateRequest') return
const keys = await this.storage.keys()
await Promise.all(keys.map(this.replicate))
await Promise.all(keys.map((key) => this.replicate(key)))
})

await this.pubsub.publish('replicateRequest', { kind: 'replicateRequest' })
Expand Down
4 changes: 4 additions & 0 deletions packages/relay/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
"dstack-relay": "src/index.js"
},
"dependencies": {
"@dstack-js/lib": "latest",
"@dstack-js/wrtc": "^0.4.8",
"@libp2p/webrtc-star-protocol": "1.0.1",
"fastify": "3.27.4",
"fastify-cors": "6.0.3",
"fastify-socket.io": "3.0.0",
"graphql": "16.3.0",
"graphql-playground-html": "1.6.30",
"lru-cache": "^7.7.1",
"mercurius": "9.3.6",
"nexus": "1.3.0",
"prom-client": "14.0.1",
"redis": "4.0.4",
"socket.io": "4.4.1"
},
"devDependencies": {
"@types/lru-cache": "^7.6.0",
"ts-node": "10.7.0",
"tsconfig-paths": "3.14.1"
},
Expand Down
11 changes: 8 additions & 3 deletions packages/relay/src/services/signaling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
joinsFailureTotal,
joinsTotal
} from '../metrics'
import { getStack } from '../stack'

const handle = (socket: WebRTCStarSocket, cachePrefix: string) => {
console.log('signaling', 'handle', socket.id)
Expand Down Expand Up @@ -104,9 +105,13 @@ export const setSocket = async (server: FastifyInstance) => {

await server.ready()
server.io.on('connection', (socket) => {
const cachePrefix = socket.handshake.auth['namespace']
? socket.handshake.auth['namespace']
: ''
let cachePrefix = ''

if (socket.handshake.auth['namespace']) {
cachePrefix = socket.handshake.auth['namespace']

getStack(socket.handshake.auth['namespace']).catch()
}

// @ts-expect-error: incompatible types
return handle(socket, cachePrefix)
Expand Down
88 changes: 88 additions & 0 deletions packages/relay/src/services/stack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { join } from 'path'
import { Stack, Storage } from '@dstack-js/lib'
// @ts-expect-error: no-types
import wrtc from '@dstack-js/wrtc'
import LRUCache from 'lru-cache'
import { tmpdir } from 'os'
import { redis } from './cache'

export class RedisStorage<T = { value: string; date: Date }>
implements Storage<T> {
private prefix: string

constructor(public namespace: string) {
this.prefix = `s!${this.namespace}#`
}

public async set(key: string, value: T): Promise<void> {
await redis.set(`${this.prefix}${key}`, JSON.stringify(value))
redis.expire(`${this.prefix}${key}`, 3600)
}

public async get(key: string): Promise<T | null> {
const data = await redis.get(key)
if (!data) return null

return JSON.parse(data)
}

public async keys(): Promise<string[]> {
const keys = await redis.keys(`${this.prefix}*`)
return keys.map((key) => key.replace(this.prefix, ''))
}
}

const cache = new LRUCache<string, 'allocating' | Stack>({
ttl: 60000,
ttlAutopurge: true,
updateAgeOnGet: true,
dispose: async (stack, namespace) => {
if (stack === 'allocating') return

await stack
.stop()
.then(() => {
console.log(namespace, 'stack deallocated')
})
.catch((error) => {
console.warn(
'error happened while deallocation stack in',
namespace,
error
)
})
}
})

export const getStack = async (namespace: string) => {
let stack = cache.get(namespace)

if (!stack) {
try {
cache.set(namespace, 'allocating')
stack = await Stack.create({
namespace,
wrtc,
relay: `http://127.0.0.1:${process.env['PORT'] || 13579}/graphql`,
repo: join(tmpdir(), '.dstack', namespace),
storage: new RedisStorage(namespace)
})

cache.set(namespace, stack)
console.log(namespace, 'stack allocated')

return stack
} catch (error) {
console.warn(
'error happened while allocation stack in',
namespace,
error
)
throw error
}
}

if (stack === 'allocating') return null

return stack
}
17 changes: 14 additions & 3 deletions packages/relay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,21 @@ const peers = queryField('peers', {
description:
'Get addresses to bootstrap for IPFS, use `randomize` argument to get 3 random peers',
args: {
randomize: booleanArg()
randomize: booleanArg(),
protocol: Protocol.asArg(),
hostname: stringArg(),
port: intArg()
},
async resolve(_, { randomize }, { namespace }) {
const peers = await getPeers(namespace)
async resolve(_, { randomize, protocol, hostname, port }, { namespace }) {
let peers = await getPeers(namespace || '')

if (protocol && hostname && port) {
peers = peers.map((peer) => {
return getListenAddress(protocol, hostname, port).concat(
peer.split('/').slice(-2).join('/')
)
})
}

if (randomize) {
return peers
Expand Down
Loading

0 comments on commit a38c875

Please sign in to comment.