Skip to content

Commit

Permalink
Added master/agent configuration to run on distributed systems
Browse files Browse the repository at this point in the history
  • Loading branch information
DheerendraRathor committed Sep 25, 2017
1 parent 5954081 commit 640f01b
Show file tree
Hide file tree
Showing 6 changed files with 538 additions and 1 deletion.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

228 changes: 228 additions & 0 deletions net/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package main

import (
"encoding/json"
"flag"
"log"
"net/http"
"sync"
"time"

"strings"

"runtime"

"github.com/DheerendraRathor/GoTracer/models"
"github.com/DheerendraRathor/GoTracer/net/constants"
"github.com/DheerendraRathor/GoTracer/tracer"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)

const (
pongTimeout = 60 * time.Second
pongWait = 20 * time.Second
pingMessage = "a"
)

type RenderingClient struct {
Conn *websocket.Conn
Results chan *models.Pixel
IsTracingInProgress bool
CloseChan chan bool
OperationId string
PongChannel chan bool
WaitGroup sync.WaitGroup
}

func (c *RenderingClient) Initialize() {
c.WaitGroup = sync.WaitGroup{}

c.Conn.SetPongHandler(
func(message string) error {
c.PongChannel <- true
return nil
},
)

go pingThread(c.Conn, c.PongChannel)

c.WaitGroup.Add(2)

go c.ReadHandler()
go c.ResultSender()

c.WaitGroup.Wait()
}

var addr = flag.String("addr", "", "http service address")

var upgrader = websocket.Upgrader{}
var mutex = &sync.Mutex{}

var activeConn *websocket.Conn = nil

func sendPing(conn *websocket.Conn) error {
err := conn.WriteControl(websocket.PingMessage, []byte(pingMessage), time.Now().Add(time.Hour))
return err
}

func pingThread(conn *websocket.Conn, pongChan <-chan bool) {
ticker := time.NewTicker(pongWait)
timer := time.NewTimer(pongTimeout)

for {
select {
case <-timer.C:
conn.WriteControl(websocket.CloseNoStatusReceived, []byte("No pong received"), time.Now().Add(time.Hour))
conn.Close()
return
case <-ticker.C:
err := sendPing(conn)
if err != nil {
ticker.Stop()
return
}
case <-pongChan:
if !timer.Stop() {
<-timer.C
}
timer.Reset(pongTimeout)
}
}
}

func (c *RenderingClient) ReadHandler() {
closeChan := make(chan bool)

defer func() {
c.WaitGroup.Done()
closeChan <- true
}()

var message messages.WebSocketMessage
var renderReqMsg messages.RenderRequestMessage
for {
_, msgStr, err := c.Conn.ReadMessage()
if err != nil {
log.Printf("Unable to understand message: %s", err)
break
}

json.Unmarshal(msgStr, &message)

if message.Type == messages.RenderRequest {
json.Unmarshal(msgStr, &renderReqMsg)
operationId := renderReqMsg.OperationId
if strings.TrimSpace(operationId) == "" {
byteOpId, _ := uuid.NewRandom()
operationId = byteOpId.String()
}
c.OperationId = operationId

responseMessage := messages.RenderRequestResponseMessage{
Type: messages.RenderRequestResponse,
OperationId: c.OperationId,
}

if !c.IsTracingInProgress {
go goTracer.GoTrace(&renderReqMsg.Data, c.Results, closeChan)
c.IsTracingInProgress = true
responseMessage.Code = messages.RenderRequestAccepted
} else {
responseMessage.Code = messages.RenderRequestTracingAlreadyInProgress
}

c.Conn.WriteJSON(message)
}
}
}

func (c *RenderingClient) ResultSender() {
defer c.WaitGroup.Done()

message := messages.WebSocketMessage{
Type: messages.PixelResult,
}

for pixel := range c.Results {
message.OperationId = c.OperationId
if pixel == nil {
message.Type = messages.RenderingCompleted
c.Conn.WriteJSON(message)
break
}

message.Data = pixel
c.Conn.WriteJSON(message)
}
}

func agentHandler(w http.ResponseWriter, r *http.Request) {
log.Println("Received a connection request")
shouldConnect := true

mutex.Lock()
if activeConn != nil {
log.Print("An active connection already exists")
http.Error(w, "An active connection already exists", http.StatusConflict)
shouldConnect = false
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
shouldConnect = false
}
activeConn = conn
mutex.Unlock()

defer func() {
activeConn = nil
conn.Close()
}()

if !shouldConnect {
return
}

client := RenderingClient{
Conn: conn,
PongChannel: make(chan bool),
Results: make(chan *models.Pixel, 100),
IsTracingInProgress: false,
OperationId: "",
}

client.Initialize()
}

func agentStatusHandler(w http.ResponseWriter, r *http.Request) {
status := messages.AgentStatus{
Available: activeConn == nil,
Cores: runtime.NumCPU(),
}

jsonStatus, err := json.Marshal(status)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write(jsonStatus)
}

func main() {
flag.Parse()

if *addr == "" {
flag.Usage()
log.Fatalf("Agent address \"%s\" is invalid.", *addr)
}

log.SetFlags(0)
http.HandleFunc("/", agentHandler)
http.HandleFunc("/status", agentStatusHandler)
log.Fatal(http.ListenAndServe(*addr, nil))
}
8 changes: 8 additions & 0 deletions net/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package messages

const (
RenderRequest = "Render"
PixelResult = "Pixel"
RenderingCompleted = "Rendered"
RenderRequestResponse = "RenderResponse"
)
41 changes: 41 additions & 0 deletions net/constants/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package messages

import (
"github.com/DheerendraRathor/GoTracer/models"
)

type RenderResponseCode string

const (
RenderRequestAccepted RenderResponseCode = "RenderRequestAccepted"
RenderRequestTracingAlreadyInProgress = "RenderRequestTracingAlreadyInProgress"
)

type WebSocketMessage struct {
Type string
Data interface{}
OperationId string
}

type RenderRequestMessage struct {
Type string
Data models.World
OperationId string
}

type RenderRequestResponseMessage struct {
Type string
Code RenderResponseCode
OperationId string
}

type PixelResultMessage struct {
Type string
Data models.Pixel
OperationId string
}

type AgentStatus struct {
Available bool
Cores int
}
5 changes: 5 additions & 0 deletions net/master/agents.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
"localhost:9190",
"localhost:9191",
"localhost:9192"
]
Loading

0 comments on commit 640f01b

Please sign in to comment.