Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
Fix #42, Fix #43, Fix #44, Fix #47
Browse files Browse the repository at this point in the history
Major changes:

- Output is now streamed from the daemon rather than being dumped at the end.
- At the end of the run, data is dumped to IPFS and you are provided an IFPS link to the output.
- `golint` now passes sucessfully
- The server now responds with 401 to unauthorized users
- User can now specify what they want to run
- Config is now in JSON format
- Config is generated as JSON on first load
  • Loading branch information
FrankPetrilli committed Jan 17, 2018
1 parent 2761a2c commit ad69959
Showing 1 changed file with 95 additions and 50 deletions.
145 changes: 95 additions & 50 deletions ipfs-cluster/trigger-server/triggerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,133 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"strconv"
"os"
"github.com/ipfs/go-ipfs-api"
"encoding/json"
"io"
"bytes"
)

const configPath = "/etc/trigger-server"
// port is HTTP API listen port
const port = 8082
var runnerPath = os.Getenv("GOPATH") + "src/github.com/ipfs/kubernetes-ipfs/ipfs-cluster/"
// Runner takes two args
const runnerNumNodes = "5"
const runnerNumPins = "5"
// TriggerConfig is server configuration JSON format
type TriggerConfig struct {
Secret string
Port int
RunnerPath string
Runner string
RunnerArgs string
}

// Secret is set by main
var secret = ""
const configPath = "/etc/trigger-server.json"
var config TriggerConfig

// IPFS Shell
var sh = shell.NewShell("localhost:5001")

func run(rw http.ResponseWriter, req *http.Request) {
if req.Method == "POST" {
if req.Header.Get("X-Auth") != secret {
log.Println("User failed authentication")
fmt.Fprintf(rw, "Invalid authentication\n")
} else {
// This command takes a while, so we'll hang out here while it runs
cmd := exec.Command("bash", "runner.sh", runnerNumNodes, runnerNumPins)
cmd.Dir = runnerPath
out, err := cmd.CombinedOutput()
// Add the output to IPFS
addr, ipfsErr := sh.Add(bytes.NewReader(out))
// Make it an IPFS link
addr = "/ipfs/" + addr
// Send the link to the client
fmt.Fprintf(rw, addr)
// Handle errors that may have occurred
if req.Method != "POST" {
fmt.Fprintf(rw, `Kubernetes-IPFS Trigger Server
https://github.com/ipfs/kubernetes-ipfs/
`)
return
}
if req.Header.Get("X-Auth") != config.Secret {
log.Println("User failed authentication")
rw.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(rw, "Invalid authentication\n")
return
}
// This command takes a while, so we'll hang out here while it runs
cmd := exec.Command("bash", "-c", fmt.Sprintf("%s%s %s", config.RunnerPath, config.Runner, config.RunnerArgs))
cmd.Dir = config.RunnerPath
pipeReader, pipeWriter := io.Pipe()
out := bytes.NewBufferString("")
cmd.Stdout = pipeWriter
cmd.Stderr = pipeWriter
go writeOutput(rw, out, pipeReader)
if err := cmd.Run(); err != nil {
log.Printf("Failed to start runner (%s/%s %s): %s", config.RunnerPath, config.Runner, config.RunnerArgs, err)
return
}
pipeWriter.Close()
log.Println("Finished execution")
// Add the output to IPFS
addr, ipfsErr := sh.Add(bytes.NewReader(out.Bytes()))
// Make it an IPFS link
addr = "/ipfs/" + addr
// Send the link to the client
fmt.Fprintf(rw, addr)
// Handle errors that may have occurred
if ipfsErr != nil {
log.Println("IPFS Error adding output")
log.Println(ipfsErr)
}
}

func writeOutput(res http.ResponseWriter, out *bytes.Buffer, pipeReader *io.PipeReader) {
buffer := make([]byte, 4096)
for {
n, err := pipeReader.Read(buffer)
if err != nil {
log.Println("Error from runner")
log.Println(err)
pipeReader.Close()
break
}
if ipfsErr != nil {
log.Println("IPFS Error adding output")
log.Println(ipfsErr)

data := buffer[0:n]
res.Write(data)
out.Write(data)
if f, ok := res.(http.Flusher); ok {
f.Flush()
}
//reset buffer
for i := 0; i < n; i++ {
buffer[i] = 0
}
}
}

func init() {
rand.Seed(time.Now().UnixNano())
configFile, readErr := ioutil.ReadFile(configPath)
if readErr != nil || len(configFile) < 1 {
log.Printf("Missing configuration in %s\n", configPath)
cwd, _ := os.Getwd()
config.Secret = randLetters(30)
config.Port = 8082
config.RunnerPath = cwd
config.RunnerArgs = ""
configData, marshalErr := json.Marshal(config)
if marshalErr != nil {
log.Printf("Couldn't create config file: %s\n", marshalErr)
os.Exit(1)
}
err := ioutil.WriteFile(configPath, configData, 0644)
if err != nil {
log.Printf("Couldn't write config file: %s\n", err)
os.Exit(1)
}
log.Printf("Generated configuration in %s\n", configPath)
} else {
// For non-POST requests, tell the user what we are.
fmt.Fprintf(rw, `Kubernetes-IPFS Trigger Server
https://github.com/ipfs/kubernetes-ipfs/
`)
json.Unmarshal(configFile, &config)
}
}

func main() {
secretBytes, readErr := ioutil.ReadFile(configPath)
if readErr != nil {
log.Printf("Missing secret configuration in %s. We'll generate one for you...\n", configPath)
rand.Seed(time.Now().UnixNano())
secretBytes = randLetters(30)
ioErr := ioutil.WriteFile(configPath, secretBytes, 0644)
if ioErr != nil {
log.Fatal("Unable to write new secret file; check permissions and try again")
}
}
secret = string(secretBytes)
log.Printf("Starting with secret as %s on port %d\n", secret, port)
log.Printf("Starting on port %d\n", config.Port)
http.HandleFunc("/run", run)
http.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
fmt.Fprintf(rw, `Kubernetes-IPFS Trigger Server
https://github.com/ipfs/kubernetes-ipfs/
`)
})
log.Println(http.ListenAndServe(":" + strconv.FormatInt(port, 10), nil))
log.Println(http.ListenAndServe(fmt.Sprintf(":%d", config.Port), nil))
}

var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randLetters(n int) []byte {
func randLetters(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return b
return string(b)
}

0 comments on commit ad69959

Please sign in to comment.