Skip to content

Commit

Permalink
Merge branch 'main' of github.com:MTKruto/server
Browse files Browse the repository at this point in the history
  • Loading branch information
rojvv committed Jun 10, 2024
2 parents 12b5dfd + bfe4738 commit d9f3fbf
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 427 deletions.
2 changes: 1 addition & 1 deletion MTKruto
Submodule MTKruto updated 114 files
1 change: 1 addition & 0 deletions allowed_methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export const ALLOWED_METHODS = [
"removeStoryFromHighlights",
"blockUser",
"unblockUser",
"download",
"downloadLiveStreamChunk",
"getLiveStreamChannels",
"getVideoChat",
Expand Down
22 changes: 14 additions & 8 deletions client_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import * as path from "std/path/mod.ts";
import { existsSync } from "std/fs/exists.ts";
import { unreachable } from "std/assert/unreachable.ts";

import { InputError } from "mtkruto/0_errors.ts";
import { Mutex, Queue } from "mtkruto/1_utilities.ts";
import {
Client,
errors,
InputError,
InvokeErrorHandler,
NetworkStatistics,
Update,
Expand All @@ -36,6 +36,8 @@ import {
import { StorageDenoKV } from "mtkruto/storage/1_storage_deno_kv.ts";
import { transportProviderTcp } from "mtkruto/transport/3_transport_provider_tcp.ts";

import { DownloadManager } from "./download_manager.ts";

export interface ClientStats {
connected: boolean;
me: User;
Expand Down Expand Up @@ -87,6 +89,16 @@ export class ClientManager {
};
}

#downloadManagers = new Map<Client, DownloadManager>();
async download(id: string, fileId: string) {
const client = await this.getClient(id);
let downloadManager = this.#downloadManagers.get(client);
if (!downloadManager) {
downloadManager = new DownloadManager(client);
}
return downloadManager.download(fileId);
}

async getClient(id: string) {
{
const client = this.#clients.get(id);
Expand Down Expand Up @@ -226,17 +238,12 @@ export class ClientManager {
if (this.#webhooks.has(client)) {
throw new InputError("getUpdates is not allowed when a webhook is set.");
}

if (this.#polls.has(client)) {
{
const controller = this.#getUpdatesControllers.get(client);
if (controller) {
controller.abort();
}
this.#getUpdatesControllers.delete(client);
// just in case
this.#polls.delete(client);
this.#updateResolvers.get(client)?.();
this.#updateResolvers.delete(client);
}
this.#polls.add(client);
let controller: AbortController | null = null;
Expand Down Expand Up @@ -276,7 +283,6 @@ export class ClientManager {
return [];
}
} finally {
this.#updateResolvers.delete(client);
this.#polls.delete(client);
this.#lastGetUpdates.set(client, new Date());
if (timeout != null) {
Expand Down
5 changes: 2 additions & 3 deletions disallowed_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { functions, name } from "mtkruto/mod.ts";
import { isMtprotoFunction } from "mtkruto/client/0_utilities.ts";

const DISALLOWED_FUNCTIONS = [
Expand Down Expand Up @@ -66,11 +65,11 @@ const DISALLOWED_FUNCTIONS = [
];

export function isFunctionDisallowed(function_: any) {
if (function_ instanceof functions.ping) {
if (function_._ == "ping") {
return false;
}

if (DISALLOWED_FUNCTIONS.includes(function_[name])) {
if (DISALLOWED_FUNCTIONS.includes(function_._)) {
return true;
}

Expand Down
131 changes: 131 additions & 0 deletions download_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* MTKruto Server
* Copyright (C) 2024 Roj <https://roj.im/>
*
* This file is part of MTKruto Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import * as path from "std/path/mod.ts";
import { exists, existsSync } from "std/fs/mod.ts";

import { Client } from "mtkruto/mod.ts";
import { Queue } from "mtkruto/1_utilities.ts";

export class DownloadManager {
#client: Client;
static DOWNLOADS_PATH = path.join(Deno.cwd(), ".downloads");

constructor(client: Client) {
this.#client = client;
if (!existsSync(DownloadManager.DOWNLOADS_PATH)) {
Deno.mkdirSync(DownloadManager.DOWNLOADS_PATH);
}
}

async *download(fileId: string) {
const dir = path.join(DownloadManager.DOWNLOADS_PATH, fileId);
if (!await exists(dir)) {
await Deno.mkdir(dir);
}
let n = 0;
let offset = 0;
const haveAllParts = await exists(path.join(dir, "_all"));
let partsAvailable = 0;
for await (const entry of Deno.readDir(dir)) {
if (entry.name.startsWith("_") || !entry.isFile) {
continue;
}
if (entry.name == partsAvailable + "") {
++partsAvailable;
const { size } = await Deno.stat(path.join(dir, entry.name));
offset += size;
}
}
let download: Download | undefined;
if (!haveAllParts) {
download = this.#startDownload(fileId, partsAvailable, offset);
}
for (let i = 0; i < partsAvailable; ++i) {
const part = await Deno.readFile(path.join(dir, i + ""));
offset += part.byteLength;
++n;
yield part;
}
if (download) {
while (true) {
if (download.partsAvailable > n) {
if (await exists(path.join(dir, n + ""))) {
const part = await Deno.readFile(path.join(dir, n + ""));
++n;
yield part;
}
} else if (download.haveAllParts) {
break;
}
await new Promise<void>((r) => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 5000);
download.addEventListener("partAvailable", () => {
clearTimeout(timeout);
r();
}, { once: true, signal: controller.signal });
});
}
}
}

#downloadQueue = new Queue("downloads");
#downloads = new Map<string, Download>();
#startDownload(fileId: string, partsAvailable: number, offset: number) {
let download = this.#downloads.get(fileId);
if (!download) {
download = new Download(this.#client, fileId, partsAvailable, offset);
this.#downloads.set(fileId, download);
this.#downloadQueue.add(() =>
download!.start().finally(() => this.#downloads.delete(fileId))
);
}
return download;
}
}

class Download extends EventTarget {
haveAllParts = false;

constructor(
private client: Client,
private fileId: string,
public partsAvailable: number,
private offset: number,
) {
super();
}

async start() {
const dir = path.join(DownloadManager.DOWNLOADS_PATH, this.fileId);
if (!await exists(dir)) {
await Deno.mkdir(dir);
}
for await (
const chunk of this.client.download(this.fileId, { offset: this.offset })
) {
await Deno.writeFile(path.join(dir, "" + this.partsAvailable), chunk);
++this.partsAvailable;
this.dispatchEvent(new Event("partAvailable"));
}
await Deno.writeFile(path.join(dir, "_all"), new Uint8Array());
}
}
30 changes: 29 additions & 1 deletion main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,36 @@ async function handleMethod(
const result = await workers.call(worker, "serve", id, method, params);
if (result === "DROP") {
return drop();
} else {
} else if (Array.isArray(result)) {
return Response.json(...result);
} else {
const firstChunk = await workers.call(worker, "next", result.streamId);
if (firstChunk == null) {
return badRequest("Invalid stream ID");
}
return new Response(
new ReadableStream({
start(controller) {
controller.enqueue(firstChunk.value);
if (firstChunk.done) {
controller.close();
}
},
async pull(controller) {
const chunk = await workers.call(worker, "next", result.streamId);
if (chunk == null) {
controller.close();
} else {
if (chunk.value) {
controller.enqueue(chunk.value);
}
if (chunk.done) {
controller.close();
}
}
},
}),
);
}
}

Expand Down
Loading

0 comments on commit d9f3fbf

Please sign in to comment.