-
Notifications
You must be signed in to change notification settings - Fork 106
/
Copy pathmongo_persister.rb
177 lines (149 loc) · 4.7 KB
/
mongo_persister.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
require 'mongo'
require 'uri'
require 'ghtorrent/adapters/base_adapter'
require 'ghtorrent/bson_orderedhash'
module GHTorrent
# A persistence adapter that saves data into a configurable MongoDB database.
class MongoPersister < GHTorrent::BaseAdapter
include GHTorrent::Settings
include GHTorrent::Logging
# Supported configuration options.
LOCALCONFIG = {
:mongo_host => "mongo.host",
:mongo_port => "mongo.port",
:mongo_db => "mongo.db",
:mongo_username => "mongo.username",
:mongo_passwd => "mongo.password",
:mongo_ssl => "mongo.ssl",
:mongo_replicas => "mongo.replicas"
}
IDXS = {
:events => %w(id),
:users => %w(login),
:commits => %w(sha),
:commit_comments => %w(commit_id id),
:repos => %w(name owner.login),
:repo_labels => %w(repo owner),
:repo_collaborators => %w(repo owner login),
:followers => %w(follows login),
:org_members => %w(org),
:watchers => %w(repo owner login),
:forks => %w(repo owner id),
:pull_requests => %w(repo owner number),
:pull_request_comments => %w(repo owner pullreq_id id),
:issues => %w(repo owner number),
:issue_events => %w(repo owner issue_id id),
:issue_comments => %w(repo owner issue_id id),
:geo_cache => %w(key),
:pull_request_commits => %w(sha),
:topics => %w(repo owner)
}
attr_reader :settings
# Creates a new instance of the MongoDB persistence adapter.
# Expects a parsed YAML settings document as input.
# Will create indexes on fields most frequently used in queries.
def initialize(set)
merge LOCALCONFIG
@settings = set
@uniq = config(:uniq_id)
end
def store(entity, data = {})
super
mongo[entity].insert_one(data).to_s
end
def find(entity, query = {})
super
mongo[entity].
find(query).
to_a.
map { |r| r.to_h }
end
# Count the number of items returned by +query+
def count(entity, query)
super
mongo[entity].count(:query => query)
end
def del(entity, query)
super
raise StandardError 'No filter was specified. Cowardly refusing to remove all entries' if query == {}
r = mongo[entity].delete_many(query)
r.n
end
def upsert(entity, query = {}, new_entry)
super
mongo[entity].find_one_and_update(query, {"$set" => new_entry}, :upsert => :true)
end
def get_underlying_connection
mongo
end
def close
unless @mongo.nil?
@mongo.disconnect!
@mongo = nil
end
end
private
def mongo
return @mongo.database unless @mongo.nil?
uname = config(:mongo_username)
passwd = config(:mongo_passwd)
host = config(:mongo_host)
port = config(:mongo_port)
db = config(:mongo_db)
replicas = config(:mongo_replicas)
hosts = if replicas.nil? then
["#{host}:#{port}"]
else
["#{host}:#{port}"] + replicas
end
ssl = case config(:mongo_ssl)
when 'true', 'True', 't', true
true
else
false
end
Mongo::Logger.logger.level = Logger::WARN
@mongo = Mongo::Client.new(hosts,
:database => db,
:password => passwd,
:user => uname,
:auth_source => 'admin',
:read => {
:mode => :secondary
},
:retry_reads => true,
:retry_writes => true,
:write_concern => {
:w => "majority",
:j => true
}
)
dbs = @mongo.list_databases
if dbs.find { |x| x['name'] == db }.nil?
init_db(@mongo.database)
end
@mongo.database
end
def init_db(mongo)
ENTITIES.each do |x|
if mongo.list_collections.find { |c| c['name'] == x.to_s }.nil?
STDERR.puts "Creating collection #{x}"
mongo[x].create
end
end
# Ensure that the necessary indexes exist
IDXS.each do |k, v|
col = mongo[k.intern]
name = v.join('_1_') + '_1'
exists = col.indexes.find { |k, v| k == name }
idx_fields = v.reduce({}) { |acc, x| acc.merge({x => 1}) }
if exists.nil?
col.indexes.create_one(idx_fields, :background => true)
STDERR.puts "Creating index on #{col}(#{v})"
else
STDERR.puts "Index on #{col}(#{v}) exists"
end
end
end
end
end