Skip to content
This repository has been archived by the owner on Nov 23, 2022. It is now read-only.

Commit

Permalink
feat(lib): improve store data fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
0x77dev committed Jan 27, 2022
1 parent b8a320f commit e3a56c6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 45 deletions.
20 changes: 20 additions & 0 deletions packages/lib/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ export class PubSub<T = unknown> {
return JSON.parse(data)
}

public async topics(ignoreInternals = true): Promise<string[]> {
const topic = await this.ipfs.pubsub.ls()

return topic
.filter((value) => {
if (ignoreInternals && value.includes('$')) {
return false
}

return true
})
.map((topic) => topic.replace(`${this.namespace}/`, ''))
}

public async peers(topic: string): Promise<number> {
const peers = await this.ipfs.pubsub.peers(this.getTopic(topic))

return peers.length
}

public async subscribe(topic: string, listener: (msg: Message<T>) => void): Promise<void> {
await this.ipfs.pubsub.subscribe(this.getTopic(topic), (message) => {
listener({
Expand Down
99 changes: 54 additions & 45 deletions packages/lib/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import type { IPFS } from "ipfs-core"
import { Buffer } from "buffer"
import { CID } from "multiformats/cid"
import drain from "it-drain"
import LRUCache, { Options as LRUOptions } from "lru-cache"
import { PubSub } from "./pubsub"

export interface Link {
Expand Down Expand Up @@ -83,77 +81,88 @@ export class Shard<T = any> {
}

export class Store {
private cache: LRUCache<string, CID>
private map: { [key: string]: CID } = {}

constructor(public ipfs: IPFS, public namespace: string, private pubsub: PubSub, cache: LRUOptions<string, CID> = { max: 1000000, maxAge: 1000 }) {
this.cache = new LRUCache(cache)
}
constructor(public ipfs: IPFS, public namespace: string, private pubsub: PubSub) { }

public async start() {
await this.pubsub.subscribe('$store', (msg) => {
const { key, value } = msg.data as { key: string; value: string }
this.cache.set(key, CID.parse(value))
this.map[key] = CID.parse(value)
})

await this.pubsub.handleRequest('$store.get', ([key], reply) => {
const value = this.cache.get(key)
await this.pubsub.subscribe('$store.lookup', async (msg) => {
const { key } = msg.data as { key: string }
const value = this.map[key]

if (value) {
reply(value.toString())
await this.pubsub.publish('$store', { key, value: value.toString() })
await this.pubsub.publish(`$store/${key}`, { key, value: value.toString() })
}
})
}

private getDHTKey(key: string): Buffer {
return Buffer.from(`${this.namespace}/${key}`)
}

public async emitUpdate(key: string, value: CID): Promise<void> {
await Promise.all([
drain(this.ipfs.dht.put(this.getDHTKey(key), value.bytes, { minPeers: 0 } as any)),
this.pubsub.publish('$store', { key, value: value.toString() })
]).catch(console.warn)
private async emitUpdate(key: string, value: CID): Promise<void> {
this.map[key] = value
await this.pubsub.publish('$store', { key, value: value.toString() })
}

/**
* Set data to store
*
* @param key key
* @param data value
* @returns Shard
*/
public async set<T = any>(key: string, data: T): Promise<Shard<T>> {
const shard = await Shard.create(this.ipfs, data, { namespace: key, store: this })

this.cache.set(key, shard.cid)
this.map[key] = shard.cid
await this.emitUpdate(key, shard.cid)

return shard
}

public async getFromDHT<T = any>(key: string, timeout = 1000): Promise<Shard<T>> {
// eslint-disable-next-line no-async-promise-executor
return new Promise<Shard>(async (resolve, reject) => {
const interval = setTimeout(() => reject('timeout'), timeout)
/**
* Get data from Store by key
*
* @param key key
* @param timeout in ms
* @returns Shard
*/
public async get<T = any>(key: string, timeout = 2000): Promise<Shard<T> | void> {
const value = this.map[key]
if (value) return Shard.from(this.ipfs, value, { namespace: key, store: this })

for await (const event of this.ipfs.dht.get(this.getDHTKey(key))) {
if (event.name === "VALUE") {
clearTimeout(interval)
const peers = await this.pubsub.peers('$store.lookup')

const cid = CID.decode(event.value)
this.cache.set(key, cid)
if (!peers) {
console.warn('While trying to get', key, 'no peers was found to ask for a value')
return
}

resolve(Shard.from(this.ipfs, cid, { namespace: key, store: this }))
}
}
})
}
// eslint-disable-next-line no-async-promise-executor
return new Promise<Shard<T> | void>(async (resolve) => {
await this.pubsub.subscribe(`$store/${key}`, (msg) => {
const data = msg.data as { key: string; value: string }

public async getFromPubSub<T = any>(key: string, timeout = 1000): Promise<Shard<T>> {
const value = await this.pubsub.request('$store.get', [key], timeout)
resolve(Shard.from(this.ipfs, CID.parse(data.value), { namespace: key, store: this }))
})

return Shard.from(this.ipfs, CID.parse(value), { namespace: key, store: this })
}
await this.pubsub.subscribe('$store', (msg) => {
const data = msg.data as { key: string; value: string }

if (data.key === key) {
resolve(Shard.from(this.ipfs, CID.parse(data.value), { namespace: key, store: this }))
}
})

public async get<T = any>(key: string, timeout = 1000): Promise<Shard<T> | void> {
const cached = this.cache.get(key)
if (cached) return Shard.from(this.ipfs, cached, { namespace: key, store: this })
await this.pubsub.publish('$store.lookup', { key })

return Promise.race([
this.getFromPubSub(key, timeout).catch(console.warn),
this.getFromDHT(key, timeout).catch(console.warn)
])
setTimeout(() => {
console.warn('While trying to get', key, 'timeout happened')
resolve()
}, timeout)
})
}
}

0 comments on commit e3a56c6

Please sign in to comment.