Skip to content

Commit

Permalink
update append triples function to catch errors and log them
Browse files Browse the repository at this point in the history
  • Loading branch information
syphax-bouazzouni committed Jan 17, 2025
1 parent 82e9e08 commit ed23245
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 34 deletions.
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GIT
remote: https://github.com/ontoportal-lirmm/sparql-client.git
revision: 59251e59346c9a69a67c88552ba55a1244eec602
revision: 24bccbd0f4a5150fa6ce2af50d7c378c681027ea
branch: development
specs:
sparql-client (3.2.2)
Expand Down Expand Up @@ -102,7 +102,7 @@ GEM
rexml (~> 3.2)
redis (5.3.0)
redis-client (>= 0.22.0)
redis-client (0.23.1)
redis-client (0.23.2)
connection_pool
request_store (1.7.0)
rack (>= 1.4)
Expand Down
88 changes: 56 additions & 32 deletions lib/goo/sparql/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class Client < RSPARQL::Client
"text/x-nquads" => "nquads"
}


def status_based_sleep_time(operation)
sleep(0.5)
st = self.status
Expand Down Expand Up @@ -43,65 +42,90 @@ def initialize(g, silent: false)
@caching_options = { :graph => @graph.to_s }
@silent = silent
end

def to_s
"DROP #{@silent ? 'SILENT' : ''} GRAPH <#{@graph.to_s}>"
end

def options
#Returns the caching option
# Returns the caching option
@caching_options
end
end

def bnodes_filter_file(file_path,mime_type)
def bnodes_filter_file(file_path, mime_type)
mime_type = "application/rdf+xml" if mime_type.nil?
format = MIMETYPE_RAPPER_MAP[mime_type]
if format.nil?
raise Exception, "mime_type #{mime_type} not supported in slicing"
end
dir = Dir.mktmpdir("file_nobnodes")
dst_path = File.join(dir,"data.nt")
dst_path_bnodes_out = File.join(dir,"data_no_bnodes.nt")
dst_path = File.join(dir, "data.nt")
dst_path_bnodes_out = File.join(dir, "data_no_bnodes.nt")
out_format = format == "nquads" ? "nquads" : "ntriples"
rapper_command_call = "rapper -i #{format} -o #{out_format} #{file_path} > #{dst_path}"
stdout,stderr,status = Open3.capture3(rapper_command_call)
stdout, stderr, status = Open3.capture3(rapper_command_call)
if not status.success?
raise Exception, "Rapper cannot parse #{format} file at #{file_path}: #{stderr}"
end
filter_command =
"LANG=C grep -v '_:genid' #{dst_path} > #{dst_path_bnodes_out}"
stdout,stderr,status = Open3.capture3(filter_command)
stdout, stderr, status = Open3.capture3(filter_command)
if not status.success?
raise Exception, "could not `#{filter_command}`: #{stderr}"
end
return dst_path_bnodes_out,dir
return dst_path_bnodes_out, dir
end

def delete_data_graph(graph)
Goo.sparql_update_client.update(DropGraph.new(graph, silent: Goo.backend_vo?))
end

def append_triples_no_bnodes(graph,file_path,mime_type_in)
bnodes_filter = nil
def append_triples_batch(graph, triples, mime_type_in, current_line = 0)
begin
puts "Appending triples in batch of #{triples.size} triples from line #{current_line}"
execute_append_request graph, triples.join, mime_type_in
rescue RestClient::Exception => e
puts "Error in appending triples request: #{e.response}"
if triples.size < 100
triples.each_with_index do |line, i|
begin
execute_append_request graph, line, mime_type_in
rescue RestClient::Exception => e
puts "Error in append request: #{e.response} line #{i + current_line}: #{line}"
end
end
else
half = triples.size / 2
append_triples_batch(graph, triples[0..half], mime_type_in, current_line)
append_triples_batch(graph, triples[half..-1], mime_type_in, current_line + half)
end

end
end

def append_triples_no_bnodes(graph, file_path, mime_type_in)
dir = nil
response = nil
if file_path.end_with?('ttl')
if file_path.end_with?('ttl') || file_path.end_with?('nt') || file_path.end_with?('n3')
bnodes_filter = file_path
else
bnodes_filter, dir = bnodes_filter_file(file_path, mime_type_in)
end
chunk_lines = 500_000 # number of line
chunk_lines = 10_000 # number of line
file = File.foreach(bnodes_filter)
lines = []
line_count = 0
file.each_entry do |line|
lines << line
if lines.size == chunk_lines
response = execute_append_request graph, lines.join, mime_type_in
response = append_triples_batch(graph, lines, mime_type_in, line_count)
line_count += lines.size
lines.clear
end
end

response = execute_append_request graph, lines.join, mime_type_in unless lines.empty?

response = append_triples_batch(graph, lines, mime_type_in, line_count) unless lines.empty?

unless dir.nil?
File.delete(bnodes_filter)
Expand All @@ -115,43 +139,43 @@ def append_triples_no_bnodes(graph,file_path,mime_type_in)
response
end

def append_data_triples(graph,data,mime_type)
def append_data_triples(graph, data, mime_type)
f = Tempfile.open('data_triple_store')
f.write(data)
f.close()
res = append_triples_no_bnodes(graph,f.path,mime_type)
res = append_triples_no_bnodes(graph, f.path, mime_type)
return res
end

def put_triples(graph,file_path,mime_type=nil)
def put_triples(graph, file_path, mime_type = nil)
delete_graph(graph)
result = append_triples_no_bnodes(graph,file_path,mime_type)
Goo.sparql_query_client.cache_invalidate_graph(graph)
result = append_triples_no_bnodes(graph, file_path, mime_type)
Goo.sparql_query_client.cache.invalidate(graph)
result
end

def append_triples(graph,data,mime_type=nil)
result = append_data_triples(graph,data,mime_type)
Goo.sparql_query_client.cache_invalidate_graph(graph)
def append_triples(graph, data, mime_type = nil)
result = append_data_triples(graph, data, mime_type)
Goo.sparql_query_client.cache.invalidate(graph)
result
end

def append_triples_from_file(graph,file_path,mime_type=nil)
def append_triples_from_file(graph, file_path, mime_type = nil)
if mime_type == "text/nquads" && !graph.instance_of?(Array)
raise Exception, "Nquads need a list of graphs, #{graph} provided"
end
result = append_triples_no_bnodes(graph,file_path,mime_type)
Goo.sparql_query_client.cache_invalidate_graph(graph)
result = append_triples_no_bnodes(graph, file_path, mime_type)
Goo.sparql_query_client.cache.invalidate(graph)
result
end

def delete_graph(graph)
result = delete_data_graph(graph)
Goo.sparql_query_client.cache_invalidate_graph(graph)
Goo.sparql_query_client.cache.invalidate(graph)
return result
end

def extract_number_from(i,text)
def extract_number_from(i, text)
res = []
while (text[i] != '<')
res << text[i]
Expand All @@ -167,7 +191,7 @@ def status
resp_text = nil

begin
resp_text = Net::HTTP.get(URI(status_url))
resp_text = Net::HTTP.get(URI(status_url))
rescue StandardError => e
resp[:exception] = "Error connecting to triple store: #{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}"
return resp
Expand All @@ -192,16 +216,16 @@ def params_for_backend(graph, data_file, mime_type_in, method = :post)
graph = "http://data.bogus.graph/uri"
end

params = {method: method, url: "#{url.to_s}", headers: {"content-type" => mime_type, "mime-type" => mime_type}, timeout: nil}
params = { method: method, url: "#{url.to_s}", headers: { "content-type" => mime_type, "mime-type" => mime_type }, timeout: nil }

if Goo.backend_4s?
params[:payload] = {
graph: graph.to_s,
data: data_file,
'mime-type' => mime_type
}
#for some reason \\\\ breaks parsing
params[:payload][:data] = params[:payload][:data].split("\n").map { |x| x.sub("\\\\","") }.join("\n")
# for some reason \\\\ breaks parsing
params[:payload][:data] = params[:payload][:data].split("\n").map { |x| x.sub("\\\\", "") }.join("\n")
elsif Goo.backend_vo?
params[:url] = "http://localhost:8890/sparql-graph-crud?graph=#{CGI.escape(graph.to_s)}"
params[:payload] = data_file
Expand Down

0 comments on commit ed23245

Please sign in to comment.