Skip to content

Commit

Permalink
fix(*): better timeout handling (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrappachc authored May 26, 2022
1 parent cd4238c commit f8c8590
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 44 deletions.
12 changes: 6 additions & 6 deletions src/commands/create-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
race,
take,
throwError,
timer,
timeout,
} from 'rxjs';

export const createChannel = async (
Expand All @@ -30,6 +30,11 @@ export const createChannel = async (
),
take(1),
map(channelState => channelState.channelId as number),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('createChannel')),
}),
),
socket.packet.pipe(
filterPacket(PermissionDenied),
Expand All @@ -38,11 +43,6 @@ export const createChannel = async (
throwError(() => new PermissionDeniedError(permissionDenied)),
),
),
timer(CommandTimeout).pipe(
concatMap(() =>
throwError(() => new CommandTimedOutError('createChannel')),
),
),
),
);

Expand Down
28 changes: 9 additions & 19 deletions src/commands/fetch-channel-permissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,22 @@ import { CommandTimedOutError } from '@/errors';
import { MumbleSocket } from '@/mumble-socket';
import { filterPacket } from '@/rxjs-operators/filter-packet';
import { PermissionQuery } from '@tf2pickup-org/mumble-protocol';
import {
concatMap,
filter,
lastValueFrom,
race,
take,
throwError,
timer,
} from 'rxjs';
import { filter, lastValueFrom, take, throwError, timeout } from 'rxjs';

export const fetchChannelPermissions = async (
socket: MumbleSocket,
channelId: number,
): Promise<PermissionQuery> => {
const ret = lastValueFrom(
race(
socket.packet.pipe(
filterPacket(PermissionQuery),
filter(permissionQuery => permissionQuery.channelId === channelId),
take(1),
),
timer(CommandTimeout).pipe(
concatMap(() =>
socket.packet.pipe(
filterPacket(PermissionQuery),
filter(permissionQuery => permissionQuery.channelId === channelId),
take(1),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('fetchChannelPermissions')),
),
),
}),
),
);

Expand Down
12 changes: 6 additions & 6 deletions src/commands/link-channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
race,
take,
throwError,
timer,
timeout,
} from 'rxjs';

export const linkChannels = async (
Expand All @@ -26,6 +26,11 @@ export const linkChannels = async (
filter(channelSync => channelSync.channelId === channelId),
take(1),
map(() => void 0),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('linkChannels')),
}),
),
socket.packet.pipe(
filterPacket(PermissionDenied),
Expand All @@ -35,11 +40,6 @@ export const linkChannels = async (
throwError(() => new PermissionDeniedError(permissionDenied)),
),
),
timer(CommandTimeout).pipe(
concatMap(() =>
throwError(() => new CommandTimedOutError('linkChannels')),
),
),
),
);
socket.send(
Expand Down
9 changes: 8 additions & 1 deletion src/commands/move-user-to-channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PermissionDeniedError } from '@/errors';
import { CommandTimeout } from '@/config';
import { CommandTimedOutError, PermissionDeniedError } from '@/errors';
import { MumbleSocket } from '@/mumble-socket';
import { filterPacket } from '@/rxjs-operators/filter-packet';
import { PermissionDenied, UserState } from '@tf2pickup-org/mumble-protocol';
Expand All @@ -10,6 +11,7 @@ import {
race,
take,
throwError,
timeout,
} from 'rxjs';

export const moveUserToChannel = async (
Expand All @@ -28,6 +30,11 @@ export const moveUserToChannel = async (
),
take(1),
map(() => void 0),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('moveUserToChannel')),
}),
),
socket.packet.pipe(
filterPacket(PermissionDenied),
Expand Down
12 changes: 6 additions & 6 deletions src/commands/remove-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
race,
take,
throwError,
timer,
timeout,
} from 'rxjs';

export const removeChannel = async (
Expand All @@ -28,6 +28,11 @@ export const removeChannel = async (
filter(channelRemove => channelRemove.channelId === channelId),
take(1),
map(channelRemove => channelRemove.channelId),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('removeChannel')),
}),
),
socket.packet.pipe(
filterPacket(PermissionDenied),
Expand All @@ -37,11 +42,6 @@ export const removeChannel = async (
throwError(() => new PermissionDeniedError(permissionDenied)),
),
),
timer(CommandTimeout).pipe(
concatMap(() =>
throwError(() => new CommandTimedOutError('removeChannel')),
),
),
),
);
socket.send(ChannelRemove, ChannelRemove.create({ channelId }));
Expand Down
12 changes: 6 additions & 6 deletions src/commands/unlink-channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
race,
take,
throwError,
timer,
timeout,
} from 'rxjs';

export const unlinkChannels = async (
Expand All @@ -26,6 +26,11 @@ export const unlinkChannels = async (
filter(channelSync => channelSync.channelId === channelId),
take(1),
map(() => void 0),
timeout({
first: CommandTimeout,
with: () =>
throwError(() => new CommandTimedOutError('unlinkChannels')),
}),
),
socket.packet.pipe(
filterPacket(PermissionDenied),
Expand All @@ -35,11 +40,6 @@ export const unlinkChannels = async (
throwError(() => new PermissionDeniedError(permissionDenied)),
),
),
timer(CommandTimeout).pipe(
concatMap(() =>
throwError(() => new CommandTimedOutError('linkChannels')),
),
),
),
);
socket.send(
Expand Down

0 comments on commit f8c8590

Please sign in to comment.