Skip to content

Commit

Permalink
fix: guide/epg now accurately reflects state of on-demand channels
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisbenincasa committed Dec 3, 2024
1 parent c3251bf commit dbfe027
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 116 deletions.
38 changes: 28 additions & 10 deletions server/src/api/debugApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ChannelLineupQuery } from '@tunarr/types/api';
import { ChannelLineupSchema } from '@tunarr/types/schemas';
import dayjs from 'dayjs';
import { jsonArrayFrom } from 'kysely/helpers/sqlite';
import { map, reject, some } from 'lodash-es';
import { isUndefined, map, reject, some } from 'lodash-es';
import os from 'node:os';
import z from 'zod';

Expand Down Expand Up @@ -41,30 +41,48 @@ export const debugApi: RouterPluginAsyncCallback = async (fastify) => {
});

fastify.get(
'/debug/helpers/current_program',
'/debug/helpers/playing_at',
{
schema: ChannelQuerySchema,
schema: {
querystring: ChannelQuerySchema.querystring.extend({
ts: z.coerce.number().optional(),
}),
},
},
async (req, res) => {
const channel = await req.serverCtx.channelDB.getChannelAndPrograms(
req.query.channelId,
);
const channelAndLineup =
await req.serverCtx.channelDB.loadChannelAndLineup(req.query.channelId);

if (!channel) {
if (!channelAndLineup) {
return res
.status(404)
.send({ error: 'No channel with ID ' + req.query.channelId });
}

const result = req.serverCtx
const { channel, lineup } = channelAndLineup;

if (
lineup.onDemandConfig?.state === 'paused' &&
isUndefined(req.query.ts)
) {
req.query.ts = channel.startTime + lineup.onDemandConfig.cursor;
}

const result = await req.serverCtx
.streamProgramCalculator()
.getCurrentLineupItem({
startTime: new Date().getTime(),
startTime: req.query.ts ?? +dayjs(),
channelId: req.query.channelId,
allowSkip: true,
});

return res.send(result);
return result
.map((lineupItem) => {
return res.send(lineupItem);
})
.getOrElse(() => {
return res.status(500).send();
});
},
);

Expand Down
30 changes: 27 additions & 3 deletions server/src/db/ChannelDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
partition,
reduce,
reject,
sum,
sumBy,
take,
uniq,
Expand Down Expand Up @@ -98,7 +99,10 @@ import {
} from './schema/Channel.ts';
import { programExternalIdString } from './schema/Program.ts';
import { ChannelTranscodingSettings } from './schema/base.ts';
import { ChannelWithPrograms as RawChannelWithPrograms } from './schema/derivedTypes.js';
import {
ChannelWithPrograms,
ChannelWithPrograms as RawChannelWithPrograms,
} from './schema/derivedTypes.js';

dayjs.extend(duration);

Expand Down Expand Up @@ -508,6 +512,24 @@ export class ChannelDB {
.executeTakeFirst();
}

async syncChannelDuration(id: string) {
const channelAndLineup = await this.loadChannelAndLineup(id);
if (!channelAndLineup) {
return false;
}
const { channel, lineup } = channelAndLineup;
const lineupDuration = sum(map(lineup.items, (item) => item.durationMs));
if (lineupDuration !== channel.duration) {
await getDatabase()
.updateTable('channel')
.where('channel.uuid', '=', id)
.set('duration', lineupDuration)
.executeTakeFirst();
return true;
}
return false;
}

async deleteChannel(
channelId: string,
blockOnLineupUpdates: boolean = false,
Expand Down Expand Up @@ -936,8 +958,10 @@ export class ChannelDB {
};
}

async loadDirectChannelAndLineup(channelId: string) {
const channel = await this.getChannel(channelId);
async loadChannelWithProgamsAndLineup(
channelId: string,
): Promise<{ channel: ChannelWithPrograms; lineup: Lineup } | null> {
const channel = await this.getChannelAndPrograms(channelId);
if (isNil(channel)) {
return null;
}
Expand Down
78 changes: 49 additions & 29 deletions server/src/services/OnDemandChannelService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { ChannelDB } from '@/db/ChannelDB.ts';
import { OnDemandChannelConfig } from '@/db/derived_types/Lineup.ts';
import { serverContext } from '@/serverContext.ts';
import { UpdateXmlTvTask } from '@/tasks/UpdateXmlTvTask.ts';
import { LoggerFactory } from '@/util/logging/LoggerFactory.js';
import { MutexMap } from '@/util/mutexMap.js';
import dayjs from 'dayjs';
import { isNull, isUndefined } from 'lodash-es';
import { GlobalScheduler } from './Scheduler.ts';

export class OnDemandChannelService {
#logger = LoggerFactory.child({ className: this.constructor.name });
Expand All @@ -22,22 +26,11 @@ export class OnDemandChannelService {
}

async pauseAllChannels() {
const allConfigs = await this.channelDB.loadAllLineupConfigs();
const now = dayjs().unix() * 1000;
for (const [channelId, { lineup }] of Object.entries(allConfigs)) {
if (isUndefined(lineup.onDemandConfig)) {
continue;
}
const channels = await this.channelDB.getAllChannels();
const now = +dayjs();

if (lineup.onDemandConfig.state === 'paused') {
continue;
}

await this.channelDB.updateLineupConfig(channelId, 'onDemandConfig', {
...lineup.onDemandConfig,
state: 'paused',
lastPaused: now,
});
for (const channel of channels) {
await this.pauseChannel(channel.uuid, now);
}
}

Expand Down Expand Up @@ -73,13 +66,20 @@ export class OnDemandChannelService {
: (lineup.onDemandConfig.cursor + elapsed - rewindMs) %
channel.duration;

return await this.channelDB
await this.channelDB
.updateLineupConfig(id, 'onDemandConfig', {
...(lineup.onDemandConfig ?? {}),
state: 'paused',
lastPaused: pauseTime,
cursor: nextCursor,
})
.then(() => {
GlobalScheduler.scheduleOneOffTask(
`Pause_Channel_Update_Guide_${id}`,
dayjs().add(1000),
UpdateXmlTvTask.create(serverContext(), id),
);
})
.finally(() => {
this.#logger.debug(
'Paused on-demand channel %s (at = %s)',
Expand Down Expand Up @@ -112,7 +112,7 @@ export class OnDemandChannelService {
// and skip it if it's a commercial.

const now = dayjs();
return await this.channelDB
await this.channelDB
.updateLineupConfig(id, 'onDemandConfig', {
...(lineup.onDemandConfig ?? {}),
state: 'playing',
Expand All @@ -125,10 +125,36 @@ export class OnDemandChannelService {
now.format(),
);
});

GlobalScheduler.scheduleOneOffTask(
`Resume_Channel_Update_Guide_${id}`,
dayjs().add(1000),
UpdateXmlTvTask.create(serverContext(), id),
);
});
}

async getLiveTimestamp(channelId: string, requestTime: number) {
getLiveTimestampForConfig(
onDemandConfig: OnDemandChannelConfig,
channelStartTime: number,
requestTime: number,
): number {
let sinceResume = dayjs(requestTime).diff(
dayjs(onDemandConfig.lastResumed),
);

// Don't skip milliseconds
if (sinceResume < 1_000) {
sinceResume = 0;
}

return channelStartTime + onDemandConfig.cursor + sinceResume;
}

async getLiveTimestamp(
channelId: string,
requestTime: number,
): Promise<number> {
const channelAndLineup = await this.loadOnDemandChannelLineup(channelId);

if (isUndefined(channelAndLineup)) {
Expand All @@ -141,21 +167,15 @@ export class OnDemandChannelService {
return requestTime;
}

let sinceResume = dayjs(requestTime).diff(
dayjs(lineup.onDemandConfig.lastResumed),
return this.getLiveTimestampForConfig(
lineup.onDemandConfig,
channel.startTime,
requestTime,
);

// Don't skip milliseconds
if (sinceResume < 1_000) {
sinceResume = 0;
}

return channel.startTime + lineup.onDemandConfig.cursor + sinceResume;
}

private async loadOnDemandChannelLineup(id: string) {
const channelAndLineup =
await this.channelDB.loadDirectChannelAndLineup(id);
const channelAndLineup = await this.channelDB.loadChannelAndLineup(id);
if (isNull(channelAndLineup)) {
return;
}
Expand Down
Loading

0 comments on commit dbfe027

Please sign in to comment.