diff --git a/packages/bottender/src/bot/Bot.ts b/packages/bottender/src/bot/Bot.ts index 655ca6262..dae886e0e 100644 --- a/packages/bottender/src/bot/Bot.ts +++ b/packages/bottender/src/bot/Bot.ts @@ -183,56 +183,62 @@ export default class Bot { const body = camelcaseKeysDeep(inputBody) as B; - const { platform } = this._connector; - const sessionKey = this._connector.getUniqueSessionKey( - body, - requestContext - ); - - // Create or retrieve session if possible - let sessionId: string | undefined; - let session: Session | undefined; - if (sessionKey) { - sessionId = `${platform}:${sessionKey}`; - - session = - (await this._sessions.read(sessionId)) || - (Object.create(null) as Session); - - debugSessionRead(`Read session: ${sessionId}`); - debugSessionRead(JSON.stringify(session, null, 2)); - - Object.defineProperty(session, 'id', { - configurable: false, - enumerable: true, - writable: false, - value: session.id || sessionId, - }); + const events = this._connector.mapRequestToEvents(body); - if (!session.platform) session.platform = platform; + const contexts = await pMap( + events, + async event => { + const { platform } = this._connector; + const sessionKey = this._connector.getUniqueSessionKey( + // TODO: may deprecating passing request body in v2 + events.length === 1 ? body : event, + requestContext + ); + + // Create or retrieve session if possible + let sessionId: string | undefined; + let session: Session | undefined; + if (sessionKey) { + sessionId = `${platform}:${sessionKey}`; + + session = + (await this._sessions.read(sessionId)) || + (Object.create(null) as Session); + + debugSessionRead(`Read session: ${sessionId}`); + debugSessionRead(JSON.stringify(session, null, 2)); + + Object.defineProperty(session, 'id', { + configurable: false, + enumerable: true, + writable: false, + value: session.id || sessionId, + }); - Object.defineProperty(session, 'platform', { - configurable: false, - enumerable: true, - writable: false, - value: session.platform, - }); + if (!session.platform) session.platform = platform; - await this._connector.updateSession(session, body); - } + Object.defineProperty(session, 'platform', { + configurable: false, + enumerable: true, + writable: false, + value: session.platform, + }); - const events = this._connector.mapRequestToEvents(body); + await this._connector.updateSession( + session, + // TODO: may deprecating passing request body in v2 + events.length === 1 ? body : event + ); + } - const contexts = await pMap( - events, - event => - this._connector.createContext({ + return this._connector.createContext({ event, session, initialState: this._initialState, requestContext, emitter: this._emitter, - }), + }); + }, { concurrency: 5, } @@ -252,6 +258,8 @@ export default class Bot { } const handler: Action = this._handler; const errorHandler: Action | null = this._errorHandler; + + // TODO: only run concurrently for different session id const promises = Promise.all( contexts.map(context => Promise.resolve() @@ -277,17 +285,24 @@ export default class Bot { if (this._sync) { try { await promises; - if (sessionId && session) { - session.lastActivity = Date.now(); - contexts.forEach(context => { + + await Promise.all( + contexts.map(async context => { context.isSessionWritten = true; - }); - debugSessionWrite(`Write session: ${sessionId}`); - debugSessionWrite(JSON.stringify(session, null, 2)); + const { session } = context; - await this._sessions.write(sessionId, session); - } + if (session) { + session.lastActivity = Date.now(); + + debugSessionWrite(`Write session: ${session.id}`); + debugSessionWrite(JSON.stringify(session, null, 2)); + + // eslint-disable-next-line no-await-in-loop + await this._sessions.write(session.id, session); + } + }) + ); } catch (err) { console.error(err); } @@ -301,19 +316,27 @@ export default class Bot { return response; } promises - .then((): Promise | void => { - if (sessionId && session) { - session.lastActivity = Date.now(); - contexts.forEach(context => { - context.isSessionWritten = true; - }); + .then( + async (): Promise => { + await Promise.all( + contexts.map(async context => { + context.isSessionWritten = true; - debugSessionWrite(`Write session: ${sessionId}`); - debugSessionWrite(JSON.stringify(session, null, 2)); + const { session } = context; - return this._sessions.write(sessionId, session); + if (session) { + session.lastActivity = Date.now(); + + debugSessionWrite(`Write session: ${session.id}`); + debugSessionWrite(JSON.stringify(session, null, 2)); + + // eslint-disable-next-line no-await-in-loop + await this._sessions.write(session.id, session); + } + }) + ); } - }) + ) .catch(console.error); }; } diff --git a/packages/bottender/src/bot/Connector.ts b/packages/bottender/src/bot/Connector.ts index 82e3561f6..43623410f 100644 --- a/packages/bottender/src/bot/Connector.ts +++ b/packages/bottender/src/bot/Connector.ts @@ -1,16 +1,20 @@ import EventEmitter from 'events'; import Session from '../session/Session'; +import { Event } from '../context/Event'; import { RequestContext } from '../types'; export interface Connector { client: C; platform: string; - getUniqueSessionKey(body: B, requestContext?: RequestContext): string | null; - updateSession(session: Session, body: B): Promise; - mapRequestToEvents(body: B): any[]; + getUniqueSessionKey( + bodyOrEvent: B | Event, + requestContext?: RequestContext + ): string | null; + updateSession(session: Session, bodyOrEvent: B | Event): Promise; + mapRequestToEvents(body: B): Event[]; createContext(params: { - event: any; + event: Event; session?: Session | null; initialState?: Record | null; requestContext?: RequestContext; diff --git a/packages/bottender/src/bot/__tests__/Bot.spec.ts b/packages/bottender/src/bot/__tests__/Bot.spec.ts index 9da1a983e..460d4daaf 100644 --- a/packages/bottender/src/bot/__tests__/Bot.spec.ts +++ b/packages/bottender/src/bot/__tests__/Bot.spec.ts @@ -5,7 +5,6 @@ function setup({ connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [{}]), createContext: jest.fn(() => ({})), @@ -16,7 +15,7 @@ function setup({ read: jest.fn(), write: jest.fn(), }, - sync = false, + sync = true, mapPageToAccessToken, }) { const bot = new Bot({ connector, sessionStore, sync, mapPageToAccessToken }); @@ -85,7 +84,6 @@ describe('#createRequestHandler', () => { const { bot, connector, sessionStore } = setup({}); connector.getUniqueSessionKey.mockReturnValue('__id__'); - connector.shouldSessionUpdate.mockReturnValue(true); sessionStore.read.mockResolvedValue(null); const handler = () => {}; @@ -105,7 +103,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session })), @@ -129,10 +126,12 @@ describe('#createRequestHandler', () => { expect(receivedContext).toEqual({ event: {}, + isSessionWritten: true, session: expect.objectContaining({ id: 'any:__id__', platform: 'any', user: {}, + lastActivity: expect.any(Number), }), }); }); @@ -152,7 +151,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => context), @@ -195,7 +193,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -223,7 +220,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -256,7 +252,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: 123456 })), @@ -290,7 +285,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: 123456 })), @@ -327,7 +321,6 @@ describe('#createRequestHandler', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session })), @@ -353,6 +346,65 @@ describe('#createRequestHandler', () => { }); Date.now = _now; }); + + it('should work with multiple events in one request', async () => { + const event1 = {}; + const event2 = {}; + const connector = { + platform: 'any', + getUniqueSessionKey: jest.fn(), + updateSession: jest.fn(), + mapRequestToEvents: jest.fn(() => [event1, event2]), + createContext: jest.fn(({ event, session }) => ({ event, session })), + }; + const { bot, sessionStore } = setup({ connector }); + + connector.getUniqueSessionKey + .mockReturnValueOnce('1') + .mockReturnValueOnce('2'); + sessionStore.read.mockResolvedValue(null); + + const handler = () => {}; + bot.onEvent(handler); + + const requestHandler = bot.createRequestHandler(); + + const body = {}; + await requestHandler(body); + + expect(sessionStore.read).toBeCalledWith('any:1'); + expect(sessionStore.read).toBeCalledWith('any:2'); + + expect(connector.getUniqueSessionKey).toBeCalledWith(event1, undefined); + expect(connector.getUniqueSessionKey).toBeCalledWith(event2, undefined); + + expect(connector.createContext).toBeCalledWith( + expect.objectContaining({ + event: event1, + session: expect.objectContaining({ id: 'any:1', platform: 'any' }), + }) + ); + expect(connector.createContext).toBeCalledWith( + expect.objectContaining({ + event: event2, + session: expect.objectContaining({ id: 'any:2', platform: 'any' }), + }) + ); + + expect(connector.updateSession).toBeCalledWith(expect.any(Object), body); + expect(connector.updateSession).toBeCalledWith(expect.any(Object), body); + + expect(sessionStore.write).toBeCalledWith('any:1', { + id: 'any:1', + platform: 'any', + lastActivity: expect.any(Number), + }); + expect(sessionStore.write).toBeCalledWith('any:2', { + id: 'any:2', + platform: 'any', + lastActivity: expect.any(Number), + }); + }); }); describe('#use', () => { @@ -361,7 +413,6 @@ describe('#use', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -377,7 +428,6 @@ describe('#use', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -411,7 +461,6 @@ describe('#onEvent', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -429,7 +478,6 @@ describe('#onError', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: ({ emitter }) => @@ -468,7 +516,6 @@ describe('#onError', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -486,7 +533,6 @@ describe('#setInitialState', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -502,7 +548,6 @@ describe('#setInitialState', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -545,7 +590,6 @@ describe('request context', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ event, session: undefined })), @@ -578,7 +622,6 @@ describe('context lifecycle', () => { const connector = { platform: 'any', getUniqueSessionKey: jest.fn(), - shouldSessionUpdate: jest.fn(), updateSession: jest.fn(), mapRequestToEvents: jest.fn(() => [event]), createContext: jest.fn(() => ({ diff --git a/packages/bottender/src/line/LineConnector.ts b/packages/bottender/src/line/LineConnector.ts index 62e5492a5..a52d8a9ed 100644 --- a/packages/bottender/src/line/LineConnector.ts +++ b/packages/bottender/src/line/LineConnector.ts @@ -113,8 +113,14 @@ export default class LineConnector return this._client; } - getUniqueSessionKey(body: LineRequestBody): string { - const { source } = body.events[0]; + getUniqueSessionKey(bodyOrEvent: LineRequestBody | LineEvent): string { + const rawEvent = + bodyOrEvent instanceof LineEvent + ? bodyOrEvent.rawEvent + : bodyOrEvent.events[0]; + + const { source } = rawEvent; + if (source.type === 'user') { return source.userId; } @@ -129,8 +135,16 @@ export default class LineConnector ); } - async updateSession(session: Session, body: LineRequestBody): Promise { - const { source } = body.events[0]; + async updateSession( + session: Session, + bodyOrEvent: LineRequestBody | LineEvent + ): Promise { + const rawEvent = + bodyOrEvent instanceof LineEvent + ? bodyOrEvent.rawEvent + : bodyOrEvent.events[0]; + + const { source } = rawEvent; if (!session.type) { session.type = source.type; diff --git a/packages/bottender/src/line/__tests__/LineConnector.spec.ts b/packages/bottender/src/line/__tests__/LineConnector.spec.ts index 8f64f043c..e3bd0cf42 100644 --- a/packages/bottender/src/line/__tests__/LineConnector.spec.ts +++ b/packages/bottender/src/line/__tests__/LineConnector.spec.ts @@ -131,7 +131,7 @@ describe('#getUniqueSessionKey', () => { expect(senderId).toBe('U206d25c2ea6bd87c17655609a1c37cb8'); }); - it('extract groupId from user source', () => { + it('extract groupId from group source', () => { const { connector } = setup(); const senderId = connector.getUniqueSessionKey({ events: [ @@ -154,7 +154,7 @@ describe('#getUniqueSessionKey', () => { expect(senderId).toBe('U206d25c2ea6bd87c17655609a1c37cb8'); }); - it('extract roomId from user source', () => { + it('extract roomId from room source', () => { const { connector } = setup(); const senderId = connector.getUniqueSessionKey({ events: [ @@ -177,6 +177,27 @@ describe('#getUniqueSessionKey', () => { expect(senderId).toBe('U206d25c2ea6bd87c17655609a1c37cb8'); }); + it('extract from line event', () => { + const { connector } = setup(); + const senderId = connector.getUniqueSessionKey( + new LineEvent({ + replyToken: 'nHuyWiB7yP5Zw52FIkcQobQuGDXCTA', + type: 'message', + timestamp: 1462629479859, + source: { + type: 'user', + userId: 'U206d25c2ea6bd87c17655609a1c37cb8', + }, + message: { + id: '325708', + type: 'text', + text: 'Hello, world', + }, + }) + ); + expect(senderId).toBe('U206d25c2ea6bd87c17655609a1c37cb8'); + }); + it('should throw error if source.type is not user, group or room', () => { const { connector } = setup(); let error; diff --git a/packages/bottender/src/messenger/MessengerConnector.ts b/packages/bottender/src/messenger/MessengerConnector.ts index c2e34e92e..1499d4067 100644 --- a/packages/bottender/src/messenger/MessengerConnector.ts +++ b/packages/bottender/src/messenger/MessengerConnector.ts @@ -231,8 +231,13 @@ export default class MessengerConnector return this._verifyToken; } - getUniqueSessionKey(body: MessengerRequestBody): string | null { - const rawEvent = this._getRawEventsFromRequest(body)[0]; + getUniqueSessionKey( + bodyOrEvent: MessengerRequestBody | MessengerEvent + ): string | null { + const rawEvent = + bodyOrEvent instanceof MessengerEvent + ? bodyOrEvent.rawEvent + : this._getRawEventsFromRequest(bodyOrEvent)[0]; if ( rawEvent && rawEvent.message && @@ -249,12 +254,17 @@ export default class MessengerConnector async updateSession( session: Session, - body: MessengerRequestBody + bodyOrEvent: MessengerRequestBody | MessengerEvent ): Promise { if (!session.user || this._profilePicExpired(session.user)) { - const senderId = this.getUniqueSessionKey(body); + const senderId = this.getUniqueSessionKey(bodyOrEvent); - const rawEvent = this._getRawEventsFromRequest(body)[0]; + const rawEvent = + bodyOrEvent instanceof MessengerEvent + ? bodyOrEvent.rawEvent + : this._getRawEventsFromRequest(bodyOrEvent)[0]; + + // TODO: use this info from event const pageId = this._getPageIdFromRawEvent(rawEvent); let customAccessToken; diff --git a/packages/bottender/src/messenger/__tests__/MessengerConnector.spec.ts b/packages/bottender/src/messenger/__tests__/MessengerConnector.spec.ts index 04c715fcc..e916bad14 100644 --- a/packages/bottender/src/messenger/__tests__/MessengerConnector.spec.ts +++ b/packages/bottender/src/messenger/__tests__/MessengerConnector.spec.ts @@ -260,6 +260,27 @@ describe('#getUniqueSessionKey', () => { const senderId = connector.getUniqueSessionKey(webhookTestRequest.body); expect(senderId).toBe(null); }); + + it('extract from messenger event', () => { + const { connector } = setup(); + const senderId = connector.getUniqueSessionKey( + new MessengerEvent({ + sender: { + id: '1412611362105802', + }, + recipient: { + id: '1895382890692545', + }, + timestamp: 1486464322190, + message: { + mid: 'mid.1486464322190:cb04e5a654', + seq: 339979, + text: 'text', + }, + }) + ); + expect(senderId).toBe('1412611362105802'); + }); }); describe('#updateSession', () => {