Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions hiveproxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ RUN go mod download

# Now build the proxy executable.
ADD . /source
RUN go build -o hive-proxy ./tool
RUN go build -o /bin/hiveproxy ./tool

# Pull the executable into a fresh image.
FROM alpine:latest
COPY --from=builder /source/hive-proxy .
COPY --from=builder /bin/hiveproxy .
EXPOSE 8081/tcp
ENTRYPOINT ./hive-proxy --addr :8081
ENTRYPOINT ./hiveproxy --addr :8081
24 changes: 16 additions & 8 deletions hiveproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ func (p *Proxy) launchRPC(stream net.Conn) {
// to the backend.
//
// All communication with the backend runs over the given r,w streams.
func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) *Proxy {
mux, _ := yamux.Client(rwCombo{r, w}, muxcfg)
func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) (*Proxy, error) {
p := newProxy(true)
mux, err := yamux.Client(rwCombo{r, w}, muxcfg)
if err != nil {
return nil, err
}

// Launch RPC handler.
rpcConn, err := mux.Open()
if err != nil {
panic(err)
mux.Close()
return nil, err
}
p.launchRPC(rpcConn)
p.rpc.RegisterName("proxy", new(proxyFunctions))
Expand All @@ -144,28 +148,32 @@ func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) *Proxy {
Transport: transport,
}
go p.serve(listener)
return p
return p, nil
}

// RunBackend starts the proxy backend, i.e. the side which handles HTTP requests proxied
// by the frontend.
//
// All communication with the frontend runs over the given r,w streams.
func RunBackend(r io.Reader, w io.WriteCloser, h http.Handler) *Proxy {
mux, _ := yamux.Server(rwCombo{r, w}, muxcfg)
func RunBackend(r io.Reader, w io.WriteCloser, h http.Handler) (*Proxy, error) {
p := newProxy(false)
mux, err := yamux.Server(rwCombo{r, w}, muxcfg)
if err != nil {
return nil, err
}

// Start RPC client.
rpcConn, err := mux.Accept()
if err != nil {
panic(err)
mux.Close()
return nil, err
}
p.launchRPC(rpcConn)

// Start HTTP server.
p.httpsrv.Handler = h
go p.serve(mux)
return p
return p, nil
}

type rwCombo struct {
Expand Down
34 changes: 23 additions & 11 deletions hiveproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestProxyCheckLive(t *testing.T) {
defer tl.Close()

// Run CheckLive on the backend side.
addr := tl.l.Addr().(*net.TCPAddr)
addr := tl.lis.Addr().(*net.TCPAddr)
if err := p.back.CheckLive(context.Background(), addr); err != nil {
t.Fatal("CheckLive did not work:", err)
}
Expand Down Expand Up @@ -80,15 +80,27 @@ func runProxyPair(t *testing.T, h http.Handler) proxyPair {
t.Fatal(err)
}
p.lis = l
frontStarted := make(chan struct{})
frontStarted := make(chan error, 1)
go func() {
p.front = RunFrontend(cr, cw, l)
close(frontStarted)
var err error
p.front, err = RunFrontend(cr, cw, l)
frontStarted <- err
}()

// Run the backend.
p.back = RunBackend(sr, sw, h)
<-frontStarted
p.back, err = RunBackend(sr, sw, h)
if err != nil {
<-frontStarted
if p.front != nil {
p.front.Close()
}
t.Fatal(err)
}

if err := <-frontStarted; err != nil {
p.back.Close()
t.Fatal(err)
}
return p
}

Expand All @@ -98,30 +110,30 @@ func (p proxyPair) close() {
}

type testListener struct {
l net.Listener
wg sync.WaitGroup
lis net.Listener
wg sync.WaitGroup
}

func runTestListener(addr string) (*testListener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
tl := &testListener{l: l}
tl := &testListener{lis: l}
tl.wg.Add(1)
go tl.acceptLoop()
return tl, nil
}

func (tl *testListener) Close() {
tl.l.Close()
tl.lis.Close()
tl.wg.Wait()
}

func (tl *testListener) acceptLoop() {
defer tl.wg.Done()
for {
c, err := tl.l.Accept()
c, err := tl.lis.Accept()
if err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion internal/libdocker/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ func (cb *ContainerBackend) ServeAPI(ctx context.Context, h http.Handler) (libhi
return nil, err
}

proxy := hiveproxy.RunBackend(outR, inW, h)
proxy, err := hiveproxy.RunBackend(outR, inW, h)
if err != nil {
cb.DeleteContainer(id)
return nil, err
}

// Register proxy in ContainerBackend, so it can be used for CheckLive.
cb.proxy = proxy

srv := &proxyContainer{
cb: cb,
containerID: id,
Expand Down