diff --git a/pyproject.toml b/pyproject.toml index 25155a0..a87ea79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ vulture = "^2.3" pytest = "^6.2.4" pytest-cov = "^2.12.1" redisgraph = "^2.4.0" +pathos = "^0.2.8" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/redisgraph_bulk_loader/bulk_insert.py b/redisgraph_bulk_loader/bulk_insert.py index 33077eb..f561fa9 100644 --- a/redisgraph_bulk_loader/bulk_insert.py +++ b/redisgraph_bulk_loader/bulk_insert.py @@ -145,6 +145,7 @@ def bulk_insert(graph, host, port, password, user, unix_socket_path, ssl_keyfile # Send all remaining tokens to Redis query_buf.send_buffer() + query_buf.wait_pool() end_time = timer() query_buf.report_completion(end_time - start_time) diff --git a/redisgraph_bulk_loader/label.py b/redisgraph_bulk_loader/label.py index 0ba4876..8b43d9f 100644 --- a/redisgraph_bulk_loader/label.py +++ b/redisgraph_bulk_loader/label.py @@ -56,7 +56,7 @@ def update_node_dictionary(self, identifier): def process_entities(self): entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader: for row in reader: self.validate_row(row) diff --git a/redisgraph_bulk_loader/query_buffer.py b/redisgraph_bulk_loader/query_buffer.py index e3b2934..cad974f 100644 --- a/redisgraph_bulk_loader/query_buffer.py +++ b/redisgraph_bulk_loader/query_buffer.py @@ -1,3 +1,10 @@ +from pathos.pools import ThreadPool as Pool + +def run(client, graphname, args): + result = client.execute_command("GRAPH.BULK", graphname, *args) + stats = result.split(', '.encode()) + return stats + class QueryBuffer: def __init__(self, graphname, client, config): self.nodes = None @@ -30,7 +37,9 @@ def __init__(self, graphname, client, config): self.nodes_created = 0 # Total number of nodes created self.relations_created = 0 # Total number of relations created - # TODO consider using a queue to send commands asynchronously + self.pool = Pool(nodes=1) + self.tasks = [] + def send_buffer(self): """Send all pending inserts to Redis""" # Do nothing if we have no entities @@ -43,10 +52,8 @@ def send_buffer(self): args.insert(0, "BEGIN") self.initial_query = False - result = self.client.execute_command("GRAPH.BULK", self.graphname, *args) - stats = result.split(', '.encode()) - self.nodes_created += int(stats[0].split(' '.encode())[0]) - self.relations_created += int(stats[1].split(' '.encode())[0]) + task = self.pool.apipe(run, self.client, self.graphname, args) + self.add_task(task) self.clear_buffer() @@ -59,6 +66,23 @@ def clear_buffer(self): self.buffer_size = 0 self.node_count = 0 self.relation_count = 0 + + def add_task(self, task): + self.tasks.append(task) + if len(self.tasks) == 5: + task = self.tasks.pop(0) + stats = task.get() + self.update_stats(stats) + + def wait_pool(self): + for task in self.tasks: + stats = task.get() + self.update_stats(stats) + self.tasks.clear() + + def update_stats(self, stats): + self.nodes_created += int(stats[0].split(' '.encode())[0]) + self.relations_created += int(stats[1].split(' '.encode())[0]) def report_completion(self, runtime): print("Construction of graph '%s' complete: %d nodes created, %d relations created in %f seconds" diff --git a/redisgraph_bulk_loader/relation_type.py b/redisgraph_bulk_loader/relation_type.py index e917e46..72d0753 100644 --- a/redisgraph_bulk_loader/relation_type.py +++ b/redisgraph_bulk_loader/relation_type.py @@ -47,7 +47,7 @@ def post_process_header_with_schema(self, header): def process_entities(self): entities_created = 0 - with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str) as reader: + with click.progressbar(self.reader, length=self.entities_count, label=self.entity_str, update_min_steps=100) as reader: for row in reader: self.validate_row(row) try: