From 186ab6b6bc1c2bc2d6cf856b0b1518639f7d84e1 Mon Sep 17 00:00:00 2001 From: Tarvi Pillessaar Date: Wed, 2 Sep 2015 16:13:32 +0300 Subject: [PATCH] londiste.repair: add option to dump in parallel --- python/londiste.py | 2 ++ python/londiste/repair.py | 20 +++++++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/python/londiste.py b/python/londiste.py index 8860a313..3013309c 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -157,6 +157,8 @@ def init_optparse(self, parser=None): help = "add: ignore table differences, repair: ignore lag") g.add_option("--apply", action = "store_true", help="repair: apply fixes automatically") + g.add_option("--parallel", action = "store_true", + help="repair: dump source and target in parallel") g.add_option("--count-only", action="store_true", help="compare: just count rows, do not compare data") p.add_option_group(g) diff --git a/python/londiste/repair.py b/python/londiste/repair.py index 3e2e8b16..8bf9c138 100644 --- a/python/londiste/repair.py +++ b/python/londiste/repair.py @@ -5,6 +5,7 @@ """ import sys, os, skytools, subprocess +from threading import Thread from londiste.syncer import Syncer @@ -30,6 +31,7 @@ def init_optparse(self, p=None): """Initialize cmdline switches.""" p = super(Repairer, self).init_optparse(p) p.add_option("--apply", action="store_true", help="apply fixes") + p.add_option("--parallel", action="store_true", help="dump source and target in parallel") return p def process_sync(self, t1, t2, src_db, dst_db): @@ -64,11 +66,18 @@ def process_sync(self, t1, t2, src_db, dst_db): src_where = dst_where self.log.info("Dumping src table: %s", src_tbl) - self.dump_table(src_tbl, src_curs, dump_src, src_where) - src_db.commit() + src_dumper = Thread(target=self.dump_table, args=(src_tbl,src_curs,dump_src,src_where,src_db,)) + src_dumper.daemon = True + src_dumper.start() + if not self.options.parallel: + src_dumper.join() self.log.info("Dumping dst table: %s", dst_tbl) - self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where) - dst_db.commit() + dst_dumper = Thread(target=self.dump_table, args=(dst_tbl, dst_curs, dump_dst, dst_where, dst_db,)) + dst_dumper.daemon = True + dst_dumper.start() + if self.options.parallel: + src_dumper.join() + dst_dumper.join() self.log.info("Sorting src table: %s", dump_src) self.do_sort(dump_src, dump_src_sorted) @@ -132,7 +141,7 @@ def load_common_columns(self, src_tbl, dst_tbl, src_curs, dst_curs): cols = ",".join(fqlist) self.log.debug("using columns: %s", cols) - def dump_table(self, tbl, curs, fn, whr): + def dump_table(self, tbl, curs, fn, whr, db): """Dump table to disk.""" cols = ','.join(self.fq_common_fields) if len(whr) == 0: @@ -144,6 +153,7 @@ def dump_table(self, tbl, curs, fn, whr): size = f.tell() f.close() self.log.info('%s: Got %d bytes', tbl, size) + db.commit() def get_row(self, ln): """Parse a row into dict."""