Skip to content

Commit

Permalink
optimize index all data in Virtuoso and GraphDb by pre-fetching all ids
Browse files Browse the repository at this point in the history
- Before optimization
    - fs ⇒ 15.224490000051446s
    - ag ⇒ 19.238805999979377s
    - vo ⇒ 42.95274499990046s
    - gb ⇒ 33.52821200003382s
- After optimization
    - fs ⇒ 15.369778999942355s
    - ag ⇒ 17.367580000078306s
    - vo ⇒ 16.564614000031725s
    - gb ⇒ 15.431716999970376s
  • Loading branch information
syphax-bouazzouni committed Mar 3, 2024
1 parent 1274090 commit 46ecccb
Showing 1 changed file with 82 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,80 @@ def self.included(base)
base.extend(ClassMethods)
end

def index_sorted_ids(ids, ontology, conn, logger, commit = true)
total_triples = Parallel.map(ids.each_slice(100), in_threads: 10) do |ids_slice|
index_ids = 0
triples_count = 0
documents = {}
time = Benchmark.realtime do
documents, triples_count = fetch_triples(ids_slice, ontology)
end

return if documents.empty?

logger.info("Worker #{Parallel.worker_number} > Fetched #{triples_count} triples of #{id} in #{time} sec.") if triples_count.positive?

time = Benchmark.realtime do
conn.index_document(documents.values, commit: false)
conn.index_commit if commit
index_ids = documents.size
documents = {}
end
logger.info("Worker #{Parallel.worker_number} > Indexed #{index_ids} ids of #{id} in #{time} sec. Total #{documents.size} ids.")
triples_count
end
total_triples.sum
end

def index_all_data(logger, commit = true)
page = 1
size = 1000
count_ids = 0
total_time = 0
total_triples = 0
old_count = -1

ontology = self.bring(:ontology).ontology
.bring(:acronym).acronym
conn = init_search_collection(ontology)

ids = {}

while count_ids != old_count
old_count = count_ids
count = 0
time = Benchmark.realtime do
ids = fetch_sorted_ids(size, page)
count = ids.size
end

count_ids += count
total_time += time
page += 1

next unless count.positive?

logger.info("Fetched #{count} ids of #{id} page: #{page} in #{time} sec.")

total_triples += index_sorted_ids(ids, ontology, conn, logger, commit)

end
logger.info("Completed indexing all ontology data: #{self.id} in #{total_time} sec. (#{count_ids} ids / #{total_triples} triples)")
logger.flush
end

private

def fetch_sorted_ids(size, page)
query = Goo.sparql_query_client.select(:id)
.distinct
.from(RDF::URI.new(self.id))
.where(%i[id p v])
.limit(size)
.offset((page - 1) * size)

query.each_solution.map(&:id).sort
end

def update_doc(doc, property, new_val)
unescaped_prop = property.gsub('___', '://')
Expand All @@ -42,31 +116,26 @@ def update_doc(doc, property, new_val)
doc
end



def init_search_collection(ontology)
self.class.clear_indexed_content(ontology)
end


def fetch_triples(ids, ontology, page, size, all_ids)
def fetch_triples(ids_slice, ontology)
documents = {}
count = 0
filter = ids_slice.map { |x| "?id = <#{x}>" }.join(' || ')
query = Goo.sparql_query_client.select(:id, :p, :v)
.distinct
.from(RDF::URI.new(self.id))
.where(%i[id p v])
.limit(size)
.offset((page - 1) * size)
count = 0
.filter(filter)
query.each_solution do |sol|
count += 1
all_ids << sol[:id].to_s
doc = ids[sol[:id].to_s]
doc = documents[sol[:id].to_s]
doc ||= {
id: "#{sol[:id]}_#{ontology}", submission_id_t: self.id.to_s,
ontology_t: ontology, resource_model: self.class.model_name,
resource_id: sol[:id].to_s
}

property = sol[:p].to_s
value = sol[:v]

Expand All @@ -75,110 +144,11 @@ def fetch_triples(ids, ontology, page, size, all_ids)
else
update_doc(doc, property, value)
end
ids[sol[:id].to_s] = doc
documents[sol[:id].to_s] = doc
end
count
[documents, count]
end

def index_ids(ids, indexed_ids, conn)
new_to_index = []
already_indexed = {}
ids.each do |k, doc|
if indexed_ids.include?(k)
already_indexed[k.to_s] = doc
else
indexed_ids << k
new_to_index << doc
end
end

conn.index_document(new_to_index, commit: false)
new_to_index = new_to_index.size
new_to_index += Parallel.map(already_indexed.each_slice(1000), in_threads: 10) do |indexed|
to_index = fetch_index_documents(indexed, conn)
conn.index_document(to_index, commit: false)
to_index.size
end.sum
new_to_index
end

def fetch_index_documents(indexed, conn)
indexed = indexed.to_h
response = conn.submit_search_query('*', { fq: indexed.keys.map { |x| "resource_id:\"#{x}\"" }.join(' OR '),
rows: indexed.size })

response['response']['docs'].each do |old_doc|
id = old_doc['resource_id'].to_s

old_doc.each do |k, v|
next if %w[submission_id_t ontology_t].include?(k)

if k.end_with?('_t')
prop = k.split('_t').first
elsif k.end_with?('_txt')
prop = k.split('_txt').first
else
next
end
update_doc(indexed[id], prop, v)
end
end
indexed.values
end

def index_all_data(logger, commit = true, optimize = true)
page = 1
size = 10000
count_ids = 0
total_time = 0
old_count = -1
ontology = self.bring(:ontology).ontology
.bring(:acronym).acronym

conn = init_search_collection(ontology)

indexed_ids = Set.new
all_ids = Set.new
index_ids = 0
ids = {}

while count_ids != old_count
old_count = count_ids
count = 0
time = Benchmark.realtime do
count = fetch_triples(ids, ontology, page, size, all_ids)
end

logger.info("Fetched #{count} triples of #{id} page: #{page} in #{time} sec.") if count.positive?

count_ids += count
total_time += time

if ids.size >= 100
time = Benchmark.realtime do
index_ids(ids, indexed_ids, conn)
conn.index_commit if commit
index_ids = ids.size
ids = {}
end
logger.info("Index #{index_ids} ids of #{id} in #{time} sec. Total #{all_ids.size} ids.")
total_time += time
end

page += 1
end

unless ids.empty?
time = Benchmark.realtime do
index_ids(ids, indexed_ids, conn)
conn.index_commit if commit
end
logger.info("Index #{index_ids} ids of #{id} in #{time} sec. Total #{all_ids.size} ids.")
total_time += time
end
logger.info("Completed indexing all ontology data: #{self.id} in #{total_time} sec. #{all_ids.size} ids.")
logger.flush
end
end
end
end
Expand Down

0 comments on commit 46ecccb

Please sign in to comment.