Skip to content

Commit

Permalink
Merge branch 'develop' into logging-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed May 15, 2024
2 parents 84696a7 + 785a8ea commit 97af3c7
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async resume() {
this.logger.info('Resuming!')
await this.openSocket()
this.logger.info('Attempting to redial peers!')
const peersToDial = await this.getPeersOnResume()
this.libp2pService?.resume(peersToDial)
}

public async getPeersOnResume(): Promise<string[]> {
this.logger.info('Getting peers to redial')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
this.logger.info('Dialing peers with info from pause: ', this.peerInfo)
await this.libp2pService?.redialPeers([...this.peerInfo.connected, ...this.peerInfo.dialed])
} else {
this.logger.info('Dialing peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger.warn(`No community launched, can't redial`)
return
}
const sortedPeers = await this.localDbService.getSortedPeers(community.peerList ?? [])
await this.libp2pService?.redialPeers(sortedPeers)
this.logger.info('Found peer info from pause: ', this.peerInfo)
return [...this.peerInfo.connected, ...this.peerInfo.dialed]
}

this.logger.info('Getting peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger.warn(`No community launched, no peers found`)
return []
}
return await this.localDbService.getSortedPeers(community.peerList ?? [])
}

// This method is only used on iOS through rn-bridge for reacting on lifecycle changes
Expand Down
23 changes: 17 additions & 6 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,21 @@ export class Libp2pService extends EventEmitter {
await this.hangUpPeers(peerInfo.dialed)
this.dialedPeers.clear()
this.connectedPeers.clear()
this.processInChunksService.pause()
return peerInfo
}

public resume = async (peersToDial: string[]): Promise<void> => {
this.processInChunksService.resume()
if (peersToDial.length === 0) {
this.logger.warn('No peers to redial!')
return
}

this.logger.info(`Redialing ${peersToDial.length} peers`)
await this.redialPeers(peersToDial)
}

public readonly createLibp2pAddress = (address: string, peerId: string): string => {
return createLibp2pAddress(address, peerId)
}
Expand Down Expand Up @@ -138,8 +150,7 @@ export class Libp2pService extends EventEmitter {
// TODO: Sort peers
await this.hangUpPeers(dialed)

this.processInChunksService.updateData(toDial)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(toDial)
}

public async createInstance(params: Libp2pNodeParams): Promise<Libp2p> {
Expand Down Expand Up @@ -208,13 +219,12 @@ export class Libp2pService extends EventEmitter {
this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => {
const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress))
this.logger.info('Dialing', nonDialedAddresses.length, 'addresses')
this.processInChunksService.updateData(nonDialedAddresses)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(nonDialedAddresses)
})

this.logger.info(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`)
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P)
this.processInChunksService.init(peers, this.dialPeer)
this.processInChunksService.init([], this.dialPeer)

this.libp2pInstance.addEventListener('peer:discovery', peer => {
this.logger.info(`${peerId.toString()} discovered ${peer.detail.id}`)
Expand Down Expand Up @@ -268,14 +278,15 @@ export class Libp2pService extends EventEmitter {
this.emit(Libp2pEvents.PEER_DISCONNECTED, peerStat)
})

await this.processInChunksService.process()
this.processInChunksService.updateQueue(peers)

this.logger.info(`Initialized libp2p for peer ${peerId.toString()}`)
}

public async close(): Promise<void> {
this.logger.info('Closing libp2p service')
await this.libp2pInstance?.stop()
this.processInChunksService.pause()
this.libp2pInstance = null
this.connectedPeers = new Map()
this.dialedPeers = new Set()
Expand Down
124 changes: 86 additions & 38 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { EventEmitter } from 'events'
import fastq from 'fastq'
import type { queue, done } from 'fastq'
import fastq, { queueAsPromised } from 'fastq'

import { createLogger } from '../common/logger'
import { randomUUID } from 'crypto'

const DEFAULT_CHUNK_SIZE = 10
export const DEFAULT_NUM_TRIES = 2

type ProcessTask<T> = {
data: T
tries: number
taskId: string
}

export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: Set<T> = new Set()
private chunkSize: number
private taskQueue: queue<ProcessTask<T>>
private taskQueue: queueAsPromised<ProcessTask<T>>
private deadLetterQueue: ProcessTask<T>[] = []
private processItem: (arg: T) => Promise<any>
private readonly logger = createLogger(ProcessInChunksService.name)
constructor() {
Expand All @@ -27,59 +28,106 @@ export class ProcessInChunksService<T> extends EventEmitter {
this.logger.info(`Initializing process-in-chunks.service with peers`, data)
this.processItem = processItem
this.chunkSize = chunkSize
this.taskQueue = fastq(this, this.processOneItem, this.chunkSize)
this.updateData(data)
this.addToTaskQueue()
this.taskQueue = fastq.promise(this, this.processOneItem, this.chunkSize)
this.isActive = true
this.updateQueue(data)
}

public updateData(items: T[]) {
this.logger.info(`Updating data with ${items.length} items`)
this.taskQueue.pause()
items.forEach(item => this.data.add(item))
this.addToTaskQueue()
public updateQueue(items: T[]) {
this.logger.info(`Adding ${items.length} items to the task queue`)
items.forEach(item => this.addToTaskQueue(item))
}

private addToTaskQueue() {
this.logger.info(`Adding ${this.data.size} items to the task queue`)
for (const item of this.data) {
if (item) {
this.logger.info(`Adding data ${item} to the task queue`)
this.data.delete(item)
try {
this.taskQueue.push({ data: item, tries: 0 } as ProcessTask<T>)
} catch (e) {
this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e)
this.data.add(item)
}
private async addToTaskQueue(task: ProcessTask<T>): Promise<void>
private async addToTaskQueue(item: T): Promise<void>
private async addToTaskQueue(itemOrTask: T | ProcessTask<T>): Promise<void> {
if (!itemOrTask) {
this.logger.error('Item/task is null or undefined, skipping!')
return
}

let task: ProcessTask<T>
if ((itemOrTask as ProcessTask<T>).taskId != null) {
task = itemOrTask as ProcessTask<T>
} else {
this.logger.info(`Creating new task for ${itemOrTask}`)
task = { data: itemOrTask as T, tries: 0, taskId: randomUUID() }
}

if (!this.isActive) {
this.logger.info(
'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!'
)
this.deadLetterQueue.push(task)
this.logger.info(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`)
return
}

this.logger.info(`Adding task ${task.taskId} with data ${task.data} to the task queue`)
try {
const success = await this.pushToQueueAndRun(task)
if (!success) {
this.logger.warn(`Will try to re-attempt task ${task.taskId} with data ${task.data}`)
await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 })
}
} catch (e) {
this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e)
}
}

public async processOneItem(task: ProcessTask<T>) {
public async processOneItem(task: ProcessTask<T>): Promise<boolean> {
let success: boolean = false
try {
this.logger.info(`Processing task with data ${task.data}`)
this.logger.info(`Processing task ${task.taskId} with data ${task.data}`)
await this.processItem(task.data)
success = true
} catch (e) {
this.logger.error(`Processing task with data ${task.data} failed`, e)
if (task.tries + 1 < DEFAULT_NUM_TRIES) {
this.logger.warn(`Will try to re-attempt task with data ${task.data}`)
this.taskQueue.push({ ...task, tries: task.tries + 1 })
}
this.logger.error(`Processing task ${task.taskId} with data ${task.data} failed`, e)
} finally {
this.logger.info(`Done attempting to process task with data ${task.data}`)
}
return success
}

public async process() {
this.logger.info(`Processing ${this.taskQueue.length()} items`)
this.taskQueue.resume()
private async pushToQueueAndRun(task: ProcessTask<T>): Promise<boolean> {
this.logger.info(
`Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue`
)
const success = await this.taskQueue.push(task)
if (success) {
this.logger.info(`Task ${task.taskId} completed successfully`)
} else {
this.logger.warn(`Task ${task.taskId} failed`)
}
return success
}

public stop() {
public resume() {
if (this.isActive) {
this.logger.info('Stopping initial dial')
this.isActive = false
this.taskQueue.pause()
this.logger.warn('ProcessInChunksService is already active')
return
}

this.logger.info('Resuming ProcessInChunksService')
this.isActive = true
this.taskQueue.resume()
if (this.deadLetterQueue) {
this.logger.warn(`Adding ${this.deadLetterQueue.length} tasks from the dead letter queue to the task queue`)
this.deadLetterQueue.forEach(task => this.addToTaskQueue(task))
this.deadLetterQueue = []
}
}

public pause() {
if (!this.isActive) {
this.logger.warn('ProcessInChunksService is already paused')
return
}

this.logger.info('Pausing ProcessInChunksService')
this.isActive = false
this.deadLetterQueue = this.taskQueue.getQueue()
this.taskQueue.kill()
this.taskQueue.pause()
}
}
30 changes: 19 additions & 11 deletions packages/backend/src/nest/libp2p/process-in-chunks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ProcessInChunksService } from './process-in-chunks.service'
import waitForExpect from 'wait-for-expect'
import { TestModule } from '../common/test.module'
import { Test, TestingModule } from '@nestjs/testing'
import { sleep } from '../common/sleep'
import { createLogger } from '../common/logger'

const logger = createLogger('processInChunksService:test')
Expand All @@ -29,7 +30,6 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 2'))
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(6)
})
Expand All @@ -43,9 +43,7 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 1'))
processInChunks.init(['a', 'b'], mockProcessItem)
await processInChunks.process()
processInChunks.updateData(['e', 'f'])
await processInChunks.process()
processInChunks.updateQueue(['e', 'f'])
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(5)
})
Expand All @@ -62,18 +60,28 @@ describe('ProcessInChunks', () => {
.mockRejectedValueOnce(new Error('Rejected 2'))
const chunkSize = 2
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize)
await processInChunks.process()
await sleep(10000)
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(2)
expect(mockProcessItem).toBeCalledTimes(6)
})
})

it.skip('does not process more data if stopped', async () => {
it('does not process more data if stopped', async () => {
const mockProcessItem = jest.fn(async () => {})
const processInChunks = new ProcessInChunksService()
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
processInChunks.stop()
await processInChunks.process()
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
expect(mockProcessItem).not.toBeCalled()
})

it('processes tasks after resuming from pause', async () => {
const mockProcessItem = jest.fn(async () => {})
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
processInChunks.resume()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
})
})
})
1 change: 1 addition & 0 deletions packages/desktop/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Fix issues with recreating general channel when deleted while offline ([#2334](https://github.com/TryQuiet/quiet/issues/2334))
* Fix package.json license inconsistency
* Fixes issue with reconnecting to peers on resume on iOS ([#2424](https://github.com/TryQuiet/quiet/issues/2424))
* Reorder the closing of services, prevent sagas running multiple times and close backend server properly

[2.1.2]

Expand Down
4 changes: 2 additions & 2 deletions packages/desktop/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/desktop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
},
"homepage": "https://github.com/TryQuiet",
"@comment version": "To build new version for specific platform, just replace platform in version tag to one of following linux, mac, windows",
"version": "2.2.0-alpha.4",
"version": "2.2.0-alpha.5",
"description": "Decentralized team chat",
"main": "dist/main/main.js",
"scripts": {
Expand Down
2 changes: 2 additions & 0 deletions packages/mobile/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* Allow JPEG and GIF files as profile photos ([#2332](https://github.com/TryQuiet/quiet/issues/2332))
* Fix issues with recreating general channel when deleted while offline ([#2334](https://github.com/TryQuiet/quiet/issues/2334))
* Fix package.json license inconsistency
* Fixes issue with reconnecting to peers on resume on iOS ([#2424](https://github.com/TryQuiet/quiet/issues/2424))
* Reorder the closing of services, prevent sagas running multiple times and close backend server properly

[2.1.2]

Expand Down
4 changes: 2 additions & 2 deletions packages/mobile/android/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ android {
applicationId "com.quietmobile"
minSdkVersion rootProject.ext.minSdkVersion
targetSdkVersion rootProject.ext.targetSdkVersion
versionCode 416
versionName "2.2.0-alpha.4"
versionCode 417
versionName "2.2.0-alpha.5"
resValue "string", "build_config_package", "com.quietmobile"
testBuildType System.getProperty('testBuildType', 'debug')
testInstrumentationRunner 'androidx.test.runner.AndroidJUnitRunner'
Expand Down
2 changes: 1 addition & 1 deletion packages/mobile/ios/Quiet/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</dict>
</array>
<key>CFBundleVersion</key>
<string>373</string>
<string>374</string>
<key>ITSAppUsesNonExemptEncryption</key>
<false/>
<key>LSRequiresIPhoneOS</key>
Expand Down
Loading

0 comments on commit 97af3c7

Please sign in to comment.