Skip to content

Commit

Permalink
Merge branch 'SWARM-374'
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgenyGri committed Feb 28, 2024
2 parents c78319c + 1813140 commit ba2e6ee
Show file tree
Hide file tree
Showing 18 changed files with 543 additions and 360 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2023 The HBF Authors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.9.1
1.9.2
48 changes: 42 additions & 6 deletions cmd/to-nft/internal/events-of-netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package internal

import (
"context"
"fmt"
"sync"

"github.com/H-BF/sgroups/pkg/nl"

Expand All @@ -27,16 +29,50 @@ type ( // events from netlink

// NetlinkEventSource -
type NetlinkEventSource struct {
AgentSubj observer.Subject
Subject observer.Subject
nl.NetlinkWatcher

runOnce sync.Once
closeOnce sync.Once
stopped chan struct{}
}

// Close -
func (w *NetlinkEventSource) Close() error {
w.closeOnce.Do(func() {
w.runOnce.Do(func() {})
w.NetlinkWatcher.Close()
if w.stopped != nil {
<-w.stopped
}
})
return nil
}

// Run -
func (w *NetlinkEventSource) Run(ctx context.Context) error {
log := logger.FromContext(ctx).Named("net-conf-watcher")
const job = "net-conf-watcher"

var neverRun bool
w.runOnce.Do(func() {
neverRun = true
})
log := logger.FromContext(ctx).Named(job)
if !neverRun {
return fmt.Errorf("%s: it has been run or closed yet", job)
}
w.stopped = make(chan struct{})
log.Info("start")
defer log.Info("stop")
for stream := w.Stream(); ; {
defer func() {
log.Info("stop")
close(w.stopped)
}()
stream := w.Stream()
if stream == nil {
log.Info("will exit cause it has closed")
return nil
}
for {
select {
case <-ctx.Done():
log.Info("will exit cause it has canceled")
Expand All @@ -55,12 +91,12 @@ func (w *NetlinkEventSource) Run(ctx context.Context) error {
ev.Updates = append(ev.Updates, t)
case nl.ErrMsg:
log.Errorf("will exit cause %v", t)
w.AgentSubj.Notify(NetlinkError{ErrMsg: t})
w.Subject.Notify(NetlinkError{ErrMsg: t})
return t
}
}
if len(ev.Updates) > 0 {
w.AgentSubj.Notify(ev)
w.Subject.Notify(ev)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/to-nft/internal/events-of-sync-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (

// SyncStatusEventSource -
type SyncStatusEventSource struct {
AgentSubj observer.Subject
Subject observer.Subject
SGClient SGClient
CheckInterval time.Duration
UsePushModel bool
Expand Down Expand Up @@ -76,7 +76,7 @@ func (ss *SyncStatusEventSource) push(ctx context.Context, tc *time.Ticker, log
return
case <-tc.C:
syncStatus.Clear(func(t model.SyncStatus) {
ss.AgentSubj.Notify(SyncStatusValue{
ss.Subject.Notify(SyncStatusValue{
SyncStatus: t,
})
})
Expand All @@ -87,7 +87,7 @@ func (ss *SyncStatusEventSource) push(ctx context.Context, tc *time.Ticker, log
close(closeCh)
if err != nil {
log.Errorf("will exit cause %v", err)
ss.AgentSubj.Notify(SyncStatusError{error: err})
ss.Subject.Notify(SyncStatusError{error: err})
}
wg.Wait()
}()
Expand All @@ -109,7 +109,7 @@ func (ss *SyncStatusEventSource) pull(ctx context.Context, tc *time.Ticker, log
defer func() {
if err != nil {
log.Errorf("will exit cause %v", err)
ss.AgentSubj.Notify(SyncStatusError{error: err})
ss.Subject.Notify(SyncStatusError{error: err})
}
}()
for {
Expand All @@ -122,7 +122,7 @@ func (ss *SyncStatusEventSource) pull(ctx context.Context, tc *time.Ticker, log
return e
}
if st != nil {
ss.AgentSubj.Notify(SyncStatusValue{SyncStatus: *st})
ss.Subject.Notify(SyncStatusValue{SyncStatus: *st})
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions cmd/to-nft/internal/host/net-conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,12 @@ func (conf NetConf) Clone() NetConf {
// LocalIPs get effective local unique IP lists
func (conf NetConf) LocalIPs() (IPv4 []net.IP, IPv6 []net.IP) {
conf.IPAdresses.Iterate(func(_ IPAdressesMapKey, a *IpAddr) bool {
a.Links.Iterate(func(lid LinkID) bool {
if _, ok := conf.IpDevs.Get(lid); ok {
switch len(a.IP) {
case net.IPv4len:
IPv4 = append(IPv4, a.IP)
case net.IPv6len:
IPv6 = append(IPv6, a.IP)
}
}
return true
})
switch len(a.IP) {
case net.IPv4len:
IPv4 = append(IPv4, a.IP)
case net.IPv6len:
IPv6 = append(IPv6, a.IP)
}
return true
})
type iplist = []net.IP
Expand Down
148 changes: 79 additions & 69 deletions cmd/to-nft/internal/jobs/dns-refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,57 @@ type (

// DnsRefresher -
DnsRefresher struct {
agentSubj observer.Subject
sema chan struct{}
close chan struct{}
stopped chan struct{}
onceClose sync.Once
onceRun sync.Once
subject observer.Subject
sema chan struct{}
stopped chan struct{}
onceClose sync.Once
onceRun sync.Once
que *queue.FIFO
activeQueries struct {
sync.Mutex
closed bool
dict.RBDict[Ask2ResolveDomainAddresses, *time.Timer]
}
}
)

// NewDnsRefresher -
func NewDnsRefresher() *DnsRefresher {
func NewDnsRefresher(sbj observer.Subject) *DnsRefresher {
const semaphoreCap = 4

ret := DnsRefresher{
agentSubj: internal.AgentSubject(),
sema: make(chan struct{}, semaphoreCap),
close: make(chan struct{}),
subject: sbj,
sema: make(chan struct{}, semaphoreCap),
que: queue.NewFIFO(),
}
for i := 0; i < cap(ret.sema); i++ {
ret.sema <- struct{}{}
}
return &ret
}

// MakeObserver -
func (rf *DnsRefresher) MakeObserver(ctx context.Context) observer.Observer {
return observer.NewObserver(func(ev observer.EventType) {
switch o := ev.(type) {
case Ask2ResolveDomainAddresses:
rf.onAsk2ResolveDomainAddresses(ctx, o)
}
}, false, Ask2ResolveDomainAddresses{})
}

// Close -
func (rf *DnsRefresher) Close() error {
rf.onceClose.Do(func() {
close(rf.close)
_ = rf.que.Close()
rf.onceRun.Do(func() {})
rf.activeQueries.Lock()
rf.activeQueries.closed = true
rf.activeQueries.Iterate(func(_ Ask2ResolveDomainAddresses, v *time.Timer) bool {
_ = v.Stop()
return true
})
rf.activeQueries.Unlock()
if rf.stopped != nil {
<-rf.stopped
}
Expand All @@ -78,47 +100,31 @@ func (rf *DnsRefresher) Close() error {

// Run -
func (rf *DnsRefresher) Run(ctx context.Context) (err error) {
const job = "dns-refresher"

var doRun bool
rf.onceRun.Do(func() { doRun = true })
if !doRun {
return errors.New("it has been run or closed yet")
return errors.Errorf("%s: it has been run or closed yet", job)
}

log := logger.FromContext(ctx).Named("dns").Named("refresher")
log := logger.FromContext(ctx).Named(job)
log.Info("start")

var activeQueries struct {
sync.Mutex
dict.RBDict[Ask2ResolveDomainAddresses, *time.Timer]
}
que := queue.NewFIFO()
queObs := observer.NewObserver(func(ev observer.EventType) {
_ = que.Put(ev)
}, false, Ask2ResolveDomainAddresses{})
rf.agentSubj.ObserversAttach(queObs)
rf.stopped = make(chan struct{})
defer func() {
que.Close()
rf.agentSubj.ObserversDetach(queObs)
queObs.Close()
activeQueries.Lock()
activeQueries.Iterate(func(_ Ask2ResolveDomainAddresses, v *time.Timer) bool {
_ = v.Stop()
return true
})
activeQueries.Unlock()
close(rf.stopped)
log.Info("stop")
}()
for events := que.Reader(); ; {
for events := rf.que.Reader(); ; {
select {
case <-ctx.Done():
log.Info("will exit cause it has canceled")
return ctx.Err()
case <-rf.close:
log.Infof("will exit cause it has closed")
return nil
case raw := <-events:
case raw, ok := <-events:
if !ok {
log.Infof("will exit cause it has closed")
return nil
}
switch ev := raw.(type) {
case DomainAddresses:
log1 := log.WithField("domain", ev.FQDN).WithField("IPv", ev.IpVersion)
Expand All @@ -129,38 +135,7 @@ func (rf *DnsRefresher) Run(ctx context.Context) (err error) {
"TTL", jsonview.Stringer(ev.DnsAnswer.TTL.Round(time.Second)),
"IP(s)", ev.DnsAnswer.IPs)
}
rf.agentSubj.Notify(ev)
case Ask2ResolveDomainAddresses:
activeQueries.Lock()
if activeQueries.At(ev) == nil {
now := time.Now()
ttl := ev.ValidBefore.Sub(now)
if ttl < time.Minute {
ttl = time.Minute
}
log.Debugw("ask-to-resolve",
"IPv", ev.IpVersion,
"domain", jsonview.Stringer(ev.FQDN),
"after", jsonview.Stringer(ttl.Round(time.Second)),
)
newTimer := time.AfterFunc(ttl, func() {
select {
case <-ctx.Done():
case <-rf.close:
case <-rf.sema:
defer func() {
rf.sema <- struct{}{}
}()
ret := rf.resolve(ctx, ev)
que.Put(ret)
}
activeQueries.Lock()
activeQueries.Del(ev)
activeQueries.Unlock()
})
activeQueries.Put(ev, newTimer)
}
activeQueries.Unlock()
rf.subject.Notify(ev)
}
}
}
Expand All @@ -186,6 +161,41 @@ func (rf *DnsRefresher) resolve(ctx context.Context, ask Ask2ResolveDomainAddres
return ret
}

func (rf *DnsRefresher) onAsk2ResolveDomainAddresses(ctx context.Context, ev Ask2ResolveDomainAddresses) {
log := logger.FromContext(ctx)
rf.activeQueries.Lock()
defer rf.activeQueries.Unlock()
if rf.activeQueries.At(ev) != nil || rf.activeQueries.closed {
return
}

now := time.Now()
ttl := ev.ValidBefore.Sub(now)
if ttl < time.Minute {
ttl = time.Minute
}
log.Debugw("ask-to-resolve",
"IPv", ev.IpVersion,
"domain", jsonview.Stringer(ev.FQDN),
"after", jsonview.Stringer(ttl.Round(time.Second)),
)
newTimer := time.AfterFunc(ttl, func() {
select {
case <-ctx.Done():
case <-rf.sema:
defer func() {
rf.sema <- struct{}{}
}()
ret := rf.resolve(ctx, ev)
rf.que.Put(ret)
}
rf.activeQueries.Lock()
defer rf.activeQueries.Unlock()
rf.activeQueries.Del(ev)
})
rf.activeQueries.Put(ev, newTimer)
}

// Cmp -
func (a Ask2ResolveDomainAddresses) Cmp(other Ask2ResolveDomainAddresses) int {
if a.IpVersion > other.IpVersion {
Expand Down
Loading

0 comments on commit ba2e6ee

Please sign in to comment.