Skip to content

Commit

Permalink
Refine pfio.cache.HTTPCache (#293)
Browse files Browse the repository at this point in the history
* Add error handling

* Allow redirect or retry once

* Shorter timeout

* Don't pickle conn in HTTPCache

* Make isort happy

* Check self.conn is None
  • Loading branch information
y1r authored Dec 20, 2022
1 parent 7a83d74 commit 06d11b3
Showing 1 changed file with 62 additions and 14 deletions.
76 changes: 62 additions & 14 deletions pfio/cache/http_cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
import os
import pickle
import time

import urllib3
import urllib3.exceptions

from pfio.cache import Cache

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())


class HTTPCache(Cache):
"""HTTP-based cache system
Expand Down Expand Up @@ -66,7 +71,31 @@ def __init__(self,

self.do_pickle = do_pickle

self.conn = urllib3.poolmanager.PoolManager()
self.conn = None
self._prepare_conn()

self.pid = os.getpid()

@property
def is_forked(self):
return self.pid != os.getpid()

def _checkconn(self):
if self.is_forked or self.conn is None:
self._prepare_conn()
self.pid = os.getpid()

def _prepare_conn(self):
# Allow redirect or retry once
self.conn = urllib3.poolmanager.PoolManager(retries=1, timeout=3)

def __getstate__(self):
state = self.__dict__.copy()
state['conn'] = None
return state

def __setstate__(self, state):
self.__dict__ = state

def __len__(self):
return self.length
Expand All @@ -80,33 +109,52 @@ def multithread_safe(self):
return True

def put(self, i, data):
self._checkconn()
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))
if self.do_pickle:
data = pickle.dumps(data)

res = self.conn.urlopen("PUT",
url=self._url(i),
headers=self._header_with_token(),
body=data)
return res.status == 201
try:
res = self.conn.urlopen("PUT",
url=self._url(i),
headers=self._header_with_token(),
body=data)
except urllib3.exceptions.RequestError as e:
logger.warning("put: {}".format(e))
return False

if res.status == 201:
return True

logger.warning("put: unexpected status code {}".format(res.status))
return False

def get(self, i):
self._checkconn()
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))

res = self.conn.urlopen("GET",
url=self._url(i),
headers=self._header_with_token())
if res.status != 200 or res.data is None:
try:
res = self.conn.urlopen("GET",
url=self._url(i),
headers=self._header_with_token())
except urllib3.exceptions.RequestError as e:
logger.warning("get: {}".format(e))
return None

if self.do_pickle:
return pickle.loads(res.data)
else:
return res.data
if res.status == 200:
if self.do_pickle:
return pickle.loads(res.data)
else:
return res.data
elif res.status == 404:
return None

logger.warning("get: unexpected status code {}".format(res.status))
return None

def _url(self, i) -> str:
return self.url + str(i)
Expand Down

0 comments on commit 06d11b3

Please sign in to comment.