Skip to content

Commit

Permalink
fix: initially fetch information before starting bee loops
Browse files Browse the repository at this point in the history
  • Loading branch information
felixauringer committed Oct 30, 2024
1 parent 0a1d9e0 commit 6b42264
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
5 changes: 4 additions & 1 deletion cmd/bee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func run(bindAddress netip.Addr, disableNftables bool, beekeeperBasePath string)
}
defer forwarder.Close()

heartbeat := heartbeat.NewHeartbeat(bee, forwarder, bindAddress)
heartbeat, err := heartbeat.NewHeartbeat(ctx, bee, forwarder, bindAddress)
if err != nil {
return fmt.Errorf("creating heartbeat service: %w", err)
}

signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
Expand Down
37 changes: 26 additions & 11 deletions internal/heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package heartbeat

import (
"context"
"fmt"
"net"
"net/netip"
"time"
Expand All @@ -23,18 +24,29 @@ type Heartbeat struct {
bindAddress netip.Addr
}

func NewHeartbeat(bee *apibee.Bee, forwarder *forward.Forwarder, bindAddress netip.Addr) *Heartbeat {
return &Heartbeat{
func NewHeartbeat(ctx context.Context, bee *apibee.Bee, forwarder *forward.Forwarder, bindAddress netip.Addr) (*Heartbeat, error) {
heartbeat := &Heartbeat{
bee,
forwarder,
bindAddress,
}

// updateForwardings fetches necessary information for the forwarder from the beekeeper.
if err := heartbeat.updateForwardings(ctx); err != nil {
return nil, fmt.Errorf("initializing heartbeat: %w", err)
}

return heartbeat, nil
}

func (h *Heartbeat) Run(ctx context.Context) error {
for {
h.ReportStats(ctx)
h.UpdateForwardings(ctx)
if err := h.reportStats(ctx); err != nil {
return fmt.Errorf("reporting stats: %w", err)
}
if err := h.updateForwardings(ctx); err != nil {
return fmt.Errorf("updating forwardings: %w", err)
}

select {
case <-ctx.Done():
Expand All @@ -44,26 +56,27 @@ func (h *Heartbeat) Run(ctx context.Context) error {
}
}

func (h *Heartbeat) ReportStats(ctx context.Context) {
func (h *Heartbeat) reportStats(ctx context.Context) error {
if err := h.bee.ReportStatistics(ctx, h.bindAddress.String()); err != nil {
log.WithError(err).Warn("Error during heartbeat")
return fmt.Errorf("contacting beekeeper: %w", err)
}
return nil
}

func (h *Heartbeat) UpdateForwardings(ctx context.Context) {
func (h *Heartbeat) updateForwardings(ctx context.Context) error {
info, err := h.bee.GetForwardingInformation(ctx)
if err != nil {
log.WithError(err).Error("Error getting forwarding information")
return
return fmt.Errorf("getting forwarding information: %w", err)
}

err = h.forwarder.SetDefaultBeehiveAddress(info.DefaultBeehive)
if err != nil {
log.WithError(err).Error("Error updating default beehive address")
return fmt.Errorf("updating default beehive address: %w", err)
} else {
log.WithField("beehive", info.DefaultBeehive).Debug("Updated default beehive")
}

// Errors are only logged here because some beehives may be operational.
newBeehives := make([]forward.WireguardBeehive, 0, len(info.Beehives))
for _, beehive := range info.Beehives {
log := log.WithField("beehive", beehive)
Expand Down Expand Up @@ -95,7 +108,7 @@ func (h *Heartbeat) UpdateForwardings(ctx context.Context) {
}

if err := h.forwarder.UpdateWireguardPeers(newBeehives); err != nil {
log.WithError(err).Error("updating wireguard peers")
return fmt.Errorf("updating wireguard peers: %w", err)
}

newForwardingRules := make([]forward.ForwardingRule, 0, len(info.Forwardings))
Expand All @@ -119,4 +132,6 @@ func (h *Heartbeat) UpdateForwardings(ctx context.Context) {
}

h.forwarder.UpdateForwardingRules(newForwardingRules)

return nil
}

0 comments on commit 6b42264

Please sign in to comment.