# XXX Should eventually contribute this in some form -- probably adding an arg to # XXX update() instead, though. Note that this comments out code to strip it down # XXX to just what we need (no tracers, no arcs, no validation code) to make it # XXX perhaps less sensitive to breakage in future coverage versions. Contributed # XXX code would need to include the part where all arcs are removed for the # XXX contexts in other_data. def replace_context_update(self, other_data, aliases=None): """Merge data from other_data by replacing all data for the contexts found in other_data,. The new data entirely replaces any old data that was collected for those contexts. """ import coverage assert isinstance(self, coverage.sqldata.CoverageData) #if self._debug.should("dataop"): #self._debug.write("Replace-updating with data from {!r}".format( #getattr(other_data, "_filename", "???"), #)) #if self._has_lines and other_data._has_arcs: #raise coverage.sqldata.DataError("Can't combine arc data with line data") #if self._has_arcs and other_data._has_lines: #raise coverage.sqldata.DataError("Can't combine line data with arc data") aliases = aliases or coverage.sqldata.PathAliases() # Force the database we're writing to to exist before we start nesting contexts. self._start_using() # Collector for all arcs, lines and tracers other_data.read() with other_data._connect() as con: # Get files data. with con.execute("select path from file") as cur: files = {path: aliases.map(path) for (path,) in cur} # Get contexts data. with con.execute("select context from context") as cur: contexts = [context for (context,) in cur] ## Get arc data. #with con.execute( #"select file.path, context.context, arc.fromno, arc.tono " + #"from arc " + #"inner join file on file.id = arc.file_id " + #"inner join context on context.id = arc.context_id" #) as cur: #arcs = [ #(files[path], context, fromno, tono) #for (path, context, fromno, tono) in cur #] # Get line data. with con.execute( "select file.path, context.context, line_bits.numbits " + "from line_bits " + "inner join file on file.id = line_bits.file_id " + "inner join context on context.id = line_bits.context_id" ) as cur: #lines: Dict[Tuple[str, str], bytes] = {} lines = {} for path, context, numbits in cur: key = (files[path], context) if key in lines: numbits = coverage.sqldata.numbits_union(lines[key], numbits) lines[key] = numbits ## Get tracer data. #with con.execute( #"select file.path, tracer " + #"from tracer " + #"inner join file on file.id = tracer.file_id" #) as cur: #tracers = {files[path]: tracer for (path, tracer) in cur} with self._connect() as con: assert con.con is not None con.con.isolation_level = "IMMEDIATE" ## Get all tracers in the DB. Files not in the tracers are assumed ## to have an empty string tracer. Since Sqlite does not support ## full outer joins, we have to make two queries to fill the ## dictionary. #with con.execute("select path from file") as cur: #this_tracers = {path: "" for path, in cur} #with con.execute( #"select file.path, tracer from tracer " + #"inner join file on file.id = tracer.file_id" #) as cur: #this_tracers.update({ #aliases.map(path): tracer #for path, tracer in cur #}) # Create all file and context rows in the DB. con.executemany_void( "insert or ignore into file (path) values (?)", ((file,) for file in files.values()) ) with con.execute("select id, path from file") as cur: file_ids = {path: id for id, path in cur} self._file_map.update(file_ids) con.executemany_void( "insert or ignore into context (context) values (?)", ((context,) for context in contexts) ) with con.execute("select id, context from context") as cur: context_ids = {context: id for id, context in cur} ## Prepare tracers and fail, if a conflict is found. ## tracer_paths is used to ensure consistency over the tracer data ## and tracer_map tracks the tracers to be inserted. #tracer_map = {} #for path in files.values(): #this_tracer = this_tracers.get(path) #other_tracer = tracers.get(path, "") ## If there is no tracer, there is always the None tracer. #if this_tracer is not None and this_tracer != other_tracer: #raise coverage.sqldata.DataError( #"Conflicting file tracer name for '{}': {!r} vs {!r}".format( #path, this_tracer, other_tracer #) #) #tracer_map[path] = other_tracer #if arcs: #self._choose_lines_or_arcs(arcs=True) #con.executemany_void( #"delete from arc where context_id = ?", #((context_ids[context], ) for context in contexts) #) #con.executemany_void( #"insert or ignore into arc " + #"(file_id, context_id, fromno, tono) values (?, ?, ?, ?)", #( #(file_ids[file], context_ids[context], fromno, tono) #for file, context, fromno, tono in arcs #) #) if lines: self._choose_lines_or_arcs(lines=True) con.executemany_void( "delete from line_bits where context_id = ?", ((context_ids[context], ) for context in contexts) ) con.executemany_void( "insert into line_bits " + "(file_id, context_id, numbits) values (?, ?, ?)", ( (file_ids[file], context_ids[context], numbits) for (file, context), numbits in lines.items() ) ) #con.executemany_void( #"insert or ignore into tracer (file_id, tracer) values (?, ?)", #((file_ids[filename], tracer) for filename, tracer in tracer_map.items()) #) if not self._no_disk: # Update all internal cache data. self._reset() self.read()