From ed232458b7292b2fae1a2ac071ef10942ee3826d Mon Sep 17 00:00:00 2001 From: Syphax Date: Fri, 17 Jan 2025 09:27:01 +0100 Subject: [PATCH] update append triples function to catch errors and log them --- Gemfile.lock | 4 +- lib/goo/sparql/client.rb | 88 +++++++++++++++++++++++++--------------- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 706b5eef..3fe30043 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,6 +1,6 @@ GIT remote: https://github.com/ontoportal-lirmm/sparql-client.git - revision: 59251e59346c9a69a67c88552ba55a1244eec602 + revision: 24bccbd0f4a5150fa6ce2af50d7c378c681027ea branch: development specs: sparql-client (3.2.2) @@ -102,7 +102,7 @@ GEM rexml (~> 3.2) redis (5.3.0) redis-client (>= 0.22.0) - redis-client (0.23.1) + redis-client (0.23.2) connection_pool request_store (1.7.0) rack (>= 1.4) diff --git a/lib/goo/sparql/client.rb b/lib/goo/sparql/client.rb index cf958398..f442ff30 100644 --- a/lib/goo/sparql/client.rb +++ b/lib/goo/sparql/client.rb @@ -14,7 +14,6 @@ class Client < RSPARQL::Client "text/x-nquads" => "nquads" } - def status_based_sleep_time(operation) sleep(0.5) st = self.status @@ -43,65 +42,90 @@ def initialize(g, silent: false) @caching_options = { :graph => @graph.to_s } @silent = silent end + def to_s "DROP #{@silent ? 'SILENT' : ''} GRAPH <#{@graph.to_s}>" end + def options - #Returns the caching option + # Returns the caching option @caching_options end end - def bnodes_filter_file(file_path,mime_type) + def bnodes_filter_file(file_path, mime_type) mime_type = "application/rdf+xml" if mime_type.nil? format = MIMETYPE_RAPPER_MAP[mime_type] if format.nil? raise Exception, "mime_type #{mime_type} not supported in slicing" end dir = Dir.mktmpdir("file_nobnodes") - dst_path = File.join(dir,"data.nt") - dst_path_bnodes_out = File.join(dir,"data_no_bnodes.nt") + dst_path = File.join(dir, "data.nt") + dst_path_bnodes_out = File.join(dir, "data_no_bnodes.nt") out_format = format == "nquads" ? "nquads" : "ntriples" rapper_command_call = "rapper -i #{format} -o #{out_format} #{file_path} > #{dst_path}" - stdout,stderr,status = Open3.capture3(rapper_command_call) + stdout, stderr, status = Open3.capture3(rapper_command_call) if not status.success? raise Exception, "Rapper cannot parse #{format} file at #{file_path}: #{stderr}" end filter_command = "LANG=C grep -v '_:genid' #{dst_path} > #{dst_path_bnodes_out}" - stdout,stderr,status = Open3.capture3(filter_command) + stdout, stderr, status = Open3.capture3(filter_command) if not status.success? raise Exception, "could not `#{filter_command}`: #{stderr}" end - return dst_path_bnodes_out,dir + return dst_path_bnodes_out, dir end def delete_data_graph(graph) Goo.sparql_update_client.update(DropGraph.new(graph, silent: Goo.backend_vo?)) end - def append_triples_no_bnodes(graph,file_path,mime_type_in) - bnodes_filter = nil + def append_triples_batch(graph, triples, mime_type_in, current_line = 0) + begin + puts "Appending triples in batch of #{triples.size} triples from line #{current_line}" + execute_append_request graph, triples.join, mime_type_in + rescue RestClient::Exception => e + puts "Error in appending triples request: #{e.response}" + if triples.size < 100 + triples.each_with_index do |line, i| + begin + execute_append_request graph, line, mime_type_in + rescue RestClient::Exception => e + puts "Error in append request: #{e.response} line #{i + current_line}: #{line}" + end + end + else + half = triples.size / 2 + append_triples_batch(graph, triples[0..half], mime_type_in, current_line) + append_triples_batch(graph, triples[half..-1], mime_type_in, current_line + half) + end + + end + end + + def append_triples_no_bnodes(graph, file_path, mime_type_in) dir = nil response = nil - if file_path.end_with?('ttl') + if file_path.end_with?('ttl') || file_path.end_with?('nt') || file_path.end_with?('n3') bnodes_filter = file_path else bnodes_filter, dir = bnodes_filter_file(file_path, mime_type_in) end - chunk_lines = 500_000 # number of line + chunk_lines = 10_000 # number of line file = File.foreach(bnodes_filter) lines = [] + line_count = 0 file.each_entry do |line| lines << line if lines.size == chunk_lines - response = execute_append_request graph, lines.join, mime_type_in + response = append_triples_batch(graph, lines, mime_type_in, line_count) + line_count += lines.size lines.clear end end - response = execute_append_request graph, lines.join, mime_type_in unless lines.empty? - + response = append_triples_batch(graph, lines, mime_type_in, line_count) unless lines.empty? unless dir.nil? File.delete(bnodes_filter) @@ -115,43 +139,43 @@ def append_triples_no_bnodes(graph,file_path,mime_type_in) response end - def append_data_triples(graph,data,mime_type) + def append_data_triples(graph, data, mime_type) f = Tempfile.open('data_triple_store') f.write(data) f.close() - res = append_triples_no_bnodes(graph,f.path,mime_type) + res = append_triples_no_bnodes(graph, f.path, mime_type) return res end - def put_triples(graph,file_path,mime_type=nil) + def put_triples(graph, file_path, mime_type = nil) delete_graph(graph) - result = append_triples_no_bnodes(graph,file_path,mime_type) - Goo.sparql_query_client.cache_invalidate_graph(graph) + result = append_triples_no_bnodes(graph, file_path, mime_type) + Goo.sparql_query_client.cache.invalidate(graph) result end - def append_triples(graph,data,mime_type=nil) - result = append_data_triples(graph,data,mime_type) - Goo.sparql_query_client.cache_invalidate_graph(graph) + def append_triples(graph, data, mime_type = nil) + result = append_data_triples(graph, data, mime_type) + Goo.sparql_query_client.cache.invalidate(graph) result end - def append_triples_from_file(graph,file_path,mime_type=nil) + def append_triples_from_file(graph, file_path, mime_type = nil) if mime_type == "text/nquads" && !graph.instance_of?(Array) raise Exception, "Nquads need a list of graphs, #{graph} provided" end - result = append_triples_no_bnodes(graph,file_path,mime_type) - Goo.sparql_query_client.cache_invalidate_graph(graph) + result = append_triples_no_bnodes(graph, file_path, mime_type) + Goo.sparql_query_client.cache.invalidate(graph) result end def delete_graph(graph) result = delete_data_graph(graph) - Goo.sparql_query_client.cache_invalidate_graph(graph) + Goo.sparql_query_client.cache.invalidate(graph) return result end - def extract_number_from(i,text) + def extract_number_from(i, text) res = [] while (text[i] != '<') res << text[i] @@ -167,7 +191,7 @@ def status resp_text = nil begin - resp_text = Net::HTTP.get(URI(status_url)) + resp_text = Net::HTTP.get(URI(status_url)) rescue StandardError => e resp[:exception] = "Error connecting to triple store: #{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}" return resp @@ -192,7 +216,7 @@ def params_for_backend(graph, data_file, mime_type_in, method = :post) graph = "http://data.bogus.graph/uri" end - params = {method: method, url: "#{url.to_s}", headers: {"content-type" => mime_type, "mime-type" => mime_type}, timeout: nil} + params = { method: method, url: "#{url.to_s}", headers: { "content-type" => mime_type, "mime-type" => mime_type }, timeout: nil } if Goo.backend_4s? params[:payload] = { @@ -200,8 +224,8 @@ def params_for_backend(graph, data_file, mime_type_in, method = :post) data: data_file, 'mime-type' => mime_type } - #for some reason \\\\ breaks parsing - params[:payload][:data] = params[:payload][:data].split("\n").map { |x| x.sub("\\\\","") }.join("\n") + # for some reason \\\\ breaks parsing + params[:payload][:data] = params[:payload][:data].split("\n").map { |x| x.sub("\\\\", "") }.join("\n") elsif Goo.backend_vo? params[:url] = "http://localhost:8890/sparql-graph-crud?graph=#{CGI.escape(graph.to_s)}" params[:payload] = data_file