Skip to content

Commit

Permalink
Request resources asynchronously when the socket connection fails (#1482
Browse files Browse the repository at this point in the history
)

* Request resources asynchronously when the socket connection fails

* Rename variable
  • Loading branch information
Andres Martinez Gotor authored Jan 28, 2020
1 parent 601d9d6 commit 0ff31ec
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
1 change: 1 addition & 0 deletions dashboard/src/actions/kube.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ describe("getAndWatchResource", () => {
payload: {
ref,
handler: expect.any(Function),
onError: { onErrorHandler: expect.any(Function), closeTimer: expect.any(Function) },
},
},
];
Expand Down
41 changes: 33 additions & 8 deletions dashboard/src/actions/kube.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export const receiveResourceError = createAction("RECEIVE_RESOURCE_ERROR", resol
// Takes a ResourceRef to open a WebSocket for and a handler to process messages
// from the socket.
export const openWatchResource = createAction("OPEN_WATCH_RESOURCE", resolve => {
return (ref: ResourceRef, handler: (e: MessageEvent) => void) => resolve({ ref, handler });
return (
ref: ResourceRef,
handler: (e: MessageEvent) => void,
onError: { closeTimer: () => void; onErrorHandler: () => void },
) => resolve({ ref, handler, onError });
});

export const closeWatchResource = createAction("CLOSE_WATCH_RESOURCE", resolve => {
Expand All @@ -37,6 +41,7 @@ export type KubeAction = ActionType<typeof allActions[number]>;

export function getResource(
ref: ResourceRef,
polling?: boolean,
): ThunkAction<Promise<void>, IStoreState, null, KubeAction> {
return async (dispatch, getState) => {
const key = ref.getResourceURL();
Expand All @@ -51,7 +56,11 @@ export function getResource(
return;
}

dispatch(requestResource(key));
// If it's not the first request, we can skip the request REDUX event
// to avoid the loading animation
if (!polling) {
dispatch(requestResource(key));
}
try {
const r = await ref.getResource();
dispatch(receiveResource({ key, resource: r }));
Expand All @@ -66,13 +75,29 @@ export function getAndWatchResource(
): ThunkAction<void, IStoreState, null, KubeAction> {
return dispatch => {
dispatch(getResource(ref));
let timer: NodeJS.Timeout;
dispatch(
openWatchResource(ref, (e: MessageEvent) => {
const msg = JSON.parse(e.data);
const resource: IResource = msg.object;
const key = ref.getResourceURL();
dispatch(receiveResource({ key, resource }));
}),
openWatchResource(
ref,
(e: MessageEvent) => {
const msg = JSON.parse(e.data);
const resource: IResource = msg.object;
const key = ref.getResourceURL();
dispatch(receiveResource({ key, resource }));
},
{
onErrorHandler: () => {
// If the Socket fails, create an interval to re-request the resource
// every 5 seconds. This interval needs to be closed calling closeTimer
timer = setInterval(async () => {
dispatch(getResource(ref, true));
}, 5000);
},
closeTimer: () => {
clearInterval(timer);
},
},
),
);
};
}
40 changes: 34 additions & 6 deletions dashboard/src/reducers/kube.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe("authReducer", () => {
payload: {
ref,
handler: jest.fn(),
onError: { onErrorHandler: jest.fn(), closeTimer: jest.fn() },
},
});
const socket = newState.sockets[ref.watchResourceURL()];
Expand All @@ -73,21 +74,23 @@ describe("authReducer", () => {

it("does not open a new socket if one exists in the state", () => {
const existingSocket = ref.watchResource();
const socket = { socket: existingSocket, closeTimer: jest.fn() };
const state = {
...initialState,
sockets: {
[ref.watchResourceURL()]: existingSocket,
[ref.watchResourceURL()]: socket,
},
};
const newState = kubeReducer(state, {
type: actionTypes.openWatchResource,
payload: {
ref,
handler: jest.fn(),
onError: { onErrorHandler: jest.fn(), closeTimer: jest.fn() },
},
});
expect(newState).toBe(state);
expect(newState.sockets[ref.watchResourceURL()]).toBe(existingSocket);
expect(newState.sockets[ref.watchResourceURL()]).toBe(socket);
});

it("adds the requested handler on the created socket", () => {
Expand All @@ -97,26 +100,47 @@ describe("authReducer", () => {
payload: {
ref,
handler: mock,
onError: { onErrorHandler: jest.fn(), closeTimer: jest.fn() },
},
});
const socket = newState.sockets[ref.watchResourceURL()];
const socket = newState.sockets[ref.watchResourceURL()].socket;
// listeners is a defined property on the mock-socket:
// https://github.com/thoov/mock-socket/blob/bed8c9237fa4b9c348a4cf5a22b59569c4cd10f2/index.d.ts#L7
const listener = (socket as any).listeners.message[0];
expect(listener).toBeDefined();
listener();
expect(mock).toHaveBeenCalled();
});

it("triggers the onError function if the socket emits an error", () => {
const mock = jest.fn();
const newState = kubeReducer(undefined, {
type: actionTypes.openWatchResource,
payload: {
ref,
handler: jest.fn(),
onError: { onErrorHandler: mock, closeTimer: jest.fn() },
},
});
const socket = newState.sockets[ref.watchResourceURL()].socket;
// listeners is a defined property on the mock-socket:
// https://github.com/thoov/mock-socket/blob/bed8c9237fa4b9c348a4cf5a22b59569c4cd10f2/index.d.ts#L7
const listener = (socket as any).listeners.error[0];
expect(listener).toBeDefined();
listener();
expect(mock).toHaveBeenCalled();
});
});

describe("closeWatchResource", () => {
it("closes the WebSocket for the requested resource and removes it from the state", () => {
it("closes the WebSocket and the timer for the requested resource and removes it from the state", () => {
const socket = ref.watchResource();
const timerMock = jest.fn();
const spy = jest.spyOn(socket, "close");
const state = {
...initialState,
sockets: {
[ref.watchResourceURL()]: socket,
[ref.watchResourceURL()]: { socket, closeTimer: timerMock },
},
};
const newState = kubeReducer(state, {
Expand All @@ -125,11 +149,15 @@ describe("authReducer", () => {
});
expect(spy).toHaveBeenCalled();
expect(newState.sockets).toEqual({});
expect(timerMock).toHaveBeenCalled();
});
});

it("does nothing if the socket doesn't exist", () => {
const state = { ...initialState, sockets: { dontdeleteme: {} as WebSocket } };
const state = {
...initialState,
sockets: { dontdeleteme: { socket: {} as WebSocket, closeTimer: jest.fn() } },
};
const newState = kubeReducer(state, {
type: actionTypes.closeWatchResource,
payload: ref,
Expand Down
9 changes: 6 additions & 3 deletions dashboard/src/reducers/kube.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ const kubeReducer = (
};
return { ...state, items: { ...state.items, ...erroredItem } };
case getType(actions.kube.openWatchResource):
const { ref, handler } = action.payload;
const { ref, handler, onError } = action.payload;
key = ref.watchResourceURL();
if (state.sockets[key]) {
// Socket for this resource already open, do nothing
return state;
}
const socket = ref.watchResource();
socket.addEventListener("message", handler);
const { onErrorHandler, closeTimer } = onError;
socket.addEventListener("error", onErrorHandler);
return {
...state,
sockets: {
...state.sockets,
[key]: socket,
[key]: { socket, closeTimer },
},
};
// TODO(adnan): this won't handle cases where one component closes a socket
Expand All @@ -54,7 +56,8 @@ const kubeReducer = (
const { [key]: foundSocket, ...otherSockets } = sockets;
// close the socket if it exists
if (foundSocket !== undefined) {
foundSocket.close();
foundSocket.socket.close();
foundSocket.closeTimer();
}
return {
...state,
Expand Down
2 changes: 1 addition & 1 deletion dashboard/src/shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ export interface IKubeItem<T> {

export interface IKubeState {
items: { [s: string]: IKubeItem<IResource> };
sockets: { [s: string]: WebSocket };
sockets: { [s: string]: { socket: WebSocket; closeTimer: () => void } };
}

export interface IBasicFormParam {
Expand Down

0 comments on commit 0ff31ec

Please sign in to comment.