Skip to content

Commit

Permalink
Merge pull request #9 from ozeidan/broken-pipe-fix
Browse files Browse the repository at this point in the history
Fixed server getting stuck on channel close
  • Loading branch information
Omar Zeidan authored Jun 3, 2019
2 parents b4a7de7 + ab91aae commit 78c486a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
6 changes: 5 additions & 1 deletion internal/database/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func sendResults(results resulter, req request.Request) {
}

for i := startIndex; i < startIndex+maxResults; i++ {
req.ResponseChannel <- results.Result(i)
select {
case req.ResponseChannel <- results.Result(i):
case <-req.Done:
return
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions internal/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Request struct {
// ResponseChannel is the channel
// on which the database will send back the results
ResponseChannel chan string `json:"-"`
// Done is used to signal to the database
// that no more results are needed
Done chan struct{} `json:"-"`
}

type Settings struct {
Expand Down Expand Up @@ -101,11 +104,13 @@ func serve(c net.Conn, requestReceiver chan<- Request) {
err := json.NewDecoder(c).Decode(&request)

if err != nil {
// TODO: send error back
log.Println("failed to decode request:", err)
return
}

request.ResponseChannel = make(chan string)
request.Done = make(chan struct{})
requestReceiver <- request

for response := range request.ResponseChannel {
Expand All @@ -116,6 +121,7 @@ func serve(c net.Conn, requestReceiver chan<- Request) {
}
if err != nil {
log.Println("failed to write to unix domain socket:", err)
request.Done <- struct{}{}
break
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func SearchRequest(searchQuery string, options ...Option) (<-chan string, error)
defer c.Close()
reader := bufio.NewReader(c)
for {
bytes, err := reader.ReadBytes('\n')
line, err := reader.ReadString('\n')
if err != nil {
// TODO: handle this error
return
}
responseChan <- string(bytes)
responseChan <- line
}
}()

Expand Down

0 comments on commit 78c486a

Please sign in to comment.