Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dmsg hypervisor #563

Open
wants to merge 22 commits into
base: mainnet-milestone2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5ba0e57
still need to integrate transport discovery and setup node
ivcosla Sep 6, 2019
b6eb2d1
removed unused code
ivcosla Sep 6, 2019
5512d1b
Began work to fix bug where visor node restart does not reestablish r…
Sep 6, 2019
add2e38
Changed behaviour of setup.
Sep 8, 2019
0ddde97
Merge pull request #564 from evanlinjin/fix/m1-visor-restart
Sep 8, 2019
660908f
Re-added initTransports for transport.Manager
Sep 8, 2019
bc35458
Merge pull request #565 from evanlinjin/fix/reenable-init-transports
Sep 8, 2019
5a66ea6
fixed tests, moved network listener to cmd, renamed disc address
ivcosla Sep 9, 2019
c7dbfda
fixed minor error, updated postman and added retrial
ivcosla Sep 10, 2019
b8aa779
Merge remote-tracking branch 'origin/mainnet-milestone2' into feature…
Sep 11, 2019
fe5d2fa
Merge mainnet-milestone2
Sep 11, 2019
1e5a8fb
added rpcclientdialer
ivcosla Sep 11, 2019
095dbff
removed retrier
ivcosla Sep 11, 2019
e1017d7
Merge branch 'feature/dmsg-hypervisor' of github.com:ivcosla/skywire …
ivcosla Sep 11, 2019
34a7f58
added unfinished test
ivcosla Sep 11, 2019
2941695
moved to SkycoinProject
ivcosla Sep 17, 2019
5856b42
imported SkycoinProject dmsg, builds and pass tests
ivcosla Sep 17, 2019
25482a2
Merge remote-tracking branch 'skycoinproject/master' into mainnet-mil…
ivcosla Sep 18, 2019
d44422d
fixed imports
ivcosla Sep 18, 2019
dff1bda
Merge pull request #1 from SkycoinProject/mainnet-milestone1
ivcosla Sep 19, 2019
107f38b
Merge branch 'master' into mainnet-milestone2
ivcosla Sep 19, 2019
0033949
moved to new repo
ivcosla Sep 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions cmd/hypervisor/commands/root.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package commands

import (
"context"
"fmt"
"net"
"net/http"
"os"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/spf13/cobra"

"github.com/skycoin/skywire/pkg/hypervisor"
"github.com/skycoin/skywire/pkg/snet"
"github.com/skycoin/skywire/pkg/util/pathutil"
"github.com/spf13/cobra"
)

const configEnv = "SW_HYPERVISOR_CONFIG"
Expand Down Expand Up @@ -62,7 +62,27 @@ var rootCmd = &cobra.Command{

log.Infof("serving RPC on '%s'", rpcAddr)
go func() {
l, err := net.Listen("tcp", rpcAddr)
_, rpcPort, err := config.Interfaces.SplitRPCAddr()
if err != nil {
log.Fatalln("Failed to parse rpc port from rpc address:", err)
}

dmsgConf := snet.Config{
PubKey: config.PK,
SecKey: config.SK,
TpNetworks: []string{snet.DmsgType},

DmsgDiscAddr: config.DmsgDiscovery,
DmsgMinSrvs: 1,
}

network := snet.New(dmsgConf)
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved

ctx := context.Background()
if err := network.Init(ctx); err != nil {
log.Fatalln("failed to init network: %v", err)
}
l, err := network.Listen(snet.DmsgType, rpcPort)
if err != nil {
log.Fatalln("Failed to bind tcp port:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
//replace github.com/skycoin/dmsg => ../dmsg
// replace github.com/skycoin/dmsg => ../dmsg
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
Expand Down
30 changes: 24 additions & 6 deletions pkg/hypervisor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"encoding/hex"
"encoding/json"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"time"

"github.com/skycoin/dmsg/cipher"
Expand Down Expand Up @@ -35,12 +37,13 @@ func (hk *Key) UnmarshalText(text []byte) error {

// Config configures the hypervisor.
type Config struct {
PK cipher.PubKey `json:"public_key"`
SK cipher.SecKey `json:"secret_key"`
DBPath string `json:"db_path"` // Path to store database file.
EnableAuth bool `json:"enable_auth"` // Whether to enable user management.
Cookies CookieConfig `json:"cookies"` // Configures cookies (for session management).
Interfaces InterfaceConfig `json:"interfaces"` // Configures exposed interfaces.
PK cipher.PubKey `json:"public_key"`
SK cipher.SecKey `json:"secret_key"`
DBPath string `json:"db_path"` // Path to store database file.
EnableAuth bool `json:"enable_auth"` // Whether to enable user management.
Cookies CookieConfig `json:"cookies"` // Configures cookies (for session management).
Interfaces InterfaceConfig `json:"interfaces"` // Configures exposed interfaces.
DmsgDiscovery string `json:"dmsg_discovery"` // DmsgDiscovery address for dmsg usage
}

func makeConfig() Config {
Expand Down Expand Up @@ -134,3 +137,18 @@ func (c *InterfaceConfig) FillDefaults() {
c.HTTPAddr = ":8080"
c.RPCAddr = ":7080"
}

// SplitRPCAddr returns host and port and whatever error results from parsing the rpc address interface
func (c *InterfaceConfig) SplitRPCAddr() (host string, port uint16, err error) {
addr, err := url.Parse(c.RPCAddr)
if err != nil {
return
}

uint64port, err := strconv.ParseUint(addr.Port(), 10, 16)
if err != nil {
return
}

return addr.Host, uint16(uint64port), nil
}
25 changes: 13 additions & 12 deletions pkg/hypervisor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/rpc"
"strconv"
Expand All @@ -16,9 +15,10 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/google/uuid"
"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/noise"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/snet"

"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/httputil"
Expand All @@ -32,7 +32,7 @@ var (
)

type appNodeConn struct {
Addr *noise.Addr
Addr dmsg.Addr
Client visor.RPCClient
}

Expand Down Expand Up @@ -61,13 +61,14 @@ func NewNode(config Config) (*Node, error) {
}

// ServeRPC serves RPC of a Node.
func (m *Node) ServeRPC(lis net.Listener) error {
func (m *Node) ServeRPC(lis *snet.Listener) error {

for {
conn, err := noise.WrapListener(lis, m.c.PK, m.c.SK, false, noise.HandshakeXK).Accept()
conn, err := lis.AcceptConn()
if err != nil {
return err
}
addr := conn.RemoteAddr().(*noise.Addr)
addr := conn.RemoteAddr().(dmsg.Addr)
m.mu.Lock()
m.nodes[addr.PK] = appNodeConn{
Addr: addr,
Expand Down Expand Up @@ -100,9 +101,9 @@ func (m *Node) AddMockData(config MockConfig) error {
}
m.mu.Lock()
m.nodes[pk] = appNodeConn{
Addr: &noise.Addr{
Addr: dmsg.Addr{
PK: pk,
Addr: mockAddr(fmt.Sprintf("0.0.0.0:%d", i)),
Port: uint16(i),
},
Client: client,
}
Expand Down Expand Up @@ -246,7 +247,7 @@ func (m *Node) getNodes() http.HandlerFunc {
summary = &visor.Summary{PubKey: pk}
}
summaries = append(summaries, summaryResp{
TCPAddr: c.Addr.Addr.String(),
TCPAddr: c.Addr.String(),
Summary: summary,
})
}
Expand All @@ -264,7 +265,7 @@ func (m *Node) getNode() http.HandlerFunc {
return
}
httputil.WriteJSON(w, r, http.StatusOK, summaryResp{
TCPAddr: ctx.Addr.Addr.String(),
TCPAddr: ctx.Addr.String(),
Summary: summary,
})
})
Expand Down Expand Up @@ -573,7 +574,7 @@ func (m *Node) getLoops() http.HandlerFunc {
<<< Helper functions >>>
*/

func (m *Node) client(pk cipher.PubKey) (*noise.Addr, visor.RPCClient, bool) {
func (m *Node) client(pk cipher.PubKey) (dmsg.Addr, visor.RPCClient, bool) {
m.mu.RLock()
conn, ok := m.nodes[pk]
m.mu.RUnlock()
Expand All @@ -583,7 +584,7 @@ func (m *Node) client(pk cipher.PubKey) (*noise.Addr, visor.RPCClient, bool) {
type httpCtx struct {
// Node
PK cipher.PubKey
Addr *noise.Addr
Addr dmsg.Addr
RPC visor.RPCClient

// App
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func ensureDir(path string) (string, error) {
// HypervisorConfig represents hypervisor configuration.
type HypervisorConfig struct {
PubKey cipher.PubKey `json:"public_key"`
Addr string `json:"address"`
Port uint16 `json:"port"`
}

// DmsgConfig represents dmsg configuration.
Expand Down
31 changes: 8 additions & 23 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/noise"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app"
Expand Down Expand Up @@ -109,7 +108,6 @@ type Node struct {
pidMu sync.Mutex

rpcListener net.Listener
rpcDialers []*noise.RPCClientDialer
}

// NewNode constructs new Node.
Expand Down Expand Up @@ -207,15 +205,6 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
}
node.rpcListener = l
}
node.rpcDialers = make([]*noise.RPCClientDialer, len(config.Hypervisors))
for i, entry := range config.Hypervisors {
node.rpcDialers[i] = noise.NewRPCClientDialer(entry.Addr, noise.HandshakeXK, noise.Config{
LocalPK: pk,
LocalSK: sk,
RemotePK: entry.PubKey,
Initiator: true,
})
}

return node, err
}
Expand Down Expand Up @@ -247,12 +236,15 @@ func (node *Node) Start() error {
node.logger.Info("Starting RPC interface on ", node.rpcListener.Addr())
go rpcSvr.Accept(node.rpcListener)
}
for _, dialer := range node.rpcDialers {
go func(dialer *noise.RPCClientDialer) {
if err := dialer.Run(rpcSvr, time.Second); err != nil {
node.logger.Errorf("Dialer exited with error: %v", err)
for _, hypervisor := range node.config.Hypervisors {
go func(hypervisor HypervisorConfig) {
conn, err := node.n.Dial(snet.DmsgType, hypervisor.PubKey, hypervisor.Port)
if err != nil {
node.logger.Errorf("Hypervisor dmsg Dial exited with error: %v", err)
} else {
rpcSvr.ServeConn(conn)
}
}(dialer)
}(hypervisor)
}

node.logger.Info("Starting packet router")
Expand Down Expand Up @@ -334,13 +326,6 @@ func (node *Node) Close() (err error) {
node.logger.Info("RPC interface stopped successfully")
}
}
for i, dialer := range node.rpcDialers {
if err = dialer.Close(); err != nil {
node.logger.WithError(err).Errorf("(%d) failed to stop RPC dialer", i)
} else {
node.logger.Infof("(%d) RPC dialer closed successfully", i)
}
}
node.startedMu.Lock()
for a, bind := range node.startedApps {
if err = node.stopApp(a, bind); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/sirupsen/logrus/hooks/syslog
github.com/skycoin/dmsg/cipher
github.com/skycoin/dmsg
github.com/skycoin/dmsg/disc
github.com/skycoin/dmsg/noise
github.com/skycoin/dmsg/ioutil
github.com/skycoin/dmsg/noise
# github.com/skycoin/skycoin v0.26.0
github.com/skycoin/skycoin/src/util/logging
github.com/skycoin/skycoin/src/cipher
Expand Down