Skip to content

Commit

Permalink
Merge pull request #72 from arabcoders/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
arabcoders authored Feb 27, 2024
2 parents 4cf4b72 + 0548e5e commit 0dbbd6a
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 57 deletions.
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
},
{
"name": "Python: main.py",
"type": "python",
"type": "debugpy",
"request": "launch",
"program": "app/main.py",
"console": "internalConsole",
"env": {
"YTP_CONFIG_PATH": "${workspaceFolder}/var/config",
"YTP_DOWNLOAD_PATH": "${workspaceFolder}/var/downloads",
"YTP_TEMP_PATH": "${workspaceFolder}/var/tmp",
"YTP_URL_HOST": "http://localhost:8081"
"YTP_URL_HOST": "http://localhost:8081",
"YTP_LOG_LEVEL": "DEBUG"
}
}
]
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ Certain values can be set via environment variables, using the `-e` parameter on
* __YTP_KEEP_ARCHIVE__: Whether to keep history of downloaded videos to prevent downloading same file multiple times. Defaults to `true`.
* __YTP_YTDL_DEBUG__: Whether to turn debug logging for the internal `yt-dlp` package. Defaults to `false`.
* __YTP_ALLOW_MANIFESTLESS__: Allow `yt-dlp` to download live streams videos which are yet to be processed by YouTube. Defaults to `false`
* __YTP_HOST__: Which ip address to bind to. Defaults to `0.0.0.0`.
* __YTP_HOST__: Which IP address to bind to. Defaults to `0.0.0.0`.
* __YTP_PORT__: Which port to bind to. Defaults to `8081`.
* __YTP_LOGGING_LEVEL__: Logging level. Defaults to `info`.
* __YTP_LOG_LEVEL__: Log level. Defaults to `info`.
* __YTP_MAX_WORKERS__: How many works to use for downloads. Defaults to `1`.

## Running behind a reverse proxy
Expand Down Expand Up @@ -130,7 +130,7 @@ Once there, you can use the yt-dlp command freely.

## Building and running locally

Make sure you have node.js and Python 3.11 installed.
Make sure you have `node.js` and Python `3.11+` installed.

```bash
cd ytptube/frontend
Expand Down
66 changes: 39 additions & 27 deletions app/AsyncPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ def __init__(self, loop, num_workers: int, name: str, logger, worker_co, load_fa
number of items of work is in the queue). `worker_co` will be called
against each item retrieved from the queue. If any exceptions are raised out of
worker_co, self.exceptions will be set to True.
@param loop: asyncio loop to use
@param num_workers: number of async tasks which will pull from the internal queue
@param name: name of the worker pool (used for logging)
@param logger: logger to use
@param worker_co: async coroutine to call when an item is retrieved from the queue
@param load_factor: multiplier used for number of items in queue
@param job_accept_duration: maximum number of seconds from first push to last push before a TimeoutError will be thrown.
:param loop: asyncio loop to use
:param num_workers: number of async tasks which will pull from the internal queue
:param name: name of the worker pool (used for logging)
:param logger: logger to use
:param worker_co: async coroutine to call when an item is retrieved from the queue
:param load_factor: multiplier used for number of items in queue
:param job_accept_duration: maximum number of seconds from first push to last push before a TimeoutError will be thrown.
Set to None for no limit. Note this does not get reset on aenter/aexit.
@param max_task_time: maximum time allowed for each task before a CancelledError is raised in the task.
:param max_task_time: maximum time allowed for each task before a CancelledError is raised in the task.
Set to None for no limit.
@param return_futures: set to reture to return a future for each `push` (imposes CPU overhead)
@param raise_on_join: raise on join if any exceptions have occurred, default is False
@param log_every_n: (optional) set to number of `push`s each time a log statement should be printed (default does not print every-n pushes)
@param expected_total: (optional) expected total number of jobs (used for `log_event_n` logging)
@return: instance of AsyncWorkerPool
:param return_futures: set to reture to return a future for each `push` (imposes CPU overhead)
:param raise_on_join: raise on join if any exceptions have occurred, default is False
:param log_every_n: (optional) set to number of `push`s each time a log statement should be printed (default does not print every-n pushes)
:param expected_total: (optional) expected total number of jobs (used for `log_event_n` logging)
:return: instance of AsyncWorkerPool
"""
loop = loop if loop else asyncio.get_event_loop()
self._loop = loop
Expand Down Expand Up @@ -117,25 +119,28 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.join()

async def push(self, *args, **kwargs) -> asyncio.Future:
""" Method to push work to `worker_co` passed to `__init__`.
"""
Method to push work to `worker_co` passed initially to `__init__`.
:param args: position arguments to be passed to `worker_co`
:param kwargs: keyword arguments to be passed to `worker_co`
:return: future of result """
:return: future of result.
"""
if self._first_push_dt is None:
self._first_push_dt = self._time()

if self._job_accept_duration is not None and (self._time() - self._first_push_dt) > self._job_accept_duration:
raise TimeoutError("Maximum lifetime of {} seconds of AsyncWorkerPool: {} exceeded".format(
self._job_accept_duration, self._name))
raise TimeoutError(f"Max life time of {self._job_accept_duration}s exceeded for {self._name} pool.")

future = asyncio.futures.Future(
loop=self._loop) if self._return_futures else None
future = asyncio.futures.Future(loop=self._loop) if self._return_futures else None
await self._queue.put((future, args, kwargs))
self._total_queued += 1

if self._log_every_n is not None and (self._total_queued % self._log_every_n) == 0:
self._logger.info("pushed {}/{} items to {} AsyncWorkerPool".format(
self._total_queued, self._expected_total, self._name))
self._logger.info(f"pushed {self._total_queued}/{self._expected_total} items to {self._name} pool.")

self._logger.debug(f"'{self._name}' pool has received a new job. {args} {kwargs}")

return future

Expand All @@ -144,15 +149,22 @@ def start(self):
assert self._workers is None
self._exceptions = False

self._workers = [asyncio.ensure_future(
self._worker_loop(), loop=self._loop) for _ in range(self._num_workers)]
self._workers = []
for _ in range(self._num_workers):
self._workers.append(
asyncio.ensure_future(
coro_or_future=self._worker_loop(),
loop=self._loop
)
)

async def join(self):
# no-op if workers aren't running
if not self._workers:
return

self._logger.info('Joining {}'.format(self._name))
self._logger.info(f'Joining {self._name}')

# The Terminators will kick each worker from being blocked against the _queue.get() and allow
# each one to exit
for _ in range(self._num_workers):
Expand All @@ -162,13 +174,13 @@ async def join(self):
await asyncio.gather(*self._workers)
self._workers = None
except:
self._logger.exception('Exception joining {}'.format(self._name))
self._logger.exception(f'Exception joining {self._name}')
raise
finally:
self._logger.info('Completed {}'.format(self._name))
self._logger.info(f'Completed {self._name}')

if self._exceptions and self._raise_on_join:
raise Exception("Exception occurred in pool {}".format(self._name))
raise Exception(f"Exception occurred in {self._name} pool")

def _time(self):
# utcnow returns a naive datetime, so we have to set the timezone manually <sigh>
Expand Down
6 changes: 3 additions & 3 deletions app/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Config:

base_path: str = ''

logging_level: str = 'info'
log_level: str = 'info'

allow_manifestless: bool = False

Expand Down Expand Up @@ -110,9 +110,9 @@ def __init__(self):
if not self.url_prefix.endswith('/'):
self.url_prefix += '/'

numeric_level = getattr(logging, self.logging_level.upper(), None)
numeric_level = getattr(logging, self.log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {self.logging_level}")
raise ValueError(f"Invalid log level: {self.log_level}")

coloredlogs.install(
level=numeric_level,
Expand Down
57 changes: 35 additions & 22 deletions app/DownloadQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from DataStore import DataStore
from Utils import Notifier, ObjectSerializer, calcDownloadPath, ExtractInfo, isDownloaded, mergeConfig
from AsyncPool import AsyncPool

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
LOG = logging.getLogger('DownloadQueue')
TYPE_DONE: str = 'done'
TYPE_QUEUE: str = 'queue'
Expand Down Expand Up @@ -217,18 +217,26 @@ async def add(
else:
already.add(url)
try:
entry = await asyncio.get_running_loop().run_in_executor(
None,
ExtractInfo,
mergeConfig(self.config.ytdl_options, ytdlp_config),
url,
bool(self.config.ytdl_debug)
)
with ThreadPoolExecutor(thread_name_prefix='extract_info') as pool:
LOG.debug(f'extracting info from {url=}')
entry = await asyncio.get_running_loop().run_in_executor(
pool,
ExtractInfo,
mergeConfig(self.config.ytdl_options, ytdlp_config),
url,
bool(self.config.ytdl_debug)
)

if not entry:
if not self.config.keep_archive:
return {
'status': 'error',
'msg': 'No metadata, most likely video has been downloaded before.' if self.config.keep_archive else 'Unable to extract info check logs.'
}

if not entry:
if self.config.keep_archive:
LOG.debug(f'No metadata, Rechecking with archive disabled. {url=}')
entry = await asyncio.get_running_loop().run_in_executor(
None,
pool,
ExtractInfo,
mergeConfig(self.config.ytdl_options, ytdlp_config),
url,
Expand All @@ -246,11 +254,6 @@ async def add(
'msg': f'[{entry.get("id")}: {entry.get("title")}]: has been downloaded already.'
}

return {
'status': 'error',
'msg': 'No metadata, most likely video has been downloaded before.' if self.config.keep_archive else 'Unable to extract info check logs.'
}

if self.isDownloaded(entry):
raise yt_dlp.utils.ExistingVideoReached()

Expand Down Expand Up @@ -331,32 +334,42 @@ async def __download_pool(self):
loop=asyncio.get_running_loop(),
num_workers=self.config.max_workers,
worker_co=self.__downloadFile,
name='WorkerPool',
name='download_pool',
logger=logging.getLogger('WorkerPool'),
) as executor:
lastLog = time.time()

while True:
while True:
if executor.has_open_workers() is False:
await asyncio.sleep(1)
else:
if executor.has_open_workers() is True:
break
if time.time() - lastLog > 120:
lastLog = time.time()
LOG.info(f'Waiting for workers to be free.')
await asyncio.sleep(1)

LOG.debug(f"Has '{executor.get_available_workers()}' free workers.")

while not self.queue.hasDownloads():
LOG.info(f'Waiting for item to download. [{executor.get_available_workers()}] workers available.')
LOG.info(f'Waiting for item to download.')
await self.event.wait()
self.event.clear()
LOG.debug(f"Cleared wait event.")

entry = self.queue.getNextDownload()
await asyncio.sleep(0.2)

LOG.debug(f"Pushing {entry=} to executor.")

if entry.started() is False and entry.is_canceled() is False:
await executor.push(id=entry.info._id, entry=entry)
LOG.debug(f"Pushed {entry=} to executor.")
await asyncio.sleep(1)

async def __download(self):
while True:
while self.queue.empty():
LOG.info('waiting for item to download.')
LOG.info('Waiting for item to download.')
await self.event.wait()
self.event.clear()

Expand Down
Binary file modified sc_full.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified sc_short.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 0dbbd6a

Please sign in to comment.