This repository has been archived by the owner on Jul 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathblacklist.go
239 lines (212 loc) · 6.75 KB
/
blacklist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package blacklist
import (
"sync"
"time"
"github.com/activecm/rita-bl/database"
"github.com/activecm/rita-bl/list"
"github.com/activecm/rita-bl/sources/rpc"
)
type (
//Blacklist is the main controller for rita-blacklist
Blacklist struct {
db database.Handle
lists []list.List
rpcs map[list.BlacklistedEntryType][]rpc.RPC
errorHandler func(error)
}
)
//NewBlacklist creates a new blacklist controller and connects to the
//backing database
func NewBlacklist(db database.Handle, errorHandler func(error)) *Blacklist {
return &Blacklist{
db: db,
lists: make([]list.List, 0),
rpcs: make(map[list.BlacklistedEntryType][]rpc.RPC),
errorHandler: errorHandler,
}
}
//SetLists loads a set of blacklist sources into the blacklist controller
func (b *Blacklist) SetLists(l ...list.List) {
b.lists = l
}
//SetRPCs takes in a remote procedure calls for checking the index of the
//given entryType. This is meant for querying web services and outside programs.
//These functions will be ran when CheckEntries is called.
func (b *Blacklist) SetRPCs(rpcs ...rpc.RPC) {
for _, call := range rpcs {
b.rpcs[call.GetType()] = append(b.rpcs[call.GetType()], call)
}
}
//Update updates the blacklist database with the latest information pulled
//from the registered sources
func (b *Blacklist) Update() {
//handle errors
finishedProcessingErrors := make(chan struct{})
errorChannel := createErrorChannel(b.errorHandler, finishedProcessingErrors)
defer func() { <-finishedProcessingErrors }()
defer close(errorChannel)
//get the existing lists from the db
remoteMetas, err := b.db.GetRegisteredLists()
if err != nil {
errorChannel <- err
return
}
//get the lists to remove from the db
metasToRemove := getListsToRemove(b.lists, remoteMetas)
for _, metaToRemove := range metasToRemove {
err = b.db.RemoveList(metaToRemove)
if err != nil {
errorChannel <- err
continue
}
}
existingLists, listsToAdd := findExistingLists(b.lists, remoteMetas)
updateExistingLists(existingLists, b.db, errorChannel)
createNewLists(listsToAdd, b.db, errorChannel)
}
//CheckEntries checks entries of different types against the blacklist database
func (b *Blacklist) CheckEntries(entryType list.BlacklistedEntryType, indexes ...string) map[string][]database.BlacklistResult {
results := make(map[string][]database.BlacklistResult)
for _, index := range indexes {
//check against cached blacklists
entries, err := b.db.FindEntries(entryType, index)
if err != nil {
b.errorHandler(err)
continue
}
results[index] = entries
}
//run remote procedure calls
for _, rpc := range b.rpcs[entryType] {
//get the results from this check on all of the indexes
rpcResults, err := rpc.Check(indexes...)
if err != nil {
b.errorHandler(err)
continue
}
//add the results to the overall results
for index, entries := range rpcResults {
results[index] = append(results[index], entries)
}
}
return results
}
func createErrorChannel(errHandler func(error), finished chan<- struct{}) chan<- error {
errorChannel := make(chan error)
go func(errHandler func(error), errors <-chan error, finished chan<- struct{}) {
for err := range errorChannel {
errHandler(err)
}
var fin struct{}
finished <- fin
}(errHandler, errorChannel, finished)
return errorChannel
}
//getListsToRemove finds lists that are in remoteMetas that aren't in loadedLists
func getListsToRemove(loadedLists []list.List, remoteMetas []list.Metadata) []list.Metadata {
var metasToRemove []list.Metadata
for _, remoteMeta := range remoteMetas {
found := false
for _, loadedList := range loadedLists {
if remoteMeta.Name == loadedList.GetMetadata().Name {
found = true
break
}
}
if !found {
metasToRemove = append(metasToRemove, remoteMeta)
}
}
return metasToRemove
}
//findExistingLists returns the lists which are in remoteMetas and the lists
//that are not, in that order. Note: this function has the side effect of
//loading in the metadata LastUpdate field from the database into the loaded lists
func findExistingLists(loadedLists []list.List, remoteMetas []list.Metadata) ([]list.List, []list.List) {
var existingLists []list.List
var listsToAdd []list.List
for _, loadedList := range loadedLists {
//load in the remote metadata into the loaded list if found
var foundMeta *list.Metadata
for _, remoteMeta := range remoteMetas {
if loadedList.GetMetadata().Name == remoteMeta.Name {
foundMeta = &remoteMeta
break
}
}
if foundMeta != nil {
// update the local metadata with fields stored in DB
localMeta := loadedList.GetMetadata()
localMeta.LastUpdate = foundMeta.LastUpdate
loadedList.SetMetadata(localMeta)
existingLists = append(existingLists, loadedList)
} else {
listsToAdd = append(listsToAdd, loadedList)
}
}
return existingLists, listsToAdd
}
func updateExistingLists(existingLists []list.List,
dbHandle database.Handle, errorsOut chan<- error) {
for _, existingList := range existingLists {
meta := existingList.GetMetadata()
if list.ShouldFetch(meta) {
//delete all existing entries and re-add the list
err := dbHandle.ClearCache(meta)
if err != nil {
errorsOut <- err
continue
}
//kick off fetching in a new thread
entryMap := list.FetchAndValidateEntries(existingList, errorsOut)
wg := new(sync.WaitGroup)
for entryType, entryChannel := range entryMap {
wg.Add(1)
go dbHandle.InsertEntries(entryType, entryChannel, wg, errorsOut)
}
//InsertEntries only finishes if FetchAndValidateEntries finishes
wg.Wait()
meta.LastUpdate = time.Now().Unix()
err = dbHandle.UpdateListMetadata(meta)
if err != nil {
errorsOut <- err
continue
}
}
}
}
func createNewLists(listsToAdd []list.List,
dbHandle database.Handle, errorsOut chan<- error) {
for _, listToAdd := range listsToAdd {
meta := listToAdd.GetMetadata()
if list.ShouldFetch(listToAdd.GetMetadata()) {
//register the list, create, and index the new collections
//set the cache to invalid so if the code errors,
//the code will reimport it
preWriteMetaCopy := meta
preWriteMetaCopy.LastUpdate = 0
preWriteMetaCopy.CacheTime = 0
err := dbHandle.RegisterList(preWriteMetaCopy)
if err != nil {
errorsOut <- err
continue
}
//kick off fetching in a new thread
entryMap := list.FetchAndValidateEntries(listToAdd, errorsOut)
wg := new(sync.WaitGroup)
for entryType, entryChannel := range entryMap {
wg.Add(1)
go dbHandle.InsertEntries(entryType, entryChannel, wg, errorsOut)
}
//InsertEntries only finishes if FetchAndValidateEntries finishes
wg.Wait()
//set the cache to valid
meta.LastUpdate = time.Now().Unix()
err = dbHandle.UpdateListMetadata(meta)
if err != nil {
errorsOut <- err
continue
}
}
}
}