Skip to content

refactor: Made PendingEventsStore interface asynchronous #513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 72 additions & 52 deletions packages/event-processor/__tests__/pendingEventsDispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('LocalStoragePendingEventsDispatcher', () => {
localStorage.clear()
})

it('should properly send the events to the passed in eventDispatcher, when callback statusCode=200', () => {
it('should properly send the events to the passed in eventDispatcher, when callback statusCode=200', (done) => {
const callback = jest.fn()
const eventV1Request: EventV1Request = {
url: 'http://cdn.com',
Expand All @@ -61,22 +61,27 @@ describe('LocalStoragePendingEventsDispatcher', () => {

pendingEventsDispatcher.dispatchEvent(eventV1Request, callback)

expect(callback).not.toHaveBeenCalled()
// manually invoke original eventDispatcher callback
const internalDispatchCall = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls[0]
internalDispatchCall[1]({ statusCode: 200 })
setTimeout(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add this setTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because i want to detect call dispatchEvent on the original event dispatcher which was called synchronously earlier but now its being called asynchronously after the set promise resolves.

expect(callback).not.toHaveBeenCalled()
// manually invoke original eventDispatcher callback
const internalDispatchCall = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls[0]
internalDispatchCall[1]({ statusCode: 200 })

// assert that the original dispatch function was called with the request
expect((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)
// assert that the original dispatch function was called with the request
expect((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)

// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 200 })
setTimeout(() => {
// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 200 })
done()
})
})
})

it('should properly send the events to the passed in eventDispatcher, when callback statusCode=400', () => {
it('should properly send the events to the passed in eventDispatcher, when callback statusCode=400', (done) => {
const callback = jest.fn()
const eventV1Request: EventV1Request = {
url: 'http://cdn.com',
Expand All @@ -86,19 +91,24 @@ describe('LocalStoragePendingEventsDispatcher', () => {

pendingEventsDispatcher.dispatchEvent(eventV1Request, callback)

expect(callback).not.toHaveBeenCalled()
// manually invoke original eventDispatcher callback
const internalDispatchCall = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls[0]
internalDispatchCall[1]({ statusCode: 400 })
setTimeout(() => {
expect(callback).not.toHaveBeenCalled()
// manually invoke original eventDispatcher callback
const internalDispatchCall = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls[0]
internalDispatchCall[1]({ statusCode: 400 })

// assert that the original dispatch function was called with the request
expect((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)
// assert that the original dispatch function was called with the request
expect((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)

// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 400})
setTimeout(() => {
// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 400})
done()
})
})
})
})

Expand Down Expand Up @@ -129,7 +139,7 @@ describe('PendingEventsDispatcher', () => {

describe('dispatch', () => {
describe('when the dispatch is successful', () => {
it('should save the pendingEvent to the store and remove it once dispatch is completed', () => {
it('should save the pendingEvent to the store and remove it once dispatch is completed', async (done) => {
const callback = jest.fn()
const eventV1Request: EventV1Request = {
url: 'http://cdn.com',
Expand All @@ -139,8 +149,8 @@ describe('PendingEventsDispatcher', () => {

pendingEventsDispatcher.dispatchEvent(eventV1Request, callback)

expect(store.values()).toHaveLength(1)
expect(store.get('uuid')).toEqual({
expect(await store.values()).toHaveLength(1)
expect(await store.get('uuid')).toEqual({
uuid: 'uuid',
timestamp: 1,
request: eventV1Request,
Expand All @@ -158,16 +168,19 @@ describe('PendingEventsDispatcher', () => {
).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)

// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 200 })
setTimeout(async () => {
// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 200 })

expect(store.values()).toHaveLength(0)
expect(await store.values()).toHaveLength(0)
done()
})
})
})

describe('when the dispatch is unsuccessful', () => {
it('should save the pendingEvent to the store and remove it once dispatch is completed', () => {
it('should save the pendingEvent to the store and remove it once dispatch is completed', async (done) => {
const callback = jest.fn()
const eventV1Request: EventV1Request = {
url: 'http://cdn.com',
Expand All @@ -177,8 +190,8 @@ describe('PendingEventsDispatcher', () => {

pendingEventsDispatcher.dispatchEvent(eventV1Request, callback)

expect(store.values()).toHaveLength(1)
expect(store.get('uuid')).toEqual({
expect(await store.values()).toHaveLength(1)
expect(await store.get('uuid')).toEqual({
uuid: 'uuid',
timestamp: 1,
request: eventV1Request,
Expand All @@ -196,28 +209,31 @@ describe('PendingEventsDispatcher', () => {
).toBeCalledTimes(1)
expect(internalDispatchCall[0]).toEqual(eventV1Request)

// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 400 })
setTimeout(async () => {
// assert that the passed in callback to pendingEventsDispatcher was called
expect(callback).toHaveBeenCalledTimes(1)
expect(callback).toHaveBeenCalledWith({ statusCode: 400 })

expect(store.values()).toHaveLength(0)
expect(await store.values()).toHaveLength(0)
done()
})
})
})
})

describe('sendPendingEvents', () => {
describe('when no pending events are in the store', () => {
it('should not invoked dispatch', () => {
expect(store.values()).toHaveLength(0)
it('should not invoked dispatch', async () => {
expect(await store.values()).toHaveLength(0)

pendingEventsDispatcher.sendPendingEvents()
expect(originalEventDispatcher.dispatchEvent).not.toHaveBeenCalled()
})
})

describe('when there are multiple pending events in the store', () => {
it('should dispatch all of the pending events, and remove them from store', () => {
expect(store.values()).toHaveLength(0)
it('should dispatch all of the pending events, and remove them from store', async (done) => {
expect(await store.values()).toHaveLength(0)

const callback = jest.fn()
const eventV1Request1: EventV1Request = {
Expand All @@ -232,29 +248,33 @@ describe('PendingEventsDispatcher', () => {
params: ({ id: 'event2' } as unknown) as EventV1,
}

store.set('uuid1', {
await store.set('uuid1', {
uuid: 'uuid1',
timestamp: 1,
request: eventV1Request1,
})
store.set('uuid2', {
await store.set('uuid2', {
uuid: 'uuid2',
timestamp: 2,
request: eventV1Request2,
})

expect(store.values()).toHaveLength(2)
expect(await store.values()).toHaveLength(2)

pendingEventsDispatcher.sendPendingEvents()
expect(originalEventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2)

// manually invoke original eventDispatcher callback
const internalDispatchCalls = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls
internalDispatchCalls[0][1]({ statusCode: 200 })
internalDispatchCalls[1][1]({ statusCode: 200 })
setTimeout(async () => {
expect(originalEventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2)

expect(store.values()).toHaveLength(0)
// manually invoke original eventDispatcher callback
const internalDispatchCalls = ((originalEventDispatcher.dispatchEvent as unknown) as jest.Mock)
.mock.calls
internalDispatchCalls[0][1]({ statusCode: 200 })
internalDispatchCalls[1][1]({ statusCode: 200 })

expect(await store.values()).toHaveLength(0)
done()
})
})
})
})
Expand Down
45 changes: 23 additions & 22 deletions packages/event-processor/__tests__/pendingEventsStore.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,105 +36,106 @@ describe('LocalStorageStore', () => {
localStorage.clear()
})

it('should get, set and remove items', () => {
store.set('1', {
it('should get, set and remove items', async () => {
await store.set('1', {
uuid: '1',
timestamp: 1,
value: 'first',
})

expect(store.get('1')).toEqual({
expect(await store.get('1')).toEqual({
uuid: '1',
timestamp: 1,
value: 'first',
})

store.set('1', {
await store.set('1', {
uuid: '1',
timestamp: 2,
value: 'second',
})

expect(store.get('1')).toEqual({
expect(await store.get('1')).toEqual({
uuid: '1',
timestamp: 2,
value: 'second',
})

expect(store.values()).toHaveLength(1)
expect(await store.values()).toHaveLength(1)

store.remove('1')
await store.remove('1')

expect(store.values()).toHaveLength(0)
expect(await store.values()).toHaveLength(0)
})

it('should allow replacement of the entire map', () => {
store.set('1', {
it('should allow replacement of the entire map', async () => {
await store.set('1', {
uuid: '1',
timestamp: 1,
value: 'first',
})

store.set('2', {
await store.set('2', {
uuid: '2',
timestamp: 2,
value: 'second',
})

store.set('3', {
await store.set('3', {
uuid: '3',
timestamp: 3,
value: 'third',
})

expect(store.values()).toEqual([
expect(await store.values()).toEqual([
{ uuid: '1', timestamp: 1, value: 'first' },
{ uuid: '2', timestamp: 2, value: 'second' },
{ uuid: '3', timestamp: 3, value: 'third' },
])

const newMap = {}
store.values().forEach(item => {
const theItems = await store.values()
theItems.forEach(item => {
newMap[item.uuid] = {
...item,
value: 'new',
}
})
store.replace(newMap)
await store.replace(newMap)

expect(store.values()).toEqual([
expect(await store.values()).toEqual([
{ uuid: '1', timestamp: 1, value: 'new' },
{ uuid: '2', timestamp: 2, value: 'new' },
{ uuid: '3', timestamp: 3, value: 'new' },
])
})

it(`shouldn't allow more than the configured maxValues, using timestamp to remove the oldest entries`, () => {
store.set('2', {
it(`shouldn't allow more than the configured maxValues, using timestamp to remove the oldest entries`, async () => {
await store.set('2', {
uuid: '2',
timestamp: 2,
value: 'second',
})

store.set('3', {
await store.set('3', {
uuid: '3',
timestamp: 3,
value: 'third',
})

store.set('1', {
await store.set('1', {
uuid: '1',
timestamp: 1,
value: 'first',
})

store.set('4', {
await store.set('4', {
uuid: '4',
timestamp: 4,
value: 'fourth',
})

expect(store.values()).toEqual([
expect(await store.values()).toEqual([
{ uuid: '2', timestamp: 2, value: 'second' },
{ uuid: '3', timestamp: 3, value: 'third' },
{ uuid: '4', timestamp: 4, value: 'fourth' },
Expand Down
24 changes: 11 additions & 13 deletions packages/event-processor/src/pendingEventsDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,21 @@ export class PendingEventsDispatcher implements EventDispatcher {
}

sendPendingEvents(): void {
const pendingEvents = this.store.values()

logger.debug('Sending %s pending events from previous page', pendingEvents.length)

pendingEvents.forEach(item => {
try {
this.send(item, () => {})
} catch (e) {}
this.store.values().then((pendingEvents) => {
Copy link
Contributor

@mjc1283 mjc1283 Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Did you test this in the browser to see how its behavior compares to the old version? I'm curious if the additional async step would affect how much of this is allowed by the browser to execute during a page close.
  • If close is called while values() is in progress, I think we need to stop and not send the events. To the extent possible, we should stop all additional work after close is called.

logger.debug('Sending %s pending events from previous page', pendingEvents.length)
pendingEvents.forEach(item => {
try {
this.send(item, () => {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does MAP guarantee the fifo order in JS? Otherwise, we lose the order of events dispatched.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, orders are concerned in other cases as well:

  • if we have any queued events, all the following events dispatched should be queued as well instead of sending them first. We may have queued events from the previous session or new events queued from the last failed dispatch.

} catch (e) {}
})
})
}

protected send(entry: DispatcherEntry, callback: EventDispatcherCallback): void {
this.store.set(entry.uuid, entry)

this.dispatcher.dispatchEvent(entry.request, response => {
this.store.remove(entry.uuid)
callback(response)
this.store.set(entry.uuid, entry).then(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above RE: doing things after close: If close is called while set(...) is in progress, we need to stop and not send the events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline w/Zeeshan - this part is OK as-is, disregard.

this.dispatcher.dispatchEvent(entry.request, response => {
this.store.remove(entry.uuid).then(() => callback(response))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need a protection from one packet stuck in the queue forever (cannot send to the server successfully for JSON format error etc). This can be an issue when we change them in FIFO order.

})
})
}
}
Expand Down
Loading