From b2bc95efa8154e97171d508e4d36b117debb2beb Mon Sep 17 00:00:00 2001 From: Marko Man Date: Thu, 24 Nov 2022 12:10:30 +0100 Subject: [PATCH] feat: add download all files option --- swarmsync.py | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/swarmsync.py b/swarmsync.py index 778fd72..9f78fea 100644 --- a/swarmsync.py +++ b/swarmsync.py @@ -149,6 +149,24 @@ async def aioget(ref, url: str, session: aiohttp.ClientSession, sem): sem.release() display.update() +async def aiodownload(ref, file: str, url: str, session: aiohttp.ClientSession, sem): + global display + try: + async with sem, session.get(url + '/' + ref + '/') as res: + r_data = await res.read() + if not 200 <= res.status <= 201: + print(f"Download failed: {res.status}") + return res + async with aiofiles.open(file, mode='wb') as f: + await f.write(r_data) + return res + except Exception as e: + # handle error(s) according to your needs + print(e) + finally: + sem.release() + display.update() + async def aioupload(file: FileManager, url: str, session: aiohttp.ClientSession, sem): resp_dict = [] (MIME,_ )=mimetypes.guess_type(file.name, strict=False) @@ -203,6 +221,16 @@ async def async_upload(scheduled, urll): res = await asyncio.gather(*[aioupload(file, url, session, sem) for file, url in zip(scheduled, l_url)]) print(f'items uploaded ({len(res)})') +async def async_download(references, paths, urll): + global display + l_url = list(islice(cycle(urll), len(references))) + sem = asyncio.Semaphore(args.count) + session_timeout=aiohttp.ClientTimeout(total=14400) + async with sem, aiohttp.ClientSession(timeout=session_timeout) as session: + res = await asyncio.gather(*[aiodownload(reference, file, url, session, sem) for reference, file, url in zip(references, paths, l_url)]) + display.close() + print(f'items downloaded ({len(res)})') + def lst_to_dict(lst): res_dct = {} length=len(lst) @@ -311,7 +339,6 @@ def upload(): print('Error: Enter an address as argument') if address: tag = asyncio.run(get_tag(args.beeurl, address)) -# write_list(TAG, json.dumps(tag)) write_list(ADDRESS, address) print ("saving address: ", address) else: @@ -367,6 +394,41 @@ def check(): leave=True) asyncio.run(async_check(scheduled, url)) +def download(): + global display + if args.count: + print ("count: ", args.count) + if args.beeurl: + urls = args.beeurl.split(",") + for l in urls: + urll.append(normalize_url(l, 'bzz')) + print ("url: ", urll) + download = read_dict(RESPONSES) + references=[] + paths=[] + for x in download: + for y in x['item']: + references.append(y['reference']) + paths.append(y['file']) + + display=tqdm( + total=len(references), + desc='Downloading', + unit='item', + colour='#ff8c00', + leave=True) + print('\n\n\n') + print('Starting download...') + get_size() + start = time.time() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(async_download(references, paths, urll)) + end = time.time() + loop.run_until_complete(asyncio.sleep(0.250)) + loop.close() + print('Time spent downloading:', time.strftime("%H:%M:%S", time.gmtime(end-start))) + # init file if not Path(RETRIEVABLE).is_file(): write_dict(RETRIEVABLE, '[]') @@ -386,6 +448,14 @@ def check(): metavar='', default='responses') parser_show.set_defaults(func=show) +parser_download = subparsers.add_parser('download', + help='download everything from responses list') +parser_download.add_argument("-u", "--beeurl", type=str, help = """enter http address of bee. + ie. http://0:1633""", default="http://0:1633/stewardship/") +parser_download.add_argument("-c", "--count", type=int, required=False, + help = "number of concurrent download", default=10) +parser_download.set_defaults(func=download) + #parser_test = subparsers.add_parser('test', help='test') #parser_test.set_defaults(func=test)