Skip to content

Commit

Permalink
feat: implemented tunnelling
Browse files Browse the repository at this point in the history
  • Loading branch information
DeeStarks committed Aug 12, 2022
1 parent 670012f commit 2ed338c
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 50 deletions.
60 changes: 41 additions & 19 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,54 @@ import (
)

type Server struct {
apps IServices
services IServices
defaultDB *sql.DB
host string
port string
openConns chan<- net.Conn
}

func NewServer() *Server {
// Connect to the default db
defaultDB, err := sql.Open("sqlite3", config.DEFAULT_DB)
if err != nil {
log.Panicln("Could not connect DB:", err)
}

func NewServer(connCh chan<- net.Conn, defaultDB *sql.DB) *Server {
// initialize and start running Services
apps := InitServices(defaultDB)
apps.ServeServices()
services := InitServices(defaultDB)

return &Server{
apps: apps,
services: services,
defaultDB: defaultDB,
openConns: connCh,
}
}

func (s *Server) process(conn net.Conn) {
// Get the servers
addrs := s.apps.GetServiceServers(conn.RemoteAddr().String())
addrs := s.services.GetServiceServers(conn.RemoteAddr().String())
// log.Println(conn.RemoteAddr().String())
if len(addrs) <= 0 {
// If the remote address is unknown, redirect to the welcome server
addrs = s.apps.GetServiceServers("[::1]:80")
addrs = s.services.GetServiceServers(fmt.Sprintf("%s:%s", s.host, s.port))
}

// Get next server from load balancer
// TODO:
// The idea for the load balancer isn't fully formed yet.
// For now, it's always going to select the first server for every connection
lb := tools.NewLoadBalancer(addrs)
addr := lb.GetNextServer()

// Connect to the available server
localConn, err := s.apps.ConnectToServer(addr)
localConn, err := s.services.ConnectToServer(addr)
if err != nil {
log.Println(err)
return
}

// Add local conn to open connections channel
s.openConns <- localConn

// Establish a point-to-point connection between conoid server and app's local server
go func() {
for {
_, err = io.Copy(localConn, conn)
if err != nil {
log.Println("Failed to read from remote connection:", err)
break
}
}
Expand All @@ -68,7 +70,6 @@ func (s *Server) process(conn net.Conn) {
for {
_, err = io.Copy(conn, localConn)
if err != nil {
log.Println("Failed to write to remote connection:", err)
break
}
}
Expand All @@ -77,21 +78,42 @@ func (s *Server) process(conn net.Conn) {

func (s *Server) Serve() {
// Start the server and wait for connections
listener, err := net.Listen("tcp", fmt.Sprintf("[::]:%d", config.TCP_PORT))
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", config.TCP_PORT))
if err != nil {
log.Println(err)
return
}
host, port, err := net.SplitHostPort(listener.Addr().String())
if err != nil {
log.Println(err)
return
}
log.Printf("Conoid started and listening on port %d\n", config.TCP_PORT)
s.host = host
s.port = port
log.Printf("Conoid listening on host: %s, port %s\n", host, port)

// Start running services
s.services.ServeServices(host, port, s.openConns)

// Record connections to ensure it doesn't exceed the max size
connsCh := make(chan int, config.MAX_CONN_COUNT)

for {
// Block if connections count it full
connsCh <- 1

// Establish a point-to-point connection between the client and server
conn, err := listener.Accept()
if err != nil {
log.Println("Connection failed:", err)
// Remove record
<-connsCh
continue
}

// Add to open connections
s.openConns <- conn

// Handle connection in a new goroutine
go s.process(conn)
}
Expand Down
86 changes: 57 additions & 29 deletions app/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net"
"net/http"

// "github.com/DeeStarks/conoid/app/tools"
"github.com/DeeStarks/conoid/app/tools"
port "github.com/DeeStarks/conoid/domain/ports"
)

Expand All @@ -23,11 +23,11 @@ type (
}

IServices interface {
ServeServices() // Retrieve all Services and serve
GetRunningServices() RunningServices // Get all running Services
GetServiceServers(string) []string // Get all servers' address that a service runs on
ConnectToServer(string) (net.Conn, error) // Connect to a service running locally
ServeStatic(string) int // Serve static Services, and return their port numbers
ServeServices(string, string, chan<- net.Conn) // Retrieve all Services and serve
GetRunningServices() RunningServices // Get all running Services
GetServiceServers(string) []string // Get all servers' address that a service runs on
ConnectToServer(string) (net.Conn, error) // Connect to a service running locally
ServeStatic(string) (string, string) // Serve static Services, and return their port numbers
}
)

Expand All @@ -40,52 +40,78 @@ func InitServices(defaultDB *sql.DB) IServices {
}

// Retrieve all Services and serve
func (s *Services) ServeServices() {
func (s *Services) ServeServices(conoidHost, conoidPort string, connCh chan<- net.Conn) {
// Serve the welcome page
welcomePort := s.ServeStatic("./assets/welcome/")
// The welcome page will be served by default on port 80
s.running["[::1]:80"] = []string{fmt.Sprintf("[::1]:%d", welcomePort)}
s.running["localhost:80"] = []string{fmt.Sprintf("[::1]:%d", welcomePort)}
s.running["localhost"] = []string{fmt.Sprintf("[::1]:%d", welcomePort)}
s.running["127.0.0.1:80"] = []string{fmt.Sprintf("[::1]:%d", welcomePort)}
s.running["127.0.0.1"] = []string{fmt.Sprintf("[::1]:%d", welcomePort)}

// Serve
welcomeHost, welcomePort := s.ServeStatic("./assets/welcome/")
// Set the welcome page as the werver's default page
s.running[fmt.Sprintf("%s:%s", conoidHost, conoidPort)] = []string{fmt.Sprintf("%s:%s", welcomeHost, welcomePort)}

// Serve registered running services
dbPort := port.NewDomainPort(s.defaultDB)
services, err := dbPort.ServiceProcesses().RetrieveRunning()
if err != nil {
log.Println("Could not serve apps:", err)
log.Println("Could not serve:", err)
return
}

for _, service := range services {
// Addresses the service is running on
var serverAddrs []string

if service.Type == "static" {
// Serve static
portNo := s.ServeStatic(service.RootDirectory)
addr := fmt.Sprintf("127.0.0.1:%d", portNo)
host, port := s.ServeStatic(service.RootDirectory)
addr := fmt.Sprintf("%s:%s", host, port)
_, err := dbPort.ServiceProcesses().Update(service.Name, map[string]interface{}{
"listeners": addr,
})
if err != nil {
log.Println("Could not update service state:", err)
}
s.running[service.RemoteServer] = []string{addr}
} else if service.Type == "server" {
serverAddrs = []string{addr}
} else {
servers := []string{}
// Connect to all listening servers
for _, addr := range service.Listeners {
_, err := s.ConnectToServer(addr)
if err != nil {
log.Printf("Could not connect to server address: %s; Error: %v\n", addr, err)
log.Printf("Could not connect to: %s; Stopping...\n", addr)
// Update service state
dbPort.ServiceProcesses().Update(service.Name, map[string]interface{}{
"status": 0,
})
continue
}
// Append servers to listening servers
servers = append(servers, addr)
}
s.running[service.RemoteServer] = servers

serverAddrs = servers
}

// Tunnelling
if service.Tunnelled {
tunnel := tools.NewTunnel(service.Name, connCh)
host, err := tunnel.AllocateHost()
if err != nil {
log.Println("Error allocating tunnel remote host. Ensure your device is connected to the internet")
continue
}

for i := 0; i < host.MaxConnectionCount(); i++ {
go host.OpenTunnel(fmt.Sprintf("%s:%s", conoidHost, conoidPort), serverAddrs)
}

// Update service's remote_server
_, err = dbPort.ServiceProcesses().Update(service.Name, map[string]interface{}{
"remote_server": host.FullURL(),
})
if err != nil {
log.Println("Error updating tunnel state:", err)
continue
}
}
}
}

Expand All @@ -109,23 +135,25 @@ func (s *Services) ConnectToServer(addr string) (net.Conn, error) {
}

// Serve static Services, and return their port numbers
func (s *Services) ServeStatic(dir string) int {
func (s *Services) ServeStatic(dir string) (string, string) {
fs := http.FileServer(http.Dir(dir))
mux := http.NewServeMux()
mux.Handle("/", fs)

// Get and listen on the next port number
portNo := s.nextPN
host := "0.0.0.0"
port := s.nextPN
for {
// Dial the port number to see if it's available
_, err := net.Dial("tcp", fmt.Sprintf("[::]:%d", portNo))
_, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
go http.ListenAndServe(fmt.Sprintf(":%d", portNo), mux)
// If it's not in use, serve
go http.ListenAndServe(fmt.Sprintf("%s:%d", host, port), mux)
break
}
// If it's already in use, try the next port
portNo++
port++
}
s.nextPN = portNo
return portNo
s.nextPN = port
return host, fmt.Sprintf("%d", port)
}
Loading

0 comments on commit 2ed338c

Please sign in to comment.