Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine pfio.cache.HTTPCache #293

Merged
merged 6 commits into from
Dec 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 62 additions & 14 deletions pfio/cache/http_cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
import os
import pickle
import time

import urllib3
import urllib3.exceptions

from pfio.cache import Cache

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())


class HTTPCache(Cache):
"""HTTP-based cache system
Expand Down Expand Up @@ -66,7 +71,31 @@ def __init__(self,

self.do_pickle = do_pickle

self.conn = urllib3.poolmanager.PoolManager()
self.conn = None
self._prepare_conn()

self.pid = os.getpid()

@property
def is_forked(self):
return self.pid != os.getpid()

def _checkconn(self):
if self.is_forked or self.conn is None:
self._prepare_conn()
self.pid = os.getpid()

def _prepare_conn(self):
# Allow redirect or retry once
self.conn = urllib3.poolmanager.PoolManager(retries=1, timeout=3)

def __getstate__(self):
state = self.__dict__.copy()
state['conn'] = None
return state

def __setstate__(self, state):
self.__dict__ = state

def __len__(self):
return self.length
Expand All @@ -80,33 +109,52 @@ def multithread_safe(self):
return True

def put(self, i, data):
self._checkconn()
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))
if self.do_pickle:
data = pickle.dumps(data)

res = self.conn.urlopen("PUT",
url=self._url(i),
headers=self._header_with_token(),
body=data)
return res.status == 201
try:
res = self.conn.urlopen("PUT",
url=self._url(i),
headers=self._header_with_token(),
body=data)
except urllib3.exceptions.RequestError as e:
logger.warning("put: {}".format(e))
return False

if res.status == 201:
return True

logger.warning("put: unexpected status code {}".format(res.status))
return False

def get(self, i):
self._checkconn()
if i < 0 or self.length <= i:
raise IndexError("index {} out of range ([0, {}])"
.format(i, self.length - 1))

res = self.conn.urlopen("GET",
url=self._url(i),
headers=self._header_with_token())
if res.status != 200 or res.data is None:
try:
res = self.conn.urlopen("GET",
url=self._url(i),
headers=self._header_with_token())
except urllib3.exceptions.RequestError as e:
logger.warning("get: {}".format(e))
return None

if self.do_pickle:
return pickle.loads(res.data)
else:
return res.data
if res.status == 200:
if self.do_pickle:
return pickle.loads(res.data)
else:
return res.data
elif res.status == 404:
return None

logger.warning("get: unexpected status code {}".format(res.status))
return None

def _url(self, i) -> str:
return self.url + str(i)
Expand Down