From 45dd946d0656d38a2e41d7962db29287677fde45 Mon Sep 17 00:00:00 2001 From: Florian Strzelecki Date: Tue, 25 Jan 2022 22:40:01 +0100 Subject: [PATCH 1/2] db: adapt SopelDB to SQLAlchemy 2.x new style SQLAlchemy 1.4 (used by Sopel) defines a cross-compatible new style to query the database using its ORM, its engine, and its connection: * Engine cannot execute SQL directly * Connection can be used as context manager * Session can be used as context manager * Instead of doing *.query, use *.execute(stmt) * SQL statements are easier to create with select, update, and delete function from `sqlalchemy.sql` * And many other things One of the key change here is the usage of session object as context manager instead of using try/except block: less code, same feature! We can't set `future=True` on the Engine yet, because we need the non-compatible and now deprecated behavior of direct SQL execution from the Engine object: `SopelDB.execute` has been deprecated and should be removed in Sopel 8.1, so we can make Sopel compatible with both SQLAlchemy 1.4 and 2.0, as planned by SQLAlchemy migration strategy. In order to lower dgw's blood pressure, there are new tests, existing tests have been updated as well, and the coverage has increased from ~70% to ~85%. Also, more doc, because I always write more doc. Co-authored-by: dgw --- sopel/db.py | 433 ++++++++++++++++++------------------ test/test_db.py | 566 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 620 insertions(+), 379 deletions(-) diff --git a/sopel/db.py b/sopel/db.py index eefc759d3e..d410c95b5d 100644 --- a/sopel/db.py +++ b/sopel/db.py @@ -1,3 +1,18 @@ +"""Sopel database module: management and tools around Sopel's datamodel. + +This module defines a datamodel, using `SQLAlchemy ORM's mapping`__: + +* :class:`NickIDs` and :class:`Nicknames` are used to track users +* :class:`NickValues` is used to store arbitrary values for users +* :class:`ChannelValues` is used to store arbitrary values for channels +* :class:`PluginValues` is used to store arbitrary values for plugins + +These models are made available through the :class:`SopelDB` class and its +convenience methods, such as :meth:`~SopelDB.get_nick_value` or +:meth:`~SopelDB.get_channel_value`. + +.. __: https://docs.sqlalchemy.org/en/14/orm/tutorial.html#declare-a-mapping +""" from __future__ import annotations import errno @@ -9,9 +24,9 @@ from sqlalchemy import Column, create_engine, ForeignKey, Integer, String from sqlalchemy.engine.url import make_url, URL -from sqlalchemy.exc import OperationalError, SQLAlchemyError -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import declarative_base, scoped_session, sessionmaker +from sqlalchemy.sql import delete, func, select, update from sopel.tools import deprecated from sopel.tools.identifiers import Identifier @@ -94,8 +109,8 @@ class SopelDB: :type: Callable[[:class:`str`], :class:`str`] This defines a simplified interface for basic, common operations on the - bot's database. Direct access to the database is also available, to serve - more complex plugins' needs. + bot's database. Direct access to the database is also available through + its :attr:`engine` attribute, to serve more complex plugins' needs. When configured to use SQLite with a relative filename, the file is assumed to be in the directory named by the core setting ``homedir``. @@ -115,6 +130,12 @@ class SopelDB: :class:`~sopel.tools.identifiers.Identifier` when dealing with Nick or Channel names. + .. seealso:: + + For any advanced usage of the ORM, refer to the + `SQLAlchemy documentation`__. + + .. __: https://docs.sqlalchemy.org/en/14/ """ def __init__( @@ -185,6 +206,29 @@ def __init__( database=db_name, query=query) self.engine = create_engine(self.url, pool_recycle=3600) + """SQLAlchemy Engine used to connect to Sopel's database. + + .. seealso:: + + Read `SQLAlchemy engine`__'s documentation to know how to use it. + + .. __: https://docs.sqlalchemy.org/en/14/core/connections.html + + .. important:: + + Introduced in Sopel 7, Sopel uses SQLAlchemy 1.4+. This version of + SQLAlchemy deprecates various behaviors and methods, to prepare the + migration to its future 2.0 version, and the new 2.x style. + + Sopel doesn't enforce the new 2.x style yet. This will be modified + in Sopel 9 by using the ``future=True`` flag on the engine. + + You can read more about the `migration guide from 1.x to 2.x`__, as + Sopel will ensure in a future version that it is compatible with + the new style. + + .. __: https://docs.sqlalchemy.org/en/14/changelog/migration_20.html + """ # Catch any errors connecting to database try: @@ -196,7 +240,8 @@ def __init__( # Create our tables BASE.metadata.create_all(self.engine) - self.ssession = scoped_session(sessionmaker(bind=self.engine)) + self.ssession = scoped_session( + sessionmaker(bind=self.engine, future=True)) def connect(self): """Get a direct database connection. @@ -250,6 +295,7 @@ def session(self): """ return self.ssession() + @deprecated('Use SopelDB.engine directly', version='8.0', removed_in='9.0') def execute(self, *args, **kwargs): """Execute an arbitrary SQL query against the database. @@ -258,10 +304,37 @@ def execute(self, *args, **kwargs): The ``Result`` object returned is a wrapper around a ``Cursor`` object as specified by :pep:`249`. + + .. deprecated:: 8.0 + + This method will be removed in Sopel 9, following the deprecation + of SQLAlchemy's :meth:`sqlalchemy.engine.Engine.execute`. + + To perform a raw SQL query, use the :class:`~SopelDB.engine` + attribute as per the migration guide from SQLAlchemy:: + + from sqlalchemy.sql import text + + def my_command(bot, trigger): + raw_sql = ' ... ' # your raw SQL + # get a connection as a context manager + with bot.db.engine.connect() as conn: + res = conn.execute(text(raw_sql)) + data = res.fetchall() + + # do something with your data here + + .. seealso:: + + Read the `migration guide from 1.x style to 2.x style`__ by + SQLAlchemy to learn more about using SQLALchemy's engine and + connection. + + .. __: https://docs.sqlalchemy.org/en/14/changelog/migration_20.html """ return self.engine.execute(*args, **kwargs) - def get_uri(self): + def get_uri(self) -> URL: """Return a direct URL for the database. :return: the database connection URI @@ -298,18 +371,19 @@ def get_nick_id(self, nick: str, create: bool = False) -> int: :meth:`forget_nick_group`. """ - session = self.ssession() slug = self.make_identifier(nick).lower() - try: - nickname = session.query(Nicknames) \ - .filter(Nicknames.slug == slug) \ - .one_or_none() + with self.session() as session: + nickname = session.execute( + select(Nicknames).where(Nicknames.slug == slug) + ).scalar_one_or_none() if nickname is None: # see if it needs case-mapping migration - nickname = session.query(Nicknames) \ - .filter(Nicknames.slug == Identifier._lower_swapped(nick)) \ - .one_or_none() + nickname = session.execute( + select(Nicknames) + .where(Nicknames.slug == Identifier._lower_swapped(nick)) + ).scalar_one_or_none() + if nickname is not None: # it does! nickname.slug = slug @@ -332,11 +406,6 @@ def get_nick_id(self, nick: str, create: bool = False) -> int: session.add(nickname) session.commit() return nickname.nick_id - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def alias_nick(self, nick: str, alias: str) -> None: """Create an alias for a nick. @@ -356,12 +425,12 @@ def alias_nick(self, nick: str, alias: str) -> None: """ slug = self.make_identifier(alias).lower() nick_id = self.get_nick_id(nick, create=True) - session = self.ssession() - try: - result = session.query(Nicknames) \ - .filter(Nicknames.slug == slug) \ - .filter(Nicknames.canonical == alias) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(Nicknames) + .where(Nicknames.slug == slug) + .where(Nicknames.canonical == alias) + ).scalar_one_or_none() if result: raise ValueError('Alias already exists.') nickname = Nicknames( @@ -371,11 +440,6 @@ def alias_nick(self, nick: str, alias: str) -> None: ) session.add(nickname) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def set_nick_value(self, nick: str, key: str, value: typing.Any) -> None: """Set or update a value in the key-value store for ``nick``. @@ -400,12 +464,13 @@ def set_nick_value(self, nick: str, key: str, value: typing.Any) -> None: """ value = json.dumps(value, ensure_ascii=False) nick_id = self.get_nick_id(nick, create=True) - session = self.ssession() - try: - result = session.query(NickValues) \ - .filter(NickValues.nick_id == nick_id) \ - .filter(NickValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(NickValues) + .where(NickValues.nick_id == nick_id) + .where(NickValues.key == key) + ).scalar_one_or_none() + # NickValue exists, update if result: result.value = value @@ -419,11 +484,6 @@ def set_nick_value(self, nick: str, key: str, value: typing.Any) -> None: ) session.add(new_nickvalue) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def delete_nick_value(self, nick: str, key: str) -> None: """Delete a value from the key-value store for ``nick``. @@ -446,21 +506,16 @@ def delete_nick_value(self, nick: str, key: str) -> None: # there's nothing to do if the nick doesn't exist return - session = self.ssession() - try: - result = session.query(NickValues) \ - .filter(NickValues.nick_id == nick_id) \ - .filter(NickValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(NickValues) + .where(NickValues.nick_id == nick_id) + .where(NickValues.key == key) + ).scalar_one_or_none() # NickValue exists, delete if result: session.delete(result) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def get_nick_value( self, @@ -490,23 +545,20 @@ def get_nick_value( """ slug = self.make_identifier(nick).lower() - session = self.ssession() - try: - result = session.query(NickValues) \ - .filter(Nicknames.nick_id == NickValues.nick_id) \ - .filter(Nicknames.slug == slug) \ - .filter(NickValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(NickValues) + .where(Nicknames.nick_id == NickValues.nick_id) + .where(Nicknames.slug == slug) + .where(NickValues.key == key) + ).scalar_one_or_none() + if result is not None: result = result.value elif default is not None: result = default + return _deserialize(result) - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def unalias_nick(self, alias: str) -> None: """Remove an alias. @@ -525,20 +577,19 @@ def unalias_nick(self, alias: str) -> None: """ slug = self.make_identifier(alias).lower() nick_id = self.get_nick_id(alias) - session = self.ssession() - try: - count = session.query(Nicknames) \ - .filter(Nicknames.nick_id == nick_id) \ - .count() + with self.session() as session: + count = session.scalar( + select(func.count()).select_from(Nicknames) + .where(Nicknames.nick_id == nick_id) + ) if count <= 1: raise ValueError('Given alias is the only entry in its group.') - session.query(Nicknames).filter(Nicknames.slug == slug).delete() + session.execute( + delete(Nicknames) + .where(Nicknames.slug == slug) + .execution_options(synchronize_session="fetch") + ) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def forget_nick_group(self, nick: str) -> None: """Remove a nickname, all of its aliases, and all of its stored values. @@ -554,16 +605,18 @@ def forget_nick_group(self, nick: str) -> None: """ nick_id = self.get_nick_id(nick) - session = self.ssession() - try: - session.query(Nicknames).filter(Nicknames.nick_id == nick_id).delete() - session.query(NickValues).filter(NickValues.nick_id == nick_id).delete() + with self.session() as session: + session.execute( + delete(Nicknames) + .where(Nicknames.nick_id == nick_id) + .execution_options(synchronize_session="fetch") + ) + session.execute( + delete(NickValues) + .where(NickValues.nick_id == nick_id) + .execution_options(synchronize_session="fetch") + ) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() @deprecated( version='8.0', @@ -595,28 +648,36 @@ def merge_nick_groups(self, first_nick: str, second_nick: str): """ first_id = self.get_nick_id(first_nick, create=True) second_id = self.get_nick_id(second_nick, create=True) - session = self.ssession() - try: + with self.session() as session: # Get second_id's values - res = session.query(NickValues).filter(NickValues.nick_id == second_id).all() + results = session.execute( + select(NickValues).where(NickValues.nick_id == second_id) + ).scalars() + # Update first_id with second_id values if first_id doesn't have that key - for row in res: - first_res = session.query(NickValues) \ - .filter(NickValues.nick_id == first_id) \ - .filter(NickValues.key == row.key) \ - .one_or_none() + for row in results: + first_res = session.execute( + select(NickValues) + .where(NickValues.nick_id == first_id) + .where(NickValues.key == row.key) + ).scalar_one_or_none() + if not first_res: - self.set_nick_value(first_nick, row.key, _deserialize(row.value)) - session.query(NickValues).filter(NickValues.nick_id == second_id).delete() - session.query(Nicknames) \ - .filter(Nicknames.nick_id == second_id) \ - .update({'nick_id': first_id}) + self.set_nick_value( + first_nick, row.key, _deserialize(row.value)) + + session.execute( + delete(NickValues) + .where(NickValues.nick_id == second_id) + .execution_options(synchronize_session="fetch") + ) + session.execute( + update(Nicknames) + .where(Nicknames.nick_id == second_id) + .values(nick_id=first_id) + .execution_options(synchronize_session="fetch") + ) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() # CHANNEL FUNCTIONS @@ -632,28 +693,18 @@ def get_channel_slug(self, chan: str) -> str: different clients and/or servers on the network. """ slug = self.make_identifier(chan).lower() - session = self.ssession() - try: - count = session.query(ChannelValues) \ - .filter(ChannelValues.channel == slug) \ - .count() - if count == 0: - # see if it needs case-mapping migration - old_rows = session.query(ChannelValues) \ - .filter(ChannelValues.channel == Identifier._lower_swapped(chan)) - old_count = old_rows.count() - if old_count > 0: - # it does! - old_rows.update({ChannelValues.channel: slug}) - session.commit() + with self.session() as session: + # Always migrate from old casemapping + session.execute( + update(ChannelValues) + .where(ChannelValues.channel == Identifier._lower_swapped(chan)) + .values(channel=slug) + .execution_options(synchronize_session="fetch") + ) + session.commit() - return slug - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() + return slug def set_channel_value( self, @@ -683,12 +734,13 @@ def set_channel_value( """ channel = self.get_channel_slug(channel) value = json.dumps(value, ensure_ascii=False) - session = self.ssession() - try: - result = session.query(ChannelValues) \ - .filter(ChannelValues.channel == channel)\ - .filter(ChannelValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(ChannelValues) + .where(ChannelValues.channel == channel) + .where(ChannelValues.key == key) + ).scalar_one_or_none() + # ChannelValue exists, update if result: result.value = value @@ -702,11 +754,6 @@ def set_channel_value( ) session.add(new_channelvalue) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def delete_channel_value(self, channel: str, key: str) -> None: """Delete a value from the key-value store for ``channel``. @@ -724,21 +771,15 @@ def delete_channel_value(self, channel: str, key: str) -> None: """ channel = self.get_channel_slug(channel) - session = self.ssession() - try: - result = session.query(ChannelValues) \ - .filter(ChannelValues.channel == channel)\ - .filter(ChannelValues.key == key) \ - .one_or_none() - # ChannelValue exists, delete - if result: - session.delete(result) - session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() + with self.session() as session: + session.execute( + delete(ChannelValues) + .where( + ChannelValues.channel == channel, + ChannelValues.key == key + ).execution_options(synchronize_session="fetch") + ) + session.commit() def get_channel_value( self, @@ -768,22 +809,17 @@ def get_channel_value( """ channel = self.get_channel_slug(channel) - session = self.ssession() - try: - result = session.query(ChannelValues) \ - .filter(ChannelValues.channel == channel)\ - .filter(ChannelValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(ChannelValues) + .where(ChannelValues.channel == channel) + .where(ChannelValues.key == key) + ).scalar_one_or_none() if result is not None: result = result.value elif default is not None: result = default return _deserialize(result) - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def forget_channel(self, channel: str) -> None: """Remove all of a channel's stored values. @@ -797,15 +833,12 @@ def forget_channel(self, channel: str) -> None: """ channel = self.get_channel_slug(channel) - session = self.ssession() - try: - session.query(ChannelValues).filter(ChannelValues.channel == channel).delete() + with self.session() as session: + session.execute( + delete(ChannelValues) + .where(ChannelValues.channel == channel) + ) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() # PLUGIN FUNCTIONS @@ -837,12 +870,12 @@ def set_plugin_value( """ plugin = plugin.lower() value = json.dumps(value, ensure_ascii=False) - session = self.ssession() - try: - result = session.query(PluginValues) \ - .filter(PluginValues.plugin == plugin)\ - .filter(PluginValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(PluginValues) + .where(PluginValues.plugin == plugin) + .where(PluginValues.key == key) + ).scalar_one_or_none() # PluginValue exists, update if result: result.value = value @@ -852,11 +885,6 @@ def set_plugin_value( new_pluginvalue = PluginValues(plugin=plugin, key=key, value=value) session.add(new_pluginvalue) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def delete_plugin_value(self, plugin: str, key: str) -> None: """Delete a value from the key-value store for ``plugin``. @@ -874,21 +902,16 @@ def delete_plugin_value(self, plugin: str, key: str) -> None: """ plugin = plugin.lower() - session = self.ssession() - try: - result = session.query(PluginValues) \ - .filter(PluginValues.plugin == plugin)\ - .filter(PluginValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(PluginValues) + .where(PluginValues.plugin == plugin) + .where(PluginValues.key == key) + ).scalar_one_or_none() # PluginValue exists, update if result: session.delete(result) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def get_plugin_value( self, @@ -918,22 +941,18 @@ def get_plugin_value( """ plugin = plugin.lower() - session = self.ssession() - try: - result = session.query(PluginValues) \ - .filter(PluginValues.plugin == plugin)\ - .filter(PluginValues.key == key) \ - .one_or_none() + with self.session() as session: + result = session.execute( + select(PluginValues) + .where(PluginValues.plugin == plugin) + .where(PluginValues.key == key) + ).scalar_one_or_none() + if result is not None: result = result.value elif default is not None: result = default return _deserialize(result) - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() def forget_plugin(self, plugin: str) -> None: """Remove all of a plugin's stored values. @@ -947,15 +966,11 @@ def forget_plugin(self, plugin: str) -> None: """ plugin = plugin.lower() - session = self.ssession() - try: - session.query(PluginValues).filter(PluginValues.plugin == plugin).delete() + with self.session() as session: + session.execute( + delete(PluginValues).where(PluginValues.plugin == plugin) + ) session.commit() - except SQLAlchemyError: - session.rollback() - raise - finally: - self.ssession.remove() # NICK AND CHANNEL FUNCTIONS diff --git a/test/test_db.py b/test/test_db.py index 18dfc8e2e0..e13ef3a6e1 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -6,10 +6,10 @@ from __future__ import annotations import json -import os -import tempfile import pytest +from sqlalchemy.engine import make_url +from sqlalchemy.sql import func, select, text from sopel.db import ( ChannelValues, @@ -22,9 +22,6 @@ from sopel.tools import Identifier -db_filename = tempfile.mkstemp()[1] - - TMP_CONFIG = """ [core] owner = Embolalia @@ -33,22 +30,57 @@ @pytest.fixture -def db(configfactory): - content = TMP_CONFIG.format(db_filename=db_filename) - settings = configfactory('default.cfg', content) - db = SopelDB(settings) +def tmpconfig(configfactory, tmpdir): + content = TMP_CONFIG.format(db_filename=tmpdir.join('test.sqlite')) + return configfactory('default.cfg', content) + + +@pytest.fixture +def db(tmpconfig): + db = SopelDB(tmpconfig) # TODO add tests to ensure db creation works properly, too. return db -def teardown_function(function): - os.remove(db_filename) +# Test execute + +def test_execute(db: SopelDB): + # todo: remove in Sopel 8.1 + results = db.execute('SELECT * FROM nicknames') + assert results.fetchall() == [] + + results = db.execute(text('SELECT * FROM nicknames')) + assert results.fetchall() == [] + +# Test connect -def test_get_nick_id(db): +def test_connect(db: SopelDB): + """Test it's possible to get a raw connection and to use it properly.""" + nick_id = db.get_nick_id('Exirel', create=True) + connection = db.connect() + + try: + cursor_obj = connection.cursor() + cursor_obj.execute("SELECT nick_id, canonical, slug FROM nicknames") + results = cursor_obj.fetchall() + cursor_obj.close() + assert results == [(nick_id, 'Exirel', 'exirel')] + finally: + connection.close() + + +# Test url + +def test_get_uri(db: SopelDB, tmpconfig): + assert db.get_uri() == make_url('sqlite:///' + tmpconfig.core.db_filename) + + +# Test nick value + +def test_get_nick_id(db: SopelDB): """Test get_nick_id does not create NickID by default.""" nick = Identifier('Exirel') - session = db.session() # Attempt to get nick ID: it is not created by default with pytest.raises(ValueError): @@ -58,14 +90,13 @@ def test_get_nick_id(db): nick_id = db.get_nick_id(nick, create=True) # Check that one and only one nickname exists with that ID - nickname = session.query(Nicknames).filter( - Nicknames.nick_id == nick_id, - ).one() # will raise if not one and exactly one + with db.session() as session: + nickname = session.execute( + select(Nicknames).where(Nicknames.nick_id == nick_id) + ).scalar_one() # will raise if not one and exactly one assert nickname.canonical == 'Exirel' assert nickname.slug == nick.lower() - session.close() - @pytest.mark.parametrize('name, slug, variant', ( # Check case insensitive with ASCII only @@ -75,17 +106,18 @@ def test_get_nick_id(db): # Unicode, just in case ('EmbölaliÅ', 'embölaliÅ', 'EMBöLALIÅ'), )) -def test_get_nick_id_casemapping(db, name, slug, variant): +def test_get_nick_id_casemapping(db: SopelDB, name, slug, variant): """Test get_nick_id is case-insensitive through an Identifier.""" - session = db.session() nick = Identifier(name) # Create the nick ID nick_id = db.get_nick_id(nick, create=True) - registered = session.query(Nicknames) \ - .filter(Nicknames.canonical == name) \ - .all() + with db.session() as session: + registered = session.execute( + select(Nicknames).where(Nicknames.canonical == name) + ).scalars().fetchall() + assert len(registered) == 1 assert registered[0].slug == slug assert registered[0].canonical == name @@ -99,17 +131,56 @@ def test_get_nick_id_casemapping(db, name, slug, variant): # And no other nick IDs are created (even with create=True) assert nick_id == db.get_nick_id(name, create=True) assert nick_id == db.get_nick_id(variant, create=True) - assert 1 == session.query(NickIDs).count() + + with db.session() as session: + assert 1 == session.scalar( + select(func.count()).select_from(NickIDs) + ) # But a truly different name means a new nick ID new_nick_id = db.get_nick_id(name + '_test', create=True) assert new_nick_id != nick_id - assert 2 == session.query(NickIDs).count() - session.close() + with db.session() as session: + assert 2 == session.scalar( + select(func.count()).select_from(NickIDs) + ) + + +def test_get_nick_id_migration(db: SopelDB): + """Test nicks with wrong casemapping are properly migrated.""" + nick = 'Test[User]' + old_nick = Identifier._lower_swapped(nick) + + # sanity check + assert Identifier(nick).lower() != old_nick, ( + 'Previous casemapping should be different from the new one') + + # insert old version + with db.session() as session: + nickname = Nicknames( + nick_id=42, + slug=Identifier._lower_swapped(nick), + canonical=nick, + ) + session.add(nickname) + session.commit() + + assert db.get_nick_id(nick) == 42, 'Old nick must be converted.' + with db.session() as session: + nicknames = session.execute( + select(Nicknames) + ).scalars().fetchall() + assert len(nicknames) == 1, ( + 'There should be only one instance of Nicknames.') + nickname_found = nicknames[0] + assert nickname_found.nick_id == 42 + assert nickname_found.slug == Identifier(nick).lower() + assert nickname_found.canonical == nick -def test_alias_nick(db): + +def test_alias_nick(db: SopelDB): nick = 'Embolalia' aliases = ['EmbölaliÅ', 'Embo`work', 'Embo'] @@ -129,105 +200,154 @@ def test_alias_nick(db): db.alias_nick(nick, nick) -def test_set_nick_value(db): - session = db.ssession() - nick = 'Embolalia' - data = { - 'key': 'value', - 'number_key': 1234, - 'unicode': 'EmbölaliÅ', - } - - def check(): - for key, value in data.items(): - db.set_nick_value(nick, key, value) - - # no `create` because that should be handled in `set_nick_value()` - nick_id = db.get_nick_id(nick) - - for key, value in data.items(): - found_value = session.query(NickValues.value) \ - .filter(NickValues.nick_id == nick_id) \ - .filter(NickValues.key == key) \ - .scalar() - assert json.loads(str(found_value)) == value - - return nick_id - - nid = check() - - # Test updates - data['number_key'] = 'not a number anymore!' - data['unicode'] = 'This is different toö!' - assert nid == check() - session.close() +@pytest.mark.parametrize('value', ( + 'string-value', + 123456789, + 'unicode-välûé', + ['structured', 'value'], +)) +def test_set_nick_value(db: SopelDB, value): + nick = 'TestNick' + db.set_nick_value(nick, 'testkey', value) + assert db.get_nick_value(nick, 'testkey') == value, ( + 'The value retrieved must be exactly what was stored.') -def test_get_nick_value(db): - session = db.ssession() - nick = 'Embolalia' - nick_id = db.get_nick_id(nick, create=True) - data = { - 'key': 'value', - 'number_key': 1234, - 'unicode': 'EmbölaliÅ', - } - - for key, value in data.items(): - nv = NickValues(nick_id=nick_id, key=key, value=json.dumps(value, ensure_ascii=False)) - session.add(nv) - session.commit() +def test_set_nick_value_update(db: SopelDB): + """Test set_nick_value can update an existing value.""" + db.set_nick_value('TestNick', 'testkey', 'first-value') + db.set_nick_value('TestNick', 'otherkey', 'other-value') + db.set_nick_value('OtherNick', 'testkey', 'other-nick-value') - for key, value in data.items(): - found_value = db.get_nick_value(nick, key) - assert found_value == value - session.close() + # sanity check: ensure every (nick, key, value) is correct + assert db.get_nick_value('TestNick', 'testkey') == 'first-value' + assert db.get_nick_value('TestNick', 'otherkey') == 'other-value' + assert db.get_nick_value('OtherNick', 'testkey') == 'other-nick-value' + # update only one key + db.set_nick_value('TestNick', 'testkey', 'new-value') -def test_get_nick_value_default(db): - assert db.get_nick_value("TestUser", "DoesntExist") is None - assert db.get_nick_value("TestUser", "DoesntExist", "MyDefault") == "MyDefault" + # check new value while old values are still the same + assert db.get_nick_value('TestNick', 'testkey') == 'new-value' + assert db.get_nick_value('TestNick', 'otherkey') == 'other-value' + assert db.get_nick_value('OtherNick', 'testkey') == 'other-nick-value' -def test_delete_nick_value(db): - nick = 'Embolalia' - db.set_nick_value(nick, 'wasd', 'uldr') - assert db.get_nick_value(nick, 'wasd') == 'uldr' - db.delete_nick_value(nick, 'wasd') - assert db.get_nick_value(nick, 'wasd') is None +def test_delete_nick_value(db: SopelDB): + nick = 'TestUser' + db.set_nick_value(nick, 'testkey', 'test-value') + # sanity check + assert db.get_nick_value(nick, 'testkey') == 'test-value', ( + 'Check set_nick_value: this key must contain the correct value.') -def test_unalias_nick(db): - session = db.ssession() + # delete key + db.delete_nick_value(nick, 'testkey') + assert db.get_nick_value(nick, 'testkey') is None + + +def test_delete_nick_value_none(db: SopelDB): + """Test method doesn't raise an error when there is nothing to delete.""" + nick = 'TestUser' + + # this user doesn't even exist + db.delete_nick_value(nick, 'testkey') + assert db.get_nick_value(nick, 'testkey') is None, ( + 'Trying to delete a key must not create it.') + + # create a key + db.set_nick_value(nick, 'otherkey', 'value') + + # delete another key for that user + db.delete_nick_value(nick, 'testkey') + assert db.get_nick_value(nick, 'testkey') is None, ( + 'Trying to delete a key must not create it.') + + # the nick still exists, and its key as well + assert db.get_nick_value(nick, 'otherkey') == 'value', ( + 'This key must not be deleted by error.') + + +@pytest.mark.parametrize('value', ( + 'string-value', + 123456789, + 'unicode-välûé', + ['structured', 'value'], +)) +def test_get_nick_value(db: SopelDB, value): + nick = 'TestUser' + nick_id = db.get_nick_id(nick, create=True) + + with db.session() as session: + nick_value = NickValues( + nick_id=nick_id, + key='testkey', + value=json.dumps(value, ensure_ascii=False), + ) + session.add(nick_value) + session.commit() + + assert db.get_nick_value(nick, 'testkey') == value + assert db.get_nick_value(nick, 'otherkey') is None + assert db.get_nick_value('NotTestUser', 'testkey') is None, ( + 'This key must be defined for TestUser only.') + + +def test_get_nick_value_default(db: SopelDB): + assert db.get_nick_value("TestUser", "nokey") is None + assert db.get_nick_value("TestUser", "nokey", "default") == "default" + + +def test_unalias_nick(db: SopelDB): nick = 'Embolalia' nick_id = 42 - nn = Nicknames(nick_id=nick_id, slug=Identifier(nick).lower(), canonical=nick) - session.add(nn) - session.commit() - - aliases = ['EmbölaliÅ', 'Embo`work', 'Embo'] - for alias in aliases: - nn = Nicknames(nick_id=nick_id, slug=Identifier(alias).lower(), canonical=alias) + with db.session() as session: + nn = Nicknames( + nick_id=nick_id, + slug=Identifier(nick).lower(), + canonical=nick, + ) session.add(nn) session.commit() + aliases = ['EmbölaliÅ', 'Embo`work', 'Embo'] + with db.session() as session: + for alias in aliases: + nn = Nicknames( + nick_id=nick_id, + slug=Identifier(alias).lower(), + canonical=alias, + ) + session.add(nn) + session.commit() + for alias in aliases: db.unalias_nick(alias) - for alias in aliases: - found = session.query(Nicknames) \ - .filter(Nicknames.nick_id == nick_id) \ - .all() - assert len(found) == 1 + with db.session() as session: + found = session.scalar( + select(func.count()) + .select_from(Nicknames) + .where(Nicknames.nick_id == nick_id) + ) + assert found == 1 + +def test_unalias_nick_one_or_none(db: SopelDB): + # this will create the first version of the nick + db.get_nick_id('Exirel', create=True) + + # assert you can't unalias a unique nick with pytest.raises(ValueError): - db.unalias_nick('Mister_Bradshaw') + db.unalias_nick('Exirel') - session.close() + # and you can't either with a non-existing nick + with pytest.raises(ValueError): + db.unalias_nick('xnaas') -def test_forget_nick_group(db): +def test_forget_nick_group(db: SopelDB): session = db.ssession() aliases = ['Embolalia', 'Embo'] nick_id = 42 @@ -252,7 +372,7 @@ def test_forget_nick_group(db): session.close() -def test_merge_nick_groups(db): +def test_merge_nick_groups(db: SopelDB): session = db.ssession() aliases = ['Embolalia', 'Embo'] for nick_id, alias in enumerate(aliases): @@ -283,84 +403,143 @@ def test_merge_nick_groups(db): session.close() -def test_set_channel_value(db): - session = db.ssession() - db.set_channel_value('#asdf', 'qwer', 'zxcv') - result = session.query(ChannelValues.value) \ - .filter(ChannelValues.channel == '#asdf') \ - .filter(ChannelValues.key == 'qwer') \ - .scalar() - assert result == '"zxcv"' - session.close() +# Test channel value +def test_get_channel_slug(db: SopelDB): + assert db.get_channel_slug('#channel') == '#channel' + assert db.get_channel_slug('#CHANNEL') == '#channel' + assert db.get_channel_slug('#[channel]') == '#{channel}', ( + 'Default casemapping should be rfc-1459') -def test_delete_channel_value(db): - db.set_channel_value('#asdf', 'wasd', 'uldr') - assert db.get_channel_value('#asdf', 'wasd') == 'uldr' - db.delete_channel_value('#asdf', 'wasd') - assert db.get_channel_value('#asdf', 'wasd') is None +def test_get_channel_slug_with_migration(db: SopelDB): + channel = db.make_identifier('#[channel]') + db.set_channel_value(channel, 'testkey', 'cval') + assert db.get_channel_slug(channel) == channel.lower() + assert db.get_channel_value(channel, 'testkey') == 'cval' -def test_get_channel_value(db): - session = db.ssession() + # insert a value with the wrong casemapping + old_channel = Identifier._lower_swapped('#[channel]') + assert old_channel == '#[channel]' + assert channel.lower() == '#{channel}' - cv = ChannelValues(channel='#asdf', key='qwer', value='\"zxcv\"') - session.add(cv) - session.commit() + with db.session() as session: + channel_value = ChannelValues( + channel=old_channel, + key='oldkey', + value='"value"' # result from json.dumps + ) + session.add(channel_value) + session.commit() - result = db.get_channel_value('#asdf', 'qwer') - assert result == 'zxcv' - session.close() + assert db.get_channel_slug(old_channel) == channel.lower(), ( + 'Channel with previous casemapping must return the new version.') + assert db.get_channel_value(old_channel, 'oldkey') == 'value', ( + 'Key associated to an old version must be migrated to the new one') -def test_forget_channel(db): - db.set_channel_value('#testchan', 'wasd', 'uldr') - db.set_channel_value('#testchan', 'asdf', 'hjkl') - assert db.get_channel_value('#testchan', 'wasd') == 'uldr' - assert db.get_channel_value('#testchan', 'asdf') == 'hjkl' - db.forget_channel('#testchan') - assert db.get_channel_value('#testchan', 'wasd') is None - assert db.get_channel_value('#testchan', 'asdf') is None +def test_set_channel_value(db: SopelDB): + # set new value + db.set_channel_value('#channel', 'testkey', 'channel-value') + with db.session() as session: + result = session.query(ChannelValues.value) \ + .filter(ChannelValues.channel == '#channel') \ + .filter(ChannelValues.key == 'testkey') \ + .scalar() + assert result == '"channel-value"' -def test_get_channel_value_default(db): - assert db.get_channel_value("TestChan", "DoesntExist") is None - assert db.get_channel_value("TestChan", "DoesntExist", "MyDefault") == "MyDefault" + # update pre-existing value + db.set_channel_value('#channel', 'testkey', 'new_channel-value') + with db.session() as session: + result = session.query(ChannelValues.value) \ + .filter(ChannelValues.channel == '#channel') \ + .filter(ChannelValues.key == 'testkey') \ + .scalar() + assert result == '"new_channel-value"' -def test_get_nick_or_channel_value(db): - db.set_nick_value('asdf', 'qwer', 'poiu') - db.set_channel_value('#asdf', 'qwer', '/.,m') - assert db.get_nick_or_channel_value('asdf', 'qwer') == 'poiu' - assert db.get_nick_or_channel_value('#asdf', 'qwer') == '/.,m' +def test_delete_channel_value(db: SopelDB): + # assert you can delete a non-existing key (without error) + db.delete_channel_value('#channel', 'testkey') -def test_get_nick_or_channel_value_default(db): - assert db.get_nick_or_channel_value("Test", "DoesntExist") is None - assert db.get_nick_or_channel_value("Test", "DoesntExist", "MyDefault") == "MyDefault" + # assert you can delete an existing key + db.set_channel_value('#channel', 'testkey', 'channel-value') + db.delete_channel_value('#channel', 'testkey') + assert db.get_channel_value('#channel', 'testkey') is None -def test_get_preferred_value(db): - db.set_nick_value('asdf', 'qwer', 'poiu') - db.set_channel_value('#asdf', 'qwer', '/.,m') - db.set_channel_value('#asdf', 'lkjh', '1234') - names = ['asdf', '#asdf'] - assert db.get_preferred_value(names, 'qwer') == 'poiu' - assert db.get_preferred_value(names, 'lkjh') == '1234' +def test_get_channel_value(db: SopelDB): + with db.session() as session: + channel_value = ChannelValues( + channel='#channel', + key='testkey', + value='\"value\"', + ) + session.add(channel_value) + session.commit() + result = db.get_channel_value('#channel', 'testkey') + assert result == 'value' -def test_set_plugin_value(db): - session = db.ssession() - db.set_plugin_value('plugname', 'qwer', 'zxcv') - result = session.query(PluginValues.value) \ - .filter(PluginValues.plugin == 'plugname') \ - .filter(PluginValues.key == 'qwer') \ - .scalar() - assert result == '"zxcv"' - session.close() +def test_get_channel_value_default(db: SopelDB): + assert db.get_channel_value("#channel", "nokey") is None + assert db.get_channel_value("#channel", "nokey", "value") == "value" -def test_get_plugin_value(db): + +def test_forget_channel(db: SopelDB): + db.set_channel_value('#channel', 'testkey1', 'value1') + db.set_channel_value('#channel', 'testkey2', 'value2') + assert db.get_channel_value('#channel', 'testkey1') == 'value1' + assert db.get_channel_value('#channel', 'testkey2') == 'value2' + db.forget_channel('#channel') + assert db.get_channel_value('#channel', 'wasd') is None + assert db.get_channel_value('#channel', 'asdf') is None + + +# Test plugin value + +def test_set_plugin_value(db: SopelDB): + # set new value + db.set_plugin_value('plugname', 'qwer', 'zxcv') + with db.session() as session: + result = session.query(PluginValues.value) \ + .filter(PluginValues.plugin == 'plugname') \ + .filter(PluginValues.key == 'qwer') \ + .scalar() + assert result == '"zxcv"' + + # update pre-existing value + db.set_plugin_value('plugname', 'qwer', 'new_zxcv') + + with db.session() as session: + result = session.query(PluginValues.value) \ + .filter(PluginValues.plugin == 'plugname') \ + .filter(PluginValues.key == 'qwer') \ + .scalar() + assert result == '"new_zxcv"' + + +def test_delete_plugin_value(db: SopelDB): + db.set_plugin_value('plugin', 'testkey', 'todelete') + db.set_plugin_value('plugin', 'nodelete', 'tokeep') + assert db.get_plugin_value('plugin', 'testkey') == 'todelete' + assert db.get_plugin_value('plugin', 'nodelete') == 'tokeep' + db.delete_plugin_value('plugin', 'testkey') + assert db.get_plugin_value('plugin', 'testkey') is None + assert db.get_plugin_value('plugin', 'nodelete') == 'tokeep' + + +def test_delete_plugin_value_none(db: SopelDB): + """Test you can delete a key even if it is not defined""" + assert db.get_plugin_value('plugin', 'testkey') is None + db.delete_plugin_value('plugin', 'testkey') + assert db.get_plugin_value('plugin', 'testkey') is None + + +def test_get_plugin_value(db: SopelDB): session = db.ssession() pv = PluginValues(plugin='plugname', key='qwer', value='\"zxcv\"') @@ -372,23 +551,70 @@ def test_get_plugin_value(db): session.close() -def test_get_plugin_value_default(db): +def test_get_plugin_value_default(db: SopelDB): assert db.get_plugin_value("TestPlugin", "DoesntExist") is None assert db.get_plugin_value("TestPlugin", "DoesntExist", "MyDefault") == "MyDefault" -def test_delete_plugin_value(db): +def test_forget_plugin(db: SopelDB): db.set_plugin_value('plugin', 'wasd', 'uldr') + db.set_plugin_value('plugin', 'asdf', 'hjkl') assert db.get_plugin_value('plugin', 'wasd') == 'uldr' - db.delete_plugin_value('plugin', 'wasd') + assert db.get_plugin_value('plugin', 'asdf') == 'hjkl' + db.forget_plugin('plugin') assert db.get_plugin_value('plugin', 'wasd') is None + assert db.get_plugin_value('plugin', 'asdf') is None -def test_forget_plugin(db): - db.set_plugin_value('plugin', 'wasd', 'uldr') - db.set_plugin_value('plugin', 'asdf', 'hjkl') - assert db.get_plugin_value('plugin', 'wasd') == 'uldr' - assert db.get_plugin_value('plugin', 'asdf') == 'hjkl' +def test_forget_plugin_none(db: SopelDB): + """Test forget_plugin works even if there is nothing to forget.""" db.forget_plugin('plugin') assert db.get_plugin_value('plugin', 'wasd') is None assert db.get_plugin_value('plugin', 'asdf') is None + + +# Test nick & channel + +def test_get_nick_or_channel_value(db: SopelDB): + db.set_nick_value('asdf', 'qwer', 'poiu') + db.set_channel_value('#asdf', 'qwer', '/.,m') + assert db.get_nick_or_channel_value('asdf', 'qwer') == 'poiu' + assert db.get_nick_or_channel_value('#asdf', 'qwer') == '/.,m' + + +def test_get_nick_or_channel_value_identifier(db: SopelDB): + db.set_nick_value('testuser', 'testkey', 'user-value') + db.set_channel_value('#channel', 'testkey', 'channel-value') + + nick = Identifier('testuser') + channel = Identifier('#channel') + assert db.get_nick_or_channel_value(nick, 'testkey') == 'user-value' + assert db.get_nick_or_channel_value(nick, 'nokey') is None + assert db.get_nick_or_channel_value(nick, 'nokey', 'default') == 'default' + assert db.get_nick_or_channel_value(channel, 'testkey') == 'channel-value' + assert db.get_nick_or_channel_value( + channel, 'nokey', 'default' + ) == 'default' + + +def test_get_nick_or_channel_value_default(db: SopelDB): + assert db.get_nick_or_channel_value("Test", "DoesntExist") is None + assert db.get_nick_or_channel_value("Test", "DoesntExist", "MyDefault") == "MyDefault" + + +def test_get_preferred_value(db: SopelDB): + db.set_nick_value('asdf', 'qwer', 'poiu') + db.set_channel_value('#asdf', 'qwer', '/.,m') + db.set_channel_value('#asdf', 'lkjh', '1234') + names = ['asdf', '#asdf'] + assert db.get_preferred_value(names, 'qwer') == 'poiu' + assert db.get_preferred_value(names, 'lkjh') == '1234' + + +def test_get_preferred_value_none(db: SopelDB): + """Test method when there is no preferred value""" + db.set_nick_value('testuser', 'userkey', 'uservalue') + db.set_channel_value('#channel', 'channelkey', 'channelvalue') + names = ['notuser', '#notchannel'] + assert db.get_preferred_value(names, 'userkey') is None + assert db.get_preferred_value(names, 'channelkey') is None From d4189c27c2e858e6ae02c24e9281cf1cafa529bc Mon Sep 17 00:00:00 2001 From: Florian Strzelecki Date: Fri, 22 Apr 2022 19:29:32 +0200 Subject: [PATCH 2/2] test: replace nicks by Monty Python references As noted by dgw, it's better to replace Sopel's owner/dev/users nicks by more neutral references to Monty Python characters and actors. Is that a dead parrot? --- test/test_db.py | 62 ++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/test/test_db.py b/test/test_db.py index e13ef3a6e1..ce6525d44f 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -24,7 +24,7 @@ TMP_CONFIG = """ [core] -owner = Embolalia +owner = Pepperpots db_filename = {db_filename} """ @@ -57,7 +57,7 @@ def test_execute(db: SopelDB): def test_connect(db: SopelDB): """Test it's possible to get a raw connection and to use it properly.""" - nick_id = db.get_nick_id('Exirel', create=True) + nick_id = db.get_nick_id('MrEricPraline', create=True) connection = db.connect() try: @@ -65,7 +65,7 @@ def test_connect(db: SopelDB): cursor_obj.execute("SELECT nick_id, canonical, slug FROM nicknames") results = cursor_obj.fetchall() cursor_obj.close() - assert results == [(nick_id, 'Exirel', 'exirel')] + assert results == [(nick_id, 'MrEricPraline', 'mrericpraline')] finally: connection.close() @@ -80,7 +80,7 @@ def test_get_uri(db: SopelDB, tmpconfig): def test_get_nick_id(db: SopelDB): """Test get_nick_id does not create NickID by default.""" - nick = Identifier('Exirel') + nick = Identifier('MrEricPraline') # Attempt to get nick ID: it is not created by default with pytest.raises(ValueError): @@ -94,17 +94,17 @@ def test_get_nick_id(db: SopelDB): nickname = session.execute( select(Nicknames).where(Nicknames.nick_id == nick_id) ).scalar_one() # will raise if not one and exactly one - assert nickname.canonical == 'Exirel' + assert nickname.canonical == 'MrEricPraline' assert nickname.slug == nick.lower() @pytest.mark.parametrize('name, slug, variant', ( # Check case insensitive with ASCII only - ('Embolalia', 'embolalia', 'eMBOLALIA'), + ('MrEricPraline', 'mrericpraline', 'mRErICPraLINE'), # Ensures case conversion is handled properly ('[][]', '{}{}', '[}{]'), # Unicode, just in case - ('EmbölaliÅ', 'embölaliÅ', 'EMBöLALIÅ'), + ('MrÉrïcPrâliné', 'mrÉrïcprâliné', 'MRÉRïCPRâLINé'), )) def test_get_nick_id_casemapping(db: SopelDB, name, slug, variant): """Test get_nick_id is case-insensitive through an Identifier.""" @@ -181,8 +181,8 @@ def test_get_nick_id_migration(db: SopelDB): def test_alias_nick(db: SopelDB): - nick = 'Embolalia' - aliases = ['EmbölaliÅ', 'Embo`work', 'Embo'] + nick = 'MrEricPraline' + aliases = ['MrÉrïcPrâliné', 'John`Cleese', 'DeadParrot'] nick_id = db.get_nick_id(nick, create=True) for alias in aliases: @@ -207,7 +207,7 @@ def test_alias_nick(db: SopelDB): ['structured', 'value'], )) def test_set_nick_value(db: SopelDB, value): - nick = 'TestNick' + nick = 'Pepperpots' db.set_nick_value(nick, 'testkey', value) assert db.get_nick_value(nick, 'testkey') == value, ( 'The value retrieved must be exactly what was stored.') @@ -215,26 +215,26 @@ def test_set_nick_value(db: SopelDB, value): def test_set_nick_value_update(db: SopelDB): """Test set_nick_value can update an existing value.""" - db.set_nick_value('TestNick', 'testkey', 'first-value') - db.set_nick_value('TestNick', 'otherkey', 'other-value') - db.set_nick_value('OtherNick', 'testkey', 'other-nick-value') + db.set_nick_value('Pepperpots', 'testkey', 'first-value') + db.set_nick_value('Pepperpots', 'otherkey', 'other-value') + db.set_nick_value('Vikings', 'testkey', 'other-nick-value') # sanity check: ensure every (nick, key, value) is correct - assert db.get_nick_value('TestNick', 'testkey') == 'first-value' - assert db.get_nick_value('TestNick', 'otherkey') == 'other-value' - assert db.get_nick_value('OtherNick', 'testkey') == 'other-nick-value' + assert db.get_nick_value('Pepperpots', 'testkey') == 'first-value' + assert db.get_nick_value('Pepperpots', 'otherkey') == 'other-value' + assert db.get_nick_value('Vikings', 'testkey') == 'other-nick-value' # update only one key - db.set_nick_value('TestNick', 'testkey', 'new-value') + db.set_nick_value('Pepperpots', 'testkey', 'new-value') # check new value while old values are still the same - assert db.get_nick_value('TestNick', 'testkey') == 'new-value' - assert db.get_nick_value('TestNick', 'otherkey') == 'other-value' - assert db.get_nick_value('OtherNick', 'testkey') == 'other-nick-value' + assert db.get_nick_value('Pepperpots', 'testkey') == 'new-value' + assert db.get_nick_value('Pepperpots', 'otherkey') == 'other-value' + assert db.get_nick_value('Vikings', 'testkey') == 'other-nick-value' def test_delete_nick_value(db: SopelDB): - nick = 'TestUser' + nick = 'TerryGilliam' db.set_nick_value(nick, 'testkey', 'test-value') # sanity check @@ -248,7 +248,7 @@ def test_delete_nick_value(db: SopelDB): def test_delete_nick_value_none(db: SopelDB): """Test method doesn't raise an error when there is nothing to delete.""" - nick = 'TestUser' + nick = 'TerryGilliam' # this user doesn't even exist db.delete_nick_value(nick, 'testkey') @@ -275,7 +275,7 @@ def test_delete_nick_value_none(db: SopelDB): ['structured', 'value'], )) def test_get_nick_value(db: SopelDB, value): - nick = 'TestUser' + nick = 'TerryGilliam' nick_id = db.get_nick_id(nick, create=True) with db.session() as session: @@ -290,12 +290,12 @@ def test_get_nick_value(db: SopelDB, value): assert db.get_nick_value(nick, 'testkey') == value assert db.get_nick_value(nick, 'otherkey') is None assert db.get_nick_value('NotTestUser', 'testkey') is None, ( - 'This key must be defined for TestUser only.') + 'This key must be defined for TerryGilliam only.') def test_get_nick_value_default(db: SopelDB): - assert db.get_nick_value("TestUser", "nokey") is None - assert db.get_nick_value("TestUser", "nokey", "default") == "default" + assert db.get_nick_value("TerryGilliam", "nokey") is None + assert db.get_nick_value("TerryGilliam", "nokey", "default") == "default" def test_unalias_nick(db: SopelDB): @@ -336,20 +336,20 @@ def test_unalias_nick(db: SopelDB): def test_unalias_nick_one_or_none(db: SopelDB): # this will create the first version of the nick - db.get_nick_id('Exirel', create=True) + db.get_nick_id('MrEricPraline', create=True) # assert you can't unalias a unique nick with pytest.raises(ValueError): - db.unalias_nick('Exirel') + db.unalias_nick('MrEricPraline') # and you can't either with a non-existing nick with pytest.raises(ValueError): - db.unalias_nick('xnaas') + db.unalias_nick('gumbys') def test_forget_nick_group(db: SopelDB): session = db.ssession() - aliases = ['Embolalia', 'Embo'] + aliases = ['MrEricPraline', 'Praline'] nick_id = 42 for alias in aliases: nn = Nicknames(nick_id=nick_id, slug=Identifier(alias).lower(), canonical=alias) @@ -374,7 +374,7 @@ def test_forget_nick_group(db: SopelDB): def test_merge_nick_groups(db: SopelDB): session = db.ssession() - aliases = ['Embolalia', 'Embo'] + aliases = ['MrEricPraline', 'Praline'] for nick_id, alias in enumerate(aliases): nn = Nicknames(nick_id=nick_id, slug=Identifier(alias).lower(), canonical=alias) session.add(nn)