Skip to content

Commit

Permalink
application startup should fail if initial download of a single list …
Browse files Browse the repository at this point in the history
…failed (#310) (#313)

application startup should fail if initial download of a single list failed
  • Loading branch information
kwitsch authored Oct 13, 2021
1 parent c22292e commit e5b44f4
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func startServer(_ *cobra.Command, _ []string) {
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

srv, err := server.NewServer(config.GetConfig())
util.FatalOnError("cant start server: ", err)
util.FatalOnError("cant start server: ", err...)

srv.Start()

Expand Down
96 changes: 62 additions & 34 deletions lists/list_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (b *ListCache) Configuration() (result []string) {

// NewListCache creates new list instance
func NewListCache(t ListCacheType, groupToLinks map[string][]string, refreshPeriod time.Duration,
downloadTimeout time.Duration) *ListCache {
downloadTimeout time.Duration) (*ListCache, []error) {
groupCaches := make(map[string]cache)

b := &ListCache{
Expand All @@ -91,11 +91,11 @@ func NewListCache(t ListCacheType, groupToLinks map[string][]string, refreshPeri
downloadTimeout: downloadTimeout,
listType: t,
}
b.Refresh()

go periodicUpdate(b)

return b
initError := b.refresh(true)
if len(initError) == 0 {
go periodicUpdate(b)
}
return b, initError
}

// periodicUpdate triggers periodical refresh (and download) of list entries
Expand All @@ -115,12 +115,17 @@ func logger() *logrus.Entry {
return log.PrefixedLog("list_cache")
}

type groupCache struct {
cache []string
errors []error
}

// downloads and reads files with domain names and creates cache for them
func (b *ListCache) createCacheForGroup(links []string) cache {
func (b *ListCache) createCacheForGroup(links []string) (cache, []error) {
var wg sync.WaitGroup

c := make(chan []string, len(links))

c := make(chan groupCache, len(links))
err := []error{}
// loop over links (http/local) or inline definitions
for _, link := range links {
wg.Add(1)
Expand All @@ -136,10 +141,13 @@ Loop:
for {
select {
case res := <-c:
if res == nil {
return nil
if len(res.errors) > 0 {
err = append(err, res.errors...)
}
if res.cache == nil {
return nil, err
}
for _, entry := range res {
for _, entry := range res.cache {
factory.addEntry(entry)
}
default:
Expand All @@ -148,7 +156,7 @@ Loop:
}
}

return factory.create()
return factory.create(), err
}

// Match matches passed domain name against cached list entries
Expand All @@ -167,24 +175,37 @@ func (b *ListCache) Match(domain string, groupsToCheck []string) (found bool, gr

// Refresh triggers the refresh of a list
func (b *ListCache) Refresh() {
b.refresh(false)
}
func (b *ListCache) refresh(init bool) []error {
res := []error{}
for group, links := range b.groupToLinks {
cacheForGroup := b.createCacheForGroup(links)

cacheForGroup, errors := b.createCacheForGroup(links)
if len(errors) > 0 {
res = append(res, errors...)
}
if cacheForGroup != nil {
b.lock.Lock()
b.groupCaches[group] = cacheForGroup
b.lock.Unlock()
} else {
logger().Warn("Populating of group cache failed, leaving items from last successful download in cache")
if init {
msg := "Populating group cache failed for group " + group
logger().Warn(msg)
} else {
logger().Warn("Populating of group cache failed, leaving items from last successful download in cache")
}
}
if b.groupCaches[group] != nil {
evt.Bus().Publish(evt.BlockingCacheGroupChanged, b.listType, group, b.groupCaches[group].elementCount())

evt.Bus().Publish(evt.BlockingCacheGroupChanged, b.listType, group, b.groupCaches[group].elementCount())

logger().WithFields(logrus.Fields{
"group": group,
"total_count": b.groupCaches[group].elementCount(),
}).Info("group import finished")
logger().WithFields(logrus.Fields{
"group": group,
"total_count": b.groupCaches[group].elementCount(),
}).Info("group import finished")
}
}
return res
}

func (b *ListCache) downloadFile(link string) (io.ReadCloser, error) {
Expand All @@ -205,22 +226,27 @@ func (b *ListCache) downloadFile(link string) (io.ReadCloser, error) {
if resp, err = client.Get(link); err == nil {
if resp.StatusCode == http.StatusOK {
return resp.Body, nil
} else {
logger().WithField("link", link).WithField("attempt",
attempt).Warnf("Got status code %d", resp.StatusCode)
}

_ = resp.Body.Close()

return nil, fmt.Errorf("couldn't download url '%s', got status code %d", link, resp.StatusCode)
err = fmt.Errorf("couldn't download url '%s', got status code %d", link, resp.StatusCode)
}

var netErr net.Error
var dnsErr *net.DNSError
if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) {
logger().WithField("link", link).WithField("attempt",
attempt).Warnf("Temporary network error / Timeout occurred, retrying... %s", netErr)
time.Sleep(time.Second)
attempt++
} else {
return nil, err
} else if errors.As(err, &dnsErr) {
logger().WithField("link", link).WithField("attempt",
attempt).Warnf("Name resolution error, retrying... %s", dnsErr.Err)
}
time.Sleep(time.Second)
attempt++
}

return nil, err
Expand All @@ -234,10 +260,13 @@ func readFile(file string) (io.ReadCloser, error) {
}

// downloads file (or reads local file) and writes file content as string array in the channel
func (b *ListCache) processFile(link string, ch chan<- []string, wg *sync.WaitGroup) {
func (b *ListCache) processFile(link string, ch chan<- groupCache, wg *sync.WaitGroup) {
defer wg.Done()

result := make([]string, 0)
result := groupCache{
cache: []string{},
errors: []error{},
}

var r io.ReadCloser

Expand All @@ -257,14 +286,13 @@ func (b *ListCache) processFile(link string, ch chan<- []string, wg *sync.WaitGr

if err != nil {
logger().Warn("error during file processing: ", err)

result.errors = append(result.errors, err)
var netErr net.Error
if errors.As(err, &netErr) && (netErr.Timeout() || netErr.Temporary()) {
// put nil to indicate the temporary error
ch <- nil
return
result.cache = nil
}
ch <- []string{}
ch <- result

return
}
Expand All @@ -278,7 +306,7 @@ func (b *ListCache) processFile(link string, ch chan<- []string, wg *sync.WaitGr
line := strings.TrimSpace(scanner.Text())
// skip comments
if line := processLine(line); line != "" {
result = append(result, line)
result.cache = append(result.cache, line)

count++
}
Expand Down
26 changes: 13 additions & 13 deletions lists/list_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("ListCache", func() {
lists := map[string][]string{
"gr0": {emptyFile.Name()},
}
sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)

found, group := sut.Match("", []string{"gr0"})
Expect(found).Should(BeFalse())
Expand All @@ -59,7 +59,7 @@ var _ = Describe("ListCache", func() {
lists := map[string][]string{
"gr1": {emptyFile.Name()},
}
sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)

found, group := sut.Match("google.com", []string{"gr1"})
Expect(found).Should(BeFalse())
Expand All @@ -85,7 +85,7 @@ var _ = Describe("ListCache", func() {
"gr1": {s.URL},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 100*time.Millisecond)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 100*time.Millisecond)
time.Sleep(time.Second)
found, group := sut.Match("blocked1.com", []string{"gr1"})
Expect(found).Should(BeTrue())
Expand All @@ -111,7 +111,7 @@ var _ = Describe("ListCache", func() {
"gr1": {s.URL, emptyFile.Name()},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 4*time.Hour, 100*time.Millisecond)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 4*time.Hour, 100*time.Millisecond)
time.Sleep(time.Second)
By("Lists loaded without timeout", func() {
found, group := sut.Match("blocked1.com", []string{"gr1"})
Expand Down Expand Up @@ -147,7 +147,7 @@ var _ = Describe("ListCache", func() {
"gr1": {s.URL},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
time.Sleep(time.Second)
By("Lists loaded without error", func() {
found, group := sut.Match("blocked1.com", []string{"gr1"})
Expand All @@ -171,7 +171,7 @@ var _ = Describe("ListCache", func() {
"gr2": {server3.URL},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)

found, group := sut.Match("blocked1.com", []string{"gr1", "gr2"})
Expect(found).Should(BeTrue())
Expand All @@ -192,7 +192,7 @@ var _ = Describe("ListCache", func() {
"withDeadLink": {"http://wrong.host.name"},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)

found, group := sut.Match("blocked1.com", []string{})
Expect(found).Should(BeFalse())
Expand All @@ -211,7 +211,7 @@ var _ = Describe("ListCache", func() {
resultCnt = cnt
})

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 30*time.Second)

found, group := sut.Match("blocked1.com", []string{})
Expect(found).Should(BeFalse())
Expand All @@ -226,7 +226,7 @@ var _ = Describe("ListCache", func() {
"gr2": {"file://" + file3.Name()},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)

found, group := sut.Match("blocked1.com", []string{"gr1", "gr2"})
Expect(found).Should(BeTrue())
Expand All @@ -247,7 +247,7 @@ var _ = Describe("ListCache", func() {
"gr1": {"inlinedomain1.com\n#some comment\n#inlinedomain2.com"},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)

found, group := sut.Match("inlinedomain1.com", []string{"gr1"})
Expect(found).Should(BeTrue())
Expand All @@ -264,7 +264,7 @@ var _ = Describe("ListCache", func() {
"gr1": {"/^apple\\.(de|com)$/\n"},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)

found, group := sut.Match("apple.com", []string{"gr1"})
Expect(found).Should(BeTrue())
Expand All @@ -284,7 +284,7 @@ var _ = Describe("ListCache", func() {
"gr2": {"inline\ndefinition\n"},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, 0, 0)

c := sut.Configuration()
Expect(c).Should(HaveLen(11))
Expand All @@ -296,7 +296,7 @@ var _ = Describe("ListCache", func() {
"gr1": {"file1", "file2"},
}

sut := NewListCache(ListCacheTypeBlacklist, lists, -1, 0)
sut, _ := NewListCache(ListCacheTypeBlacklist, lists, -1, 0)

c := sut.Configuration()
Expect(c).Should(ContainElement("refresh: disabled"))
Expand Down
15 changes: 11 additions & 4 deletions resolver/blocking_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,21 @@ type BlockingResolver struct {
}

// NewBlockingResolver returns a new configured instance of the resolver
func NewBlockingResolver(cfg config.BlockingConfig) ChainedResolver {
func NewBlockingResolver(cfg config.BlockingConfig) (ChainedResolver, []error) {
blockHandler := createBlockHandler(cfg)
refreshPeriod := time.Duration(cfg.RefreshPeriod)
timeout := time.Duration(cfg.DownloadTimeout)
blacklistMatcher := lists.NewListCache(lists.ListCacheTypeBlacklist, cfg.BlackLists, refreshPeriod, timeout)
whitelistMatcher := lists.NewListCache(lists.ListCacheTypeWhitelist, cfg.WhiteLists, refreshPeriod, timeout)
blacklistMatcher, blErr := lists.NewListCache(lists.ListCacheTypeBlacklist, cfg.BlackLists, refreshPeriod, timeout)
whitelistMatcher, wlErr := lists.NewListCache(lists.ListCacheTypeWhitelist, cfg.WhiteLists, refreshPeriod, timeout)
whitelistOnlyGroups := determineWhitelistOnlyGroups(&cfg)

var initErrors []error
if len(blErr) > 0 {
initErrors = append(initErrors, blErr...)
}
if len(wlErr) > 0 {
initErrors = append(initErrors, wlErr...)
}
res := &BlockingResolver{
blockHandler: blockHandler,
cfg: cfg,
Expand All @@ -100,7 +107,7 @@ func NewBlockingResolver(cfg config.BlockingConfig) ChainedResolver {
},
}

return res
return res, initErrors
}

// RefreshLists triggers the refresh of all black and white lists in the cache
Expand Down
8 changes: 5 additions & 3 deletions resolver/blocking_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ badcnamedomain.com`)
JustBeforeEach(func() {
m = &resolverMock{}
m.On("Resolve", mock.Anything).Return(&Response{Res: mockAnswer}, nil)
sut = NewBlockingResolver(sutConfig).(*BlockingResolver)
tmp, _ := NewBlockingResolver(sutConfig)
sut = tmp.(*BlockingResolver)
sut.Next(m)
sut.RefreshLists()
})
Expand Down Expand Up @@ -90,7 +91,8 @@ badcnamedomain.com`)
Expect(err).Should(Succeed())

// recreate to trigger a reload
sut = NewBlockingResolver(sutConfig).(*BlockingResolver)
tmp, _ := NewBlockingResolver(sutConfig)
sut = tmp.(*BlockingResolver)

time.Sleep(time.Second)

Expand Down Expand Up @@ -769,7 +771,7 @@ badcnamedomain.com`)

Log().ExitFunc = func(int) { fatal = true }

_ = NewBlockingResolver(config.BlockingConfig{
_, _ = NewBlockingResolver(config.BlockingConfig{
BlockType: "wrong",
})

Expand Down
Loading

0 comments on commit e5b44f4

Please sign in to comment.