Skip to content

Commit

Permalink
Replaced get_cookie_jar function with a CookieManager (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazyscientist committed Jul 24, 2024
1 parent 463b6ef commit 69e0e3d
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 37 deletions.
3 changes: 3 additions & 0 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ Utilities
.. automodule:: osctiny.utils.conf
:members:

.. automodule:: osctiny.utils.cookies
:members:

.. automodule:: osctiny.utils.mapping
:members:

Expand Down
17 changes: 8 additions & 9 deletions osctiny/osc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from base64 import b64encode
import typing
import errno
from http.cookiejar import CookieJar
from http.cookiejar import CookieJar, LWPCookieJar
from io import BufferedReader, BytesIO, StringIO
import gc
import logging
Expand Down Expand Up @@ -42,7 +42,8 @@
from .models import ParamsType
from .utils.auth import HttpSignatureAuth
from .utils.backports import cached_property
from .utils.conf import BOOLEAN_PARAMS, get_credentials, get_cookie_jar
from .utils.conf import BOOLEAN_PARAMS, get_credentials
from .utils.cookies import CookieManager
from .utils.errors import OscError
from .utils.xml import get_xml_parser, get_objectified_xml

Expand Down Expand Up @@ -193,11 +194,7 @@ def session(self) -> Session:
if not session:
session = Session()
session.verify = self.verify or get_default_verify_paths().capath

cookies = get_cookie_jar()
if cookies is not None:
cookies.load()
session.cookies = cookies
session.cookies = CookieManager.get_jar()

if self.ssh_key is not None:
session.auth = HttpSignatureAuth(username=self.username, password=self.password,
Expand All @@ -217,12 +214,14 @@ def cookies(self) -> RequestsCookieJar:
return self.session.cookies

@cookies.setter
def cookies(self, value: typing.Union[CookieJar, dict]):
if not isinstance(value, (CookieJar, dict)):
def cookies(self, value: typing.Union[LWPCookieJar, dict, str]):
if not isinstance(value, (LWPCookieJar, dict, str)):
raise TypeError(f"Expected a cookie jar or dict. Got instead: {type(value)}")

if isinstance(value, CookieJar):
self.session.cookies = value
if isinstance(value, str):
CookieManager.set_cookie(jar=self.session.cookies, cookie=value)
else:
self.session.cookies = cookiejar_from_dict(value)

Expand Down
77 changes: 77 additions & 0 deletions osctiny/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# -*- coding: utf-8 -*-
# pylint: disable=protected-access
import re
from base64 import b64encode
from bz2 import compress
from http.cookiejar import Cookie, LWPCookieJar
from unittest import TestCase, mock
from datetime import datetime
from io import StringIO
import os
from pathlib import Path
import sys
from tempfile import mkstemp
import time
from types import GeneratorType
import warnings

Expand All @@ -21,6 +24,7 @@
from ..utils.auth import HttpSignatureAuth
from ..utils.changelog import ChangeLog, Entry
from ..utils.conf import get_config_path, get_credentials
from ..utils.cookies import CookieManager
from ..utils.mapping import Mappable
from ..utils.errors import get_http_error_details

Expand Down Expand Up @@ -553,3 +557,76 @@ def test_get_http_error_details__bad_response(self):
self.assertIn("Start tag expected", str(emitted_warnings[-1].message))
else:
self.fail("No exception was raised")


@mock.patch("osctiny.utils.cookies._conf", new=None)
class TestCookies(TestCase):
@property
def mock_lwp_cookie_str(self) -> str:
today = datetime.today()
return '#LWP-Cookies-2.0\n'\
'Set-Cookie3: openSUSE_session=3f0471fef300491289c3fcf845d445bd; '\
'path="/"; domain=".suse.de"; path_spec; domain_dot; secure; '\
f'expires="{today.year + 1}-07-18 14:35:02Z"; HttpOnly=None; version=0\n'

def test_get_cookie_path(self, *_):
with self.subTest("XDG_STATE_HOME set"), \
mock.patch.dict(os.environ, {"XDG_STATE_HOME": "/foo/bar"}):
self.assertEqual(Path("/foo/bar/osc/cookiejar"), CookieManager.get_cookie_path())

with self.subTest("XDG_STATE_HOME not set"), mock.patch.dict(os.environ, {}, clear=True):
self.assertEqual(Path("~/.local/state/osc/cookiejar").expanduser(),
CookieManager.get_cookie_path())

def test_get_jar(self, *_):
cookie_path = Path(mkstemp()[1])

with cookie_path.open("w", encoding="utf-8") as handle:
handle.write(self.mock_lwp_cookie_str)

with self.subTest("Existing jar"), mock.patch.object(CookieManager, "get_cookie_path",
return_value=cookie_path):
jar = CookieManager.get_jar()
self.assertEqual(jar.filename, cookie_path.as_posix())
self.assertIn(".suse.de", jar._cookies)

does_not_exist = Path("/no/such/path")
with self.subTest("No jar"), mock.patch.object(CookieManager, "get_cookie_path",
return_value=does_not_exist):
self.assertFalse(does_not_exist.is_file())

jar = CookieManager.get_jar()
self.assertEqual(jar.filename, does_not_exist.as_posix())
self.assertEqual(0, len(jar._cookies))

def test_save_jar(self, *_):
cookie_path = Path(mkstemp()[1])
jar = LWPCookieJar(filename=str(cookie_path))
jar.set_cookie(cookie=Cookie(version=0, name='openSUSE_session',
value='3f0471fef300491289c3fcf845d445bd', port=None,
port_specified=False, domain='.suse.de', domain_specified=True,
domain_initial_dot=True, path='/', path_specified=True,
secure=True, expires=int(time.time()) + 3600, discard=False,
comment=None, comment_url=None, rest={'HttpOnly': 'None'},
rfc2109=False))
CookieManager.save_jar(jar=jar)

with cookie_path.open("r", encoding="utf-8") as handle:
lines = handle.readlines()
self.assertEqual(2, len(lines))
self.assertIn("openSUSE_session", lines[1])

def test_set_cookie(self, *_):
jar = LWPCookieJar()
CookieManager.set_cookie(jar=jar, cookie=self.mock_lwp_cookie_str)
self.assertIn(".suse.de", jar._cookies)

def test_get_cookie(self, *_):
cookie_path = Path(mkstemp()[1])
cookie_str = self.mock_lwp_cookie_str
with cookie_path.open("w", encoding="utf-8") as handle:
handle.write(cookie_str)

jar = LWPCookieJar(filename=str(cookie_path))
jar.load()
self.assertEqual(cookie_str, CookieManager.get_cookie(jar=jar))
28 changes: 0 additions & 28 deletions osctiny/utils/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from base64 import b64decode
from bz2 import decompress
from configparser import ConfigParser, NoSectionError
from http.cookiejar import LWPCookieJar
import os
from pathlib import Path

Expand Down Expand Up @@ -191,30 +190,3 @@ def get_credentials(url: typing.Optional[str] = None) \
raise ValueError(f"`osc` config provides no password or SSH key for URL {url}")

return username, password if sshkey is None else None, sshkey


def get_cookie_jar() -> typing.Optional[LWPCookieJar]:
"""
Get cookies from a persistent osc cookiejar
.. versionadded:: 0.8.0
"""
if _conf is not None:
try:
# New OSC config
path = _conf.config.cookiejar
except AttributeError:
# Backward compatibility to old OSC config style
path = _conf._identify_osccookiejar() # pylint: disable=protected-access
if os.path.isfile(path):
return LWPCookieJar(filename=path)

path_suffix = Path("osc", "cookiejar")
paths = [Path(os.getenv("XDG_STATE_HOME", "/tmp")).joinpath(path_suffix),
Path.home().joinpath(".local", "state").joinpath(path_suffix)]

for path in paths:
if path.is_file():
return LWPCookieJar(filename=str(path)) # compatibility for Python < 3.8

return None
80 changes: 80 additions & 0 deletions osctiny/utils/cookies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Utilities for cookies
^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: {{ NEXT_RELEASE }}
"""
from http.cookiejar import LWPCookieJar
from io import StringIO
import os
from pathlib import Path

from .conf import _conf


class CookieManager:
"""
Simplify handling of cookies
"""
@staticmethod
def get_cookie_path() -> Path:
"""
Get path candidates where to expect a cookie file
"""
if _conf is not None:
try:
try:
# New OSC config
return Path(_conf.config.cookiejar)
except AttributeError:
# Backward compatibility to old OSC config style
# pylint: disable=protected-access
return Path(_conf._identify_osccookiejar())
except: # pylint: disable=broad-except,bare-except
# If `osc` raises an exception we pretend like it does not exist.
pass

path_suffix = Path("osc", "cookiejar")
return Path(os.getenv("XDG_STATE_HOME", "~/.local/state"))\
.joinpath(path_suffix).expanduser()

@classmethod
def get_jar(cls) -> LWPCookieJar:
"""
Get cookies from a persistent osc cookiejar, if it exists
.. versionchanged:: {{ NEXT_RELEASE }}
Converted from function ``get_cookie_jar``
"""
path = cls.get_cookie_path()
if path.is_file():
jar = LWPCookieJar(filename=str(path)) # compatibility for Python < 3.8
jar.load()
return jar

return LWPCookieJar(filename=str(path))

@classmethod
def save_jar(cls, jar: LWPCookieJar) -> None:
"""
Save cookies to a persistent osc cookiejar
"""
# compatibility for Python < 3.8
jar.save(filename=jar.filename or str(cls.get_cookie_path()))

@staticmethod
def set_cookie(jar: LWPCookieJar, cookie: str) -> None:
"""
Read cookie data from string instead of loading from file
"""
strio = StringIO(cookie)
strio.seek(0)
# pylint: disable=protected-access
jar._really_load(f=strio, filename="", ignore_discard=True, ignore_expires=True)

@staticmethod
def get_cookie(jar: LWPCookieJar) -> str:
"""
Return LWP cookie string with headers
"""
return f"#LWP-Cookies-2.0\n{jar.as_lwp_str()}"

0 comments on commit 69e0e3d

Please sign in to comment.