Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: add peer tagging #12

Merged
merged 2 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"generate": "protons src/pb/peer.proto",
"generate": "protons src/pb/peer.proto src/pb/tags.proto",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser --cov",
Expand All @@ -142,7 +142,7 @@
"@libp2p/components": "^2.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interface-peer-store": "^1.1.0",
"@libp2p/interface-record": "^1.0.1",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
Expand All @@ -165,8 +165,9 @@
"@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/peer-id-factory": "^1.0.0",
"@libp2p/utils": "^2.0.0",
"aegir": "^37.3.0",
"aegir": "^37.4.0",
"datastore-core": "^7.0.1",
"delay": "^5.0.0",
"p-defer": "^4.0.0",
"p-wait-for": "^4.1.0",
"protons": "^3.0.4",
Expand Down
70 changes: 69 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { PeerStoreKeyBook } from './key-book.js'
import { PeerStoreMetadataBook } from './metadata-book.js'
import { PeerStoreProtoBook } from './proto-book.js'
import { PersistentStore, Store } from './store.js'
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer } from '@libp2p/interface-peer-store'
import type { PeerStore, AddressBook, KeyBook, MetadataBook, ProtoBook, PeerStoreEvents, PeerStoreInit, Peer, TagOptions } from '@libp2p/interface-peer-store'
import type { PeerId } from '@libp2p/interface-peer-id'
import { Components, Initializable } from '@libp2p/components'
import errCode from 'err-code'
import { Tag, Tags } from './pb/tags.js'

const log = logger('libp2p:peer-store')

Expand Down Expand Up @@ -115,4 +117,70 @@ export class PersistentPeerStore extends EventEmitter<PeerStoreEvents> implement
release()
}
}

async tagPeer (peerId: PeerId, tag: string, options: TagOptions = {}) {
const providedValue = options.value ?? 0
const value = Math.round(providedValue)
const ttl = options.ttl ?? undefined

if (value !== providedValue || value < 0 || value > 100) {
throw errCode(new Error('Tag value must be between 0-100'), 'ERR_TAG_VALUE_OUT_OF_BOUNDS')
}

const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

for (const t of tags) {
if (t.name === tag) {
throw errCode(new Error('Peer already tagged'), 'ERR_DUPLICATE_TAG')
}
}

tags.push({
name: tag,
value,
expiry: ttl == null ? undefined : BigInt(Date.now() + ttl)
})

await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }))
}

async unTagPeer (peerId: PeerId, tag: string) {
const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

tags = tags.filter(t => t.name !== tag)

await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags }))
}

async getTags (peerId: PeerId) {
const buf = await this.metadataBook.getValue(peerId, 'tags')
let tags: Tag[] = []

if (buf != null) {
tags = Tags.decode(buf).tags
}

const now = BigInt(Date.now())
const unexpiredTags = tags.filter(tag => tag.expiry == null || tag.expiry > now)

if (unexpiredTags.length !== tags.length) {
// remove any expired tags
await this.metadataBook.setValue(peerId, 'tags', Tags.encode({ tags: unexpiredTags }))
}

return unexpiredTags.map(t => ({
name: t.name,
value: t.value ?? 0
}))
}
}
11 changes: 11 additions & 0 deletions src/pb/tags.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

message Tags {
repeated Tag tags = 1;
}

message Tag {
string name = 1; // e.g. 'priority'
optional uint32 value = 2; // tag value 0-100
optional uint64 expiry = 3; // ms timestamp after which the tag is no longer valid
}
49 changes: 49 additions & 0 deletions src/pb/tags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */

import { encodeMessage, decodeMessage, message, string, uint32, uint64 } from 'protons-runtime'
import type { Codec } from 'protons-runtime'

export interface Tags {
tags: Tag[]
}

export namespace Tags {
export const codec = (): Codec<Tags> => {
return message<Tags>({
1: { name: 'tags', codec: Tag.codec(), repeats: true }
})
}

export const encode = (obj: Tags): Uint8Array => {
return encodeMessage(obj, Tags.codec())
}

export const decode = (buf: Uint8Array): Tags => {
return decodeMessage(buf, Tags.codec())
}
}

export interface Tag {
name: string
value?: number
expiry?: bigint
}

export namespace Tag {
export const codec = (): Codec<Tag> => {
return message<Tag>({
1: { name: 'name', codec: string },
2: { name: 'value', codec: uint32, optional: true },
3: { name: 'expiry', codec: uint64, optional: true }
})
}

export const encode = (obj: Tag): Uint8Array => {
return encodeMessage(obj, Tag.codec())
}

export const decode = (buf: Uint8Array): Tag => {
return decodeMessage(buf, Tag.codec())
}
}
104 changes: 104 additions & 0 deletions test/peer-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { MemoryDatastore } from 'datastore-core/memory'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import { Components } from '@libp2p/components'
import delay from 'delay'

const addr1 = new Multiaddr('/ip4/127.0.0.1/tcp/8000')
const addr2 = new Multiaddr('/ip4/127.0.0.1/tcp/8001')
Expand Down Expand Up @@ -214,4 +215,107 @@ describe('peer-store', () => {
expect(peerData.metadata.get(metadataKey)).to.equalBytes(metadataValue)
})
})

describe('tags', () => {
let peerStore: PersistentPeerStore

beforeEach(() => {
peerStore = new PersistentPeerStore()
peerStore.init(new Components({ peerId: peerIds[4], datastore: new MemoryDatastore() }))
})

it('tags a peer', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag for peer')
.to.eventually.deep.include.members([{
name,
value: 0
}])
})

it('tags a peer with a value', async () => {
const name = 'a-tag'
const value = 50
await peerStore.tagPeer(peerIds[0], name, {
value
})

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag for peer with a value')
.to.eventually.deep.include.members([{
name,
value
}])
})

it('tags a peer with a valid value', async () => {
const name = 'a-tag'

await expect(peerStore.tagPeer(peerIds[0], name, {
value: -1
}), 'PeerStore contain tag for peer where value was too small')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')

await expect(peerStore.tagPeer(peerIds[0], name, {
value: 101
}), 'PeerStore contain tag for peer where value was too large')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')

await expect(peerStore.tagPeer(peerIds[0], name, {
value: 5.5
}), 'PeerStore contain tag for peer where value was not an integer')
.to.eventually.be.rejected().with.property('code', 'ERR_TAG_VALUE_OUT_OF_BOUNDS')
})

it('tags a peer with an expiring value', async () => {
const name = 'a-tag'
const value = 50
await peerStore.tagPeer(peerIds[0], name, {
value,
ttl: 50
})

await expect(peerStore.getTags(peerIds[0]))
.to.eventually.deep.include.members([{
name,
value
}], 'PeerStore did not contain expiring value')

await delay(100)

await expect(peerStore.getTags(peerIds[0]))
.to.eventually.not.deep.include.members([{
name,
value
}], 'PeerStore contained expired value')
})

it('does not tag a peer twice', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.tagPeer(peerIds[0], name), 'PeerStore allowed duplicate tags')
.to.eventually.be.rejected().with.property('code', 'ERR_DUPLICATE_TAG')
})

it('untags a peer', async () => {
const name = 'a-tag'
await peerStore.tagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore did not contain tag')
.to.eventually.deep.include.members([{
name,
value: 0
}])

await peerStore.unTagPeer(peerIds[0], name)

await expect(peerStore.getTags(peerIds[0]), 'PeerStore contained untagged tag')
.to.eventually.not.deep.include.members([{
name,
value: 0
}])
})
})
})