Skip to content
This repository has been archived by the owner on Nov 23, 2022. It is now read-only.

feat(relay, lib): add stack to relay and store/shard improvements #86

Merged
merged 2 commits into from
Mar 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/explorer/src/main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ export const App: React.FC = () => {
let stop = () => {}

const run = async () => {
const stack = await Stack.create({ namespace })
const stack = await Stack.create({
namespace,
relay: localStorage['relay']
})
window.stack = stack
window.Shard = Shard
window.ShardKind = ShardKind
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
"license": "GPL-3.0",
"name": "@dstack-js/ipfs",
"repository": "https://github.com/dstack-js/dstack.git",
"type": "module",
"type": "commonjs",
"version": "0.2.47"
}
7 changes: 6 additions & 1 deletion packages/ipfs/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export const bootstrap = async (
const query = gql`
query Bootstrap($protocol: Protocol!, $hostname: String!, $port: Int!) {
listen(protocol: $protocol, hostname: $hostname, port: $port)
peers(randomize: true)
peers(
randomize: true
protocol: $protocol
hostname: $hostname
port: $port
)
}
`

Expand Down
9 changes: 7 additions & 2 deletions packages/ipfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { bootstrap } from './bootstrap'
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import WebRTCStar from '@dstack-js/transport'
import type { IPFSOptions } from 'ipfs-core/src/components/storage'

export interface Options {
namespace: string;
relay?: string;
wrtc?: any;
privateKey?: string;
repo?: IPFSOptions['repo'];
}

export { CID, PeerId }
Expand All @@ -17,12 +19,14 @@ export const create = async ({
namespace,
relay,
wrtc,
privateKey
privateKey,
repo
}: Options): Promise<IPFS> => {
const { listen, peers } = await bootstrap(namespace, relay)

return IPFSCreate({
init: { privateKey },
repo,
config: {
Discovery: {
webRTCStar: { Enabled: true }
Expand Down Expand Up @@ -57,7 +61,8 @@ export const create = async ({
relay: {
enabled: true,
hop: {
enabled: true
enabled: true,
active: true
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"module": "ESNext",
"module": "CommonJS",
"forceConsistentCasingInFileNames": true,
"strict": true,
"noImplicitOverride": true,
Expand Down
37 changes: 27 additions & 10 deletions packages/lib/src/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import all from 'it-all'
import type { CID } from 'ipfs-core'
import { PeerUnreachableError, Store } from '.'
import { PubSub } from './pubsub'
import { Storage } from './storage'
import { create } from '@dstack-js/ipfs'
import { InMemoryStorage, Storage } from './storage'
import { create, Options } from '@dstack-js/ipfs'

export interface Peer {
id: string;
Expand Down Expand Up @@ -33,19 +33,29 @@ export interface StackOptions {
*
* No need to provide it unless you want to use DStack in non browser environment
*/
wrtc?: any;
wrtc?: Options['wrtc'];
/**
* Relay GraphQL Endpoint
*
* Defaults to DStack Cloud
*/
relay?: string;
relay?: Options['relay'];
/**
* Storage implementation
*
* No need to provide it unless you want custom storage implementation to be used
*/
storage?: Storage;
/**
* A path to store IPFS repo
*
* No need to provide it unless you to create a more than one Stack instance
*/
repo?: Options['repo'];
/**
* Preload shard on store replication
*/
loadOnReplicate?: boolean;
}

export class Stack {
Expand All @@ -55,9 +65,14 @@ export class Stack {
private announceInterval?: ReturnType<typeof setTimeout>
public announce = true

constructor(public namespace: CID, public ipfs: IPFS, storage: Storage) {
private constructor(
public namespace: CID,
public ipfs: IPFS,
storage: Storage,
loadOnReplicate?: boolean
) {
this.pubsub = new PubSub(ipfs, namespace.toString())
this.store = new Store(this, storage)
this.store = new Store(this, storage, loadOnReplicate)
}

/**
Expand Down Expand Up @@ -100,16 +115,18 @@ export class Stack {
ipfs,
storage,
relay,
wrtc
wrtc,
repo,
loadOnReplicate
}: StackOptions) {
if (!ipfs) {
ipfs = await create({ namespace, relay, wrtc })
ipfs = await create({ namespace, relay, wrtc, repo })
}

const cid = await ipfs.dag.put({ namespace })
storage = storage || new Storage(namespace)
storage = storage || new InMemoryStorage(namespace)

const stack = new Stack(cid, ipfs, storage)
const stack = new Stack(cid, ipfs, storage, loadOnReplicate)
await stack.start()

return stack
Expand Down
12 changes: 11 additions & 1 deletion packages/lib/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
export class Storage<T = { value: string; date: Date }> {
/* eslint-disable @typescript-eslint/no-misused-new */
export interface Storage<T = { value: string; date: Date }> {
namespace: string;

keys(): Promise<string[]>;
get(key: string): Promise<T | null>;
set(key: string, value: T): Promise<void>;
}

export class InMemoryStorage<T = { value: string; date: Date }>
implements Storage<T> {
private data: { [key: string]: T } = {}

constructor(public namespace: string) {}
Expand Down
22 changes: 18 additions & 4 deletions packages/lib/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ export type StackMessage = ReplicateMessage | ReplicateRequestMessage;

export class Store {
private pubsub: PubSub<StackMessage>
private storage: Storage

constructor(private stack: Stack, storage: Storage) {
constructor(
private stack: Stack,
private storage: Storage,
private loadOnReplicate: boolean = false
) {
this.pubsub = stack.pubsub.create('$$store')
this.storage = storage
}

private async onReplicate(msg: Message<StackMessage>): Promise<void> {
Expand All @@ -37,10 +39,22 @@ export class Store {
value: msg.data.value,
date
})

if (this.loadOnReplicate && msg.data.value.startsWith('/shard/')) {
Shard.from(this.stack, msg.data.value)
.then((shard) => {
console.debug('shard replicated', shard)
})
.catch((err) => {
console.warn('Failed to load on replicate', msg.data, err)
})
}
}

private watchShard(key: string, watch: Shard) {
watch.on('update', async (shard): Promise<void> => {
console.log(shard.toString())

this.storage.set(key, {
value: shard.toString(),
date: new Date()
Expand Down Expand Up @@ -93,7 +107,7 @@ export class Store {
await this.pubsub.subscribe('replicateRequest', async (msg) => {
if (msg.data.kind !== 'replicateRequest') return
const keys = await this.storage.keys()
await Promise.all(keys.map(this.replicate))
await Promise.all(keys.map((key) => this.replicate(key)))
})

await this.pubsub.publish('replicateRequest', { kind: 'replicateRequest' })
Expand Down
4 changes: 4 additions & 0 deletions packages/relay/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
"dstack-relay": "src/index.js"
},
"dependencies": {
"@dstack-js/lib": "latest",
"@dstack-js/wrtc": "^0.4.8",
"@libp2p/webrtc-star-protocol": "1.0.1",
"fastify": "3.27.4",
"fastify-cors": "6.0.3",
"fastify-socket.io": "3.0.0",
"graphql": "16.3.0",
"graphql-playground-html": "1.6.30",
"lru-cache": "^7.7.1",
"mercurius": "9.3.6",
"nexus": "1.3.0",
"prom-client": "14.0.1",
"redis": "4.0.4",
"socket.io": "4.4.1"
},
"devDependencies": {
"@types/lru-cache": "^7.6.0",
"ts-node": "10.7.0",
"tsconfig-paths": "3.14.1"
},
Expand Down
11 changes: 8 additions & 3 deletions packages/relay/src/services/signaling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
joinsFailureTotal,
joinsTotal
} from '../metrics'
import { getStack } from '../stack'

const handle = (socket: WebRTCStarSocket, cachePrefix: string) => {
console.log('signaling', 'handle', socket.id)
Expand Down Expand Up @@ -104,9 +105,13 @@ export const setSocket = async (server: FastifyInstance) => {

await server.ready()
server.io.on('connection', (socket) => {
const cachePrefix = socket.handshake.auth['namespace']
? socket.handshake.auth['namespace']
: ''
let cachePrefix = ''

if (socket.handshake.auth['namespace']) {
cachePrefix = socket.handshake.auth['namespace']

getStack(socket.handshake.auth['namespace']).catch()
}

// @ts-expect-error: incompatible types
return handle(socket, cachePrefix)
Expand Down
88 changes: 88 additions & 0 deletions packages/relay/src/services/stack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { join } from 'path'
import { Stack, Storage } from '@dstack-js/lib'
// @ts-expect-error: no-types
import wrtc from '@dstack-js/wrtc'
import LRUCache from 'lru-cache'
import { tmpdir } from 'os'
import { redis } from './cache'

export class RedisStorage<T = { value: string; date: Date }>
implements Storage<T> {
private prefix: string

constructor(public namespace: string) {
this.prefix = `s!${this.namespace}#`
}

public async set(key: string, value: T): Promise<void> {
await redis.set(`${this.prefix}${key}`, JSON.stringify(value))
redis.expire(`${this.prefix}${key}`, 3600)
}

public async get(key: string): Promise<T | null> {
const data = await redis.get(key)
if (!data) return null

return JSON.parse(data)
}

public async keys(): Promise<string[]> {
const keys = await redis.keys(`${this.prefix}*`)
return keys.map((key) => key.replace(this.prefix, ''))
}
}

const cache = new LRUCache<string, 'allocating' | Stack>({
ttl: 60000,
ttlAutopurge: true,
updateAgeOnGet: true,
dispose: async (stack, namespace) => {
if (stack === 'allocating') return

await stack
.stop()
.then(() => {
console.log(namespace, 'stack deallocated')
})
.catch((error) => {
console.warn(
'error happened while deallocation stack in',
namespace,
error
)
})
}
})

export const getStack = async (namespace: string) => {
let stack = cache.get(namespace)

if (!stack) {
try {
cache.set(namespace, 'allocating')
stack = await Stack.create({
namespace,
wrtc,
relay: `http://127.0.0.1:${process.env['PORT'] || 13579}/graphql`,
repo: join(tmpdir(), '.dstack', namespace),
storage: new RedisStorage(namespace)
})

cache.set(namespace, stack)
console.log(namespace, 'stack allocated')

return stack
} catch (error) {
console.warn(
'error happened while allocation stack in',
namespace,
error
)
throw error
}
}

if (stack === 'allocating') return null

return stack
}
17 changes: 14 additions & 3 deletions packages/relay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,21 @@ const peers = queryField('peers', {
description:
'Get addresses to bootstrap for IPFS, use `randomize` argument to get 3 random peers',
args: {
randomize: booleanArg()
randomize: booleanArg(),
protocol: Protocol.asArg(),
hostname: stringArg(),
port: intArg()
},
async resolve(_, { randomize }, { namespace }) {
const peers = await getPeers(namespace)
async resolve(_, { randomize, protocol, hostname, port }, { namespace }) {
let peers = await getPeers(namespace || '')

if (protocol && hostname && port) {
peers = peers.map((peer) => {
return getListenAddress(protocol, hostname, port).concat(
peer.split('/').slice(-2).join('/')
)
})
}

if (randomize) {
return peers
Expand Down
Loading