Skip to content

Commit

Permalink
Add support for downloading files
Browse files Browse the repository at this point in the history
  • Loading branch information
rojvv committed May 12, 2024
1 parent d378c3c commit 4606093
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 10 deletions.
2 changes: 1 addition & 1 deletion MTKruto
1 change: 1 addition & 0 deletions allowed_methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export const ALLOWED_METHODS = [
"removeStoryFromHighlights",
"blockUser",
"unblockUser",
"download",
"downloadLiveStreamChunk",
"getLiveStreamChannels",
"getVideoChat",
Expand Down
12 changes: 12 additions & 0 deletions client_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import {
import { StorageDenoKV } from "mtkruto/storage/1_storage_deno_kv.ts";
import { transportProviderTcp } from "mtkruto/transport/3_transport_provider_tcp.ts";

import { DownloadManager } from "./download_manager.ts";

export interface ClientStats {
connected: boolean;
me: User;
Expand Down Expand Up @@ -87,6 +89,16 @@ export class ClientManager {
};
}

#downloadManagers = new Map<Client, DownloadManager>();
async download(id: string, fileId: string) {
const client = await this.getClient(id);
let downloadManager = this.#downloadManagers.get(client);
if (!downloadManager) {
downloadManager = new DownloadManager(client);
}
return downloadManager.download(fileId);
}

async getClient(id: string) {
{
const client = this.#clients.get(id);
Expand Down
111 changes: 111 additions & 0 deletions download_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import * as path from "std/path/mod.ts";
import { exists, existsSync } from "std/fs/mod.ts";

import { Client } from "mtkruto/mod.ts";
import { Queue } from "mtkruto/1_utilities.ts";

export class DownloadManager {
#client: Client;
static DOWNLOADS_PATH = path.join(Deno.cwd(), ".downloads");

constructor(client: Client) {
this.#client = client;
if (!existsSync(DownloadManager.DOWNLOADS_PATH)) {
Deno.mkdirSync(DownloadManager.DOWNLOADS_PATH);
}
}

async *download(fileId: string) {
const dir = path.join(DownloadManager.DOWNLOADS_PATH, fileId);
if (!await exists(dir)) {
await Deno.mkdir(dir);
}
let n = 0;
let offset = 0;
const haveAllParts = await exists(path.join(dir, "_all"));
let partsAvailable = 0;
for await (const entry of Deno.readDir(dir)) {
if (entry.name.startsWith("_") || !entry.isFile) {
continue;
}
if (entry.name == partsAvailable + "") {
++partsAvailable;
const { size } = await Deno.stat(path.join(dir, entry.name));
offset += size;
}
}
let download: Download | undefined;
if (!haveAllParts) {
download = this.#startDownload(fileId, partsAvailable, offset);
}
for (let i = 0; i < partsAvailable; ++i) {
const part = await Deno.readFile(path.join(dir, i + ""));
offset += part.byteLength;
++n;
yield part;
}
if (download) {
while (true) {
if (download.partsAvailable > n) {
if (await exists(path.join(dir, n + ""))) {
const part = await Deno.readFile(path.join(dir, n + ""));
++n;
yield part;
}
} else if (download.haveAllParts) {
break;
}
await new Promise<void>((r) => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 5000);
download.addEventListener("partAvailable", () => {
clearTimeout(timeout);
r();
}, { once: true, signal: controller.signal });
});
}
}
}

#downloadQueue = new Queue("downloads");
#downloads = new Map<string, Download>();
#startDownload(fileId: string, partsAvailable: number, offset: number) {
let download = this.#downloads.get(fileId);
if (!download) {
download = new Download(this.#client, fileId, partsAvailable, offset);
this.#downloads.set(fileId, download);
this.#downloadQueue.add(() =>
download!.start().finally(() => this.#downloads.delete(fileId))
);
}
return download;
}
}

class Download extends EventTarget {
haveAllParts = false;

constructor(
private client: Client,
private fileId: string,
public partsAvailable: number,
private offset: number,
) {
super();
}

async start() {
const dir = path.join(DownloadManager.DOWNLOADS_PATH, this.fileId);
if (!await exists(dir)) {
await Deno.mkdir(dir);
}
for await (
const chunk of this.client.download(this.fileId, { offset: this.offset })
) {
await Deno.writeFile(path.join(dir, "" + this.partsAvailable), chunk);
++this.partsAvailable;
this.dispatchEvent(new Event("partAvailable"));
}
await Deno.writeFile(path.join(dir, "_all"), new Uint8Array());
}
}
30 changes: 29 additions & 1 deletion main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,36 @@ async function handleMethod(
const result = await workers.call(worker, "serve", id, method, params);
if (result === "DROP") {
return drop();
} else {
} else if (Array.isArray(result)) {
return Response.json(...result);
} else {
const firstChunk = await workers.call(worker, "next", result.streamId);
if (firstChunk == null) {
return badRequest("Invalid stream ID");
}
return new Response(
new ReadableStream({
start(controller) {
controller.enqueue(firstChunk.value);
if (firstChunk.done) {
controller.close();
}
},
async pull(controller) {
const chunk = await workers.call(worker, "next", result.streamId);
if (chunk == null) {
controller.close();
} else {
if (chunk.value) {
controller.enqueue(chunk.value);
}
if (chunk.done) {
controller.close();
}
}
},
}),
);
}
}

Expand Down
49 changes: 41 additions & 8 deletions worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import { InputError } from "mtkruto/0_errors.ts";
import { setLogVerbosity } from "mtkruto/1_utilities.ts";
import { functions, setLoggingProvider, types } from "mtkruto/mod.ts";

import { serialize } from "./tl_json.ts";
import { deserialize } from "./tl_json.ts";
import { transform } from "./transform.ts";
import { fileLogger } from "./file_logger.ts";
import { deserialize, serialize } from "./tl_json.ts";
import { isFunctionDisallowed } from "./disallowed_functions.ts";
import { ClientManager, ClientStats } from "./client_manager.ts";
import { ALLOWED_METHODS, AllowedMethod } from "./allowed_methods.ts";
Expand Down Expand Up @@ -82,6 +81,7 @@ const handlers = {
init,
clientCount,
serve,
next,
stats,
getUpdates,
invoke,
Expand Down Expand Up @@ -152,23 +152,56 @@ async function serve(
id: string,
method: AllowedMethod,
args: any[],
): Promise<"DROP" | Parameters<typeof Response["json"]>> {
): Promise<
"DROP" | Parameters<typeof Response["json"]> | { streamId: string }
> {
if (!id.trim() || !method.trim()) {
return "DROP";
}
if (!(ALLOWED_METHODS.includes(method))) {
return "DROP";
}
const client = await clientManager.getClient(id);
// deno-lint-ignore ban-ts-comment
// @ts-ignore
const result = transform(await client[method](...args));
let result;
if (method == "download") {
result = await clientManager.download(id, args[0]);
} else {
const client = await clientManager.getClient(id);
// deno-lint-ignore ban-ts-comment
// @ts-ignore
result = transform(await client[method](...args));
}
if (result !== undefined) {
return [result];
if (
typeof result === "object" && result != null &&
Symbol.asyncIterator in result
) {
return { streamId: getStreamId(result) };
} else {
return [result];
}
} else {
return [null];
}
}
const streams = new Map<string, AsyncIterator<Uint8Array>>();
function getStreamId(iterable: AsyncIterable<Uint8Array>) {
const id = crypto.randomUUID();
streams.set(id, iterable[Symbol.asyncIterator]());
return id;
}

async function next(streamId: string) {
const result = await streams.get(streamId)?.next();

if (result === undefined) {
return null;
} else {
if (result.done) {
streams.delete(streamId);
}
return result;
}
}

async function invoke(
id: string,
Expand Down

0 comments on commit 4606093

Please sign in to comment.