Skip to content

Commit

Permalink
Re-Implementation of Term and VNC Features. (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardoalcantara authored Sep 18, 2024
1 parent a68ae4a commit 9c47328
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 114 deletions.
13 changes: 10 additions & 3 deletions containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,18 @@ func (c *Container) Shutdown(ctx context.Context, force bool, timeout int) (task
return NewTask(upid, c.client), nil
}

func (c *Container) TermProxy(ctx context.Context) (vnc *VNC, err error) {
return vnc, c.client.Post(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/termproxy", c.Node, c.VMID), nil, &vnc)
func (c *Container) TermProxy(ctx context.Context) (term *Term, err error) {
return term, c.client.Post(ctx, fmt.Sprintf("/nodes/%s/lxc/%d/termproxy", c.Node, c.VMID), nil, &term)
}

func (c *Container) VNCWebSocket(vnc *VNC) (chan string, chan string, chan error, func() error, error) {
func (c *Container) TermWebSocket(term *Term) (chan []byte, chan []byte, chan error, func() error, error) {
p := fmt.Sprintf("/nodes/%s/lxc/%d/vncwebsocket?port=%d&vncticket=%s",
c.Node, c.VMID, term.Port, url.QueryEscape(term.Ticket))

return c.client.TermWebSocket(p, term)
}

func (c *Container) VNCWebSocket(vnc *VNC) (chan []byte, chan []byte, chan error, func() error, error) {
p := fmt.Sprintf("/nodes/%s/lxc/%d/vncwebsocket?port=%d&vncticket=%s",
c.Node, c.VMID, vnc.Port, url.QueryEscape(vnc.Ticket))

Expand Down
13 changes: 10 additions & 3 deletions nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ func (n *Node) Version(ctx context.Context) (version *Version, err error) {
return version, n.client.Get(ctx, fmt.Sprintf("/nodes/%s/version", n.Name), &version)
}

func (n *Node) TermProxy(ctx context.Context) (vnc *VNC, err error) {
return vnc, n.client.Post(ctx, fmt.Sprintf("/nodes/%s/termproxy", n.Name), nil, &vnc)
func (n *Node) TermProxy(ctx context.Context) (term *Term, err error) {
return term, n.client.Post(ctx, fmt.Sprintf("/nodes/%s/termproxy", n.Name), nil, &term)
}

func (n *Node) TermWebSocket(term *Term) (chan []byte, chan []byte, chan error, func() error, error) {
p := fmt.Sprintf("/nodes/%s/vncwebsocket?port=%d&vncticket=%s",
n.Name, term.Port, url.QueryEscape(term.Ticket))

return n.client.TermWebSocket(p, term)
}

// VNCWebSocket send, recv, errors, closer, error
func (n *Node) VNCWebSocket(vnc *VNC) (chan string, chan string, chan error, func() error, error) {
func (n *Node) VNCWebSocket(vnc *VNC) (chan []byte, chan []byte, chan error, func() error, error) {
p := fmt.Sprintf("/nodes/%s/vncwebsocket?port=%d&vncticket=%s",
n.Name, vnc.Port, url.QueryEscape(vnc.Ticket))

Expand Down
94 changes: 86 additions & 8 deletions proxmox.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (c *Client) handleResponse(res *http.Response, v interface{}) error {
return json.Unmarshal(body, &v) // assume passed in type fully supports response
}

func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan string, chan string, chan error, func() error, error) {
func (c *Client) TermWebSocket(path string, term *Term) (chan []byte, chan []byte, chan error, func() error, error) {
if strings.HasPrefix(path, "/") {
path = strings.Replace(c.baseURL, "https://", "wss://", 1) + path
}
Expand All @@ -320,7 +320,7 @@ func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan string, chan string,
}

// start the session by sending user@realm:ticket
if err := conn.WriteMessage(websocket.BinaryMessage, []byte(vnc.User+":"+vnc.Ticket+"\n")); err != nil {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte(term.User+":"+term.Ticket+"\n")); err != nil {
return nil, nil, nil, nil, err
}

Expand Down Expand Up @@ -348,8 +348,8 @@ func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan string, chan string,
return nil, nil, nil, nil, err
}

send := make(chan string)
recv := make(chan string)
send := make(chan []byte)
recv := make(chan []byte)
errs := make(chan error)
done := make(chan struct{})
ticker := time.NewTicker(30 * time.Second)
Expand Down Expand Up @@ -403,7 +403,7 @@ func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan string, chan string,
}
errs <- err
}
recv <- string(msg)
recv <- msg
}
}
}()
Expand All @@ -428,12 +428,90 @@ func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan string, chan string,
}
case msg := <-send:
c.log.Debugf("sending: %s", msg)
m := []byte(msg)
send := append([]byte(fmt.Sprintf("0:%d:", len(m))), m...)
send := append([]byte(fmt.Sprintf("0:%d:", len(msg))), msg...)
if err := conn.WriteMessage(websocket.BinaryMessage, send); err != nil {
errs <- err
}
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("0:1:\n")); err != nil {
}
}
}()

return send, recv, errs, closer, nil
}

func (c *Client) VNCWebSocket(path string, vnc *VNC) (chan []byte, chan []byte, chan error, func() error, error) {
if strings.HasPrefix(path, "/") {
path = strings.Replace(c.baseURL, "https://", "wss://", 1) + path
}

var tlsConfig *tls.Config
transport := c.httpClient.Transport.(*http.Transport)
if transport != nil {
tlsConfig = transport.TLSClientConfig
}
c.log.Debugf("connecting to websocket: %s", path)
dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 30 * time.Second,
TLSClientConfig: tlsConfig,
}

dialerHeaders := http.Header{}
c.authHeaders(&dialerHeaders)

conn, _, err := dialer.Dial(path, dialerHeaders)

if err != nil {
return nil, nil, nil, nil, err
}

send := make(chan []byte)
recv := make(chan []byte)
errs := make(chan error)
done := make(chan struct{})

closer := func() error {
close(done)
time.Sleep(1 * time.Second)
close(send)
close(recv)
close(errs)

return conn.Close()
}

go func() {
for {
select {
case <-done:
return
default:
_, msg, err := conn.ReadMessage()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
if !websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
return
}
errs <- err
}
recv <- msg
}
}
}()

go func() {
for {
select {
case <-done:
if err := conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
errs <- err
}
return
case msg := <-send:
c.log.Debugf("sending: %s", msg)
if err := conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
errs <- err
}
}
Expand Down
88 changes: 63 additions & 25 deletions tests/integration/nodes_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//go:build nodes
// +build nodes

package integration

import (
"bytes"
"context"
"fmt"
"strings"
"testing"
Expand All @@ -12,53 +11,54 @@ import (
"github.com/luthermonson/go-proxmox"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNodes(t *testing.T) {
client := ClientFromLogins()
nodes, err := client.Nodes()
nodes, err := client.Nodes(context.TODO())
assert.Nil(t, err)
assert.GreaterOrEqual(t, len(nodes), 1)
for _, n := range nodes {
assert.NotEmpty(t, n.Node)
var node *proxmox.Node
t.Run("get status for node "+n.Node, func(t *testing.T) {
var err error
node, err = client.Node(n.Node)
node, err = client.Node(context.TODO(), n.Node)
assert.Nil(t, err)
assert.Equal(t, n.MaxMem, node.Memory.Total)
assert.Equal(t, n.Disk, node.RootFS.Used)
})

t.Run("get VMs for node "+n.Node, func(t *testing.T) {
_, err := node.VirtualMachines()
_, err := node.VirtualMachines(context.TODO())
assert.Nil(t, err)
})

break // only pull status from one node
}

_, err = client.Node("doesnt-exist")
_, err = client.Node(context.TODO(), "doesnt-exist")
assert.Contains(t, err.Error(), "500 hostname lookup 'doesnt-exist' failed - failed to get address info for: doesnt-exist:")
}

func TestNode(t *testing.T) {
client := ClientFromLogins()
node, err := client.Node(td.nodeName)
node, err := client.Node(context.TODO(), td.nodeName)
assert.Nil(t, err)
assert.Equal(t, node.Name, td.nodeName)
}

func TestContainers(t *testing.T) {
t.Run("get Containers for node "+td.node.Name, func(t *testing.T) {
_, err := td.node.Containers()
_, err := td.node.Containers(context.TODO())
assert.Nil(t, err)
})
}

func TestNode_Appliances(t *testing.T) {
t.Run("get Containers for node "+td.node.Name, func(t *testing.T) {
aplinfos, err := td.node.Appliances()
aplinfos, err := td.node.Appliances(context.TODO())
assert.Nil(t, err)
assert.GreaterOrEqual(t, len(aplinfos), 1)
})
Expand All @@ -68,13 +68,13 @@ func TestNode_DownloadAppliance(t *testing.T) {
var aplinfos proxmox.Appliances
t.Run("get Containers for node "+td.node.Name, func(t *testing.T) {
var err error
aplinfos, err = td.node.Appliances()
aplinfos, err = td.node.Appliances(context.TODO())
assert.Nil(t, err)
assert.GreaterOrEqual(t, len(aplinfos), 1)
})

t.Run("download non existing appliance template", func(t *testing.T) {
_, err := td.node.DownloadAppliance("doesnt-exist", td.nodeStorage)
_, err := td.node.DownloadAppliance(context.TODO(), "doesnt-exist", td.nodeStorage)
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "no such template"))
})
Expand All @@ -86,7 +86,7 @@ func TestNode_DownloadAppliance(t *testing.T) {
for _, a := range aplinfos {
if strings.HasPrefix(a.Template, td.appliancePrefix) {
td.appliance = a // set to use in later tests
ret, err := td.node.DownloadAppliance(a.Template, td.nodeStorage)
ret, err := td.node.DownloadAppliance(context.TODO(), a.Template, td.nodeStorage)
assert.Nil(t, err)
assert.True(t, strings.HasPrefix(ret, fmt.Sprintf("UPID:%s:", td.node.Name)))
}
Expand All @@ -95,7 +95,7 @@ func TestNode_DownloadAppliance(t *testing.T) {
}

func TestNode_Storages(t *testing.T) {
storages, err := td.node.Storages()
storages, err := td.node.Storages(context.TODO())
assert.Nil(t, err)
assert.True(t, len(storages) > 0)

Expand All @@ -110,27 +110,70 @@ func TestNode_Storages(t *testing.T) {
}

func TestNode_Storage(t *testing.T) {
_, err := td.node.Storage("doesnt-exist")
_, err := td.node.Storage(context.TODO(), "doesnt-exist")
assert.Contains(t, err.Error(), "No such storage.")

storage, err := td.node.Storage(td.nodeStorage)
storage, err := td.node.Storage(context.TODO(), td.nodeStorage)
assert.Nil(t, err)
assert.Equal(t, td.nodeStorage, storage.Name)
}

func TestNode_TermProxy(t *testing.T) {
vnc, err := td.node.TermProxy()
term, err := td.node.TermProxy(context.TODO())
assert.Nil(t, err)
send, recv, errs, close, err := td.node.TermWebSocket(term)
assert.Nil(t, err)
send, recv, errs, close, err := td.node.VNCWebSocket(vnc)
defer close()

go func() {
for {
select {
case msg := <-recv:
if len(msg) > 0 {
fmt.Println("MSG: " + string(msg))
}
case err := <-errs:
if err != nil {
fmt.Println("ERROR: " + err.Error())
return
}
}
}
}()

send <- []byte("ls -la\n")
time.Sleep(1 * time.Second)
send <- []byte("hostname\n")
time.Sleep(1 * time.Second)
send <- []byte("exit\n")
time.Sleep(1 * time.Second)
}

func TestNode_VncProxy(t *testing.T) {
assert.NotEqual(t, 0, td.vncVmId)

vm, err := td.node.VirtualMachine(context.TODO(), td.vncVmId)
require.NoError(t, err)

vnc, err := vm.VNCProxy(context.TODO(), nil)
require.NoError(t, err)

send, recv, errs, close, err := vm.VNCWebSocket(vnc)
assert.Nil(t, err)
defer close()

go func() {
for {
select {
case msg := <-recv:
if msg != "" {
fmt.Println("MSG: " + msg)
if len(msg) > 0 {
fmt.Printf("MSG: %s -> %v\n", string(msg), msg)
if strings.HasPrefix(string(msg), "RFB") {
send <- msg
}
if bytes.Equal(msg, []byte{0x01, 0x02}) {
fmt.Println("Success!")
}
}
case err := <-errs:
if err != nil {
Expand All @@ -141,10 +184,5 @@ func TestNode_TermProxy(t *testing.T) {
}
}()

send <- "ls -la"
time.Sleep(10 * time.Second)
send <- "hostname"
time.Sleep(10 * time.Second)
send <- "exit"
time.Sleep(5 * time.Second)
}
Loading

0 comments on commit 9c47328

Please sign in to comment.