-
Notifications
You must be signed in to change notification settings - Fork 17
/
buffered_pipe.go
100 lines (90 loc) · 2.27 KB
/
buffered_pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gosh
import (
"bytes"
"io"
"sync"
)
type bufferedPipe struct {
cond *sync.Cond
buf bytes.Buffer
closed bool
}
var (
// Make sure the signatures are right, so that io.Copy can be faster.
_ io.WriterTo = (*bufferedPipe)(nil)
_ io.ReaderFrom = (*bufferedPipe)(nil)
)
// newBufferedPipe returns a new thread-safe pipe backed by an unbounded
// in-memory buffer. Writes on the pipe never block; reads on the pipe block
// until data is available.
func newBufferedPipe() io.ReadWriteCloser {
return &bufferedPipe{cond: sync.NewCond(&sync.Mutex{})}
}
// Read reads from the pipe.
func (p *bufferedPipe) Read(d []byte) (int, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for {
// Read any remaining data before checking whether the pipe is closed.
if p.buf.Len() > 0 {
return p.buf.Read(d)
}
if p.closed {
return 0, io.EOF
}
p.cond.Wait()
}
}
// WriteTo implements the io.WriterTo method; it is the fast version of Read
// used by io.Copy.
// Unlike Read, which returns io.EOF to signal that all data has been read,
// WriteTo blocks until all data has been written to w, and never returns
// io.EOF.
func (p *bufferedPipe) WriteTo(w io.Writer) (int64, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
var written int64
for {
// Keep writing data until the pipe is closed.
n, err := p.buf.WriteTo(w)
written += n
if p.closed || err != nil {
return written, err
}
p.cond.Wait()
}
}
// Write writes to the pipe.
func (p *bufferedPipe) Write(d []byte) (int, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.closed {
return 0, io.ErrClosedPipe
}
defer p.cond.Signal()
return p.buf.Write(d)
}
// ReadFrom implements the io.ReaderFrom method; it is the fast version of Write
// used by io.Copy.
func (p *bufferedPipe) ReadFrom(r io.Reader) (int64, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.closed {
return 0, io.ErrClosedPipe
}
defer p.cond.Signal()
return p.buf.ReadFrom(r)
}
// Close closes the pipe.
func (p *bufferedPipe) Close() error {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if !p.closed {
defer p.cond.Signal()
p.closed = true
}
return nil
}