diff --git a/09 - Multithreading/09 - Multithreading.ipynb b/09 - Multithreading/09 - Multithreading.ipynb index 305a68a..3d046cf 100644 --- a/09 - Multithreading/09 - Multithreading.ipynb +++ b/09 - Multithreading/09 - Multithreading.ipynb @@ -1154,7 +1154,69 @@ "metadata": {}, "source": [ "\n", - "## asyncio\n", + "# Асинхронно програмиране с `asyncio`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Какво е \"асинхронно програмиране\"?\n", + "\n", + "Асинхронното програмиране е парадигма, която има имплементация във все повече и повече езици за програмиране (напр. C#, Swift, Javascript, ...).\n", + "\n", + "Идеята е проста: докато чакаме приключването на някаква [I/O операция (т.е. такава, която не е CPU-bound)](https://stackoverflow.com/a/868577) да дадем възможност на други задачи да се изпълняват. Това може да се управлява от т.нар. \"run loop\", който да върви на **една единствена нишка**.\n", + "\n", + "Аналогия за асинхронност от реалния свят може да бъде например една дюнерджийница. В този пример IO-bound операцията е стоплянето на храната на скарата - докато това се случва, дюнерджията (нишката) може да се заеме с други задачи - да приеме поръчки, плащания, да приготвя други поръчки и т.н. Без тази асинхронност 5 поръчки щяха да отнемат 5 пъти повече време (като игнорираме добавения overhead от плащане, слагане на салати и т.н.)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Какво са примери за IO операции и за CPU-bound операции?\n", + "\n", + "* IO операции:\n", + " * Четене и писане на файлове (в т.ч. бази данни)\n", + " * Мрежови заявки\n", + " * Изпълнение на команди към операционната система\n", + " * всичко друго, което не ангажира процесора през цялото време\n", + "* CPU-bound операции:\n", + " * Математически изчисления\n", + " * Сортиране на списъци\n", + " * Джуркане на матрици\n", + " * всичко друго, което ангажира процесора през цялото време\n", + "\n", + "Асинхронното програмиране **няма как да забърза CPU-bound операциите**. За тях са необходими многонишкови решения, или конкретно в Python заради GIL - единственото решение е използването на много паралелни процеси (multiprocessing).\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Какви са съставните части на асинхронното програмиране?\n", + "\n", + "* run loop: това е \"двигателят\" на асинхронното програмиране. Той следи за събития (напр. приключване на IO операция) и изпълнява кода, който е свързан с тях.\n", + "* корутини (coroutines), `Task`-ове и `Future`-и: с тях дефинираме \"задачите\", които се изпълняват асинхронно:\n", + " * корутините са функции, които могат да бъдат спрени и продължени по време на изпълнение (подобно на генераторите в Python, но с известна разлика)\n", + " * `Task`-овете са обектите, които съдържат корутините и се изпълняват от run loop-а\n", + " * `Future`-ите е общият интерфейс за асинхронните операции\n", + "* ключовите думи `async` и `await`:\n", + " * с `async` декларираме, че дадена функция е корутина (в Python също служи и за деклариране на асинхронни цикли и контекстни мениджъри)\n", + " * с `await` предаваме контрола на run loop-а, когато стигнем до IO операция. Изполва се само в `async` среда.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## `asyncio`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ "https://docs.python.org/3.10/library/asyncio.html#module-asyncio" ] }, @@ -1162,207 +1224,748 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Една от по-интересните (и полезни) функционалности в Python, е поддържката на `async/await` синтаксиса. Благодарение на него, можем да реализираме асинхронни IO операции. Модулът `asyncio` ни дава множество функции и обекти свързани с async IO в Python." + "Асинхронното програмиране в Python е възможно във **версии >= 3.4** посредством вградения модул `asyncio` и ключовите думи `async` и `await`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Какво са async IO и coroutines ?" + "⚠️ Имайте предвид, че (особено между версии 3.4 и 3.7) има много промени в синтаксиса и функционалностите на `asyncio`, така че не вярвайте сляпо на ChatGPT или StackOverflow. Ако пише нещо от рода на `@asyncio.couroutine` или `yield from ...` - това не важи вече. Също има и промени в имената на функции и т.н. Винаги double-check-вайте в официалната документация, ако не сте сигурни за нещо." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Async IO е вид конкурентно изпълнение на функции - асинхроността ни дава възможност да \"спрем\" изпълнението на дадена функция в един момент докато те изчакват някакъв ресурс, и да дадем контрола на друга такава функция. Тези функции, които имат възможността да бъдат спирани и продължавани се наричат корутини (coroutines). Основната разлика с многонишковото и многопроцесорното програмиране е, че тук използваме един процес и една нишка.\n", + "### Корутини" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Теоретически най-коректната дефиниця на корутина е, че тя е генератор, който освен да генерира стойности, може и да получава стойности. \n", "\n", - "Async IO ни позволява да изпълняваме coroutines в т.нар. event loop - докато една coroutine чака, event loop-а може да пусне друга да се изпълнява." + "Няма да навливаме в повече детайли от по-ниско ниво. За целите на asyncio и асинхронното програмиране можем да разглеждаме в Python корутините като функции или генератори (понеже е възможно да има и `yield` в тялото им), декларирани с `async def`, които могат да предават контрола на run loop-a (за изпълнение на други future-и докато се чака) чрез `await`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Нека разгледаме една проста функция, и нейния coroutine еквивалент:" + "Нека си направим един прост coroutine, който просто да чака изпълнението на някаква IO операция между два `print`-а. \n", + "\n", + "За простота ще използваме `asyncio.sleep`, но можете да лесно да си представете, че на негово място примерно чете огромен файл или чака отговор от сървър например." ] }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ - "def add(x: int, y: int) -> int:\n", - " return x + y\n", + "import asyncio # нужно ни е за фунциите вътре, иначе ако само пишем async и await ключови думи няма нужда от импорт" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "async def some_coroutine():\n", + " print(\"Starting the operation\")\n", + " await asyncio.sleep(5)\n", + " print(\"Operation finished\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "⚠️ Не използвайте `time.sleep` в корутини! Това ще блокира целия run loop и ще забави изпълнението на всички останали корутина. Винаги използвайте `asyncio.sleep` за асинхронно чакане." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Хубаво, пише `async` и `await`, но каква е разликата с това да ги няма?" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", "\n", - "async def add(x: int, y: int) -> int:\n", - " return x + y" + "def some_function():\n", + " print(\"Starting the operation\")\n", + " time.sleep(5)\n", + " print(\"Operation finished\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "За да изпълним дадената функция, трябва да я добавим към текущия event loop. Може да си представим event loop-а като безкраен цикъл, който изпълнява нашите функции в дадено време. \n", + "Ако просто изчакваме корутините подред, няма да има разлика във времето за изпълнение:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation\n", + "Operation finished\n", + "Starting the operation\n", + "Operation finished\n" + ] + } + ], + "source": [ + "await some_coroutine()\n", + "await some_coroutine()\n", + "\n", + "# Забележка: ако това го напишем като top-level code във файл, няма да можем да го изпълним просто така.\n", + "# Тогава ще трябва да го сложим в друг coroutine (async def) и да го извикаме с `asyncio.run(името_на_новия_coroutine())`\n", "\n", - "От нас се изисква да пуснем event loop-а, да добавим нашата coroutine в списъка за изпълнение и да спрем loop-а след като функцията ни е изпълнена - това може да бъде изпълнено чрез ключовата дума `await`" + "# Jupyter notebooks обаче ни позволяваt имплицитна top-level async среда." ] }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "5\n" + "Starting the operation\n", + "Operation finished\n", + "Starting the operation\n", + "Operation finished\n" ] } ], "source": [ - "import asyncio\n", + "some_function()\n", + "some_function()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "И в двата случая отнема 10 секунди, защото всеки coroutine чака 5 секунди, и ги извикваме **един след друг**." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### `asyncio.gather`" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Ако искаме да ги пуснем едновременно, един начин е чрез `asyncio.gather`:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation\n", + "Starting the operation\n", + "Operation finished\n", + "Operation finished\n" + ] + }, + { + "data": { + "text/plain": [ + "[None, None]" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await asyncio.gather(some_coroutine(), some_coroutine())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`asyncio.gather` обединява множество future-и в един и накрая връща списък с резултатите им (**в реда, в който са подадени**)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "⚠️ Обърнете внимание, че подаваме не просто имената на корутините, а ги извикваме с `()`. Това е защото един `async def` на практика е **coroutine function**, и чак като бъде извикан, тогава връща самия **coroutine object**. Терминологията обаче обикновено се смесва и двете понятия се наричат \"coroutine\".\n", "\n", - "async def add(x: int, y: int) -> int:\n", - " return x + y\n", + "Чак след указване на `await` върху coroutine object-a започва изпълнението му (правете аналогия с генераторите и `next`)." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "some_coroutine()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Полезна илюстрация за това е coroutine, който приема аргументи:" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "async def tagged_coroutine(tag: str | int, sleep_secs: int):\n", + " print(f\"Starting the operation with tag {tag}, sleeping for {sleep_secs} seconds\")\n", + " await asyncio.sleep(sleep_secs)\n", + " print(f\"Operation with tag {tag} finished\")\n", + " return f\"dummy_result_of_tag_{tag}\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation with tag 0, sleeping for 5 seconds\n", + "Starting the operation with tag 1, sleeping for 4 seconds\n", + "Starting the operation with tag 2, sleeping for 3 seconds\n", + "Starting the operation with tag 3, sleeping for 2 seconds\n", + "Starting the operation with tag 4, sleeping for 1 seconds\n", + "Operation with tag 4 finished\n", + "Operation with tag 3 finished\n", + "Operation with tag 2 finished\n", + "Operation with tag 1 finished\n", + "Operation with tag 0 finished\n" + ] + }, + { + "data": { + "text/plain": [ + "['dummy_result_of_tag_0',\n", + " 'dummy_result_of_tag_1',\n", + " 'dummy_result_of_tag_2',\n", + " 'dummy_result_of_tag_3',\n", + " 'dummy_result_of_tag_4']" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "coroutines = (tagged_coroutine(tag=i, sleep_secs=5-i) for i in range(5))\n", + "await asyncio.gather(*coroutines)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Обърнете внимание, че ако сложим `await` в comprehension, няма да има concurrency:" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation with tag 0, sleeping for 3 seconds\n", + "Operation with tag 0 finished\n", + "Starting the operation with tag 1, sleeping for 2 seconds\n", + "Operation with tag 1 finished\n", + "Starting the operation with tag 2, sleeping for 1 seconds\n", + "Operation with tag 2 finished\n" + ] + }, + { + "data": { + "text/plain": [ + "['dummy_result_of_tag_0', 'dummy_result_of_tag_1', 'dummy_result_of_tag_2']" + ] + }, + "execution_count": 45, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "coroutines = [tagged_coroutine(tag=i, sleep_secs=3-i) for i in range(3)]\n", + "results = [await result for result in coroutines]\n", + "results" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### `async for` и асинхронни генератори" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Concurrency няма да има и с итериране на асинхронни генератори. Тях итерираме с `async for`. Можем да ги създаваме по тва начина:\n", + "* функционален: с `async def`, в койto има `yield`\n", + "* ООП: с дефиниране на `__aiter__` и `__anext__` дъндърите" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation with tag 0, sleeping for 3 seconds\n", + "Operation with tag 0 finished\n", + "Starting the operation with tag 1, sleeping for 2 seconds\n", + "Operation with tag 1 finished\n", + "Starting the operation with tag 2, sleeping for 1 seconds\n", + "Operation with tag 2 finished\n", + "['dummy_result_of_tag_0', 'dummy_result_of_tag_1', 'dummy_result_of_tag_2']\n" + ] + } + ], + "source": [ + "async def async_generator(n: int):\n", + " for i in range(n):\n", + " result = await tagged_coroutine(tag=i, sleep_secs=n-i)\n", + " yield result\n", "\n", - "result = await add(2, 3)\n", - "print(result)" + "resluts = [result async for result in async_generator(3)]\n", + "print(resluts)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "С помощта на `await`, ние казваме - добави тази функция в event loop-а, и когато се изпълни, вземи резултата, и го принтирай на екрана." + "### `asyncio.as_completed`\n", + "\n", + "Ако искаме да обработваме резултатите от future-и **в реда на приключването им**, можем да използваме `asyncio.as_completed`. Връща ни Future, който можем да `await`-нем на всяка итерация:" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation with tag 1, sleeping for 4 seconds\n", + "Starting the operation with tag 0, sleeping for 5 seconds\n", + "Starting the operation with tag 2, sleeping for 3 seconds\n", + "Starting the operation with tag 3, sleeping for 2 seconds\n", + "Starting the operation with tag 4, sleeping for 1 seconds\n", + "Operation with tag 4 finished\n", + "Operation with tag 3 finished\n", + "Operation with tag 2 finished\n", + "Operation with tag 1 finished\n", + "Operation with tag 0 finished\n" + ] + }, + { + "data": { + "text/plain": [ + "['dummy_result_of_tag_4',\n", + " 'dummy_result_of_tag_3',\n", + " 'dummy_result_of_tag_2',\n", + " 'dummy_result_of_tag_1',\n", + " 'dummy_result_of_tag_0']" + ] + }, + "execution_count": 49, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from asyncio import as_completed\n", + "\n", + "coroutines = [tagged_coroutine(tag=i, sleep_secs=5-i) for i in range(5)]\n", + "results = [await result for result in as_completed(coroutines)]\n", + "\n", + "results" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Нека разгледаме следните две функции - `fast_hi` и `slow_hi` - едната ще изчака 1 секунда, и ще изпише \"Hi\", а другата ще изчака 3 секунди, преди да изпише \"Hi\"" + "### `asyncio.create_task`\n", + "\n", + "Ако не ни трябва да ги изчакваме в текущия coroutine, можем да ги schedule-нем с `create_task`.\n", + "\n", + "⚠️ При този начин е важно полученият `Task` обект да му запазим референция някъде извън текущия scope (например в атрибут на клас или като глобална променлива), защото иначе ще бъде зачистен от garbage collector-a и прекратен." ] }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 59, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Starting fast hi\n", - "Fast said hi\n", - "Starting slow hi\n", - "Slow said hi\n" + "I'm a sync function.\n", + "I shall not wait for the tasks to finish. Or to even start.\n", + "Starting the operation with tag 0, sleeping for 5 seconds\n", + "Starting the operation with tag 1, sleeping for 4 seconds\n", + "Starting the operation with tag 2, sleeping for 3 seconds\n", + "Starting the operation with tag 3, sleeping for 2 seconds\n", + "Starting the operation with tag 4, sleeping for 1 seconds\n", + "Operation with tag 4 finished\n", + "Operation with tag 3 finished\n", + "Operation with tag 2 finished\n", + "Operation with tag 1 finished\n", + "Operation with tag 0 finished\n", + "If you don't await the tasks, at least clean them afterwards, jeez!\n" ] } ], "source": [ - "import asyncio\n", + "from asyncio import create_task\n", "\n", - "async def fast_hi():\n", - " print(\"Starting fast hi\")\n", - " await asyncio.sleep(1)\n", - " print(\"Fast said hi\")\n", + "tasks = set()\n", + "\n", + "def sync_function():\n", + " print(\"I'm a sync function.\")\n", + " for i in range(5):\n", + " task = create_task(tagged_coroutine(tag=i, sleep_secs=5-i))\n", + " tasks.add(task)\n", + " print(\"I shall not wait for the tasks to finish. Or to even start.\")\n", + "\n", + "sync_function()\n", + "\n", + "# ...\n", "\n", - "async def slow_hi():\n", - " print(\"Starting slow hi\")\n", - " await asyncio.sleep(3)\n", - " print(\"Slow said hi\")\n", + "await asyncio.sleep(8)\n", "\n", - "await fast_hi()\n", - "await slow_hi()" + "if len(tasks) > 0:\n", + " print(\"If you don't await the tasks, at least clean them afterwards, jeez!\")\n", + " for task in tasks:\n", + " if not task.done():\n", + " task.cancel()\n", + " tasks = set()" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation with tag 0, sleeping for 5 seconds\n", + "Starting the operation with tag 1, sleeping for 4 seconds\n", + "Starting the operation with tag 2, sleeping for 3 seconds\n", + "Starting the operation with tag 3, sleeping for 2 seconds\n", + "Starting the operation with tag 4, sleeping for 1 seconds\n", + "Operation with tag 4 finished\n", + "Operation with tag 3 finished\n", + "Operation with tag 2 finished\n", + "Operation with tag 1 finished\n", + "Operation with tag 0 finished\n" + ] + }, + { + "data": { + "text/plain": [ + "['dummy_result_of_tag_2',\n", + " 'dummy_result_of_tag_4',\n", + " 'dummy_result_of_tag_3',\n", + " 'dummy_result_of_tag_1',\n", + " 'dummy_result_of_tag_0']" + ] + }, + "execution_count": 64, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tasks = set()\n", + "\n", + "for i in range(5):\n", + " task = create_task(tagged_coroutine(tag=i, sleep_secs=5-i))\n", + " tasks.add(task)\n", + "\n", + "[await task for task in tasks] # tasks can be awaited too" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "В текущото изпълнение, двете ни функции работят последователно. Но можем да ги накараме да работят конкурентно, като ги превърнем в задачи `Task`." + "`Task`-овете могат да бъдат спирани с `.cancel()`. Един `task` има метод `.done()`, който връща `True`, ако задачата е приключила.\n", + "\n", + "Приключването на `Task` Може да стане по три начина:\n", + "1. Успешно изпълнение\n", + "2. Грешка\n", + "3. Отменяне (чрез `cancel` отвън или вдигане на `CancellationError` отвътре)\n", + "\n", + "С `.result()`, `.exception()` и `.cancelled()` можем да вземем резултата, грешката или да проверим дали е била отменена задачата, като ако състоянието е различно от очакваното се вдига подходяща грешка. Повече инфо [тук](https://docs.python.org/3/library/asyncio-task.html#task-status)." ] }, { "cell_type": "code", - "execution_count": 44, + "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Starting slow hi\n", - "Starting fast hi\n", - "Fast said hi\n", - "Slow said hi\n" + "0 seconds: task.done() = False\n", + "0 seconds: task.result() raised asyncio.InvalidStateError\n", + "Starting the operation with tag long coro, sleeping for 69420 seconds\n", + "1 seconds: task.done() = False\n", + "1 seconds: task.result() raised asyncio.InvalidStateError\n", + "2 seconds: task.done() = False\n", + "2 seconds: task.result() raised asyncio.InvalidStateError\n", + "3 seconds: task.done() = False\n", + "3 seconds: task.result() raised asyncio.InvalidStateError\n", + "4 seconds: task.done() = False\n", + "4 seconds: task.result() raised asyncio.InvalidStateError\n", + "5 seconds: task.done() = False\n", + "5 seconds: task.result() raised asyncio.InvalidStateError\n", + "6 seconds: task.done() = True\n", + "6 seconds: task.result() raised asyncio.CancelledError\n" ] } ], "source": [ - "import asyncio\n", + "def debug_task(task: Task, seconds: int):\n", + " header = f\"{seconds} seconds: \"\n", + "\n", + " print(f\"{header}{task.done() = }\")\n", + " try:\n", + " print(f\"{header}{task.result() = }\")\n", + " except asyncio.InvalidStateError:\n", + " print(f\"{header}task.result() raised asyncio.InvalidStateError\")\n", + " except asyncio.CancelledError:\n", + " print(f\"{header}task.result() raised asyncio.CancelledError\")\n", + " except Exception as e: \n", + " print(f\"{header}task.result() raised {e}\")\n", "\n", - "async def fast_hi():\n", - " print(\"Starting fast hi\")\n", + "\n", + "task = create_task(tagged_coroutine(tag=\"long coro\", sleep_secs=69420))\n", + "\n", + "for second in range(5):\n", + " debug_task(task, second)\n", " await asyncio.sleep(1)\n", - " print(\"Fast said hi\")\n", "\n", - "async def slow_hi():\n", - " print(\"Starting slow hi\")\n", - " await asyncio.sleep(3)\n", - " print(\"Slow said hi\")\n", + "task.cancel()\n", + "debug_task(task, 5)\n", "\n", - "slow_task = asyncio.create_task(slow_hi())\n", - "fast_task = asyncio.create_task(fast_hi())\n", + "await asyncio.sleep(1) # it takes some time to cancel the task!\n", + "debug_task(task, 6)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Асинхронни контекст мениджъри (`async with`)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "В много библиотеки, които използват asyncio, се изисква да се използва `async with` за да се гарантира правилното отваряне и затваряне на ресурси (файлове, сокети, т.н.).\n", "\n", - "await slow_task\n", - "await fast_task" + "Както с `with` използваме context manager, така и с `async with` използваме асинхронен контекстен мениджър, без друга разлика в синтаксиса." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "За превръщането на coroutine в `Task` използваме `asyncio.create_task`. При текущата подредба, `slow_task` започва първа изпълнението си, и достига до 3-секундното чакане. Вместо да седи и да не прави нищо, изпълнението се сменя върху `fast_task`, докато изчакваме нашия 3-секунден `sleep` да приключи." + "Единият (ООП) начин да си направим наш си асинхронен контекстен мениджър, като дефинираме `__aenter__` и `__aexit__` методите:" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [], + "source": [ + "from asyncio import sleep\n", + "\n", + "\n", + "class SomeResource:\n", + " @classmethod\n", + " async def acquire(cls) -> 'SomeResource':\n", + " print(\"Acquiring the resource...\", end=\" \")\n", + " await sleep(1)\n", + " print(\"OK.\")\n", + " return cls()\n", + "\n", + " async def some_method(self):\n", + " print(\"Doing something with the resource...\", end=\" \")\n", + " await sleep(1)\n", + " print(\"OK.\")\n", + "\n", + " async def release(self):\n", + " print(\"Releasing the resource...\", end=\" \")\n", + " await sleep(1)\n", + " print(\"OK.\")" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Acquiring the resource... OK.\n", + "Doing something with the resource... OK.\n", + "Releasing the resource... OK.\n" + ] + } + ], + "source": [ + "\n", + "class AsyncContextManager:\n", + " def __init__(self):\n", + " self._resource: SomeResource | None = None\n", + " \n", + " async def __aenter__(self) -> SomeResource:\n", + " if self._resource is not None:\n", + " raise IOError(\"Resource already acquired.\")\n", + " \n", + " res = await SomeResource.acquire()\n", + " self._resource = res\n", + " return res\n", + "\n", + " async def __aexit__(self, exc_type, exc, tb): # just like __exit__ it accepts the exception type, exception and traceback, if any.\n", + " if self._resource is not None:\n", + " await self._resource.release()\n", + " self._resource = None\n", + "\n", + "\n", + "# Usage example:\n", + "\n", + "async with AsyncContextManager() as res:\n", + " await res.some_method()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "До момента трябваше ръчно да създаваме coroutines и техните Task-ове. Но всъщност ние можем и динамично да правим извиквания към вече дефинираната ни coroutine. За целта използваме `asyncio.gather`. Нека имаме coroutine, който има за цел да умножи две числа и да върне резултата. Също така, нека имаме даден наброй входа. Искаме да извършим операциите върху тях асинхронно." + "Другият (функционален) начин е чрез `contextlib.asynccontextmanager` декоратора. С него трябва да декорираме асинхронен генератор, който yield-ва един единствен път. Кодът преди yield-а се изпълнява като `__aenter__`, а след yield-а - като `__aexit__`." ] }, { "cell_type": "code", - "execution_count": 45, + "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "[16, 12, 8, -9]\n" + "Acquiring the resource... OK.\n", + "Doing something with the resource... OK.\n", + "Releasing the resource... OK.\n" ] } ], "source": [ - "import asyncio\n", + "from contextlib import asynccontextmanager\n", "\n", - "async def multiply(a: int, b: int) -> int:\n", - " return a * b\n", "\n", - "inputs = [(8, 2), (1, 12), (-2, -4), (3, -3)]\n", + "@asynccontextmanager\n", + "async def async_context_manager():\n", + " res = await SomeResource.acquire()\n", + " try:\n", + " yield res\n", + " finally:\n", + " await res.release()\n", "\n", - "coroutines = [multiply(first, second) for first, second in inputs]\n", "\n", - "results = await asyncio.gather(*coroutines)\n", "\n", - "print(results)" + "# Usage example:\n", + "\n", + "async with async_context_manager() as res:\n", + " await res.some_method()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Литература за asyncio" ] }, { @@ -1371,8 +1974,9 @@ "source": [ "`asyncio` е обширна тема, която няма да покрием изцяло в курса. Оставяме няколко полезни линка, за тези които искат да навлязат по-надълбоко с `asyncio`.\n", "\n", + "- https://docs.python.org/3.13/library/asyncio.html#module-asyncio\n", + "- https://realpython.com/async-io-python/#the-asyncio-package-and-asyncawait\n", "- https://realpython.com/async-io-python/#async-io-in-context\n", - "- https://docs.python.org/3.10/library/asyncio.html#module-asyncio\n", "- https://www.goodreads.com/book/show/50083143-using-asyncio-in-python" ] }, @@ -1380,7 +1984,24 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Примери" + "## Популярни библиотеки, използващи asyncio" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "* [aiofiles](https://github.com/Tinche/aiofiles): за работа с файлове (локални)\n", + "* [aiohttp](https://github.com/aio-libs/aiohttp): за мрежови заявки (с интерфейс, подобен на [`requests`](../10%20-%20requests/10%20-%20requests.ipynb))\n", + "* доста Python SDKs (за Azure например) или Python имплементации на API-та ([`discord.py`](https://github.com/Rapptz/discord.py) например)\n", + "* много съществуващи библиотеки за работа с определени бази данни или сървъри (като psycopg, boto3, т.н.) имат асинхронни версии (съотв. aiopg, aioboto3, т.н.). Някои от тях са по-поддържани, други - не толкова, трети - въобще не." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Примери след лекцията" ] }, { @@ -1582,11 +2203,123 @@ "\n", " semaphore.release()" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Пример 3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Напишете клас `Scheduler`, който да приема coroutine function (за улеснение приемете, че тя не приема аргументи), и да започне да го изпълнява на всеки `interval` секунди, след като бъде извикан метода `start`. Scheduler-ът да може да бъде спиран от метод `stop`.\n", + "\n", + "Интерфейс:\n", + "```python\n", + "from typing import Callable, Coroutine\n", + "\n", + "\n", + "class Scheduler:\n", + " \"\"\"Schedule a coroutine to start running periodically with a fixed time interval.\"\"\"\n", + "\n", + " def __init__(self, coroutine_func: Callable[[], Coroutine], interval: int):\n", + " ...\n", + "\n", + " def start(self):\n", + " ...\n", + "\n", + " def stop(self):\n", + " ...\n", + "\n", + "```\n", + "\n", + "Примерна употреба:\n", + "\n", + "```python\n", + "scheduler = Scheduler(some_coroutine, 10)\n", + "\n", + "scheduler.start()\n", + "await asyncio.sleep(30)\n", + "scheduler.stop()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Решение на пример 3" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting the operation\n", + "Operation finished\n", + "Starting the operation\n", + "Operation finished\n", + "Starting the operation\n", + "Operation finished\n" + ] + } + ], + "source": [ + "from asyncio import Task, create_task, sleep\n", + "from typing import Callable, Coroutine\n", + "\n", + "\n", + "class Scheduler:\n", + " \"\"\"Schedule a coroutine to start running periodically with a fixed time interval.\"\"\"\n", + "\n", + " def __init__(self, coroutine_func: Callable[[], Coroutine], interval: int):\n", + " self.interval = interval\n", + " self._coroutine_func = coroutine_func\n", + " self._scheduler_task: Task | None = None\n", + " self._coroutine_task: Task | None = None\n", + "\n", + " def start(self):\n", + " self._scheduler_task = create_task(self._run())\n", + "\n", + " async def _run(self):\n", + " while True:\n", + " self._running_task = create_task(self._coroutine_func())\n", + " # primitive and error-prone but just for illustration purposes\n", + " await sleep(self.interval)\n", + "\n", + " def stop(self):\n", + " if self._scheduler_task is not None:\n", + " self._scheduler_task.cancel()\n", + " if self._coroutine_task is not None:\n", + " self._coroutine_task.cancel()\n", + "\n", + "\n", + "# Usage example:\n", + "\n", + "async def some_coroutine():\n", + " print(\"Starting the operation\")\n", + " await sleep(5)\n", + " print(\"Operation finished\")\n", + "\n", + "scheduler = Scheduler(some_coroutine, 10)\n", + "\n", + "scheduler.start()\n", + "await asyncio.sleep(30)\n", + "scheduler.stop()" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3.10.7 64-bit", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -1600,12 +2333,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" - }, - "vscode": { - "interpreter": { - "hash": "e7370f93d1d0cde622a1f8e1c04877d8463912d04d973331ad4851f04de6915a" - } + "version": "3.11.9" } }, "nbformat": 4,