Skip to content

Commit

Permalink
iter content in requests seems to work better (both image serrvice an…
Browse files Browse the repository at this point in the history
…d git)
  • Loading branch information
jrybicki-jsc committed Jan 23, 2023
1 parent e373366 commit f08948f
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 41 deletions.
20 changes: 7 additions & 13 deletions image_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def file_exist(sftp, name):
except:
return -1


def http2ssh(url: str, ssh_client, remote_name: str, force=True):
sftp_client = ssh_client.open_sftp()
size = file_exist(sftp=sftp_client, name=remote_name)
Expand All @@ -30,26 +31,19 @@ def http2ssh(url: str, ssh_client, remote_name: str, force=True):
dirname = os.path.dirname(remote_name)
ssh_client.exec_command(command=f"mkdir -p {dirname}")
ssh_client.exec_command(command=f"touch {remote_name}")

with requests.get(url, stream=True, verify=False, timeout=(2,3)) as r:
written = 0
with sftp_client.open(remote_name, 'w') as f:
with sftp_client.open(remote_name, 'wb') as f:
f.set_pipelined(pipelined=True)
while True:
chunk=r.raw.read(1024 * 1000)
if not chunk:
break
for chunk in r.iter_content(chunk_size=1024*1000):
written+=len(chunk)
content_to_write = memoryview(chunk)
f.write(content_to_write)
written+=len(chunk)
cl = r.headers.get('Content-Length', 0)
print(f"Written: {written} Content-Length: {cl}")
if cl!=written:
print('Content length mismatch')
return -1

print(f"Written {written} bytes")
return 0


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def transfer_image():

Expand Down
25 changes: 2 additions & 23 deletions plainhttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,12 @@
from airflow.utils.dates import days_ago

from decors import get_connection, remove, setup
from image_transfer import file_exist
from image_transfer import http2ssh

default_args = {
'owner': 'airflow',
}

def http2ssh(url: str, ssh_client, remote_name: str, force=True):
sftp_client = ssh_client.open_sftp()
size = file_exist(sftp=sftp_client, name=remote_name)
if size>0:
print(f"File {remote_name} exists and has {size} bytes")
if force is not True:
return 0
print("Forcing overwrite")

dirname = os.path.dirname(remote_name)
ssh_client.exec_command(command=f"mkdir -p {dirname}")
ssh_client.exec_command(command=f"touch {remote_name}")

with requests.get(url, stream=True, verify=False, timeout=(2,3)) as r:
with sftp_client.open(remote_name, 'w') as f:
f.set_pipelined(pipelined=True)
for chunk in r.iter_content(chunk_size=1024*1000):
content_to_write = memoryview(chunk)
f.write(content_to_write)

return 0

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['wp4', 'http', 'ssh'])
def plainhttp2ssh():
Expand All @@ -46,7 +25,7 @@ def stream_upload(connection_id, **kwargs):
target = params.get('target', '/tmp/')
url = params.get('url', '')
if not url:
print('Provide valid url')
print('Provide a valid url')
return -1

print(f"Putting {url} --> {target}")
Expand Down
8 changes: 3 additions & 5 deletions tests/test_http2ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ def test_actual_cpy(self, exists, get):
my_client.exec_command = exec


get().__enter__().raw.read = MagicMock(side_effect=[b'blabla', None])
get().__enter__().headers.get = MagicMock(return_value=6)
get().__enter__().iter_content = MagicMock(return_value=[b'blabla'])
r = http2ssh(url='foo.bar', ssh_client=my_client, remote_name='/goo/bar', force=True)
self.assertEqual(r, 0)
exec.assert_called()
Expand All @@ -70,10 +69,9 @@ def test_missed_cpy(self, exists, get):
my_client.open_sftp.return_value = my_sftp
my_client.exec_command = exec

get().__enter__().iter_content = MagicMock(return_value=[b'blabla'])

get().__enter__().raw.read = MagicMock(side_effect=[b'blabla', None])
get().__enter__().headers.get = MagicMock(return_value=699)
r = http2ssh(url='foo.bar', ssh_client=my_client, remote_name='/goo/bar', force=True)
self.assertEqual(r, -1)
self.assertEqual(r, 0)
exec.assert_called()
wrt.assert_called_once_with(memoryview(b'blabla'))

0 comments on commit f08948f

Please sign in to comment.