Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
kellyiscute committed Jan 13, 2022
2 parents 0a1a6d0 + 71821c7 commit cd732a3
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "wechaty-puppet-whatsapp",
"version": "1.11.26",
"version": "1.11.27",
"description": "Wechaty Puppet for WhatsApp",
"type": "module",
"exports": {
Expand Down
3 changes: 1 addition & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/// <reference path="./typings.d.ts" />
import { FileBox } from 'file-box'
import { log } from 'wechaty-puppet'
import { log, FileBox } from 'wechaty-puppet'
import { packageJson } from './package-json.js'

const VERSION = packageJson.version || '0.0.0'
Expand Down
29 changes: 20 additions & 9 deletions src/puppet-whatsapp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
} from './whatsapp.js'
import WAWebJS, { ClientOptions, GroupChat, MessageContent, MessageMedia, MessageTypes } from 'whatsapp-web.js'
import { parseVcard } from './pure-function-helpers/vcard-parser.js'
import { Manager } from './work/manager.js'
// @ts-ignore
// import { MessageTypes } from 'whatsapp-web.js'
// import { Attachment } from './mock/user/types'
Expand All @@ -53,6 +54,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {
private roomStore: { [id: string]: WhatsappContact }
private roomInvitationStore: { [id: string]: Partial<WAWebJS.InviteV4Data>}
private whatsapp: undefined | WhatsApp
private manager: undefined | Manager

constructor (
override options: PuppetWhatsAppOptions = {},
Expand All @@ -64,15 +66,19 @@ class PuppetWhatsapp extends PUPPET.Puppet {
this.contactStore = {}
this.roomStore = {}
this.roomInvitationStore = {}

}

override async start (): Promise<void> {
log.verbose('PuppetWhatsApp', 'onStart()')

if (this.state.on()) {
await this.state.ready('on')
return
}
const session = await this.memory.get(MEMORY_SLOT)
const whatsapp = await getWhatsApp(this.options['puppeteerOptions'] as ClientOptions, session)
this.whatsapp = whatsapp

this.manager = new Manager(whatsapp)
this.state.on('pending')
this.initWhatsAppEvents(whatsapp)

/**
Expand All @@ -93,10 +99,11 @@ class PuppetWhatsapp extends PUPPET.Puppet {
/**
* Huan(202102): Wait for Puppeteer to be inited before resolve start() for robust state management
*/
const { state } = this
const future = new Promise<void>(resolve => {
function check () {
if (whatsapp.pupBrowser) {
resolve()
resolve(state.on(true))
} else {
// process.stdout.write('.')
setTimeout(check, 100)
Expand All @@ -106,23 +113,27 @@ class PuppetWhatsapp extends PUPPET.Puppet {
check()
})

await Promise.race([
return Promise.race([
future,
this.state.ready('off'),
])
}

override async stop (): Promise<void> {
log.verbose('PuppetWhatsApp', 'onStop()')

if (this.state.off()) {
await this.state.ready('off')
return
}
if (!this.whatsapp) {
log.error('PuppetWhatsApp', 'stop() this.whatsapp is undefined!')
return
}

this.state.off('pending')
const whatsapp = this.whatsapp
this.whatsapp = undefined
await whatsapp.destroy()
this.state.off(true)
}

private initWhatsAppEvents (
Expand All @@ -146,7 +157,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {
whatsapp.on('ready', () => {
(async () => {
// this.id = whatsapp.info.wid.user
// this.state.active(true)
// await this.state.on(true)
const contacts: WhatsappContact[] = await whatsapp.getContacts()
const nonBroadcast = contacts.filter(c => c.id.server !== 'broadcast')
for (const contact of nonBroadcast) {
Expand Down Expand Up @@ -292,7 +303,7 @@ class PuppetWhatsapp extends PUPPET.Puppet {

override async contactSelfName (name: string): Promise<void> {
log.verbose('PuppetWhatsApp', 'contactSelfName(%s)', name)
await this.whatsapp!.setDisplayName(name)
await this.manager!.setNickname(name)
}

override async contactSelfSignature (signature: string): Promise<void> {
Expand Down
20 changes: 20 additions & 0 deletions src/work/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { RequestManager } from './request/requestManager.js'
import type { Client as WhatsApp } from 'whatsapp-web.js'

export class Manager {

whatsapp: WhatsApp
requestManager: RequestManager

constructor (whatsapp: WhatsApp) {

this.whatsapp = whatsapp
void this.whatsapp.initialize()
this.requestManager = new RequestManager(this.whatsapp)
}

setNickname (nickname: string) {
return this.requestManager.setNickname(nickname)
}

}
116 changes: 116 additions & 0 deletions src/work/request/rateManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { EventEmitter } from 'events'
import { log } from '../../config.js'
import { sleep } from '../utils.js'

interface FunctionObj {
func: () => any,
resolve: (data: any) => void,
reject: (e: any) => void,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}

export interface RateOptions {
queueId?: string,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}

type RateManagerEvents = 'error'

const MAX_QUEUE_SIZE = 5000

export class RateManager extends EventEmitter {

private counter = 0

public override emit(event: 'error', error: string): boolean
public override emit(event: never, ...args: never[]): never
public override emit (event: RateManagerEvents, ...args: any[]): boolean {
return super.emit(event, ...args)
}

public override on(event: 'error', listener: (error: string) => void): this
public override on(event: never, listener: never): never
public override on (event: RateManagerEvents, listener: (...args: any[]) => void): this {
super.on(event, listener)
return this
}

private functionQueueMap: { [id: string]: FunctionObj[] } = {}
private runningMap: { [id: string]: boolean } = {}

public getQueueLength (queueId: string) {
if (!this.functionQueueMap[queueId]) {
return 0
}
return this.functionQueueMap[queueId]!.length
}

public async exec<T> (func: () => T, options: RateOptions = {}) {
const queueId = options.queueId || 'default'
const { delayAfter, delayBefore, uniqueKey } = options

if (!this.functionQueueMap[queueId]) {
this.functionQueueMap[queueId] = []
}

if (this.functionQueueMap[queueId]!.length > MAX_QUEUE_SIZE) {
if (this.counter % MAX_QUEUE_SIZE === 0) {
log.error(`EXCEED_QUEUE_SIZE: Max queue size for id: ${queueId} reached: ${this.functionQueueMap[queueId]!.length} > ${MAX_QUEUE_SIZE}(max queue size). Drop these tasks.`)
this.counter = 0
}
this.counter++
}

return new Promise<T>((resolve, reject) => {
this.functionQueueMap[queueId]!.push({ delayAfter, delayBefore, func, reject, resolve, uniqueKey })
if (!this.runningMap[queueId]) {
this.runningMap[queueId] = true
void this.execNext(queueId)
}
})
}

private async execNext (queueId: string) {
const queue = this.functionQueueMap[queueId]
if (!queue) {
return
}

const funcObj = queue.shift()
if (!funcObj) {
throw new Error(`can not get funcObj from queue with id: ${queueId}.`)
}
const { delayAfter, delayBefore, func, resolve, reject, uniqueKey } = funcObj
await sleep(delayBefore)
try {
const result = await func()
resolve(result)
/**
* If uniqueKey is given, will resolve functions with same key in the queue
*/
if (uniqueKey) {
const sameFuncIndexes = queue.map((f, index) => ({ func: f, index }))
.filter(o => o.func.uniqueKey === uniqueKey)
.map(o => o.index)
.sort((a, b) => b - a)
for (const index of sameFuncIndexes) {
const [sameFunc] = queue.splice(index, 1)
sameFunc!.resolve(result)
}
}
} catch (e) {
reject(e)
}
await sleep(delayAfter)
if (queue.length > 0) {
await this.execNext(queueId)
} else {
delete this.runningMap[queueId]
}
}

}
Loading

0 comments on commit cd732a3

Please sign in to comment.