From be7bf2c3cf7ddbffad2e12542d75ed65d41829f3 Mon Sep 17 00:00:00 2001 From: Mohammed <79150699+mrpalide@users.noreply.github.com> Date: Sat, 27 Jan 2024 21:03:48 +0330 Subject: [PATCH] Skysocks Monitor (#47) * implement skysocks-lite-client * create dockerfile * implement skysocks-monitor * update docker build/push to dockerhub commands for skysocks-monitor * add dummy skysocks-monitor config on docker configs * modify Makefile * fix wrong api path * update ascii of cli app * fix typo and name issue on tpd monitor" * fix errors condition --- Makefile | 6 + cmd/skysocks-lite-client/README.md | 3 + .../skysocks-lite-client.go | 130 ++++++ cmd/skysocks-monitor/README.md | 16 + cmd/skysocks-monitor/commands/root.go | 128 ++++++ cmd/skysocks-monitor/skysocks-monitor.go | 8 + cmd/tpd-monitor/commands/root.go | 12 +- docker/config/skysocks-monitor.json | 42 ++ docker/docker_build.sh | 7 + docker/docker_clean.sh | 1 + docker/docker_push.sh | 1 + docker/images/skysocks-monitor/Dockerfile | 35 ++ internal/skysocks/client.go | 195 ++++++++ internal/skysocks/common.go | 24 + pkg/skysocks-monitor/api/api.go | 419 ++++++++++++++++++ pkg/tpd-monitor/api/api.go | 6 +- 16 files changed, 1024 insertions(+), 9 deletions(-) create mode 100644 cmd/skysocks-lite-client/README.md create mode 100644 cmd/skysocks-lite-client/skysocks-lite-client.go create mode 100644 cmd/skysocks-monitor/README.md create mode 100644 cmd/skysocks-monitor/commands/root.go create mode 100644 cmd/skysocks-monitor/skysocks-monitor.go create mode 100644 docker/config/skysocks-monitor.json create mode 100644 docker/images/skysocks-monitor/Dockerfile create mode 100644 internal/skysocks/client.go create mode 100644 internal/skysocks/common.go create mode 100644 pkg/skysocks-monitor/api/api.go diff --git a/Makefile b/Makefile index 55836ea8..4ce2498f 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,8 @@ build: dep ## Build binaries ${OPTS} go build ${BUILD_OPTS} -o ./bin/dmsg-monitor ./cmd/dmsg-monitor ${OPTS} go build ${BUILD_OPTS} -o ./bin/tpd-monitor ./cmd/tpd-monitor ${OPTS} go build ${BUILD_OPTS} -o ./bin/vpn-monitor ./cmd/vpn-monitor + ${OPTS} go build ${BUILD_OPTS} -o ./bin/skysocks-monitor ./cmd/skysocks-monitor + ${OPTS} go build ${BUILD_OPTS} -o ./apps/skysocks-client ./cmd/skysocks-lite-client ${OPTS} go build ${BUILD_OPTS} -o ./bin/public-visor-monitor ./cmd/public-visor-monitor # yarn --cwd ./pkg/node-visualizer/web build # rm -rf ./pkg/node-visualizer/api/build/static @@ -90,6 +92,8 @@ build-deploy: ## Build for deployment Docker images go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/dmsg-monitor ./cmd/dmsg-monitor go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/tpd-monitor ./cmd/tpd-monitor go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/vpn-monitor ./cmd/vpn-monitor + go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/skysocks-monitor ./cmd/skysocks-monitor + go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/skysocks-client ./cmd/skysocks-lite-client go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/public-visor-monitor ./cmd/public-visor-monitor build-race: dep ## Build binaries @@ -106,6 +110,8 @@ build-race: dep ## Build binaries ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/dmsg-monitor ./cmd/dmsg-monitor ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/tpd-monitor ./cmd/tpd-monitor ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/vpn-monitor ./cmd/vpn-monitor + ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/skysocks-monitor ./cmd/skysocks-monitor + ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/skysocks-client ./cmd/skysocks-lite-client ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/public-visor-monitor ./cmd/public-visor-monitor install: ## Install route-finder, transport-discovery, address-resolver, sw-env, keys-gen, network-monitor, node-visualizer diff --git a/cmd/skysocks-lite-client/README.md b/cmd/skysocks-lite-client/README.md new file mode 100644 index 00000000..2901ddef --- /dev/null +++ b/cmd/skysocks-lite-client/README.md @@ -0,0 +1,3 @@ +# Skywire Skysocks lite client app + +`skysocks-lite-client` app implements lite client for the skysocks use in skysocks-monitor. diff --git a/cmd/skysocks-lite-client/skysocks-lite-client.go b/cmd/skysocks-lite-client/skysocks-lite-client.go new file mode 100644 index 00000000..8cbcb55d --- /dev/null +++ b/cmd/skysocks-lite-client/skysocks-lite-client.go @@ -0,0 +1,130 @@ +// Package main cmd/skysocks-lite-client/skysocks-lite-client.go +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net" + "os" + "time" + + "github.com/skycoin/skywire-services/internal/skysocks" + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/netutil" + "github.com/skycoin/skywire/pkg/app" + "github.com/skycoin/skywire/pkg/app/appnet" + "github.com/skycoin/skywire/pkg/app/appserver" + "github.com/skycoin/skywire/pkg/routing" + "github.com/skycoin/skywire/pkg/visor/visorconfig" +) + +const ( + netType = appnet.TypeSkynet + socksPort = routing.Port(3) +) + +var r = netutil.NewRetrier(nil, time.Second, netutil.DefaultMaxBackoff, 0, 1) + +func dialServer(ctx context.Context, appCl *app.Client, pk cipher.PubKey) (net.Conn, error) { + appCl.SetDetailedStatus(appserver.AppDetailedStatusStarting) //nolint + var conn net.Conn + err := r.Do(ctx, func() error { + var err error + conn, err = appCl.Dial(appnet.Addr{ + Net: netType, + PubKey: pk, + Port: socksPort, + }) + return err + }) + if err != nil { + return nil, err + } + + return conn, nil +} + +func main() { + appCl := app.NewClient(nil) + defer appCl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := buildinfo.Get().WriteTo(os.Stdout); err != nil { + print(fmt.Sprintf("Failed to output build info: %v\n", err)) + } + + var addr = flag.String("addr", visorconfig.SkysocksClientAddr, "Client address to listen on") + var serverPK = flag.String("srv", "", "PubKey of the server to connect to") + flag.Parse() + + if *serverPK == "" { + err := errors.New("Empty server PubKey. Exiting") + print(fmt.Sprintf("%v\n", err)) + setAppErr(appCl, err) + os.Exit(1) + } + + pk := cipher.PubKey{} + if err := pk.UnmarshalText([]byte(*serverPK)); err != nil { + print(fmt.Sprintf("Invalid server PubKey: %v\n", err)) + setAppErr(appCl, err) + os.Exit(1) + } + defer setAppStatus(appCl, appserver.AppDetailedStatusStopped) + setAppPort(appCl, appCl.Config().RoutingPort) + for { + conn, err := dialServer(ctx, appCl, pk) + if err != nil { + print(fmt.Sprintf("Failed to dial to a server: %v\n", err)) + setAppErr(appCl, err) + os.Exit(1) + } + + fmt.Printf("Connected to %v\n", pk) + client, err := skysocks.NewClient(conn, appCl) + if err != nil { + print(fmt.Sprintf("Failed to create a new client: %v\n", err)) + setAppErr(appCl, err) + os.Exit(1) + } + + fmt.Printf("Serving proxy client %v\n", *addr) + setAppStatus(appCl, appserver.AppDetailedStatusRunning) + + if err := client.ListenAndServe(*addr); err != nil { + print(fmt.Sprintf("Error serving proxy client: %v\n", err)) + } + + // need to filter this out, cause usually client failure means app conn is already closed + if err := conn.Close(); err != nil && err != io.ErrClosedPipe { + print(fmt.Sprintf("Error closing app conn: %v\n", err)) + } + + fmt.Println("Reconnecting to skysocks server") + setAppStatus(appCl, appserver.AppDetailedStatusReconnecting) + } +} + +func setAppErr(appCl *app.Client, err error) { + if appErr := appCl.SetError(err.Error()); appErr != nil { + print(fmt.Sprintf("Failed to set error %v: %v\n", err, appErr)) + } +} + +func setAppStatus(appCl *app.Client, status appserver.AppDetailedStatus) { + if err := appCl.SetDetailedStatus(string(status)); err != nil { + print(fmt.Sprintf("Failed to set status %v: %v\n", status, err)) + } +} + +func setAppPort(appCl *app.Client, port routing.Port) { + if err := appCl.SetAppPort(port); err != nil { + print(fmt.Sprintf("Failed to set port %v: %v\n", port, err)) + } +} diff --git a/cmd/skysocks-monitor/README.md b/cmd/skysocks-monitor/README.md new file mode 100644 index 00000000..944b7d6e --- /dev/null +++ b/cmd/skysocks-monitor/README.md @@ -0,0 +1,16 @@ +# Skysocks Monitor + +## API endpoints + +### GET `/health`
+Gets the health info of the service. e.g. +``` +{ + "build_info": { + "version": "v1.0.1-267-ge1617c5b", + "commit": "e1617c5b0121182cfd2b610dc518e4753e56440e", + "date": "2022-10-25T11:01:52Z" + }, + "started_at": "2022-10-25T11:10:45.152629597Z" +} +``` diff --git a/cmd/skysocks-monitor/commands/root.go b/cmd/skysocks-monitor/commands/root.go new file mode 100644 index 00000000..009bff0d --- /dev/null +++ b/cmd/skysocks-monitor/commands/root.go @@ -0,0 +1,128 @@ +// Package commands cmd/skysocks-monitor/commands/root.go +package commands + +import ( + "context" + "log" + "os" + "time" + + cc "github.com/ivanpirog/coloredcobra" + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/tcpproxy" + "github.com/spf13/cobra" + + "github.com/skycoin/skywire-services/pkg/skysocks-monitor/api" +) + +var ( + confPath string + addr string + tag string + sleepDeregistration time.Duration +) + +func init() { + RootCmd.Flags().StringVarP(&addr, "addr", "a", ":9081", "address to bind to.\033[0m") + RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 10, "Sleep time for derigstration process in minutes\033[0m") + RootCmd.Flags().StringVarP(&confPath, "config", "c", "skysocks-monitor.json", "config file location.\033[0m") + RootCmd.Flags().StringVar(&tag, "tag", "skysocks_monitor", "logging tag\033[0m") + var helpflag bool + RootCmd.SetUsageTemplate(help) + RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for skysocks-monitor") + RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) + RootCmd.PersistentFlags().MarkHidden("help") //nolint +} + +// RootCmd contains the root command +var RootCmd = &cobra.Command{ + Use: "skysocksmon", + Short: "Skysocks monitor.", + Long: ` + ┌─┐┬┌─┬ ┬┌─┐┌─┐┌─┐┬┌─┌─┐ ┌┬┐┌─┐┌┐┌┬┌┬┐┌─┐┬─┐ + └─┐├┴┐└┬┘└─┐│ ││ ├┴┐└─┐───││││ │││││ │ │ │├┬┘ + └─┘┴ ┴ ┴ └─┘└─┘└─┘┴ ┴└─┘ ┴ ┴└─┘┘└┘┴ ┴ └─┘┴└─`, + SilenceErrors: true, + SilenceUsage: true, + DisableSuggestions: true, + DisableFlagsInUseLine: true, + Version: buildinfo.Version(), + Run: func(_ *cobra.Command, _ []string) { + visorBuildInfo := buildinfo.Get() + if _, err := visorBuildInfo.WriteTo(os.Stdout); err != nil { + log.Printf("Failed to output build info: %v", err) + } + + mLogger := logging.NewMasterLogger() + conf := api.InitConfig(confPath, mLogger) + + srvURLs := api.ServicesURLs{ + SD: conf.Launcher.ServiceDisc, + UT: conf.UptimeTracker.Addr, + } + + logger := mLogger.PackageLogger("skysocks_monitor") + + logger.WithField("addr", addr).Info("Serving discovery API...") + + smSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint + + smConfig := api.Config{ + PK: conf.PK, + SK: conf.SK, + Sign: smSign, + } + + smAPI := api.New(logger, srvURLs, smConfig) + + ctx, cancel := cmdutil.SignalContext(context.Background(), logger) + defer cancel() + + go smAPI.InitDeregistrationLoop(ctx, conf, sleepDeregistration) + + go func() { + if err := tcpproxy.ListenAndServe(addr, smAPI); err != nil { + logger.Errorf("serve: %v", err) + cancel() + } + }() + + <-ctx.Done() + if err := smAPI.Visor.Close(); err != nil { + logger.WithError(err).Error("Visor closed with error.") + } + }, +} + +// Execute executes root CLI command. +func Execute() { + cc.Init(&cc.Config{ + RootCmd: RootCmd, + Headings: cc.HiBlue + cc.Bold, //+ cc.Underline, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + //FlagsDataType: cc.HiBlue, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + if err := RootCmd.Execute(); err != nil { + log.Fatal("Failed to execute command: ", err) + } +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" diff --git a/cmd/skysocks-monitor/skysocks-monitor.go b/cmd/skysocks-monitor/skysocks-monitor.go new file mode 100644 index 00000000..a5d92ba7 --- /dev/null +++ b/cmd/skysocks-monitor/skysocks-monitor.go @@ -0,0 +1,8 @@ +// Package main cmd/skysocks-monitor/skysocks-monitor.go +package main + +import "github.com/skycoin/skywire-services/cmd/skysocks-monitor/commands" + +func main() { + commands.Execute() +} diff --git a/cmd/tpd-monitor/commands/root.go b/cmd/tpd-monitor/commands/root.go index 13df606e..f88273b9 100644 --- a/cmd/tpd-monitor/commands/root.go +++ b/cmd/tpd-monitor/commands/root.go @@ -35,7 +35,7 @@ var ( func init() { RootCmd.Flags().StringVarP(&addr, "addr", "a", ":9080", "address to bind to.\033[0m") RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 10, "Sleep time for deregistration process in minutes\033[0m") - RootCmd.Flags().StringVarP(&confPath, "config", "c", "dmsg-monitor.json", "config file location.\033[0m") + RootCmd.Flags().StringVarP(&confPath, "config", "c", "tpd-monitor.json", "config file location.\033[0m") RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "info", "set log level one of: info, error, warn, debug, trace, panic") RootCmd.Flags().StringVar(&dmsgURL, "dmsg-url", "", "url to dmsg data.\033[0m") RootCmd.Flags().StringVar(&tpdURL, "tpd-url", "", "url to transport discovery.\033[0m") @@ -105,26 +105,26 @@ var RootCmd = &cobra.Command{ monitorSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint - var monitorConfig api.DMSGMonitorConfig + var monitorConfig api.TpdMonitorConfig monitorConfig.PK = conf.PK monitorConfig.Sign = monitorSign - dmsgMonitorAPI := api.New(logger, srvURLs, monitorConfig) + tpdMonitorAPI := api.New(logger, srvURLs, monitorConfig) ctx, cancel := cmdutil.SignalContext(context.Background(), logger) defer cancel() - go dmsgMonitorAPI.InitDeregistrationLoop(ctx, conf, sleepDeregistration) + go tpdMonitorAPI.InitDeregistrationLoop(ctx, conf, sleepDeregistration) go func() { - if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { + if err := tcpproxy.ListenAndServe(addr, tpdMonitorAPI); err != nil { logger.Errorf("serve: %v", err) cancel() } }() <-ctx.Done() - if err := dmsgMonitorAPI.Visor.Close(); err != nil { + if err := tpdMonitorAPI.Visor.Close(); err != nil { logger.WithError(err).Error("Visor closed with error.") } }, diff --git a/docker/config/skysocks-monitor.json b/docker/config/skysocks-monitor.json new file mode 100644 index 00000000..b8e6b13e --- /dev/null +++ b/docker/config/skysocks-monitor.json @@ -0,0 +1,42 @@ +{ + "version": "v0.2.0-1392-geff996ba", + "sk": "7cc6c5f12cf79e4cd003e2f49e8bc9a8a12c6b0a9353c296d6b6a4d400749d2d", + "pk": "0216a2b84c5ec409c1aef3fc7c73770d2182cfda8d51ee1dc6bfd507424894b038", + "dmsg": { + "discovery": "http://dmsg-discovery:9090", + "sessions_count": 1, + "servers": [] + }, + "transport": { + "discovery": "http://transport-discovery:9091", + "address_resolver": "http://address-resolver:9093", + "public_autoconnect": false, + "transport_setup_nodes": null + }, + "routing": { + "setup_nodes": [ + "02603d53d49b6575a0b8cee05b70dd23c86e42cd6cba99af769d61a6196ea2bcb1" + ], + "route_finder": "http://route-finder:9092", + "route_finder_timeout": "10s", + "min_hops": 0 + }, + "uptime_tracker": { + "addr": "http://uptime-tracker:9096" + }, + "launcher": { + "service_discovery": "http://service-discovery:9098", + "apps": [], + "server_addr": "localhost:5510", + "bin_path": "./release" + }, + "hypervisors": [], + "cli_addr": "localhost:3435", + "log_level": "info", + "local_path": "./local/network-monitor", + "stun_servers": null, + "shutdown_timeout": "10s", + "restart_check_delay": "1s", + "is_public": false, + "persistent_transports": null +} diff --git a/docker/docker_build.sh b/docker/docker_build.sh index 7e6eed99..76276b34 100755 --- a/docker/docker_build.sh +++ b/docker/docker_build.sh @@ -247,6 +247,13 @@ DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/tpd-monitor/Dockerfile \ --build-arg image_tag="$image_tag" \ -t "$registry"/tpd-monitor:"$image_tag" . +echo "building skysocks monitor image" +DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/skysocks-monitor/Dockerfile \ + --build-arg base_image="$base_image" \ + --build-arg build_opts="$go_buildopts" \ + --build-arg image_tag="$image_tag" \ + -t "$registry"/skysocks-monitor:"$image_tag" . + echo "building transport setup image" DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/transport-setup/Dockerfile \ --build-arg base_image="$base_image" \ diff --git a/docker/docker_clean.sh b/docker/docker_clean.sh index b5f0f1b5..18dbf62f 100755 --- a/docker/docker_clean.sh +++ b/docker/docker_clean.sh @@ -25,6 +25,7 @@ declare -a images_arr=( "skycoin/dmsg-monitor:${image_tag}" "skycoin/tpd-monitor:${image_tag}" "skycoin/transport-setup:${image_tag}" + "skycoin/skysocks-monitor:${image_tag}" ) for i in "${images_arr[@]}"; do diff --git a/docker/docker_push.sh b/docker/docker_push.sh index ec53be1a..0bc7f43e 100755 --- a/docker/docker_push.sh +++ b/docker/docker_push.sh @@ -28,6 +28,7 @@ declare -a images_arr=( "dmsg-monitor" "tpd-monitor" "transport-setup" + "skysocks-monitor" ) echo "Pushing to $registry using tag: $tag" diff --git a/docker/images/skysocks-monitor/Dockerfile b/docker/images/skysocks-monitor/Dockerfile new file mode 100644 index 00000000..a894590d --- /dev/null +++ b/docker/images/skysocks-monitor/Dockerfile @@ -0,0 +1,35 @@ +ARG image_tag +ARG base_image + +FROM ${base_image} as builder + +ARG build_opts + +COPY . /skywire-services +WORKDIR /skywire-services + +RUN go build "${build_opts}" -o /release/skysocks-monitor ./cmd/skysocks-monitor && \ + go build "${build_opts}" -o /release/skysocks-client ./cmd/skysocks-lite-client + +FROM alpine as prod +COPY --from=builder /release/skysocks-client /apps/skysocks-client +COPY --from=builder /release/skysocks-monitor /release/skysocks-monitor +ENTRYPOINT ["/release/skysocks-monitor"] + +FROM prod as test + +# OS image +FROM alpine as e2e +WORKDIR /release + +COPY ./docker/common/install-prequisites.sh /release/install-prequisites.sh +RUN sh -c /release/install-prequisites.sh cert-only \ + && rm -rf /release/install-prequisites.sh + +COPY --from=builder /release/skysocks-client /release/skysocks-client +COPY --from=builder /release/skysocks-monitor /release/skysocks-monitor +ENTRYPOINT ["/release/skysocks-monitor"] + +FROM e2e as integration + +FROM ${image_tag} diff --git a/internal/skysocks/client.go b/internal/skysocks/client.go new file mode 100644 index 00000000..c1b85b8d --- /dev/null +++ b/internal/skysocks/client.go @@ -0,0 +1,195 @@ +// Package skysocks client.go +package skysocks + +import ( + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/hashicorp/yamux" + ipc "github.com/james-barrow/golang-ipc" + + "github.com/skycoin/skywire/pkg/app" + "github.com/skycoin/skywire/pkg/router" + "github.com/skycoin/skywire/pkg/skyenv" +) + +// Client implement multiplexing proxy client using yamux. +type Client struct { + appCl *app.Client + session *yamux.Session + listener net.Listener + once sync.Once + closeC chan struct{} +} + +// NewClient constructs a new Client. +func NewClient(conn net.Conn, appCl *app.Client) (*Client, error) { + c := &Client{ + appCl: appCl, + closeC: make(chan struct{}), + } + + sessionCfg := yamux.DefaultConfig() + sessionCfg.EnableKeepAlive = false + session, err := yamux.Client(conn, sessionCfg) + if err != nil { + return nil, fmt.Errorf("error creating client: yamux: %w", err) + } + + c.session = session + + go c.sessionKeepAliveLoop() + + return c, nil +} + +// ListenAndServe start tcp listener on addr and proxies incoming +// connection to a remote proxy server. +func (c *Client) ListenAndServe(addr string) error { + l, err := net.Listen("tcp", addr) + if err != nil { + if c.appCl != nil { + c.setAppError(err) + } + return fmt.Errorf("listen: %w", err) + } + + fmt.Printf("Listening skysocks client on %s", addr) + + c.listener = l + + for { + select { + case <-c.closeC: + return nil + default: + } + + conn, err := l.Accept() + if err != nil { + fmt.Printf("Error accepting: %v\n", err) + return fmt.Errorf("accept: %w", err) + } + + fmt.Println("Accepted skysocks client") + + stream, err := c.session.Open() + if err != nil { + c.close() + + return fmt.Errorf("error opening yamux stream: %w", err) + } + + fmt.Println("Opened session skysocks client") + + go c.handleStream(conn, stream) + } +} + +func (c *Client) sessionKeepAliveLoop() { + ticker := time.NewTicker(router.DefaultRouteKeepAlive / 2) + defer ticker.Stop() + + for { + select { + case <-c.closeC: + return + case <-ticker.C: + if c.session.IsClosed() { + c.close() + + return + } + } + } +} + +func (c *Client) handleStream(conn, stream net.Conn) { + const errorCount = 2 + + errCh := make(chan error, errorCount) + + go func() { + _, err := io.Copy(stream, conn) + errCh <- err + }() + + go func() { + _, err := io.Copy(conn, stream) + errCh <- err + }() + + var connClosed, streamClosed bool + + for err := range errCh { + if !connClosed { + if err := conn.Close(); err != nil { + fmt.Printf("Failed to close connection: %v\n", err) + } + + connClosed = true + } + + if !streamClosed { + if err := stream.Close(); err != nil { + fmt.Printf("Failed to close stream: %v\n", err) + } + + streamClosed = true + } + + if err != nil { + print(fmt.Sprintf("Copy error: %v\n", err)) + } + } + + close(errCh) + + if c.session.IsClosed() { + c.close() + } +} + +func (c *Client) close() { + print("Session failed, closing skysocks client") + if err := c.Close(); err != nil { + print(fmt.Sprintf("Error closing skysocks client: %v\n", err)) + } +} + +// ListenIPC starts named-pipe based connection server for windows or unix socket for other OSes +func (c *Client) ListenIPC(client *ipc.Client) { + listenIPC(client, skyenv.SkychatName+"-client", func() { + client.Close() + if err := c.Close(); err != nil { + print(fmt.Sprintf("Error closing skysocks-client: %v\n", err)) + } + }) +} + +func (c *Client) setAppError(appErr error) { + if err := c.appCl.SetError(appErr.Error()); err != nil { + print(fmt.Sprintf("Failed to set error %v: %v\n", appErr, err)) + } +} + +// Close implement io.Closer. +func (c *Client) Close() error { + if c == nil { + return nil + } + + var err error + c.once.Do(func() { + fmt.Println("Closing proxy client") + + close(c.closeC) + + err = c.listener.Close() + }) + + return err +} diff --git a/internal/skysocks/common.go b/internal/skysocks/common.go new file mode 100644 index 00000000..3d70aa8b --- /dev/null +++ b/internal/skysocks/common.go @@ -0,0 +1,24 @@ +// Package skysocks internal/skysocks/common.go +package skysocks + +import ( + "fmt" + + ipc "github.com/james-barrow/golang-ipc" + + "github.com/skycoin/skywire/pkg/skyenv" +) + +func listenIPC(ipcClient *ipc.Client, appName string, onClose func()) { + for { + m, err := ipcClient.Read() + if err != nil { + fmt.Printf("%s IPC received error: %v", appName, err) + } + if m.MsgType == skyenv.IPCShutdownMessageType { + fmt.Println("Stopping " + appName + " via IPC") + break + } + } + onClose() +} diff --git a/pkg/skysocks-monitor/api/api.go b/pkg/skysocks-monitor/api/api.go new file mode 100644 index 00000000..88ae6813 --- /dev/null +++ b/pkg/skysocks-monitor/api/api.go @@ -0,0 +1,419 @@ +// Package api pkg/skysocks-monitor/api/api.go +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "net/url" + "sync/atomic" + "time" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/sirupsen/logrus" + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/httputil" + "github.com/skycoin/skywire-utilities/pkg/logging" + utilenv "github.com/skycoin/skywire-utilities/pkg/skyenv" + "github.com/skycoin/skywire/pkg/app/appserver" + "github.com/skycoin/skywire/pkg/servicedisc" + "github.com/skycoin/skywire/pkg/skyenv" + "github.com/skycoin/skywire/pkg/transport/network" + "github.com/skycoin/skywire/pkg/visor" + "github.com/skycoin/skywire/pkg/visor/visorconfig" + + "github.com/skycoin/skywire-services/internal/vpn" +) + +// API register all the API endpoints. +// It implements a net/http.Handler. +type API struct { + http.Handler + Config + ServicesURLs + + Visor *visor.Visor + + skysocksKeys []cipher.PubKey + deadSkysockss []string + logger logging.Logger + startedAt time.Time +} + +// Config is struct for keys and sign value of VM +type Config struct { + PK cipher.PubKey + SK cipher.SecKey + Sign cipher.Sig +} + +// ServicesURLs is struct for organizing URLs of services +type ServicesURLs struct { + SD string + UT string +} + +// HealthCheckResponse is struct of /health endpoint +type HealthCheckResponse struct { + BuildInfo *buildinfo.Info `json:"build_info,omitempty"` + StartedAt time.Time `json:"started_at,omitempty"` +} + +// Error is the object returned to the client when there's an error. +type Error struct { + Error string `json:"error"` +} + +// New returns a new *chi.Mux object, which can be started as a server +func New(logger *logging.Logger, srvURLs ServicesURLs, vmConfig Config) *API { + + api := &API{ + Config: vmConfig, + ServicesURLs: srvURLs, + logger: *logger, + startedAt: time.Now(), + } + r := chi.NewRouter() + + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + r.Use(httputil.SetLoggerMiddleware(logger)) + r.Get("/health", api.health) + api.Handler = r + + return api +} + +func (api *API) health(w http.ResponseWriter, r *http.Request) { + info := buildinfo.Get() + api.writeJSON(w, r, http.StatusOK, HealthCheckResponse{ + BuildInfo: info, + StartedAt: api.startedAt, + }) +} + +func (api *API) writeJSON(w http.ResponseWriter, r *http.Request, code int, object interface{}) { + jsonObject, err := json.Marshal(object) + if err != nil { + api.log(r).WithError(err).Errorf("failed to encode json response") + w.WriteHeader(http.StatusInternalServerError) + + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + + _, err = w.Write(jsonObject) + if err != nil { + api.log(r).WithError(err).Errorf("failed to write json response") + } +} + +func (api *API) log(r *http.Request) logrus.FieldLogger { + return httputil.GetLogger(r) +} + +// InitDeregistrationLoop is function which runs periodic background tasks of API. +func (api *API) InitDeregistrationLoop(ctx context.Context, conf *visorconfig.V1, sleepDeregistration time.Duration) { + // Start a visor + api.startVisor(ctx, conf) + + for { + select { + case <-ctx.Done(): + return + default: + api.deregister() + time.Sleep(sleepDeregistration * time.Minute) + } + } +} + +// deregister dead Skysockss entries in service discovery +func (api *API) deregister() { + api.logger.Info("Skysocks Deregistration started.") + + // reload keys + api.getSkysocksKeys() + + // monitoring Skysockss + onlineSkysocksCount := int64(0) + api.deadSkysockss = []string{} + var allDeadSkysockss []string + if len(api.skysocksKeys) == 0 { + api.logger.Warn("No Skysocks keys found") + } else { + for _, key := range api.skysocksKeys { + api.testSkysocks(key, &onlineSkysocksCount) + if len(api.deadSkysockss) >= 10 { + api.skysocksDeregister(api.deadSkysockss) + allDeadSkysockss = append(allDeadSkysockss, api.deadSkysockss...) + api.deadSkysockss = []string{} + } + } + api.logger.WithField("count", onlineSkysocksCount).Info("Skysockss online.") + + // deregister dead Skysockss + if len(api.deadSkysockss) > 0 { + api.skysocksDeregister(api.deadSkysockss) + } + } + + api.logger.WithField("Number of dead Skysockss", len(allDeadSkysockss)).WithField("PKs", allDeadSkysockss).Info("Skysocks Deregistration completed.") +} + +func (api *API) testSkysocks(key cipher.PubKey, onlineSkysocksCount *int64) { + + online := api.isOnline(key) + + if online { + atomic.AddInt64(onlineSkysocksCount, 1) + } + + if !online { + api.deadSkysockss = append(api.deadSkysockss, key.Hex()) + } +} + +func (api *API) isOnline(key cipher.PubKey) (ok bool) { + transport := network.DMSG + + tp, err := api.Visor.AddTransport(key, string(transport), time.Second*10) + if err != nil { + api.logger.WithError(err).Warnf("Failed to establish %v transport", transport) + return false + } + + var latency time.Duration + api.logger.Infof("Established %v transport to %v", transport, key) + // We use the name skysocks-client and not skysocks-lite-client here to get around the constraint that + // -srv flag can only be set for skysocks-client and skysocks-client. + // And due to this the binary should also be named as skysocks-client and not skysocks-client-lite + sum, skysocksErr := RunSkysocksClient(api.Visor, key, skyenv.SkysocksClientName) + time.Sleep(time.Second * 4) + ok = true + + switch skysocksErr { + case nil: + if len(sum) > 0 { + latency = sum[0].Latency + } + case vpn.ErrNotPermitted: + api.logger.WithError(skysocksErr).Infof("Skysocks error on %v transport of %v.", transport, key) + default: + api.logger.WithError(skysocksErr).Infof("Skysocks error on %v transport of %v.", transport, key) + ok = false + } + + err = api.Visor.RemoveTransport(tp.ID) + if err != nil { + api.logger.Warnf("Error removing %v transport of %v: %v", transport, key, err) + } + + if ok && latency != 0 { + return ok + } + + return ok +} + +func (api *API) skysocksDeregister(keys []string) { + err := api.deregisterRequest(keys, fmt.Sprintf(api.ServicesURLs.SD+"/api/services/deregister/skysocks")) + if err != nil { + api.logger.Warn(err) + return + } + api.logger.Info("Deregister request send to SD") +} + +// deregisterRequest is deregistration handler for all services +func (api *API) deregisterRequest(keys []string, rawReqURL string) error { + reqURL, err := url.Parse(rawReqURL) + if err != nil { + return fmt.Errorf("error on parsing deregistration URL : %v", err) + } + + jsonData, err := json.Marshal(keys) + if err != nil { + return fmt.Errorf("error on parsing deregistration keys : %v", err) + } + body := bytes.NewReader(jsonData) + + req := &http.Request{ + Method: "DELETE", + URL: reqURL, + Header: map[string][]string{ + "NM-PK": {api.Config.PK.Hex()}, + "NM-Sign": {api.Config.Sign.Hex()}, + }, + Body: io.NopCloser(body), + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("error on send deregistration request : %s", err) + } + defer func(Body io.ReadCloser) { + _ = Body.Close() //nolint + }(res.Body) + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("error deregistering skysocks keys: status code %v", res.StatusCode) + } + + return nil +} + +type skysocksList []servicedisc.Service + +func getSkysockss(sdURL string) (data skysocksList, err error) { + res, err := http.Get(sdURL + "/api/services?type=skysocks") //nolint + + if err != nil { + return nil, err + } + + body, err := io.ReadAll(res.Body) + + if err != nil { + return nil, err + } + + err = json.Unmarshal(body, &data) + if err != nil { + return nil, err + } + return data, nil +} + +func (api *API) getSkysocksKeys() { + skysockss, err := getSkysockss(api.ServicesURLs.SD) + if err != nil { + api.logger.Warn("Error while fetching skysockss: %v", err) + return + } + if len(skysockss) == 0 { + api.logger.Warn("No skysockss found... Trying again") + } + //randomize the order of the skysocks entries + rand.Shuffle(len(skysockss), func(i, j int) { + skysockss[i], skysockss[j] = skysockss[j], skysockss[i] + }) + api.skysocksKeys = []cipher.PubKey{} + for _, skysocksEntry := range skysockss { + api.skysocksKeys = append(api.skysocksKeys, skysocksEntry.Addr.PubKey()) + } + + api.logger.WithField("skysockss", len(skysockss)).Info("Skysocks keys updated.") +} + +func (api *API) startVisor(ctx context.Context, conf *visorconfig.V1) { + conf.SetLogger(logging.NewMasterLogger()) + v, ok := visor.NewVisor(ctx, conf) + if !ok { + api.logger.Fatal("Failed to start visor.") + } + api.Visor = v +} + +// RunSkysocksClient runs a skysocks-client which connects to skysocks server +func RunSkysocksClient(v *visor.Visor, serverPK cipher.PubKey, appName string) ([]appserver.ConnectionSummary, error) { + err := v.SetAppPK(appName, serverPK) + if err != nil { + return []appserver.ConnectionSummary{}, err + } + err = v.StartApp(appName) + if err != nil { + return []appserver.ConnectionSummary{}, err + } + + time.Sleep(time.Second * 15) + appErr, _ := v.GetAppError(appName) //nolint + if appErr == vpn.ErrNotPermitted.Error() { + return []appserver.ConnectionSummary{}, vpn.ErrNotPermitted + } + if appErr == vpn.ErrServerOffline.Error() { + return []appserver.ConnectionSummary{}, vpn.ErrServerOffline + } + sum, err := v.GetAppConnectionsSummary(appName) + if err != nil { + return []appserver.ConnectionSummary{}, err + } + time.Sleep(time.Second * 2) + err = v.StopApp(appName) + if err != nil { + return []appserver.ConnectionSummary{}, err + } + + return sum, nil +} + +// InitConfig to initialize config +func InitConfig(confPath string, mLog *logging.MasterLogger) *visorconfig.V1 { + log := mLog.PackageLogger("skysocks_monitor:config") + log.Info("Reading config from file.") + log.WithField("filepath", confPath).Info() + + oldConf, err := visorconfig.ReadFile(confPath) + if err != nil { + log.WithError(err).Fatal("Failed to read config file.") + } + var testEnv bool + if oldConf.Dmsg.Discovery == utilenv.TestDmsgDiscAddr { + testEnv = true + } + // have same services as old config + services := &visorconfig.Services{ + DmsgDiscovery: oldConf.Dmsg.Discovery, + TransportDiscovery: oldConf.Transport.Discovery, + AddressResolver: oldConf.Transport.AddressResolver, + RouteFinder: oldConf.Routing.RouteFinder, + RouteSetupNodes: oldConf.Routing.RouteSetupNodes, + UptimeTracker: oldConf.UptimeTracker.Addr, + ServiceDiscovery: oldConf.Launcher.ServiceDisc, + } + // update old config + conf, err := visorconfig.MakeDefaultConfig(mLog, &oldConf.SK, false, false, testEnv, false, false, confPath, "", services) + if err != nil { + log.WithError(err).Fatal("Failed to create config.") + } + + // have the same apps that the old config had + var newConfLauncherApps []appserver.AppConfig + for _, app := range conf.Launcher.Apps { + for _, oldApp := range oldConf.Launcher.Apps { + if app.Name == oldApp.Name { + newConfLauncherApps = append(newConfLauncherApps, oldApp) + } + } + } + conf.Launcher.Apps = newConfLauncherApps + + conf.Version = oldConf.Version + conf.LocalPath = oldConf.LocalPath + conf.Launcher.BinPath = oldConf.Launcher.BinPath + conf.Launcher.ServerAddr = oldConf.Launcher.ServerAddr + conf.CLIAddr = oldConf.CLIAddr + conf.Transport.TransportSetupPKs = oldConf.Transport.TransportSetupPKs + + // following services are not needed + conf.STCP = nil + conf.Dmsgpty = nil + conf.Transport.PublicAutoconnect = false + + // save the config file + if err := conf.Flush(); err != nil { + log.WithError(err).Fatal("Failed to flush config to file.") + } + + return conf +} diff --git a/pkg/tpd-monitor/api/api.go b/pkg/tpd-monitor/api/api.go index 599f827a..60fce30b 100644 --- a/pkg/tpd-monitor/api/api.go +++ b/pkg/tpd-monitor/api/api.go @@ -47,8 +47,8 @@ type API struct { whitelistedPKs map[string]bool } -// DMSGMonitorConfig is struct for Keys and Sign value of dmsg monitor -type DMSGMonitorConfig struct { +// TpdMonitorConfig is struct for Keys and Sign value of tpd monitor +type TpdMonitorConfig struct { PK cipher.PubKey Sign cipher.Sig } @@ -72,7 +72,7 @@ type Error struct { } // New returns a new *chi.Mux object, which can be started as a server -func New(logger *logging.Logger, srvURLs ServicesURLs, monitorConfig DMSGMonitorConfig) *API { +func New(logger *logging.Logger, srvURLs ServicesURLs, monitorConfig TpdMonitorConfig) *API { api := &API{ dmsgURL: srvURLs.DMSG,