Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolver parallelization #225

Merged
merged 6 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def main():
cmd.execute(**kwargs)
if cmd.needs_session:
session.db.commit()
session.db.close()
session.close()


class CreateDb(Command):
Expand Down
12 changes: 10 additions & 2 deletions koschei/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from koschei.config import get_config
from koschei.backend import koji_util
from koschei.backend.koji_util import itercall
from koschei.db import Session
from koschei.db import Session, get_engine
from koschei.models import (
Build, UnappliedChange, KojiTask, Package, PackageGroup,
PackageGroupRelation, BasePackage, Collection, RepoMapping, LogEntry,
Expand All @@ -43,6 +43,7 @@
class KoscheiBackendSession(KoscheiSession):
def __init__(self):
super(KoscheiBackendSession, self).__init__()
self._db_connection = None
self._db = None
self._koji_sessions = {}
self._repo_cache = None
Expand All @@ -53,9 +54,16 @@ def log_user_action(self, message, **kwargs):
@property
def db(self):
if self._db is None:
self._db = Session()
self._db_connection = get_engine().connect()
self._db = Session(bind=self._db_connection)
return self._db

def close(self):
if self._db:
self._db.close()
if self._db_connection:
self._db_connection.close()

@property
def build_from_repo_id(self):
return get_config('koji_config').get('build_from_repo_id')
Expand Down
1 change: 0 additions & 1 deletion koschei/backend/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def run_service(self):
self.main()
finally:
self.db.rollback()
self.db.close()
self.memory_check()
self.notify_watchdog()
time.sleep(interval)
Expand Down
25 changes: 18 additions & 7 deletions koschei/backend/services/build_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from sqlalchemy.orm.exc import ObjectDeletedError, StaleDataError

from koschei.config import get_config
from koschei.locks import pg_session_lock, Locked, LOCK_BUILD_RESOLVER
from koschei.models import (
Collection, Package, AppliedChange, Build,
)
Expand All @@ -48,6 +49,8 @@ def main(self):
def process_builds(self, collection):
"""
Processes builds in a single collection.
Can be executed concurrently from multiple processes. One process
always processes one repo ID.
Commits the transaction in increments.
"""
builds = (
Expand All @@ -61,13 +64,21 @@ def process_builds(self, collection):

# Group by repo_id to speed up processing (reuse the sack)
for repo_id, builds_group in groupby(builds, lambda b: b.repo_id):
if repo_id is None:
for build in builds_group:
# Builds with no repo id cannot be resolved
self.process_unresolved_build(build)
else:
self.process_builds_with_repo_id(collection, repo_id, list(builds_group))
self.db.commit()
try:
with pg_session_lock(self.db, LOCK_BUILD_RESOLVER, repo_id, block=False):
if repo_id is None:
for build in builds_group:
# Builds with no repo id cannot be resolved
self.process_unresolved_build(build)
else:
self.process_builds_with_repo_id(
collection,
repo_id,
list(builds_group),
)
self.db.commit()
except Locked:
continue

def process_builds_with_repo_id(self, collection, repo_id, builds):
"""
Expand Down
2 changes: 0 additions & 2 deletions koschei/backend/services/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ def main(self):
self.log.info('Polling Koji packages...')
backend.refresh_packages(self.session)
self.db.commit()
self.db.close()
plugin.dispatch_event('polling_event', self.session)
self.db.commit()
self.db.close()
self.log.info('Polling latest real builds...')
backend.refresh_latest_builds(self.session)
self.db.commit()
Expand Down
11 changes: 10 additions & 1 deletion koschei/backend/services/repo_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from koschei.backend import koji_util
from koschei.plugin import dispatch_event
from koschei.util import stopwatch
from koschei.locks import pg_session_lock, Locked, LOCK_REPO_RESOLVER
from koschei.models import (
Package, UnappliedChange, ResolutionProblem, BuildrootProblem, RepoMapping,
ResolutionChange, Collection,
Expand All @@ -56,7 +57,15 @@ class RepoGenerationException(Exception):
class RepoResolver(Resolver):
def main(self):
for collection in self.db.query(Collection).all():
self.process_repo(collection)
try:
with pg_session_lock(
self.db, LOCK_REPO_RESOLVER, collection.id, block=False
):
self.process_repo(collection)
self.db.commit()
except Locked:
# Locked by another process
continue

def process_repo(self, collection):
"""
Expand Down
13 changes: 11 additions & 2 deletions koschei/backend/services/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from sqlalchemy.orm import undefer
from sqlalchemy.sql import insert
from sqlalchemy.exc import IntegrityError

from koschei import util
from koschei.config import get_config
Expand Down Expand Up @@ -65,7 +66,7 @@ def _compact(self):
del self.nevras[(victim.name, victim.epoch, victim.version,
victim.release, victim.arch)]

def get_or_create_nevra(self, db, nevra):
def _get_or_create_nevra(self, db, nevra):
dep = self.nevras.get(nevra)
if dep is None:
dep = db.query(*Dependency.inevra)\
Expand All @@ -87,10 +88,18 @@ def get_or_create_nevra(self, db, nevra):
self._access(dep)
return dep

def get_or_create_nevra(self, db, nevra):
try:
with db.begin_nested():
return self._get_or_create_nevra(db, nevra)
except IntegrityError:
# If there was a concurrent insert, the next query must succeed
return self._get_or_create_nevra(db, nevra)

def get_or_create_nevras(self, db, nevras):
res = []
for nevra in nevras:
res.append(self.get_or_create_nevra(db, nevra))
res.append(self._get_or_create_nevra(db, nevra))
return res

@stopwatch(total_time, note='dependency cache')
Expand Down
11 changes: 11 additions & 0 deletions koschei/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def as_record(self):


class KoscheiDbSession(sqlalchemy.orm.session.Session):
def __init__(self, connection=None, *args, **kwargs):
self._connection = connection
if connection:
kwargs['bind'] = connection
super(KoscheiDbSession, self).__init__(*args, **kwargs)

def close(self):
super(KoscheiDbSession, self).close()
if self._connection:
self._connection.close()

def bulk_insert(self, objects):
"""
Inserts ORM objects using sqla-core bulk insert. Only handles simple flat
Expand Down
100 changes: 100 additions & 0 deletions koschei/locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Author: Michael Simacek <msimacek@redhat.com>

from contextlib import contextmanager

from sqlalchemy.sql import func


LOCK_REPO_RESOLVER = 1
LOCK_BUILD_RESOLVER = 2


class Locked(Exception):
pass


def pg_lock(db, namespace, key, block=True, transaction=False, shared=False):
"""
Lock an arbitrary resource identified by an integer key using PostgreSQL
advisory lock.

:param: db Database session
:param: namespace Integer namespace identifier. Should use one of the constants
defined in this module.
:param: key Integer identifier of the lock.
:param: block Whether to block waiting for the lock.
:param: transaction Whether the lock should be scoped by the transaction
(unlocks when the transaction ends). Otherwise scoped by the
session.
:param: shared Whether the lock should be shared. Otherwise it is
exclusive.

:raises: Locked if in non blocking mode and failed to obtain the lock.

Exact semantics are described in PostgreSQL documentation:
https://www.postgresql.org/docs/9.2/static/explicit-locking.html#ADVISORY-LOCKS
https://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
"""
fn_name = 'pg_'
if not block:
fn_name += 'try_'
fn_name += 'advisory_'
if transaction:
fn_name += 'xact_'
fn_name += 'lock'
if shared:
fn_name += '_shared'
function = getattr(func, fn_name)
res = db.query(function(namespace, key)).scalar()
if not block and not res:
raise Locked()


def pg_unlock(db, namespace, key, shared=False):
"""
Unlocks given advisory session lock. Arguments have the same meaning as in pg_lock
"""
fn_name = 'pg_advisory_unlock'
if shared:
fn_name += '_shared'
function = getattr(func, fn_name)
db.query(function(namespace, key)).one()


def pg_unlock_all(db):
"""
Unlocks advisory session locks.
"""
db.query(func.pg_advisory_unlock_all()).one()


@contextmanager
def pg_session_lock(db, namespace, key, block=True, shared=False):
"""
Context manager for obtaining a session lock.
With block=True (default) blocks until the resource is locked.
"""
locked = False
try:
pg_lock(db, namespace, key, block=block, shared=shared)
locked = True
yield
finally:
if locked:
pg_unlock(db, namespace, key, shared=shared)
1 change: 0 additions & 1 deletion koschei/plugins/fedmsg_plugin/backend/services/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,4 @@ def main(self):
plugin.dispatch_event('fedmsg_event', self.session, topic, msg)
finally:
self.db.rollback()
self.db.close()
self.memory_check()
3 changes: 3 additions & 0 deletions koschei/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ def cache(self, cache_id):
cache.configure(**get_config('caching.' + cache_id))
self._caches[cache_id] = cache
return self._caches[cache_id]

def close(self):
pass
19 changes: 10 additions & 9 deletions test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ def get_json_data(name):
return json.load(fo)


class KoscheiSessionMock(KoscheiBackendSession):
class KoscheiBackendSessionMock(KoscheiBackendSession):
def __init__(self):
super(KoscheiSessionMock, self).__init__()
super(KoscheiBackendSessionMock, self).__init__()
self.koji_mock = KojiMock()
self.sec_koji_mock = KojiMock()
self.repo_cache_mock = RepoCacheMock()
Expand Down Expand Up @@ -151,7 +151,7 @@ def __init__(self, *args, **kwargs):
dest_tag='f25', build_tag="f25-build", priority_coefficient=1.0,
latest_repo_resolved=True, latest_repo_id=123,
)
self.session = KoscheiSessionMock()
self.session = None

@classmethod
def setUpClass(cls):
Expand All @@ -162,13 +162,10 @@ def setUpClass(cls):
DBTest.init_postgres()
DBTest.postgres_initialized = True

def get_session(self):
return Session()

def setUp(self):
super(DBTest, self).setUp()
if not DBTest.postgres_initialized:
self.skipTest("requires PostgreSQL")
super(DBTest, self).setUp()
conn = get_engine().connect()
for table in Base.metadata.non_materialized_view_tables:
conn.execute(table.delete())
Expand All @@ -177,13 +174,17 @@ def setUp(self):
for materialized_view in Base.metadata.materialized_views:
materialized_view.refresh(conn)
conn.close()
self.db = self.session._db = self.get_session()
self.session = self.create_session()
self.db = self.session.db
self.db.add(self.collection)
self.db.commit()

def create_session(self):
return KoscheiBackendSessionMock()

def tearDown(self):
super(DBTest, self).tearDown()
self.db.close()
self.session.close()

def ensure_base_package(self, package):
if not package.base_id:
Expand Down
6 changes: 3 additions & 3 deletions test/frontend_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from functools import wraps

from koschei.frontend import app
from koschei.frontend.base import db
from koschei.frontend.base import KoscheiFrontendSession, db
from test.common import DBTest


Expand Down Expand Up @@ -57,8 +57,8 @@ def __init__(self, *args, **kwargs):
super(FrontendTest, self).__init__(*args, **kwargs)
self.user = None

def get_session(self):
return db
def create_session(self):
return KoscheiFrontendSession()

def setUp(self):
super(FrontendTest, self).setUp()
Expand Down
Loading