Skip to content

Commit

Permalink
Merge pull request buildbot#8192 from p12tic/fix-copydb-tests
Browse files Browse the repository at this point in the history
More fixes to copydb tests
  • Loading branch information
p12tic authored Nov 4, 2024
2 parents 595ea79 + fa4b310 commit 60e9fdc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 27 deletions.
2 changes: 1 addition & 1 deletion master/buildbot/db/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def setServiceParent(self, p):

@defer.inlineCallbacks
def setup(self, check_version=True, verbose=True):
super().setup()
yield super().setup()
db_url = self.configured_url

log.msg(f"Setting up database with URL {util.stripUrlPassword(db_url)!r}")
Expand Down
24 changes: 22 additions & 2 deletions master/buildbot/scripts/copydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def tdh_query_all_column_rows(conn, column):
ids.add(getattr(row, column.name))
return ids

got_error = False

def thd_write(conn):
max_column_id = 0

Expand Down Expand Up @@ -226,6 +228,16 @@ def thd_write(conn):

except queue.Empty:
continue
except Exception:
nonlocal got_error
got_error = True
# unblock queue
try:
rows_queue.get(timeout=1)
rows_queue.task_done()
except queue.Empty:
pass
raise

try:
written_count[0] += len(rows)
Expand All @@ -246,19 +258,27 @@ def thd_read(conn):
total_count[0] = conn.execute(q).scalar()

result = conn.execute(sa.select(table))
while True:
while not got_error:
chunk = result.fetchmany(10000)
if not chunk:
break
rows_queue.put(chunk)

rows_queue.put(None)

error: Exception | None = None
tasks = [src_db.pool.do(thd_read), dst_db.pool.do(thd_write)]
yield defer.gatherResults(tasks)
for d in tasks:
try:
yield d
except Exception as e:
error = e

rows_queue.join()

if error is not None:
raise error


@defer.inlineCallbacks
def _copy_database_with_db(src_db, dst_db, ignore_fk_error_rows, print_log):
Expand Down
6 changes: 3 additions & 3 deletions master/buildbot/test/unit/scripts/test_copydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TestCopyDbRealDb(misc.StdoutAssertionsMixin, RunMasterBase, dirs.DirsMixin

def setUp(self):
self.setUpDirs('basedir')
# self.setUpStdoutAssertions() # comment out to see stdout from script
self.setUpStdoutAssertions() # comment out to see stdout from script
write_buildbot_tac(os.path.join('basedir', 'buildbot.tac'))

def tearDown(self):
Expand Down Expand Up @@ -161,8 +161,8 @@ def resolve_db_url(self):

def drop_database_tables(self, db_url):
engine = enginestrategy.create_engine(db_url, basedir='basedir')
conn = engine.connect()
db.thd_clean_database(conn)
with engine.connect() as conn:
db.thd_clean_database(conn)

@async_to_deferred
async def test_full(self):
Expand Down
52 changes: 31 additions & 21 deletions master/buildbot/test/util/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.trial import unittest
from zope.interface import implementer
Expand Down Expand Up @@ -93,15 +94,21 @@ async def create_master(self, case, reactor, config_dict, basedir=None):
async def shutdown(self):
if self.is_master_shutdown:
return
await self.master.stopService()
await self.master.db.pool.shutdown()
try:
await self.master.stopService()
except Exception as e:
log.err(e)
try:
await self.master.db.pool.shutdown()
except Exception as e:
log.err(e)
self.is_master_shutdown = True


def print_test_log(log, out):
print(" " * 8 + f"*********** LOG: {log['name']} *********", file=out)
if log['type'] == 's':
for line in log['contents']['content'].splitlines():
def print_test_log(l, out):
print(" " * 8 + f"*********** LOG: {l['name']} *********", file=out)
if l['type'] == 's':
for line in l['contents']['content'].splitlines():
linetype = line[0]
line = line[1:]
if linetype == 'h':
Expand All @@ -112,7 +119,7 @@ def print_test_log(log, out):
line = "\x1b[31m" + line + "\x1b[0m"
print(" " * 8 + line)
else:
print("" + log['contents']['content'], file=out)
print("" + l['contents']['content'], file=out)
print(" " * 8 + "********************************", file=out)


Expand All @@ -129,10 +136,10 @@ async def enrich_build(
for step in build["steps"]:
step['logs'] = await master.data.get(("steps", step['stepid'], "logs"))
step["logs"] = list(step['logs'])
for log in step["logs"]:
log['contents'] = await master.data.get((
for l in step["logs"]:
l['contents'] = await master.data.get((
"logs",
log['logid'],
l['logid'],
"contents",
))

Expand Down Expand Up @@ -161,10 +168,10 @@ async def print_build(build, master: BuildMaster, out=sys.stdout, with_logs=Fals
)
for url in step['urls']:
print(f" url:{url['name']} ({url['url']})", file=out)
for log in step['logs']:
print(f" log:{log['name']} ({log['num_lines']})", file=out)
for l in step['logs']:
print(f" log:{l['name']} ({l['num_lines']})", file=out)
if step['results'] != SUCCESS or with_logs:
print_test_log(log, out)
print_test_log(l, out)


class RunFakeMasterTestCase(unittest.TestCase, TestReactorMixin, DebugIntegrationLogsMixin):
Expand Down Expand Up @@ -211,19 +218,19 @@ def assertStepStateString(self, step_id, state_string):
def assertLogs(self, build_id, exp_logs):
got_logs = {}
data_logs = yield self.master.data.get(('builds', build_id, 'steps', 1, 'logs'))
for log in data_logs:
self.assertTrue(log['complete'])
for l in data_logs:
self.assertTrue(l['complete'])
log_contents = yield self.master.data.get((
'builds',
build_id,
'steps',
1,
'logs',
log['slug'],
l['slug'],
'contents',
))

got_logs[log['name']] = log_contents['content']
got_logs[l['name']] = log_contents['content']

self.assertEqual(got_logs, exp_logs)

Expand Down Expand Up @@ -367,12 +374,15 @@ async def shutdown(self):
return

if isinstance(self.worker, SandboxedWorker):
await self.worker.shutdownWorker()
try:
await self.worker.shutdownWorker()
except Exception as e:
log.err(e)
await super().shutdown()

@async_to_deferred
async def dump_data_if_failed(self):
if self.case is not None and not self.case._passed:
if self.case is not None and not self.case._passed and not self.is_master_shutdown:
dump = StringIO()
print("FAILED! dumping build db for debug", file=dump)
builds = await self.master.data.get(("builds",))
Expand Down Expand Up @@ -490,8 +500,8 @@ def checkBuildStepLogExist(self, build, expectedLog, onlyStdout=False, regex=Fal
build, self.master, want_steps=True, want_properties=True, want_logs=True
)
for step in build['steps']:
for log in step['logs']:
for line in log['contents']['content'].splitlines():
for l in step['logs']:
for line in l['contents']['content'].splitlines():
if onlyStdout and line[0] != 'o':
continue
expectedLog = self._match_patterns_consume(line, expectedLog, is_regex=regex)
Expand Down

0 comments on commit 60e9fdc

Please sign in to comment.