Skip to content

Commit

Permalink
channels
Browse files Browse the repository at this point in the history
  • Loading branch information
aldor007 committed Nov 1, 2017
1 parent 845abc2 commit 4b3349a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cmd/mort.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func main() {
logger, _ := zap.NewDevelopment()
zap.ReplaceGlobals(logger)
log.RegisterLogger(logger.Sugar())
rp := mort.NewRequestProcessor(5)

imgConfig := config.GetInstance()
imgConfig.Load(*configPath)
Expand All @@ -40,8 +41,7 @@ func main() {
return ctx.NoContent(400)
}

// dodac placeholder
res := mort.Process(ctx, obj)
res := rp.Process(ctx, obj)
res.SetDebug(ctx.Request().Header.Get("X-Mort-Debug"))
res.WriteHeaders(ctx.Response())
defer res.Close()
Expand Down
55 changes: 43 additions & 12 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,59 @@ import (
"mort/transforms"
"mort/log"
"strconv"
"fmt"
"time"
)

const S3_LOCATION_STR = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">EU</LocationConstraint>"

type struct RequestProcessor {
responseChan chan<- response.Response
func NewRequestProcessor(max int) RequestProcessor{
rp := RequestProcessor{}
rp.Init(max)
return rp
}

type requestMessage struct {
responseChan chan *response.Response
ctx echo.Context
obj *object.FileObject
}

type RequestProcessor struct {
queue chan requestMessage
}

func (r *RequestProcessor) Init(max int) {
r.queue = make(chan requestMessage, max)
}

func (r *RequestProcessor) Process(ctx echo.Context, obj *object.FileObject) *response.Response{

msg := requestMessage{}
msg.ctx = ctx
msg.obj = obj
msg.responseChan = make(chan *response.Response)

func Process(ctx echo.Context, obj *object.FileObject) *response.Response{
go r.processChan()
r.queue <- msg

select {
case res := <-queue:
//case <-ctx.Done():
// return response.NewBuf(504, "timeout")
case res := <-msg.responseChan:
return res
case <-time.After(time.Second * 60):
return response.NewBuf(504, "timeout")
return response.NewBuf(504, []byte("timeout"))
}
}

func (r *RequestProcessor) processChan() {
msg := <- r.queue
res := r.process(msg.ctx, msg.obj)
msg.responseChan <- res
}


func process(ctx echo.Context, obj *object.FileObject) *response.Response {
func (r *RequestProcessor) process(ctx echo.Context, obj *object.FileObject) *response.Response {
switch ctx.Request().Method {
case "GET":
return hanldeGET(ctx, obj)
Expand Down Expand Up @@ -78,6 +106,14 @@ func hanldeGET(ctx echo.Context, obj *object.FileObject) *response.Response {
}
}



// check if object is on storage
res = updateHeaders(storage.Get(obj))
if res.StatusCode == 200 {
return res
}

// get parent from storage
if parentObj != nil {
parentRes = updateHeaders(storage.Get(parentObj))
Expand All @@ -87,11 +123,6 @@ func hanldeGET(ctx echo.Context, obj *object.FileObject) *response.Response {
}
}

// check if object is on storage
res = updateHeaders(storage.Get(obj))
if res.StatusCode == 200 {
return res
}

defer parentRes.Close()

Expand Down
2 changes: 1 addition & 1 deletion response/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *Response) SetDebug(debug string) {
}

r.debug = false
r.Headers["Cache-Control"] = "no-cache"
r.Set("Cache-Control", "no-cache")
}

func (r *Response) HasError() bool {
Expand Down

0 comments on commit 4b3349a

Please sign in to comment.