Skip to content
This repository has been archived by the owner on Oct 1, 2020. It is now read-only.

londiste.repair: add option to dump in parallel #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/londiste.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def init_optparse(self, parser=None):
help = "add: ignore table differences, repair: ignore lag")
g.add_option("--apply", action = "store_true",
help="repair: apply fixes automatically")
g.add_option("--parallel", action = "store_true",
help="repair: dump source and target in parallel")
g.add_option("--count-only", action="store_true",
help="compare: just count rows, do not compare data")
p.add_option_group(g)
Expand Down
20 changes: 15 additions & 5 deletions python/londiste/repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import sys, os, skytools, subprocess
from threading import Thread

from londiste.syncer import Syncer

Expand All @@ -30,6 +31,7 @@ def init_optparse(self, p=None):
"""Initialize cmdline switches."""
p = super(Repairer, self).init_optparse(p)
p.add_option("--apply", action="store_true", help="apply fixes")
p.add_option("--parallel", action="store_true", help="dump source and target in parallel")
return p

def process_sync(self, t1, t2, src_db, dst_db):
Expand Down Expand Up @@ -64,11 +66,18 @@ def process_sync(self, t1, t2, src_db, dst_db):
src_where = dst_where

self.log.info("Dumping src table: %s", src_tbl)
self.dump_table(src_tbl, src_curs, dump_src, src_where)
src_db.commit()
src_dumper = Thread(target=self.dump_table, args=(src_tbl,src_curs,dump_src,src_where,src_db,))
src_dumper.daemon = True
src_dumper.start()
if not self.options.parallel:
src_dumper.join()
self.log.info("Dumping dst table: %s", dst_tbl)
self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where)
dst_db.commit()
dst_dumper = Thread(target=self.dump_table, args=(dst_tbl, dst_curs, dump_dst, dst_where, dst_db,))
dst_dumper.daemon = True
dst_dumper.start()
if self.options.parallel:
src_dumper.join()
dst_dumper.join()

self.log.info("Sorting src table: %s", dump_src)
self.do_sort(dump_src, dump_src_sorted)
Expand Down Expand Up @@ -132,7 +141,7 @@ def load_common_columns(self, src_tbl, dst_tbl, src_curs, dst_curs):
cols = ",".join(fqlist)
self.log.debug("using columns: %s", cols)

def dump_table(self, tbl, curs, fn, whr):
def dump_table(self, tbl, curs, fn, whr, db):
"""Dump table to disk."""
cols = ','.join(self.fq_common_fields)
if len(whr) == 0:
Expand All @@ -144,6 +153,7 @@ def dump_table(self, tbl, curs, fn, whr):
size = f.tell()
f.close()
self.log.info('%s: Got %d bytes', tbl, size)
db.commit()

def get_row(self, ln):
"""Parse a row into dict."""
Expand Down