Skip to content

Commit

Permalink
fix(Bot, LineConnector, MessengerConnector): when receiving multiple …
Browse files Browse the repository at this point in the history
…events, construct session with event instead of request (#621)
  • Loading branch information
chentsulin committed Jan 8, 2020
1 parent 0494661 commit d47628a
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 92 deletions.
139 changes: 81 additions & 58 deletions packages/bottender/src/bot/Bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,56 +183,62 @@ export default class Bot<B extends Body, C extends Client, E extends Event> {

const body = camelcaseKeysDeep(inputBody) as B;

const { platform } = this._connector;
const sessionKey = this._connector.getUniqueSessionKey(
body,
requestContext
);

// Create or retrieve session if possible
let sessionId: string | undefined;
let session: Session | undefined;
if (sessionKey) {
sessionId = `${platform}:${sessionKey}`;

session =
(await this._sessions.read(sessionId)) ||
(Object.create(null) as Session);

debugSessionRead(`Read session: ${sessionId}`);
debugSessionRead(JSON.stringify(session, null, 2));

Object.defineProperty(session, 'id', {
configurable: false,
enumerable: true,
writable: false,
value: session.id || sessionId,
});
const events = this._connector.mapRequestToEvents(body);

if (!session.platform) session.platform = platform;
const contexts = await pMap(
events,
async event => {
const { platform } = this._connector;
const sessionKey = this._connector.getUniqueSessionKey(
// TODO: may deprecating passing request body in v2
events.length === 1 ? body : event,
requestContext
);

// Create or retrieve session if possible
let sessionId: string | undefined;
let session: Session | undefined;
if (sessionKey) {
sessionId = `${platform}:${sessionKey}`;

session =
(await this._sessions.read(sessionId)) ||
(Object.create(null) as Session);

debugSessionRead(`Read session: ${sessionId}`);
debugSessionRead(JSON.stringify(session, null, 2));

Object.defineProperty(session, 'id', {
configurable: false,
enumerable: true,
writable: false,
value: session.id || sessionId,
});

Object.defineProperty(session, 'platform', {
configurable: false,
enumerable: true,
writable: false,
value: session.platform,
});
if (!session.platform) session.platform = platform;

await this._connector.updateSession(session, body);
}
Object.defineProperty(session, 'platform', {
configurable: false,
enumerable: true,
writable: false,
value: session.platform,
});

const events = this._connector.mapRequestToEvents(body);
await this._connector.updateSession(
session,
// TODO: may deprecating passing request body in v2
events.length === 1 ? body : event
);
}

const contexts = await pMap(
events,
event =>
this._connector.createContext({
return this._connector.createContext({
event,
session,
initialState: this._initialState,
requestContext,
emitter: this._emitter,
}),
});
},
{
concurrency: 5,
}
Expand All @@ -252,6 +258,8 @@ export default class Bot<B extends Body, C extends Client, E extends Event> {
}
const handler: Action<C, E> = this._handler;
const errorHandler: Action<C, E> | null = this._errorHandler;

// TODO: only run concurrently for different session id
const promises = Promise.all(
contexts.map(context =>
Promise.resolve()
Expand All @@ -277,17 +285,24 @@ export default class Bot<B extends Body, C extends Client, E extends Event> {
if (this._sync) {
try {
await promises;
if (sessionId && session) {
session.lastActivity = Date.now();
contexts.forEach(context => {

await Promise.all(
contexts.map(async context => {
context.isSessionWritten = true;
});

debugSessionWrite(`Write session: ${sessionId}`);
debugSessionWrite(JSON.stringify(session, null, 2));
const { session } = context;

await this._sessions.write(sessionId, session);
}
if (session) {
session.lastActivity = Date.now();

debugSessionWrite(`Write session: ${session.id}`);
debugSessionWrite(JSON.stringify(session, null, 2));

// eslint-disable-next-line no-await-in-loop
await this._sessions.write(session.id, session);
}
})
);
} catch (err) {
console.error(err);
}
Expand All @@ -301,19 +316,27 @@ export default class Bot<B extends Body, C extends Client, E extends Event> {
return response;
}
promises
.then((): Promise<any> | void => {
if (sessionId && session) {
session.lastActivity = Date.now();
contexts.forEach(context => {
context.isSessionWritten = true;
});
.then(
async (): Promise<any> => {
await Promise.all(
contexts.map(async context => {
context.isSessionWritten = true;

debugSessionWrite(`Write session: ${sessionId}`);
debugSessionWrite(JSON.stringify(session, null, 2));
const { session } = context;

return this._sessions.write(sessionId, session);
if (session) {
session.lastActivity = Date.now();

debugSessionWrite(`Write session: ${session.id}`);
debugSessionWrite(JSON.stringify(session, null, 2));

// eslint-disable-next-line no-await-in-loop
await this._sessions.write(session.id, session);
}
})
);
}
})
)
.catch(console.error);
};
}
Expand Down
12 changes: 8 additions & 4 deletions packages/bottender/src/bot/Connector.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import EventEmitter from 'events';

import Session from '../session/Session';
import { Event } from '../context/Event';
import { RequestContext } from '../types';

export interface Connector<B, C> {
client: C;
platform: string;
getUniqueSessionKey(body: B, requestContext?: RequestContext): string | null;
updateSession(session: Session, body: B): Promise<void>;
mapRequestToEvents(body: B): any[];
getUniqueSessionKey(
bodyOrEvent: B | Event<any>,
requestContext?: RequestContext
): string | null;
updateSession(session: Session, bodyOrEvent: B | Event<any>): Promise<void>;
mapRequestToEvents(body: B): Event<any>[];
createContext(params: {
event: any;
event: Event<any>;
session?: Session | null;
initialState?: Record<string, any> | null;
requestContext?: RequestContext;
Expand Down
Loading

0 comments on commit d47628a

Please sign in to comment.