Skip to content

Commit

Permalink
Replace fsouza go-docker client with engine-api client in cluster up
Browse files Browse the repository at this point in the history
  • Loading branch information
csrwng committed Jun 19, 2017
1 parent fb8ef23 commit d1777b0
Show file tree
Hide file tree
Showing 16 changed files with 755 additions and 245 deletions.
285 changes: 285 additions & 0 deletions pkg/bootstrap/docker/dockerhelper/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package dockerhelper

import (
"fmt"
"io"
"io/ioutil"
"time"

"golang.org/x/net/context"

"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/network"
)

const (
// defaultDockerOpTimeout is the default timeout of short running docker operations.
defaultDockerOpTimeout = 10 * time.Minute
)

// NewClient creates an instance of the client Interface, given a docker engine
// client
func NewClient(endpoint string, client *client.Client) Interface {
return &dockerClient{
endpoint: endpoint,
client: client,
}
}

type dockerClient struct {
endpoint string
client *client.Client
}

type operationTimeout struct {
err error
}

func (e operationTimeout) Error() string {
return fmt.Sprintf("operation timeout: %v", e.err)
}

func (c *dockerClient) Endpoint() string {
return c.endpoint
}

func (c *dockerClient) ServerVersion() (*types.Version, error) {
ctx, cancel := defaultContext()
defer cancel()
version, err := c.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return &version, err
}

func (c *dockerClient) Info() (*types.Info, error) {
ctx, cancel := defaultContext()
defer cancel()
info, err := c.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return &info, err
}

func (c *dockerClient) ContainerList(options types.ContainerListOptions) ([]types.Container, error) {
ctx, cancel := defaultContext()
defer cancel()
containers, err := c.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return containers, err
}

func (c *dockerClient) ContainerCreate(config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, name string) (*types.ContainerCreateResponse, error) {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerCreate(ctx, config, hostConfig, networkingConfig, name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return &response, err
}

func (c *dockerClient) ContainerInspect(container string) (*types.ContainerJSON, error) {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerInspect(ctx, container)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return &response, err
}

func (c *dockerClient) ContainerRemove(container string, options types.ContainerRemoveOptions) error {
ctx, cancel := defaultContext()
defer cancel()
err := c.client.ContainerRemove(ctx, container, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}

func (c *dockerClient) ContainerLogs(container string, options types.ContainerLogsOptions, stdOut, stdErr io.Writer) error {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerLogs(ctx, container, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
defer response.Close()
return redirectResponseToOutputStream(stdOut, stdErr, response)
}

func (c *dockerClient) ContainerStart(container string) error {
ctx, cancel := defaultContext()
defer cancel()
err := c.client.ContainerStart(ctx, container)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}

func (c *dockerClient) ContainerStop(container string, timeout int) error {
ctx, cancel := defaultContext()
defer cancel()
err := c.client.ContainerStop(ctx, container, timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}

func (c *dockerClient) ContainerWait(container string) (int, error) {
ctx, cancel := defaultContext()
defer cancel()
rc, err := c.client.ContainerWait(ctx, container)
if ctxErr := contextError(ctx); ctxErr != nil {
return 0, ctxErr
}
return rc, err
}

func (c *dockerClient) CopyToContainer(container string, dest string, src io.Reader, options types.CopyToContainerOptions) error {
return c.client.CopyToContainer(context.Background(), container, dest, src, options)
}

func (c *dockerClient) CopyFromContainer(container string, src string, dest io.Writer) error {
response, _, err := c.client.CopyFromContainer(context.Background(), container, src)
if err != nil {
return err
}
defer response.Close()
_, err = io.Copy(dest, response)
return err
}

func (c *dockerClient) ContainerExecCreate(container string, config types.ExecConfig) (*types.ContainerExecCreateResponse, error) {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerExecCreate(ctx, container, config)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &response, err
}

func (c *dockerClient) ContainerExecAttach(execID string, stdIn io.Reader, stdOut, stdErr io.Writer) error {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerExecAttach(ctx, execID, types.ExecConfig{
AttachStdin: stdIn != nil,
AttachStdout: true,
AttachStderr: true,
})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
defer response.Close()
return holdHijackedConnection(stdIn, stdOut, stdErr, response)
}

func (c *dockerClient) ContainerExecInspect(execID string) (*types.ContainerExecInspect, error) {
ctx, cancel := defaultContext()
defer cancel()
response, err := c.client.ContainerExecInspect(ctx, execID)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return &response, err
}

func (c *dockerClient) ImageInspectWithRaw(imageID string, getSize bool) (*types.ImageInspect, []byte, error) {
ctx, cancel := defaultContext()
defer cancel()
image, raw, err := c.client.ImageInspectWithRaw(ctx, imageID, getSize)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, nil, ctxErr
}
return &image, raw, err
}

func (c *dockerClient) ImagePull(ref string, options types.ImagePullOptions, writer io.Writer) error {
ctx := context.Background()
response, err := c.client.ImagePull(ctx, ref, options)
if err != nil {
return err
}
defer response.Close()
_, err = io.Copy(writer, response)
return err
}

func defaultContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultDockerOpTimeout)
}

func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {
return operationTimeout{err: ctx.Err()}
}
return ctx.Err()
}

// redirectResponseToOutputStream redirect the response stream to stdout and stderr.
func redirectResponseToOutputStream(outputStream, errorStream io.Writer, resp io.Reader) error {
if outputStream == nil {
outputStream = ioutil.Discard
}
if errorStream == nil {
errorStream = ioutil.Discard
}
_, err := stdcopy.StdCopy(outputStream, errorStream, resp)
return err
}

// holdHijackedConnection holds the HijackedResponse, redirects the inputStream to the connection, and redirects the response
// stream to stdout and stderr.
func holdHijackedConnection(inputStream io.Reader, outputStream, errorStream io.Writer, resp types.HijackedResponse) error {
receiveStdout := make(chan error)
if outputStream != nil || errorStream != nil {
go func() {
receiveStdout <- redirectResponseToOutputStream(outputStream, errorStream, resp.Reader)
}()
}

sendStdin := make(chan error)
go func() {
defer resp.CloseWrite()
if inputStream != nil {
_, err := io.Copy(resp.Conn, inputStream)
sendStdin <- err
return
}
sendStdin <- nil
}()

select {
case err := <-receiveStdout:
return err
case sendErr := <-sendStdin:
if sendErr != nil {
return sendErr
}
if outputStream != nil || errorStream != nil {
return <-receiveStdout
}
}
return nil
}
25 changes: 8 additions & 17 deletions pkg/bootstrap/docker/dockerhelper/filetransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"path/filepath"
"strings"

docker "github.com/fsouza/go-dockerclient"

"github.com/docker/engine-api/types"
s2itar "github.com/openshift/source-to-image/pkg/tar"
s2iutil "github.com/openshift/source-to-image/pkg/util"
)
Expand Down Expand Up @@ -61,30 +60,22 @@ func (adapter removeLeadingDirectoryAdapter) Next() (*tar.Header, error) {
}
}

func newContainerDownloader(client *docker.Client, container, path string) io.ReadCloser {
func newContainerDownloader(client Interface, container, path string) io.ReadCloser {
r, w := io.Pipe()

go func() {
opts := docker.DownloadFromContainerOptions{
Path: path,
OutputStream: w,
}
w.CloseWithError(client.DownloadFromContainer(container, opts))
w.CloseWithError(client.CopyFromContainer(container, path, w))
}()

return r
}

func newContainerUploader(client *docker.Client, container, path string) (io.WriteCloser, <-chan error) {
func newContainerUploader(client Interface, container, path string) (io.WriteCloser, <-chan error) {
r, w := io.Pipe()
errch := make(chan error, 1)

go func() {
opts := docker.UploadToContainerOptions{
Path: path,
InputStream: r,
}
errch <- client.UploadToContainer(container, opts)
errch <- client.CopyToContainer(container, path, r, types.CopyToContainerOptions{})
}()

return w, errch
Expand All @@ -97,7 +88,7 @@ type readCloser struct {

// StreamFileFromContainer returns an io.ReadCloser from which the contents of a
// file in a remote container can be read.
func StreamFileFromContainer(client *docker.Client, container, src string) (io.ReadCloser, error) {
func StreamFileFromContainer(client Interface, container, src string) (io.ReadCloser, error) {
downloader := newContainerDownloader(client, container, src)
tarReader := tar.NewReader(downloader)

Expand All @@ -113,7 +104,7 @@ func StreamFileFromContainer(client *docker.Client, container, src string) (io.R

// DownloadDirFromContainer downloads an entire directory of files from a remote
// container.
func DownloadDirFromContainer(client *docker.Client, container, src, dst string) error {
func DownloadDirFromContainer(client Interface, container, src, dst string) error {
downloader := newContainerDownloader(client, container, src)
defer downloader.Close()
tarReader := &removeLeadingDirectoryAdapter{Reader: tar.NewReader(downloader)}
Expand All @@ -123,7 +114,7 @@ func DownloadDirFromContainer(client *docker.Client, container, src, dst string)
}

// UploadFileToContainer uploads a file to a remote container.
func UploadFileToContainer(client *docker.Client, container, src, dest string) error {
func UploadFileToContainer(client Interface, container, src, dest string) error {
uploader, errch := newContainerUploader(client, container, path.Dir(dest))

t := s2itar.New(s2iutil.NewFileSystem())
Expand Down
Loading

0 comments on commit d1777b0

Please sign in to comment.