Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codereorg: Part 5, state #1492

Merged
merged 31 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a885e1f
format with prettier 3.0
kradalby Jul 14, 2023
2f7cbc8
Replace the timestamp based state system
kradalby Jun 21, 2023
47a75db
Split up MapResponse
kradalby Jun 29, 2023
a52db30
Add missing return in shutdown
kradalby Jul 7, 2023
8e973f4
add script to run integration tests
kradalby Jun 22, 2023
b6593dc
rearrange channel closing defers
kradalby Jul 14, 2023
3b8c4c9
add annoying linter to golangci
kradalby Jul 17, 2023
809db0c
add less/jq to hs debug container
kradalby Jul 17, 2023
1013e5f
add debug option to save all map responses
kradalby Jul 17, 2023
3ee0179
disable online map by default for now
kradalby Jul 17, 2023
3bfc123
introduce rw lock for db, ish...
kradalby Jul 17, 2023
c054fb5
additional debug logging, use mapper pointer
kradalby Jul 24, 2023
52a85bc
only send lite map responses when omitpeers
kradalby Jul 26, 2023
eb1a556
fix lint
kradalby Jul 26, 2023
d84ea0e
add maprequest to all mapper calls
kradalby Jul 26, 2023
6b620ca
remove retries for pings in tsic
kradalby Jul 26, 2023
6b49dad
filter out peers without endpoints
kradalby Jul 30, 2023
841e3c5
rearrange poll, lock, notify
kradalby Jul 26, 2023
7c2c9b0
Update packetfilter when peers change
kradalby Aug 9, 2023
1e74117
Remove database from Mapper
kradalby Aug 9, 2023
fdaf9da
move MapResponse peer logic into function and reuse
kradalby Aug 9, 2023
db5864d
give ci more tollerance for timeouts
kradalby Sep 10, 2023
df1f4b3
Upgrade go and debian in headscale docker
kradalby Sep 11, 2023
b6d64e1
add lock around saving ts clients
kradalby Sep 10, 2023
8882825
gitignore infolder tailscale
kradalby Sep 11, 2023
4b4b032
order path
kradalby Sep 11, 2023
db8761f
add pprof endpoint
kradalby Sep 11, 2023
ec39145
Remove LastSuccessfulUpdate from Machine
kradalby Sep 11, 2023
decbeb7
improve debug logging, rw lock for notifier
kradalby Sep 11, 2023
1f4d51d
Return simple responses immediatly
kradalby Sep 11, 2023
bc9fc67
handle route updates correctly
kradalby Sep 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ignored/
tailscale/

# Binaries for programs and plugins
*.exe
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ issues:
linters:
enable-all: true
disable:
- depguard

- exhaustivestruct
- revive
- lll
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Builder image
FROM docker.io/golang:1.20-bullseye AS build
FROM docker.io/golang:1.21-bookworm AS build
ARG VERSION=dev
ENV GOPATH /go
WORKDIR /go/src/headscale
Expand All @@ -14,7 +14,7 @@ RUN strip /go/bin/headscale
RUN test -e /go/bin/headscale

# Production image
FROM docker.io/debian:bullseye-slim
FROM docker.io/debian:bookworm-slim

RUN apt-get update \
&& apt-get install -y ca-certificates \
Expand Down
4 changes: 4 additions & 0 deletions Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ FROM docker.io/golang:1.20.0-bullseye
COPY --from=build /go/bin/headscale /bin/headscale
ENV TZ UTC

RUN apt-get update \
&& apt-get install --no-install-recommends --yes less jq \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN mkdir -p /var/run/headscale

# Need to reset the entrypoint or everything will run as a busybox script
Expand Down
2 changes: 1 addition & 1 deletion cmd/headscale/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func initConfig() {

cfg, err := types.GetHeadscaleConfig()
if err != nil {
log.Fatal().Caller().Err(err)
log.Fatal().Caller().Err(err).Msg("Failed to get headscale configuration")
}

machineOutput := HasMachineOutputFlag()
Expand Down
6 changes: 3 additions & 3 deletions cmd/headscale/cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ func SuccessOutput(result interface{}, override string, outputFormat string) {
case "json":
jsonBytes, err = json.MarshalIndent(result, "", "\t")
if err != nil {
log.Fatal().Err(err)
log.Fatal().Err(err).Msg("failed to unmarshal output")
}
case "json-line":
jsonBytes, err = json.Marshal(result)
if err != nil {
log.Fatal().Err(err)
log.Fatal().Err(err).Msg("failed to unmarshal output")
}
case "yaml":
jsonBytes, err = yaml.Marshal(result)
if err != nil {
log.Fatal().Err(err)
log.Fatal().Err(err).Msg("failed to unmarshal output")
}
default:
//nolint
Expand Down
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

# When updating go.mod or go.sum, a new sha will need to be calculated,
# update this if you have a mismatch after doing a change to thos files.
vendorSha256 = "sha256-9Hol8w8HB28AlulshMYYQwOgvGzR47qxzyPrB8G0XSQ=";
vendorSha256 = "sha256-dNE5wgR3oWXlYzPNXp0v/GGwY0/hvhOB5JWCb5EIbg8=";

ldflags = ["-s" "-w" "-X github.com/juanfont/headscale/cmd/headscale/cli.Version=v${version}"];
};
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/containerd/console v1.0.3 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/docker/cli v23.0.5+incompatible // indirect
github.com/docker/docker v23.0.5+incompatible // indirect
github.com/docker/docker v24.0.4+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/deckarep/golang-set/v2 v2.3.0 h1:qs18EKUfHm2X9fA50Mr/M5hccg2tNnVqsiBI
github.com/deckarep/golang-set/v2 v2.3.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/docker/cli v23.0.5+incompatible h1:ufWmAOuD3Vmr7JP2G5K3cyuNC4YZWiAsuDEvFVVDafE=
github.com/docker/cli v23.0.5+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v23.0.5+incompatible h1:DaxtlTJjFSnLOXVNUBU1+6kXGz2lpDoEAH6QoxaSg8k=
github.com/docker/docker v23.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v24.0.4+incompatible h1:s/LVDftw9hjblvqIeTiGYXBCD95nOEEl7qRsRrIOuQI=
github.com/docker/docker v24.0.4+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
Expand Down
128 changes: 35 additions & 93 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"io"
"net"
"net/http"
_ "net/http/pprof" //nolint
"os"
"os/signal"
"sort"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -20,19 +21,19 @@ import (
"github.com/coreos/go-oidc/v3/oidc"
"github.com/gorilla/mux"
grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
grpcRuntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/juanfont/headscale"
v1 "github.com/juanfont/headscale/gen/go/headscale/v1"
"github.com/juanfont/headscale/hscontrol/db"
"github.com/juanfont/headscale/hscontrol/derp"
derpServer "github.com/juanfont/headscale/hscontrol/derp/server"
"github.com/juanfont/headscale/hscontrol/notifier"
"github.com/juanfont/headscale/hscontrol/policy"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"github.com/patrickmn/go-cache"
zerolog "github.com/philip-bui/grpc-zerolog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/puzpuzpuz/xsync/v2"
zl "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/crypto/acme"
Expand Down Expand Up @@ -84,7 +85,7 @@ type Headscale struct {

ACLPolicy *policy.ACLPolicy

lastStateChange *xsync.MapOf[string, time.Time]
nodeNotifier *notifier.Notifier

oidcProvider *oidc.Provider
oauth2Config *oauth2.Config
Expand All @@ -93,12 +94,13 @@ type Headscale struct {

shutdownChan chan struct{}
pollNetMapStreamWG sync.WaitGroup

stateUpdateChan chan struct{}
cancelStateUpdateChan chan struct{}
}

func NewHeadscale(cfg *types.Config) (*Headscale, error) {
if _, enableProfile := os.LookupEnv("HEADSCALE_PROFILING_ENABLED"); enableProfile {
runtime.SetBlockProfileRate(1)
}

privateKey, err := readOrCreatePrivateKey(cfg.PrivateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read or create private key: %w", err)
Expand Down Expand Up @@ -158,19 +160,14 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
noisePrivateKey: noisePrivateKey,
registrationCache: registrationCache,
pollNetMapStreamWG: sync.WaitGroup{},
lastStateChange: xsync.NewMapOf[time.Time](),

stateUpdateChan: make(chan struct{}),
cancelStateUpdateChan: make(chan struct{}),
nodeNotifier: notifier.NewNotifier(),
}

go app.watchStateChannel()

database, err := db.NewHeadscaleDatabase(
cfg.DBtype,
dbString,
app.dbDebug,
app.stateUpdateChan,
app.nodeNotifier,
cfg.IPPrefixes,
cfg.BaseDomain)
if err != nil {
Expand Down Expand Up @@ -203,7 +200,11 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {

if cfg.DERP.ServerEnabled {
// TODO(kradalby): replace this key with a dedicated DERP key.
embeddedDERPServer, err := derpServer.NewDERPServer(cfg.ServerURL, key.NodePrivate(*privateKey), &cfg.DERP)
embeddedDERPServer, err := derpServer.NewDERPServer(
cfg.ServerURL,
key.NodePrivate(*privateKey),
&cfg.DERP,
)
if err != nil {
return nil, err
}
Expand All @@ -230,10 +231,14 @@ func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {

// expireExpiredMachines expires machines that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredMachines(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
func (h *Headscale) expireExpiredMachines(intervalMs int64) {
interval := time.Duration(intervalMs) * time.Millisecond
ticker := time.NewTicker(interval)

lastCheck := time.Unix(0, 0)

for range ticker.C {
h.db.ExpireExpiredMachines(h.getLastStateChange())
lastCheck = h.db.ExpireExpiredMachines(lastCheck)
}
}

Expand All @@ -258,7 +263,10 @@ func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
h.DERPMap.Regions[region.RegionID] = &region
}

h.setLastStateChangeToNow()
h.nodeNotifier.NotifyAll(types.StateUpdate{
Type: types.StateDERPUpdated,
DERPMap: *h.DERPMap,
})
}
}
}
Expand Down Expand Up @@ -433,8 +441,9 @@ func (h *Headscale) ensureUnixSocketIsAbsent() error {
return os.Remove(h.cfg.UnixSocket)
}

func (h *Headscale) createRouter(grpcMux *runtime.ServeMux) *mux.Router {
func (h *Headscale) createRouter(grpcMux *grpcRuntime.ServeMux) *mux.Router {
router := mux.NewRouter()
router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux)

router.HandleFunc(ts2021UpgradePath, h.NoiseUpgradeHandler).Methods(http.MethodPost)

Expand Down Expand Up @@ -541,7 +550,7 @@ func (h *Headscale) Serve() error {
return fmt.Errorf("failed change permission of gRPC socket: %w", err)
}

grpcGatewayMux := runtime.NewServeMux()
grpcGatewayMux := grpcRuntime.NewServeMux()

// Make the grpc-gateway connect to grpc over socket
grpcGatewayConn, err := grpc.Dial(
Expand Down Expand Up @@ -722,7 +731,9 @@ func (h *Headscale) Serve() error {
Str("path", aclPath).
Msg("ACL policy successfully reloaded, notifying nodes of change")

h.setLastStateChangeToNow()
h.nodeNotifier.NotifyAll(types.StateUpdate{
Type: types.StateFullUpdate,
})
}

default:
Expand Down Expand Up @@ -760,10 +771,6 @@ func (h *Headscale) Serve() error {
// Stop listening (and unlink the socket if unix type):
socketListener.Close()

<-h.cancelStateUpdateChan
close(h.stateUpdateChan)
close(h.cancelStateUpdateChan)

// Close db connections
err = h.db.Close()
if err != nil {
Expand All @@ -775,6 +782,8 @@ func (h *Headscale) Serve() error {

// And we're done:
cancel()

return
}
}
}
Expand Down Expand Up @@ -859,73 +868,6 @@ func (h *Headscale) getTLSSettings() (*tls.Config, error) {
}
}

// TODO(kradalby): baby steps, make this more robust.
func (h *Headscale) watchStateChannel() {
for {
select {
case <-h.stateUpdateChan:
h.setLastStateChangeToNow()

case <-h.cancelStateUpdateChan:
return
}
}
}

func (h *Headscale) setLastStateChangeToNow() {
var err error

now := time.Now().UTC()

users, err := h.db.ListUsers()
if err != nil {
log.Error().
Caller().
Err(err).
Msg("failed to fetch all users, failing to update last changed state.")
}

for _, user := range users {
lastStateUpdate.WithLabelValues(user.Name, "headscale").Set(float64(now.Unix()))
if h.lastStateChange == nil {
h.lastStateChange = xsync.NewMapOf[time.Time]()
}
h.lastStateChange.Store(user.Name, now)
}
}

func (h *Headscale) getLastStateChange(users ...types.User) time.Time {
times := []time.Time{}

// getLastStateChange takes a list of users as a "filter", if no users
// are past, then use the entier list of users and look for the last update
if len(users) > 0 {
for _, user := range users {
if lastChange, ok := h.lastStateChange.Load(user.Name); ok {
times = append(times, lastChange)
}
}
} else {
h.lastStateChange.Range(func(key string, value time.Time) bool {
times = append(times, value)

return true
})
}

sort.Slice(times, func(i, j int) bool {
return times[i].After(times[j])
})

log.Trace().Msgf("Latest times %#v", times)

if len(times) == 0 {
return time.Now().UTC()
} else {
return times[0]
}
}

func notFoundHandler(
writer http.ResponseWriter,
req *http.Request,
Expand Down
Loading
Loading