Skip to content

Commit

Permalink
Cache refresh memory consumption optimization (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xERR0R committed May 12, 2022
1 parent 6772438 commit e471585
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 90 deletions.
21 changes: 11 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,17 @@ type ConditionalUpstreamMapping struct {

// BlockingConfig configuration for query blocking
type BlockingConfig struct {
BlackLists map[string][]string `yaml:"blackLists"`
WhiteLists map[string][]string `yaml:"whiteLists"`
ClientGroupsBlock map[string][]string `yaml:"clientGroupsBlock"`
BlockType string `yaml:"blockType" default:"ZEROIP"`
BlockTTL Duration `yaml:"blockTTL" default:"6h"`
DownloadTimeout Duration `yaml:"downloadTimeout" default:"60s"`
DownloadAttempts uint `yaml:"downloadAttempts" default:"3"`
DownloadCooldown Duration `yaml:"downloadCooldown" default:"1s"`
RefreshPeriod Duration `yaml:"refreshPeriod" default:"4h"`
FailStartOnListError bool `yaml:"failStartOnListError" default:"false"`
BlackLists map[string][]string `yaml:"blackLists"`
WhiteLists map[string][]string `yaml:"whiteLists"`
ClientGroupsBlock map[string][]string `yaml:"clientGroupsBlock"`
BlockType string `yaml:"blockType" default:"ZEROIP"`
BlockTTL Duration `yaml:"blockTTL" default:"6h"`
DownloadTimeout Duration `yaml:"downloadTimeout" default:"60s"`
DownloadAttempts uint `yaml:"downloadAttempts" default:"3"`
DownloadCooldown Duration `yaml:"downloadCooldown" default:"1s"`
RefreshPeriod Duration `yaml:"refreshPeriod" default:"4h"`
FailStartOnListError bool `yaml:"failStartOnListError" default:"false"`
ProcessingConcurrency uint `yaml:"processingConcurrency" default:"4"`
}

// ClientLookupConfig configuration for the client lookup
Expand Down
13 changes: 13 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,19 @@ downloaded or opened. Default value is `false`.
failStartOnListError: false
```

### Concurrency

Blocky downloads and processes links in a single group concurrently. With parameter `processingConcurrency` you can adjust
how many links can be processed in the same time. Higher value can reduce the overall list refresh time, but more parallel
download and processing jobs need more RAM. Please consider to reduce this value on systems with limited memory. Default value is 4.

!!! example

```yaml
blocking:
processingConcurrency: 10
```

## Caching

Each DNS response has a TTL (Time-to-live) value. This value defines, how long is the record valid in seconds. The
Expand Down
120 changes: 64 additions & 56 deletions lists/list_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/0xERR0R/blocky/log"
)

const (
defaultProcessingConcurrency = 4
chanCap = 1000
)

// ListCacheType represents the type of cached list ENUM(
// blacklist // is a list with blocked domains
// whitelist // is a list with whitelisted domains / IPs
Expand All @@ -43,10 +48,11 @@ type ListCache struct {
groupCaches map[string]stringcache.StringCache
lock sync.RWMutex

groupToLinks map[string][]string
refreshPeriod time.Duration
downloader FileDownloader
listType ListCacheType
groupToLinks map[string][]string
refreshPeriod time.Duration
downloader FileDownloader
listType ListCacheType
processingConcurrency uint
}

// Configuration returns current configuration and stats
Expand Down Expand Up @@ -89,15 +95,20 @@ func (b *ListCache) Configuration() (result []string) {

// NewListCache creates new list instance
func NewListCache(t ListCacheType, groupToLinks map[string][]string, refreshPeriod time.Duration,
downloader FileDownloader) (*ListCache, error) {
downloader FileDownloader, processingConcurrency uint) (*ListCache, error) {
groupCaches := make(map[string]stringcache.StringCache)

if processingConcurrency == 0 {
processingConcurrency = defaultProcessingConcurrency
}

b := &ListCache{
groupToLinks: groupToLinks,
groupCaches: groupCaches,
refreshPeriod: refreshPeriod,
downloader: downloader,
listType: t,
groupToLinks: groupToLinks,
groupCaches: groupCaches,
refreshPeriod: refreshPeriod,
downloader: downloader,
listType: t,
processingConcurrency: processingConcurrency,
}
initError := b.refresh(true)

Expand Down Expand Up @@ -125,46 +136,57 @@ func logger() *logrus.Entry {
return log.PrefixedLog("list_cache")
}

type groupCache struct {
cache []string
err error
}

// downloads and reads files with domain names and creates cache for them
func (b *ListCache) createCacheForGroup(links []string) (stringcache.StringCache, error) {
var wg sync.WaitGroup

var err error

c := make(chan groupCache, len(links))
// loop over links (http/local) or inline definitions
for _, link := range links {
wg.Add(1)
factory := stringcache.NewChainedCacheFactory()

go b.processFile(link, c, &wg)
}
fileLinesChan := make(chan string, chanCap)
errChan := make(chan error, chanCap)

wg.Wait()
workerDoneChan := make(chan bool, len(links))

factory := stringcache.NewChainedCacheFactory()
// guard channel is used to limit the number of concurrent executions of the function
guard := make(chan struct{}, b.processingConcurrency)

processingLinkJobs := len(links)

// loop over links (http/local) or inline definitions
// start a new goroutine for each link, but limit to max. number (see processingConcurrency)
for _, link := range links {
go func(link string) {
// thy to write in this channel -> this will block if max amount of goroutines are being executed
guard <- struct{}{}

defer func() {
// remove from guard channel to allow other blocked goroutines to continue
<-guard
workerDoneChan <- true
}()
b.processFile(link, fileLinesChan, errChan)
}(link)
}

Loop:
for {
select {
case res := <-c:
if res.err != nil {
err = multierror.Append(err, res.err)
}
if res.cache == nil {
return nil, err
}
for _, entry := range res.cache {
factory.AddEntry(entry)
case line := <-fileLinesChan:
factory.AddEntry(line)
case e := <-errChan:
var transientErr *TransientError

if errors.As(e, &transientErr) {
return nil, e
}
default:
close(c)
err = multierror.Append(err, e)
case <-workerDoneChan:
processingLinkJobs--

break Loop
default:
if processingLinkJobs == 0 {
break Loop
}
}
}

Expand Down Expand Up @@ -231,14 +253,8 @@ func readFile(file string) (io.ReadCloser, error) {
return os.Open(file)
}

// downloads file (or reads local file) and writes file content as string array in the channel
func (b *ListCache) processFile(link string, ch chan<- groupCache, wg *sync.WaitGroup) {
defer wg.Done()

result := groupCache{
cache: []string{},
}

// downloads file (or reads local file) and writes each line in the file to the result channel
func (b *ListCache) processFile(link string, resultCh chan<- string, errCh chan<- error) {
var r io.ReadCloser

var err error
Expand All @@ -247,15 +263,7 @@ func (b *ListCache) processFile(link string, ch chan<- groupCache, wg *sync.Wait

if err != nil {
logger().Warn("error during file processing: ", err)
result.err = multierror.Append(result.err, err)

var transientErr *TransientError

if errors.As(err, &transientErr) {
// put nil to indicate the temporary err
result.cache = nil
}
ch <- result
errCh <- err

return
}
Expand All @@ -269,21 +277,21 @@ func (b *ListCache) processFile(link string, ch chan<- groupCache, wg *sync.Wait
line := strings.TrimSpace(scanner.Text())
// skip comments
if line := processLine(line); line != "" {
result.cache = append(result.cache, line)
resultCh <- line

count++
}
}

if err := scanner.Err(); err != nil {
// don't propagate error here. If some lines are not parsable (e.g. too long), it is ok
logger().Warn("can't parse file: ", err)
} else {
logger().WithFields(logrus.Fields{
"source": link,
"count": count,
}).Info("file imported")
}
ch <- result
}

func (b *ListCache) getLinkReader(link string) (r io.ReadCloser, err error) {
Expand Down
22 changes: 22 additions & 0 deletions lists/list_cache_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package lists

import (
"testing"
)

func BenchmarkRefresh(b *testing.B) {
file1 := createTestListFile(b.TempDir(), 100000)
file2 := createTestListFile(b.TempDir(), 150000)
file3 := createTestListFile(b.TempDir(), 130000)
lists := map[string][]string{
"gr1": {file1, file2, file3},
}

cache, _ := NewListCache(ListCacheTypeBlacklist, lists, -1, NewDownloader(), 5)

b.ReportAllocs()

for n := 0; n < b.N; n++ {
cache.Refresh()
}
}
Loading

0 comments on commit e471585

Please sign in to comment.