Skip to content

Commit

Permalink
fix: squashing bugs
Browse files Browse the repository at this point in the history
- fix tag handling
- add some error handling
- improve response dict handling
  • Loading branch information
darkobas committed Dec 18, 2022
1 parent b869a0b commit f818a34
Showing 1 changed file with 86 additions and 60 deletions.
146 changes: 86 additions & 60 deletions swarmsync.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python3
# encoding: utf-8
from tqdm import tqdm
import time, sys, logging, os, json, mimetypes, math, argparse, aiohttp, aiofiles, asyncio
import re
from itertools import cycle, islice
from pathlib import Path
from secrets import token_hex
from termcolor import colored
from collections import OrderedDict

__version__ = '0.0.4.r1'
__version__ = '0.0.5.r3'

### init paths and homedir
home=Path.home() / '.swarmsync'
Expand Down Expand Up @@ -53,6 +55,13 @@ def toJSON(self):
return json.dump(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)

class q_dict(dict):
def __str__(self):
return json.dumps(self, ensure_ascii=False)

def __repr__(self):
return json.dumps(self, ensure_ascii=False)

def prepare():
global pin,stamp
pin=args.pin
Expand Down Expand Up @@ -122,29 +131,33 @@ def get_size():
get = read_dict(RESPONSES)
calc=[]
for x in get:
for y in x['item']:
calc.append(y['size'])
calc.append(x['size'])
total = sum(calc)
print('Total size of uploaded data: ', convert_size(total))

def response_dict(file, a_dict):
l_dict = [a_dict]
o_dict = read_dict(file)
if o_dict is not None:
o_dict.append(a_dict)
write_dict(file, str(o_dict).replace("'",'"'))
else:
write_dict(file, str(l_dict).replace("'",'"'))
for i in range(len(o_dict)):
o_dict[i] = q_dict(o_dict[i])
o_dict.append(q_dict(a_dict))
write_dict(file, str(o_dict))

async def create_tag():
global address
params = json.dumps({ "address": address })
headers = { "Content-Type": "application/json" }
async with aiohttp.ClientSession() as session:
async with session.post(normalize_url(args.beeurl, 'tags'), headers=headers, data=params) as resp:
if 200 <= resp.status <= 300:
tag = await resp.json()
print(tag)
try:
async with aiohttp.ClientSession() as session:
async with session.post(normalize_url(args.beeurl, 'tags'), headers=headers, data=params) as resp:
if 200 <= resp.status <= 300:
tag = await resp.json()
return(tag)
else:
print('Can not create tag!')
quit()
except Exception as e:
# handle error(s) according to your needs
print(e)

async def aioget(ref, url: str, session: aiohttp.ClientSession, sem):
global display
Expand All @@ -155,8 +168,8 @@ async def aioget(ref, url: str, session: aiohttp.ClientSession, sem):
response = await res.json()
result = response['isRetrievable']
quoted_result = f'{result}'
resp_dict = { "item": [ { "reference": ref,
"isRetrievable": quoted_result, } ] }
resp_dict = { "reference": ref,
"isRetrievable": quoted_result, }
response_dict(RETRIEVABLE, resp_dict)
if result != True:
all_errors.append({ "reference": ref, "isRetrievable": quoted_result, })
Expand Down Expand Up @@ -189,16 +202,16 @@ async def aiodownload(ref, file: str, url: str, session: aiohttp.ClientSession,

async def aioupload(file: FileManager, url: str, session: aiohttp.ClientSession, sem):
global scheduled,todo
resp_dict = []
resp_dict = {}
(MIME,_ )=mimetypes.guess_type(file.name, strict=False)
if MIME is None:
MIME = "application/octet-stream"

headers={"Content-Type": MIME, "swarm-deferred-upload": "false",
"swarm-postage-batch-id": stamp }
if tag:
await create_tag()
headers.update({ "swarm-tag": json.dumps(tag['uid']) })
ntag = await create_tag()
headers.update({ "swarm-tag": json.dumps(ntag['uid']) })
if args.encrypt:
headers.update({ "swarm-encrypt": "True" })
if args.pin:
Expand All @@ -209,20 +222,29 @@ async def aioupload(file: FileManager, url: str, session: aiohttp.ClientSession,
headers=headers, data=file.file_reader()) as res:
scheduled.remove(file.name)
if 200 <= res.status <= 300:
response = await res.json()
ref = response['reference']
if len(ref) == 64 and not args.reupload:
# if we have a reference we can asume upload was sucess
# so remove from todo list
todo.remove({"file": file.name })
write_list(TODO, todo)
if len(ref) > 64:
# if we have a reference and its longer than 64 then we can asume its encrypted upload
resp_dict = { "item": [ { "file": file.name, "reference": ref[:64], "decrypt": ref[64:], "size": file.size} ] }
else:
resp_dict = { "item": [ { "file": file.name, "reference": ref, "size": file.size} ] }
#else:
#print(res.status)
response = await res.json()
ref = response['reference']
if len(ref) == 64 and not args.reupload:
# if we have a reference we can asume upload was sucess
# so remove from todo list
todo.remove({ "file": file.name })
write_list(TODO, todo)

#cant handle quotes in responses dict
if len(ref) > 64:
# if we have a reference and its longer than 64 then we can asume its encrypted upload
resp_dict = { "file": file.name, "reference": ref[:64], "decrypt": ref[64:], "size": file.size }
if len(ref) == 64:
resp_dict = { "file": file.name, "reference": ref, "size": file.size }
if len(ref) < 64:
#something is wrong
print('Lenght of response is not correct! ', res.status)
quit()
else:
print('\n\n An error occured: ', res.status)
# better quit on error
quit()
#everything passed, write response
response_dict(RESPONSES, resp_dict)
return res
except Exception as e:
Expand All @@ -232,15 +254,17 @@ async def aioupload(file: FileManager, url: str, session: aiohttp.ClientSession,
sem.release()

async def async_check(scheduled, url: str):
global display
global display,args
sem = asyncio.Semaphore(args.count)
session_timeout=aiohttp.ClientTimeout(total=14400)
async with sem, aiohttp.ClientSession(timeout=session_timeout) as session:
res = await asyncio.gather(*[aioget(ref, url, session, sem) for ref in scheduled])
display.close()
cleanup(RETRIEVABLE)
return res

async def async_upload(scheduled, urll):
global args
l_url = list(islice(cycle(urll), len(scheduled)))
scheduled = [FileManager(file) for file in scheduled]
sem = asyncio.Semaphore(args.count)
Expand All @@ -250,7 +274,7 @@ async def async_upload(scheduled, urll):
print(f'\nitems uploaded ({len(res)})')

async def async_download(references, paths, urll):
global display
global display,args
l_url = list(islice(cycle(urll), len(references)))
sem = asyncio.Semaphore(args.count)
session_timeout=aiohttp.ClientTimeout(total=14400)
Expand All @@ -263,27 +287,29 @@ def lst_to_dict(lst):
res_dct = {}
length=len(lst)
for x in range(length):
jsd=json.dumps(lst[x])
jsd=lst[x]
res_dct[jsd]=jsd
return res_dct

def clean_responses():
get = read_dict(RESPONSES)
if get:
clean=lst_to_dict(get)
clean=clean.values()
clean=str(clean).replace("dict_values(", '')
clean=str(clean).replace(")", '')
clean=str(clean).replace("'", "")
write_dict(RESPONSES, clean)
def clean_responses(file):
data = read_dict(file)
for i in range(len(data)):
data[i] = q_dict(data[i])
if data:
clean = OrderedDict((frozenset(item.items()),item) for item in data).values()
clean=str(clean).replace("odict_values(", "")
clean=str(clean).replace(")", "")
write_dict(file, str(clean))

def cleanup(file):
#sanitze responses if there was a failure
clean = read_dict(file)
for i in range(len(clean)):
clean[i] = q_dict(clean[i])
if clean is not None:
clean = str_list = list(filter(None, clean))
write_dict(file, str(clean).replace("'",'"'))
clean_responses();
clean = str_list = list(filter(None, clean))
write_dict(file, str(clean))
clean_responses(file);

def normalize_url(base: str, path: str):
url = os.path.join(base, '')
Expand Down Expand Up @@ -311,7 +337,7 @@ async def get_tag(url: str, addr: str):
if Path(TAG).is_file():
tag = read_dict(TAG)
else:
tag = create_tag()
tag = await create_tag()
return tag

def main():
Expand Down Expand Up @@ -417,8 +443,7 @@ def check():
checklist = read_dict(RESPONSES)
scheduled=[]
for x in checklist:
for y in x['item']:
scheduled.append(y['reference'])
scheduled.append(x['reference'])
print('\n\n\n')
print('Checking stewardship...')
display=tqdm(
Expand All @@ -435,7 +460,7 @@ def check():
retry=[]
for i in all_errors:
for x in checklist:
for y in x['item']:
for y in x:
if y['reference'] == i['reference']:
retry.append(y['file'])
if retry != []:
Expand All @@ -454,14 +479,13 @@ def download():
references=[]
paths=[]
for x in download:
for y in x['item']:
if 'decrypt' in y:
l_ref = y['reference'] + y['decrypt']
references.append(y['reference'] + y['decrypt'])
else:
references.append(y['reference'])
#append decrypt key if it exists
paths.append(y['file'])
if 'decrypt' in x:
l_ref = x['reference'] + x['decrypt']
references.append(x['reference'] + x['decrypt'])
else:
references.append(x['reference'])
#append decrypt key if it exists
paths.append(x['file'])

display=tqdm(
total=len(references),
Expand All @@ -484,6 +508,8 @@ def download():
# init file
if not Path(RETRIEVABLE).is_file():
write_dict(RETRIEVABLE, '[]')
if not Path(RESPONSES).is_file():
write_dict(RESPONSES, '[]')

# Initialize parser
parser = argparse.ArgumentParser()
Expand Down

0 comments on commit f818a34

Please sign in to comment.