diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index ae989811..64ea3ddd 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -18,7 +18,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: "1.22.5" + go-version: "1.22.0" - shell: bash run: | make build @@ -34,6 +34,12 @@ jobs: # CLI test suites { group: "cmd", name: "cmd", path: "" }, # providers suites, some of the providers are too heavy to run as single test + { + group: "pkg/providers", + name: "container", + path: "container", + container: true, + }, { group: "pkg/providers", name: "yt", path: "yt", yt: true }, { group: "pkg/providers", @@ -54,7 +60,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: "1.22.5" + go-version: "1.22.0" - shell: bash run: | go install gotest.tools/gotestsum@latest @@ -69,6 +75,14 @@ jobs: - shell: bash run: | pg_dump --version + - uses: engineerd/setup-kind@v0.6.0 + if: matrix.suite.container + with: + version: "v0.26.0" + # Handled by the test code + skipClusterCreation: true + skipClusterDeletion: true + skipClusterLogsExport: true - shell: bash if: matrix.suite.yt name: prepare local YT @@ -80,6 +94,8 @@ jobs: - shell: bash run: | make run-tests SUITE_GROUP="${{ matrix.suite.group }}" SUITE_PATH="${{ matrix.suite.path }}" SUITE_NAME="${{ matrix.suite.name }}" + env: + TEST_KUBERNETES_INTEGRATION: ${{ matrix.suite.container == true && '1' || '' }} - name: Upload Test Results uses: actions/upload-artifact@v4 if: always() @@ -156,7 +172,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: "1.22.5" + go-version: "1.22.0" - shell: bash run: | go install gotest.tools/gotestsum@latest diff --git a/.mapping.json b/.mapping.json index 339c1829..8f7f5261 100644 --- a/.mapping.json +++ b/.mapping.json @@ -789,6 +789,18 @@ "pkg/connection/connections.go":"transfer_manager/go/pkg/connection/connections.go", "pkg/connection/resolver.go":"transfer_manager/go/pkg/connection/resolver.go", "pkg/connection/stub_resolver.go":"transfer_manager/go/pkg/connection/stub_resolver.go", + "pkg/container/README.md":"transfer_manager/go/pkg/container/README.md", + "pkg/container/client.go":"transfer_manager/go/pkg/container/client.go", + "pkg/container/container.go":"transfer_manager/go/pkg/container/container.go", + "pkg/container/container_opts.go":"transfer_manager/go/pkg/container/container_opts.go", + "pkg/container/docker.go":"transfer_manager/go/pkg/container/docker.go", + "pkg/container/docker_mocks.go":"transfer_manager/go/pkg/container/docker_mocks.go", + "pkg/container/docker_options.go":"transfer_manager/go/pkg/container/docker_options.go", + "pkg/container/docker_test.go":"transfer_manager/go/pkg/container/docker_test.go", + "pkg/container/kubernetes.go":"transfer_manager/go/pkg/container/kubernetes.go", + "pkg/container/kubernetes_mocks.go":"transfer_manager/go/pkg/container/kubernetes_mocks.go", + "pkg/container/kubernetes_options.go":"transfer_manager/go/pkg/container/kubernetes_options.go", + "pkg/container/kubernetes_test.go":"transfer_manager/go/pkg/container/kubernetes_test.go", "pkg/contextutil/contextutil.go":"transfer_manager/go/pkg/contextutil/contextutil.go", "pkg/coordinator/s3coordinator/coordinator_s3.go":"transfer_manager/go/pkg/coordinator/s3coordinator/coordinator_s3.go", "pkg/coordinator/s3coordinator/coordinator_s3_recipe.go":"transfer_manager/go/pkg/coordinator/s3coordinator/coordinator_s3_recipe.go", @@ -938,10 +950,6 @@ "pkg/debezium/ydb/tests/emitter_vals_test.go":"transfer_manager/go/pkg/debezium/ydb/tests/emitter_vals_test.go", "pkg/debezium/ydb/tests/stub.go":"transfer_manager/go/pkg/debezium/ydb/tests/stub.go", "pkg/debezium/ydb/tests/testdata/emitter_vals_test__canon_change_item.txt":"transfer_manager/go/pkg/debezium/ydb/tests/testdata/emitter_vals_test__canon_change_item.txt", - "pkg/docker/client.go":"transfer_manager/go/pkg/docker/client.go", - "pkg/docker/docker.go":"transfer_manager/go/pkg/docker/docker.go", - "pkg/docker/docker_test.go":"transfer_manager/go/pkg/docker/docker_test.go", - "pkg/docker/mocks.go":"transfer_manager/go/pkg/docker/mocks.go", "pkg/errors/README.md":"transfer_manager/go/pkg/errors/README.md", "pkg/errors/categories/category.go":"transfer_manager/go/pkg/errors/categories/category.go", "pkg/errors/categorized.go":"transfer_manager/go/pkg/errors/categorized.go", diff --git a/go.mod b/go.mod index d77fea0d..266ecee7 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,9 @@ require ( google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 + k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 + k8s.io/client-go v0.29.2 sigs.k8s.io/yaml v1.4.0 ) @@ -149,6 +151,7 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/dlclark/regexp2 v1.11.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/emicklei/proto v1.11.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -160,11 +163,17 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.7-0.20211215081658-ee6c8cce8e87 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/google/gnostic v0.7.0 // indirect + github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/google/martian/v3 v3.3.3 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/tink/go v1.7.0 // indirect @@ -180,13 +189,16 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lib/pq v1.10.9 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c // indirect github.com/magiconair/properties v1.8.7 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mattn/go-sqlite3 v1.14.24 // indirect github.com/microcosm-cc/bluemonday v1.0.27 // indirect @@ -196,10 +208,15 @@ require ( github.com/moby/sys/sequential v0.5.0 // indirect github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mpvl/unique v0.0.0-20150818121801-cbe035fff7de // indirect github.com/muesli/reflow v0.3.0 // indirect github.com/muesli/termenv v0.15.3-0.20240618155329-98d742f6907a // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/onsi/ginkgo/v2 v2.19.0 // indirect + github.com/onsi/gomega v1.34.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/paulmach/orb v0.11.1 // indirect @@ -258,6 +275,11 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect + k8s.io/klog/v2 v2.120.1 // indirect + k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) exclude github.com/keybase/go.dbus v0.0.0-20220506165403-5aa21ea2c23a diff --git a/go.sum b/go.sum index 9e377892..84c51a1d 100644 --- a/go.sum +++ b/go.sum @@ -1809,6 +1809,8 @@ github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkg github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/proto v1.11.0 h1:XcDEsxxv5xBp0jeZ4rt7dj1wuv/GQ4cSAe4BHbhrRXY= github.com/emicklei/proto v1.11.0/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -1835,6 +1837,7 @@ github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBD github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -1910,10 +1913,16 @@ github.com/go-ole/go-ole v1.2.7-0.20211215081658-ee6c8cce8e87 h1:GylhL0uJeyEQjzF github.com/go-ole/go-ole v1.2.7-0.20211215081658-ee6c8cce8e87/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= +github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXymS4zJbYVCZmcgkasdo= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= @@ -1926,7 +1935,10 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= @@ -2005,6 +2017,10 @@ github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6 github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= +github.com/google/gnostic v0.7.0 h1:d7EpuFp8vVdML+y0JJJYiKeOLjKTdH/GvVkLOBWqJpw= +github.com/google/gnostic v0.7.0/go.mod h1:IAcUyMl6vtC95f60EZ8oXyqTsOersP6HbwjeG7EyDPM= +github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 h1:0VpGH+cDhbDtdcweoyCVsF3fhN8kejK6rFe/2FFX2nU= +github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -2054,6 +2070,8 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.0/go.mod h1:OJpEgntRZo8ugHpF9hkoLJbS5dSI20XZeXJ9JVywLlM= github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= @@ -2219,11 +2237,13 @@ github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUB github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -2294,6 +2314,8 @@ github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3v github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A= github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA= @@ -2375,9 +2397,11 @@ github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae/go.mod h1:E2VnQOmVuvZB6U github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= @@ -2392,6 +2416,7 @@ github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKt github.com/muesli/termenv v0.15.3-0.20240618155329-98d742f6907a h1:2MaM6YC3mGu54x+RKAA6JiFFHlHDY1UbkxqppT7wYOg= github.com/muesli/termenv v0.15.3-0.20240618155329-98d742f6907a/go.mod h1:hxSnBBYLK21Vtq/PHd0S2FYCxBXzBua8ov5s1RobyRQ= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= @@ -2421,6 +2446,8 @@ github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47 github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8AyFNU9d0= github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -2436,6 +2463,8 @@ github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeR github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM= github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= +github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os= +github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -2724,6 +2753,7 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx0K/GyB0o2bww= @@ -3069,6 +3099,7 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -3687,6 +3718,7 @@ google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/genproto v0.0.0-20230525234025-438c736192d0/go.mod h1:9ExIQyXL5hZrHzQceCwuSYwZZ5QZBazOcprJ5rgs3lY= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= google.golang.org/genproto v0.0.0-20230629202037-9506855d4529/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= @@ -3870,10 +3902,12 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +k8s.io/api v0.26.1 h1:f+SWYiPd/GsiWwVRz+NbFyCgvv75Pk9NK6dlkZgpCRQ= k8s.io/api v0.26.1/go.mod h1:xd/GBNgR0f707+ATNyPmQ1oyKSgndzXij81FzWGsejg= k8s.io/apimachinery v0.26.1 h1:8EZ/eGJL+hY/MYCNwhmDzVqq2lPl3N3Bo8rvweJwXUQ= k8s.io/apimachinery v0.26.1/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= k8s.io/apiserver v0.26.1/go.mod h1:wr75z634Cv+sifswE9HlAo5FQ7UoUauIICRlOE+5dCg= +k8s.io/client-go v0.26.1 h1:87CXzYJnAMGaa/IDDfRdhTzxk/wzGZ+/HUQpqgVSZXU= k8s.io/client-go v0.26.1/go.mod h1:IWNSglg+rQ3OcvDkhY6+QLeasV4OYHDjdqeWkDQZwGE= k8s.io/code-generator v0.26.1/go.mod h1:OMoJ5Dqx1wgaQzKgc+ZWaZPfGjdRq/Y3WubFrZmeI3I= k8s.io/component-base v0.26.1/go.mod h1:VHrLR0b58oC035w6YQiBSbtsf0ThuSwXP+p5dD/kAWU= @@ -3886,13 +3920,19 @@ k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kms v0.26.1/go.mod h1:ReC1IEGuxgfN+PDCIpR6w8+XMmDE7uJhxcCwMZFdIYc= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20221107191617-1a15be271d1d/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= lukechampine.com/uint128 v1.3.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= @@ -3959,8 +3999,12 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/pkg/container/README.md b/pkg/container/README.md new file mode 100644 index 00000000..6fb02f7e --- /dev/null +++ b/pkg/container/README.md @@ -0,0 +1,36 @@ +This package runs both unit and integration tests. Integration tests against Kubernetes (using kind) are executed only if the environment variable TEST_KUBERNETES_INTEGRATION is set. + +## Prerequisites + +- Go (v1.18+) +- Docker +- kind (for Kubernetes integration tests) + +## Installing kind + +To install kind, run: + +go install sigs.k8s.io/kind@v0.26.0 + +Make sure your GOPATH/bin is in your PATH. + +More information on kind can be found [here](https://kind.sigs.k8s.io/). + +## Running Tests + +### Unit Tests + +By default, integration tests are skipped. Run all tests with: + +go test -v ./... + +### Integration Tests + +To run integration tests: + +1. Set the environment variable: +   export TEST_KUBERNETES_INTEGRATION=1 +2. Run the tests: +   go test -v ./... + +The integration tests will create a temporary kind cluster, perform the tests, and then clean up the cluster. diff --git a/pkg/docker/client.go b/pkg/container/client.go similarity index 68% rename from pkg/docker/client.go rename to pkg/container/client.go index 497f692b..1da52144 100644 --- a/pkg/docker/client.go +++ b/pkg/container/client.go @@ -1,4 +1,4 @@ -package docker +package container import ( "context" @@ -8,6 +8,8 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" v1 "github.com/opencontainers/image-spec/specs-go/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type DockerClient interface { @@ -21,3 +23,10 @@ type DockerClient interface { Ping(ctx context.Context) (types.Ping, error) ContainerKill(ctx context.Context, containerID, signal string) error } + +type KubernetesClient interface { + CreatePod(ctx context.Context, namespace string, pod *corev1.Pod) (*corev1.Pod, error) + GetPodLogs(ctx context.Context, namespace, podName, containerName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) + GetPod(ctx context.Context, namespace, podName string) (*corev1.Pod, error) + DeletePod(ctx context.Context, namespace, podName string, opts *metav1.DeleteOptions) error +} diff --git a/pkg/container/container.go b/pkg/container/container.go new file mode 100644 index 00000000..b857d0fc --- /dev/null +++ b/pkg/container/container.go @@ -0,0 +1,36 @@ +package container + +import ( + "context" + "io" + "os" + + "github.com/docker/docker/api/types" + "github.com/doublecloud/transfer/library/go/core/xerrors" + "go.ytsaurus.tech/library/go/core/log" +) + +type ContainerImpl interface { + Run(context.Context, ContainerOpts) (io.Reader, io.Reader, error) + Pull(context.Context, string, types.ImagePullOptions) error +} + +func NewContainerImpl(l log.Logger) (ContainerImpl, error) { + if isRunningInKubernetes() { + k8sClient, err := NewK8sWrapper() + if err != nil { + return nil, xerrors.Errorf("unable to init k8s wrapper: %w", err) + } + return k8sClient, nil + } + dockerClient, err := NewDockerWrapper(l) + if err != nil { + return nil, xerrors.Errorf("unable to init docker wrapper: %w", err) + } + return dockerClient, nil +} + +func isRunningInKubernetes() bool { + _, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST") + return exists +} diff --git a/pkg/container/container_opts.go b/pkg/container/container_opts.go new file mode 100644 index 00000000..f8bcf513 --- /dev/null +++ b/pkg/container/container_opts.go @@ -0,0 +1,130 @@ +package container + +import ( + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + corev1 "k8s.io/api/core/v1" +) + +type Volume struct { + Name string + HostPath string + ContainerPath string + VolumeType string + ReadOnly bool +} + +type ContainerOpts struct { + Env map[string]string + LogOptions map[string]string + Namespace string + RestartPolicy corev1.RestartPolicy + PodName string + Image string + LogDriver string + Network string + ContainerName string + Volumes []Volume + Command []string + Args []string + Timeout time.Duration + AttachStdout bool + AttachStderr bool + AutoRemove bool +} + +func (c *ContainerOpts) String() string { + if isRunningInKubernetes() { + return c.ToK8sOpts().String() + } + + return c.ToDockerOpts().String() +} + +func (c *ContainerOpts) ToDockerOpts() DockerOpts { + var envSlice []string + for key, value := range c.Env { + envSlice = append(envSlice, key+"="+value) + } + + var mounts []mount.Mount + for _, vol := range c.Volumes { + mounts = append(mounts, mount.Mount{ + Type: mount.Type(vol.VolumeType), + Source: vol.HostPath, + Target: vol.ContainerPath, + ReadOnly: vol.ReadOnly, + }) + } + + var restartPolicy container.RestartPolicy + switch c.RestartPolicy { + case corev1.RestartPolicyAlways: + restartPolicy.Name = container.RestartPolicyAlways + case corev1.RestartPolicyOnFailure: + restartPolicy.Name = container.RestartPolicyOnFailure + case corev1.RestartPolicyNever: + restartPolicy.Name = container.RestartPolicyDisabled + } + + return DockerOpts{ + RestartPolicy: restartPolicy, + Mounts: mounts, + LogDriver: c.LogDriver, + LogOptions: c.LogOptions, + Image: c.Image, + Network: c.Network, + ContainerName: c.ContainerName, + Command: c.Command, + Env: envSlice, + Timeout: c.Timeout, + AutoRemove: c.AutoRemove, + AttachStdout: c.AttachStdout, + AttachStderr: c.AttachStderr, + } +} + +func (c *ContainerOpts) ToK8sOpts() K8sOpts { + var envVars []corev1.EnvVar + for key, value := range c.Env { + envVars = append(envVars, corev1.EnvVar{ + Name: key, + Value: value, + }) + } + + var k8sVolumes []corev1.Volume + var volumeMounts []corev1.VolumeMount + for _, vol := range c.Volumes { + volumeName := vol.Name + k8sVolumes = append(k8sVolumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: vol.HostPath, + }, + }, + }) + + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: vol.ContainerPath, + ReadOnly: vol.ReadOnly, + }) + } + + return K8sOpts{ + Namespace: c.Namespace, + PodName: c.PodName, + Image: c.Image, + RestartPolicy: c.RestartPolicy, + Command: c.Command, + Args: c.Args, + Env: envVars, + Volumes: k8sVolumes, + VolumeMounts: volumeMounts, + Timeout: c.Timeout, + } +} diff --git a/pkg/docker/docker.go b/pkg/container/docker.go similarity index 63% rename from pkg/docker/docker.go rename to pkg/container/docker.go index 9a750300..f1b4f5bb 100644 --- a/pkg/docker/docker.go +++ b/pkg/container/docker.go @@ -1,19 +1,15 @@ -package docker +package container import ( "bytes" "context" - "fmt" "io" "os" "os/exec" - "sort" - "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" @@ -71,7 +67,11 @@ func (d *DockerWrapper) Pull(ctx context.Context, image string, opts types.Image return nil } -func (d *DockerWrapper) Run(ctx context.Context, opts DockerOpts) (stdout io.Reader, stderr io.Reader, err error) { +func (d *DockerWrapper) Run(ctx context.Context, opts ContainerOpts) (stdout io.Reader, stderr io.Reader, err error) { + return d.RunContainer(ctx, opts.ToDockerOpts()) +} + +func (d *DockerWrapper) RunContainer(ctx context.Context, opts DockerOpts) (stdout io.Reader, stderr io.Reader, err error) { if d.cli == nil { return nil, nil, xerrors.Errorf("docker unavailable") } @@ -80,16 +80,6 @@ func (d *DockerWrapper) Run(ctx context.Context, opts DockerOpts) (stdout io.Rea return nil, nil, err } - var mountsList []mount.Mount - for hostPath, containerPath := range opts.Volumes { - mountsList = append(mountsList, mount.Mount{ - Type: mount.TypeBind, - Source: hostPath, - Target: containerPath, - }) - } - mountsList = append(mountsList, opts.Mounts...) - containerConfig := &container.Config{ Image: opts.Image, Cmd: opts.Command, @@ -99,11 +89,15 @@ func (d *DockerWrapper) Run(ctx context.Context, opts DockerOpts) (stdout io.Rea } hostConfig := &container.HostConfig{ - Mounts: mountsList, + Mounts: opts.Mounts, AutoRemove: opts.AutoRemove, LogConfig: container.LogConfig{Type: opts.LogDriver, Config: opts.LogOptions}, } + if opts.RestartPolicy.Name != "" { + hostConfig.RestartPolicy = opts.RestartPolicy + } + networkingConfig := &network.NetworkingConfig{} if opts.Network != "" { networkingConfig.EndpointsConfig = map[string]*network.EndpointSettings{ @@ -176,6 +170,11 @@ func (d *DockerWrapper) Run(ctx context.Context, opts DockerOpts) (stdout io.Rea func (d *DockerWrapper) ensureDocker(supervisorConfigPath string, timeout time.Duration) error { if supervisorConfigPath == "" { + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + return xerrors.Errorf("unable to init docker cli: %w", err) + } + d.cli = cli // no supervisor, assume docker is already running. if !d.isDockerReady() { return xerrors.New("docker is not ready") @@ -237,125 +236,3 @@ func (d *DockerWrapper) ensureDocker(supervisorConfigPath string, timeout time.D return xerrors.Errorf("timeout: %v waiting for Docker to be ready", timeout) } } - -type DockerOpts struct { - Volumes map[string]string - Mounts []mount.Mount - LogDriver string - LogOptions map[string]string - Image string - Network string - ContainerName string - Command []string - Env []string - Timeout time.Duration - AutoRemove bool - AttachStdout bool - AttachStderr bool -} - -func (o *DockerOpts) String() string { - var args []string - - // AutoRemove - if o.AutoRemove { - args = append(args, "--rm") - } - - // ContainerName - if o.ContainerName != "" { - args = append(args, "--name", o.ContainerName) - } - - // Network - if o.Network != "" { - args = append(args, "--network", o.Network) - } - - // Volumes (handled with -v) - if len(o.Volumes) > 0 { - var volumeKeys []string - for hostPath := range o.Volumes { - volumeKeys = append(volumeKeys, hostPath) - } - sort.Strings(volumeKeys) - for _, hostPath := range volumeKeys { - containerPath := o.Volumes[hostPath] - args = append(args, "-v", fmt.Sprintf("%s:%s", hostPath, containerPath)) - } - } - - // Mounts (handled with --mount) - if len(o.Mounts) > 0 { - sort.Slice(o.Mounts, func(i, j int) bool { - ikey := fmt.Sprintf("%s/%s/%s", o.Mounts[i].Type, o.Mounts[i].Source, o.Mounts[i].Target) - jkey := fmt.Sprintf("%s/%s/%s", o.Mounts[j].Type, o.Mounts[j].Source, o.Mounts[j].Target) - return ikey < jkey - }) - for _, m := range o.Mounts { - mountOpts := []string{ - fmt.Sprintf("type=%s", m.Type), - fmt.Sprintf("source=%s", m.Source), - fmt.Sprintf("target=%s", m.Target), - } - if m.ReadOnly { - mountOpts = append(mountOpts, "readonly") - } - - args = append(args, "--mount", strings.Join(mountOpts, ",")) - } - } - - // Environment Variables - if len(o.Env) > 0 { - sort.Strings(o.Env) - for _, envVar := range o.Env { - args = append(args, "-e", envVar) - } - } - - // Log Driver and Options - if o.LogDriver != "" { - args = append(args, "--log-driver", o.LogDriver) - if len(o.LogOptions) > 0 { - var logOptKeys []string - for key := range o.LogOptions { - logOptKeys = append(logOptKeys, key) - } - sort.Strings(logOptKeys) - for _, key := range logOptKeys { - value := o.LogOptions[key] - args = append(args, "--log-opt", fmt.Sprintf("%s=%s", key, value)) - } - } - } - - // Attach options - var attachOptions []string - if o.AttachStderr { - attachOptions = append(attachOptions, "stderr") - } - if o.AttachStdout { - attachOptions = append(attachOptions, "stdout") - } - if len(attachOptions) > 0 { - sort.Strings(attachOptions) - for _, attach := range attachOptions { - args = append(args, "--attach", attach) - } - } - - // Image - if o.Image != "" { - args = append(args, o.Image) - } - - // Command - if len(o.Command) > 0 { - args = append(args, o.Command...) - } - - cmd := append([]string{"docker", "run"}, args...) - - return strings.Join(cmd, " ") -} diff --git a/pkg/docker/mocks.go b/pkg/container/docker_mocks.go similarity index 99% rename from pkg/docker/mocks.go rename to pkg/container/docker_mocks.go index 85437682..e2a000d1 100644 --- a/pkg/docker/mocks.go +++ b/pkg/container/docker_mocks.go @@ -1,4 +1,4 @@ -package docker +package container import ( "context" diff --git a/pkg/container/docker_options.go b/pkg/container/docker_options.go new file mode 100644 index 00000000..33090ca8 --- /dev/null +++ b/pkg/container/docker_options.go @@ -0,0 +1,125 @@ +package container + +import ( + "fmt" + "sort" + "strings" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" +) + +type DockerOpts struct { + RestartPolicy container.RestartPolicy + Mounts []mount.Mount + LogDriver string + LogOptions map[string]string + Image string + Network string + ContainerName string + Command []string + Env []string + Timeout time.Duration + AutoRemove bool + AttachStdout bool + AttachStderr bool +} + +func (o DockerOpts) String() string { + var args []string + + // AutoRemove + if o.AutoRemove { + args = append(args, "--rm") + } + + // ContainerName + if o.ContainerName != "" { + args = append(args, "--name", o.ContainerName) + } + + // Network + if o.Network != "" { + args = append(args, "--network", o.Network) + } + + // Mounts + if len(o.Mounts) > 0 { + sort.Slice(o.Mounts, func(i, j int) bool { + ikey := fmt.Sprintf("%s/%s/%s", o.Mounts[i].Type, o.Mounts[i].Source, o.Mounts[i].Target) + jkey := fmt.Sprintf("%s/%s/%s", o.Mounts[j].Type, o.Mounts[j].Source, o.Mounts[j].Target) + return ikey < jkey + }) + for _, m := range o.Mounts { + mountOpts := []string{ + fmt.Sprintf("type=%s", m.Type), + fmt.Sprintf("source=%s", m.Source), + fmt.Sprintf("target=%s", m.Target), + } + if m.ReadOnly { + mountOpts = append(mountOpts, "readonly") + } + + args = append(args, "--mount", strings.Join(mountOpts, ",")) + } + } + + // Environment Variables + if len(o.Env) > 0 { + sort.Strings(o.Env) + for _, envVar := range o.Env { + args = append(args, "-e", envVar) + } + } + + // Log Driver and options + if o.LogDriver != "" { + args = append(args, "--log-driver", o.LogDriver) + if len(o.LogOptions) > 0 { + var logOptKeys []string + for key := range o.LogOptions { + logOptKeys = append(logOptKeys, key) + } + sort.Strings(logOptKeys) + for _, key := range logOptKeys { + value := o.LogOptions[key] + args = append(args, "--log-opt", fmt.Sprintf("%s=%s", key, value)) + } + } + } + + // Attach + var attachOptions []string + if o.AttachStderr { + attachOptions = append(attachOptions, "stderr") + } + if o.AttachStdout { + attachOptions = append(attachOptions, "stdout") + } + if len(attachOptions) > 0 { + sort.Strings(attachOptions) + for _, attach := range attachOptions { + args = append(args, "--attach", attach) + } + } + + // Image + if o.Image != "" { + args = append(args, o.Image) + } + + // Command + if len(o.Command) > 0 { + args = append(args, o.Command...) + } + + // RestartPolicy + if o.RestartPolicy.Name != "" { + args = append(args, "--restart", string(o.RestartPolicy.Name)) + } + + cmd := append([]string{"docker", "run"}, args...) + + return strings.Join(cmd, " ") +} diff --git a/pkg/docker/docker_test.go b/pkg/container/docker_test.go similarity index 84% rename from pkg/docker/docker_test.go rename to pkg/container/docker_test.go index 38539abe..028a1877 100644 --- a/pkg/docker/docker_test.go +++ b/pkg/container/docker_test.go @@ -1,4 +1,4 @@ -package docker +package container import ( "bufio" @@ -40,22 +40,23 @@ func TestDockerOptsString(t *testing.T) { expected: "docker run --name my-container nginx", }, { - name: "WithNetwork", + name: "WithRestartPolicy", opts: DockerOpts{ - Network: "my-network", - Image: "ubuntu", + ContainerName: "my-container", + Image: "nginx", + RestartPolicy: container.RestartPolicy{ + Name: container.RestartPolicyAlways, + }, }, - expected: "docker run --network my-network ubuntu", + expected: "docker run --name my-container nginx --restart always", }, { - name: "WithVolumes", + name: "WithNetwork", opts: DockerOpts{ - Volumes: map[string]string{ - "/host/data": "/container/data", - }, - Image: "busybox", + Network: "my-network", + Image: "ubuntu", }, - expected: "docker run -v /host/data:/container/data busybox", + expected: "docker run --network my-network ubuntu", }, { name: "WithEnvVariables", @@ -97,19 +98,15 @@ func TestDockerOptsString(t *testing.T) { AutoRemove: true, ContainerName: "full-container", Network: "full-network", - Volumes: map[string]string{ - "/host/vol1": "/container/vol1", - "/host/vol2": "/container/vol2", - }, - Env: []string{"ENV1=val1", "ENV2=val2"}, - LogDriver: "syslog", - LogOptions: map[string]string{"syslog-address": "tcp://192.168.0.42:123"}, - AttachStdout: true, - AttachStderr: false, - Image: "full-image", - Command: []string{"bash", "-c", "echo Full Test"}, + Env: []string{"ENV1=val1", "ENV2=val2"}, + LogDriver: "syslog", + LogOptions: map[string]string{"syslog-address": "tcp://192.168.0.42:123"}, + AttachStdout: true, + AttachStderr: false, + Image: "full-image", + Command: []string{"bash", "-c", "echo Full Test"}, }, - expected: "docker run --rm --name full-container --network full-network -v /host/vol1:/container/vol1 -v /host/vol2:/container/vol2 -e ENV1=val1 -e ENV2=val2 --log-driver syslog --log-opt syslog-address=tcp://192.168.0.42:123 --attach stdout full-image bash -c echo Full Test", + expected: "docker run --rm --name full-container --network full-network -e ENV1=val1 -e ENV2=val2 --log-driver syslog --log-opt syslog-address=tcp://192.168.0.42:123 --attach stdout full-image bash -c echo Full Test", }, { name: "WithMountsOnly", @@ -131,23 +128,6 @@ func TestDockerOptsString(t *testing.T) { }, expected: "docker run --mount type=bind,source=/host/config,target=/container/config,readonly --mount type=tmpfs,source=,target=/container/tmp nginx", }, - { - name: "WithVolumesAndMounts", - opts: DockerOpts{ - Volumes: map[string]string{ - "/host/data": "/container/data", - }, - Mounts: []mount.Mount{ - { - Type: mount.TypeVolume, - Source: "myvolume", - Target: "/container/volume", - }, - }, - Image: "postgres", - }, - expected: "docker run -v /host/data:/container/data --mount type=volume,source=myvolume,target=/container/volume postgres", - }, { name: "WithAttachStdoutOnly", opts: DockerOpts{ @@ -186,18 +166,6 @@ func TestDockerOptsString(t *testing.T) { }, expected: "docker run -e A_VAR=first -e M_VAR=middle -e Z_VAR=last env-test", }, - { - name: "WithMultipleVolumesUnordered", - opts: DockerOpts{ - Volumes: map[string]string{ - "/host/c": "/container/c", - "/host/a": "/container/a", - "/host/b": "/container/b", - }, - Image: "vol-test", - }, - expected: "docker run -v /host/a:/container/a -v /host/b:/container/b -v /host/c:/container/c vol-test", - }, { name: "WithMultipleMountsUnordered", opts: DockerOpts{ @@ -371,7 +339,7 @@ func TestDockerWrapper_Run_Success(t *testing.T) { AttachStderr: true, } - stdout, stderr, err := dw.Run(context.Background(), opts) + stdout, stderr, err := dw.RunContainer(context.Background(), opts) if tc.expectError { assert.Error(t, err) } else { diff --git a/pkg/container/kubernetes.go b/pkg/container/kubernetes.go new file mode 100644 index 00000000..36fe77dd --- /dev/null +++ b/pkg/container/kubernetes.go @@ -0,0 +1,126 @@ +package container + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/docker/docker/api/types" + "github.com/doublecloud/transfer/library/go/core/xerrors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +type K8sWrapper struct { + client kubernetes.Interface +} + +func NewK8sWrapper() (*K8sWrapper, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, xerrors.Errorf("failed to load in-cluster config: %w", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, xerrors.Errorf("failed to create k8s client: %w", err) + } + return &K8sWrapper{client: clientset}, nil +} + +func (w *K8sWrapper) Run(ctx context.Context, opts ContainerOpts) (io.Reader, io.Reader, error) { + // Unfortunately, Kubernetes does not provide a way to demux stdout and stderr + b, err := w.RunPod(ctx, opts.ToK8sOpts()) + + return b, nil, err +} + +func (w *K8sWrapper) Pull(_ context.Context, _ string, _ types.ImagePullOptions) error { + // No need to pull images in k8s + return nil +} + +func (w *K8sWrapper) RunPod(ctx context.Context, opts K8sOpts) (*bytes.Buffer, error) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.PodName, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: opts.PodName, + Image: opts.Image, + Command: opts.Command, + Args: opts.Args, + Env: opts.Env, + VolumeMounts: opts.VolumeMounts, + }, + }, + Volumes: opts.Volumes, + RestartPolicy: opts.RestartPolicy, + }, + } + + _, err := w.client.CoreV1().Pods(opts.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return nil, xerrors.Errorf("failed to create pod: %w", err) + } + + timeout := time.After(opts.Timeout) + tick := time.NewTicker(2 * time.Second) + defer tick.Stop() + +waitLoop: + for { + select { + case <-timeout: + // If timed out, clean up. + _ = w.client.CoreV1().Pods(opts.Namespace).Delete(ctx, opts.PodName, metav1.DeleteOptions{}) + return nil, xerrors.Errorf("timeout waiting for pod %s to complete", opts.PodName) + case <-tick.C: + p, err := w.client.CoreV1().Pods(opts.Namespace).Get(ctx, opts.PodName, metav1.GetOptions{}) + if err != nil { + return nil, xerrors.Errorf("failed to get pod info: %w", err) + } + phase := p.Status.Phase + if phase == corev1.PodSucceeded || phase == corev1.PodFailed { + break waitLoop + } + } + } + + logOpts := &corev1.PodLogOptions{ + Container: opts.PodName, + } + rc := w.client.CoreV1().Pods(opts.Namespace).GetLogs(opts.PodName, logOpts) + stream, err := rc.Stream(ctx) + if err != nil { + return nil, xerrors.Errorf("failed to stream pod logs: %w", err) + } + defer stream.Close() + + stdout := new(bytes.Buffer) + + _, err = io.Copy(stdout, stream) + if err != nil { + return stdout, xerrors.Errorf("failed copying pod logs: %w", err) + } + + _ = w.client.CoreV1().Pods(opts.Namespace).Delete(ctx, opts.PodName, metav1.DeleteOptions{}) + return stdout, nil +} + +func NewK8sWrapperFromKubeconfig(kubeconfigPath string) (*K8sWrapper, error) { + config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return nil, xerrors.Errorf("unable to build kubeconfig: %w", err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, xerrors.Errorf("unable to connect to k8s: %w", err) + } + return &K8sWrapper{client: clientset}, nil +} diff --git a/pkg/container/kubernetes_mocks.go b/pkg/container/kubernetes_mocks.go new file mode 100644 index 00000000..72b98633 --- /dev/null +++ b/pkg/container/kubernetes_mocks.go @@ -0,0 +1,34 @@ +package container + +import ( + "context" + "io" + + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type MockKubernetesClient struct { + mock.Mock +} + +func (m *MockKubernetesClient) CreatePod(ctx context.Context, namespace string, pod *corev1.Pod) (*corev1.Pod, error) { + args := m.Called(ctx, namespace, pod) + return args.Get(0).(*corev1.Pod), args.Error(1) +} + +func (m *MockKubernetesClient) GetPodLogs(ctx context.Context, namespace, podName, containerName string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + args := m.Called(ctx, namespace, podName, containerName, opts) + return args.Get(0).(io.ReadCloser), args.Error(1) +} + +func (m *MockKubernetesClient) GetPod(ctx context.Context, namespace, podName string) (*corev1.Pod, error) { + args := m.Called(ctx, namespace, podName) + return args.Get(0).(*corev1.Pod), args.Error(1) +} + +func (m *MockKubernetesClient) DeletePod(ctx context.Context, namespace, podName string, opts *metav1.DeleteOptions) error { + args := m.Called(ctx, namespace, podName, opts) + return args.Error(0) +} diff --git a/pkg/container/kubernetes_options.go b/pkg/container/kubernetes_options.go new file mode 100644 index 00000000..c7c8f243 --- /dev/null +++ b/pkg/container/kubernetes_options.go @@ -0,0 +1,56 @@ +package container + +import ( + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" +) + +type K8sOpts struct { + Namespace string + PodName string + Image string + RestartPolicy corev1.RestartPolicy + Command []string + Args []string + Env []corev1.EnvVar + Volumes []corev1.Volume + VolumeMounts []corev1.VolumeMount + Timeout time.Duration +} + +func (k K8sOpts) String() string { + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: k.PodName, + Namespace: k.Namespace, + }, + Spec: corev1.PodSpec{ + RestartPolicy: k.RestartPolicy, + Containers: []corev1.Container{ + { + Name: k.PodName, + Image: k.Image, + Command: k.Command, + Args: k.Args, + Env: k.Env, + VolumeMounts: k.VolumeMounts, + }, + }, + Volumes: k.Volumes, + }, + } + + b, err := yaml.Marshal(pod) + if err != nil { + return fmt.Sprintf("error marshalling pod to YAML: %v", err) + } + return string(b) +} diff --git a/pkg/container/kubernetes_test.go b/pkg/container/kubernetes_test.go new file mode 100644 index 00000000..cece487c --- /dev/null +++ b/pkg/container/kubernetes_test.go @@ -0,0 +1,377 @@ +package container + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" +) + +type K8sWrapperTest struct { + client KubernetesClient +} + +func (w *K8sWrapperTest) RunPod(ctx context.Context, opts K8sOpts) (stdout bytes.Buffer, err error) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.PodName, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: opts.PodName, + Image: opts.Image, + Command: opts.Command, + Args: opts.Args, + Env: opts.Env, + VolumeMounts: opts.VolumeMounts, + }, + }, + Volumes: opts.Volumes, + RestartPolicy: opts.RestartPolicy, + }, + } + + _, err = w.client.CreatePod(ctx, opts.Namespace, pod) + if err != nil { + return stdout, fmt.Errorf("failed to create pod: %w", err) + } + + timeout := time.After(opts.Timeout) + tick := time.NewTicker(500 * time.Millisecond) + defer tick.Stop() +waitingLoop: + for { + select { + case <-timeout: + _ = w.client.DeletePod(ctx, opts.Namespace, opts.PodName, &metav1.DeleteOptions{}) + return stdout, fmt.Errorf("timeout waiting for pod %s to complete", opts.PodName) + case <-tick.C: + p, err := w.client.GetPod(ctx, opts.Namespace, opts.PodName) + if err != nil { + return stdout, fmt.Errorf("failed to get pod info: %w", err) + } + if p.Status.Phase == corev1.PodSucceeded || p.Status.Phase == corev1.PodFailed { + break waitingLoop + } + } + } + + logOpts := &corev1.PodLogOptions{ + Container: opts.PodName, + } + rc, err := w.client.GetPodLogs(ctx, opts.Namespace, opts.PodName, opts.PodName, logOpts) + if err != nil { + return stdout, fmt.Errorf("failed to get pod logs: %w", err) + } + defer rc.Close() + + _, err = io.Copy(&stdout, rc) + if err != nil { + return stdout, fmt.Errorf("failed copying pod logs: %w", err) + } + + _ = w.client.DeletePod(ctx, opts.Namespace, opts.PodName, &metav1.DeleteOptions{}) + return stdout, nil +} + +func TestK8sOptsString(t *testing.T) { + opts := K8sOpts{ + Namespace: "default", + PodName: "example-pod", + Image: "nginx:latest", + RestartPolicy: corev1.RestartPolicyAlways, + Command: []string{"nginx"}, + Args: []string{"-g", "daemon off;"}, + Env: []corev1.EnvVar{ + { + Name: "ENV_VAR", + Value: "value", + }, + }, + Volumes: []corev1.Volume{ + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "data", + MountPath: "/data", + }, + }, + Timeout: 30 * time.Second, + } + + yamlData := opts.String() + + var pod corev1.Pod + if err := yaml.Unmarshal([]byte(yamlData), &pod); err != nil { + t.Fatalf("Unmarshalling pod YAML failed: %v", err) + } + + // Verify that the pod metadata matches. + if pod.ObjectMeta.Name != opts.PodName { + t.Errorf("Expected pod name %q, got %q", opts.PodName, pod.ObjectMeta.Name) + } + if pod.ObjectMeta.Namespace != opts.Namespace { + t.Errorf("Expected namespace %q, got %q", opts.Namespace, pod.ObjectMeta.Namespace) + } + + // Verify restart policy. + if pod.Spec.RestartPolicy != opts.RestartPolicy { + t.Errorf("Expected restart policy %q, got %q", opts.RestartPolicy, pod.Spec.RestartPolicy) + } + + // Verify that there's one container and its fields match. + if len(pod.Spec.Containers) != 1 { + t.Fatalf("Expected exactly one container, got %d", len(pod.Spec.Containers)) + } + + container := pod.Spec.Containers[0] + if container.Image != opts.Image { + t.Errorf("Expected image %q, got %q", opts.Image, container.Image) + } + if len(container.Command) != len(opts.Command) { + t.Errorf("Expected %d command(s), got %d", len(opts.Command), len(container.Command)) + } + for i, v := range container.Command { + if v != opts.Command[i] { + t.Errorf("Expected command at index %d to be %q, got %q", i, opts.Command[i], v) + } + } + if len(container.Args) != len(opts.Args) { + t.Errorf("Expected %d arg(s), got %d", len(opts.Args), len(container.Args)) + } + for i, v := range container.Args { + if v != opts.Args[i] { + t.Errorf("Expected arg at index %d to be %q, got %q", i, opts.Args[i], v) + } + } + + // Verify environment variables. + if len(container.Env) != len(opts.Env) { + t.Errorf("Expected %d environment variables, got %d", len(opts.Env), len(container.Env)) + } else { + for i, envVar := range container.Env { + if envVar.Name != opts.Env[i].Name || envVar.Value != opts.Env[i].Value { + t.Errorf("Expected env var at index %d to be %+v, got %+v", i, opts.Env[i], envVar) + } + } + } + + // Verify VolumeMounts in container. + if len(container.VolumeMounts) != len(opts.VolumeMounts) { + t.Errorf("Expected %d volumeMount(s), got %d", len(opts.VolumeMounts), len(container.VolumeMounts)) + } else { + for i, vm := range container.VolumeMounts { + if vm.Name != opts.VolumeMounts[i].Name || vm.MountPath != opts.VolumeMounts[i].MountPath { + t.Errorf("Expected volumeMount at index %d to be %+v, got %+v", i, opts.VolumeMounts[i], vm) + } + } + } + + // Verify Volumes at pod spec level. + if len(pod.Spec.Volumes) != len(opts.Volumes) { + t.Errorf("Expected %d volume(s), got %d", len(opts.Volumes), len(pod.Spec.Volumes)) + } else { + for i, vol := range pod.Spec.Volumes { + if vol.Name != opts.Volumes[i].Name { + t.Errorf("Expected volume at index %d to have name %q, got %q", i, opts.Volumes[i].Name, vol.Name) + } + } + } +} + +func TestK8sOptsString_MarshalError(t *testing.T) { + opts := K8sOpts{ + Namespace: "default", + PodName: "test-pod", + Image: "busybox:latest", + RestartPolicy: corev1.RestartPolicyOnFailure, + Command: []string{"sleep"}, + Args: []string{"3600"}, + Env: []corev1.EnvVar{}, + Volumes: []corev1.Volume{}, + VolumeMounts: []corev1.VolumeMount{}, + Timeout: 10 * time.Second, + } + + yamlStr := opts.String() + if yamlStr == "" { + t.Error("Expected a non-empty YAML string") + } + var pod corev1.Pod + if err := yaml.Unmarshal([]byte(yamlStr), &pod); err != nil { + t.Errorf("Expected valid YAML, but got error: %v", err) + } +} + +func TestK8sWrapper_RunPod_Success(t *testing.T) { + mockClient := new(MockKubernetesClient) + wrapper := &K8sWrapperTest{client: mockClient} + ctx := context.Background() + opts := K8sOpts{ + Namespace: "default", + PodName: "test-pod", + Image: "alpine", + Command: []string{"echo", "Hello, Kubernetes!"}, + Timeout: 3 * time.Second, + RestartPolicy: corev1.RestartPolicyNever, + } + + // Expected pod returned from CreatePod. + createdPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.PodName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + + // Setup mock expectations. + mockClient.On("CreatePod", ctx, opts.Namespace, mock.AnythingOfType("*v1.Pod")).Return(createdPod, nil).Once() + + // Simulate GetPod returning a succeeded pod. + succeededPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.PodName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + } + // During polling, GetPod will be called repeatedly. For simplicity, always return the succeeded pod. + mockClient.On("GetPod", ctx, opts.Namespace, opts.PodName).Return(succeededPod, nil).Maybe() + + // Setup GetPodLogs to return a reader with log content. + logContent := "Pod execution log\n" + mockClient.On("GetPodLogs", ctx, opts.Namespace, opts.PodName, opts.PodName, mock.AnythingOfType("*v1.PodLogOptions")).Return(io.NopCloser(strings.NewReader(logContent)), nil).Once() + + // Setup DeletePod expectation. + mockClient.On("DeletePod", ctx, opts.Namespace, opts.PodName, mock.Anything).Return(nil).Maybe() + + stdout, err := wrapper.RunPod(ctx, opts) + assert.NoError(t, err) + assert.Equal(t, logContent, stdout.String()) + + mockClient.AssertExpectations(t) +} + +func TestK8sWrapper_RunPod_Timeout(t *testing.T) { + mockClient := new(MockKubernetesClient) + wrapper := &K8sWrapperTest{client: mockClient} + ctx := context.Background() + opts := K8sOpts{ + Namespace: "default", + PodName: "timeout-pod", + Image: "alpine", + Command: []string{"sleep", "100"}, + Timeout: 1 * time.Second, + RestartPolicy: corev1.RestartPolicyNever, + } + + createdPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.PodName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + mockClient.On("CreatePod", ctx, opts.Namespace, mock.AnythingOfType("*v1.Pod")).Return(createdPod, nil).Once() + // Always return the pending pod. + mockClient.On("GetPod", ctx, opts.Namespace, opts.PodName).Return(createdPod, nil).Maybe() + // DeletePod is expected to be called when timing out. + mockClient.On("DeletePod", ctx, opts.Namespace, opts.PodName, mock.Anything).Return(nil).Once() + + stdout, err := wrapper.RunPod(ctx, opts) + assert.Error(t, err) + assert.Contains(t, err.Error(), "timeout") + assert.Equal(t, "", stdout.String()) + + mockClient.AssertExpectations(t) +} + +func TestIntegration_RunPod(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if os.Getenv("TEST_KUBERNETES_INTEGRATION") == "" { + t.Skip("Skipping integration test. To run, set TEST_KUBERNETES_INTEGRATION=1") + } + + kindClusterName := "integration-test-cluster" + + // Create a new Kind cluster. + createCmd := exec.Command("kind", "create", "cluster", "--name", kindClusterName) + createOut, err := createCmd.CombinedOutput() + require.NoError(t, err, "Failed to create Kind cluster: %s", string(createOut)) + + // Ensure that we delete the cluster upon test completion. + defer func() { + deleteCmd := exec.Command("kind", "delete", "cluster", "--name", kindClusterName) + deleteOut, err := deleteCmd.CombinedOutput() + if err != nil { + t.Logf("Failed to delete Kind cluster: %s. Output: %s", err, string(deleteOut)) + } + }() + + // Wait a bit for the cluster to be fully up. + time.Sleep(15 * time.Second) + + // Retrieve the kubeconfig for the new Kind cluster. + kubeconfigCmd := exec.Command("kind", "get", "kubeconfig", "--name", kindClusterName) + kubeconfigData, err := kubeconfigCmd.Output() + require.NoError(t, err, "Failed to retrieve kubeconfig for Kind cluster") + + tmpFile, err := os.CreateTemp("", "kind-kubeconfig-") + require.NoError(t, err, "Failed to create temporary kubeconfig file") + _, err = tmpFile.Write(kubeconfigData) + require.NoError(t, err, "Failed to write kubeconfig data to file") + err = tmpFile.Close() + require.NoError(t, err, "Failed to close temporary kubeconfig file") + defer os.Remove(tmpFile.Name()) + + // Create a new Kubernetes client using the test kubeconfig. + wrapper, err := NewK8sWrapperFromKubeconfig(tmpFile.Name()) + require.NoError(t, err, "Failed to create K8sWrapper from kubeconfig") + + opts := K8sOpts{ + Namespace: "default", + PodName: "integration-pod", + Image: "busybox", // Using busybox as a lightweight image. + Command: []string{"sh", "-c"}, + Args: []string{"echo Integration test successful"}, + RestartPolicy: corev1.RestartPolicyNever, + Timeout: 60 * time.Second, + } + + ctx := context.Background() + stdout, err := wrapper.RunPod(ctx, opts) + require.NoError(t, err, "Failed to run pod in integration test") + + // Read the pod's output. + outputData, err := io.ReadAll(stdout) + require.NoError(t, err, "Failed to read pod output") + + expectedOutput := "Integration test successful" + require.Contains(t, string(outputData), expectedOutput, "Pod output did not contain expected message") +} diff --git a/pkg/providers/airbyte/storage.go b/pkg/providers/airbyte/storage.go index c6d86812..a0c2e1db 100644 --- a/pkg/providers/airbyte/storage.go +++ b/pkg/providers/airbyte/storage.go @@ -16,7 +16,7 @@ import ( "github.com/doublecloud/transfer/pkg/abstract" "github.com/doublecloud/transfer/pkg/abstract/coordinator" "github.com/doublecloud/transfer/pkg/abstract/model" - "github.com/doublecloud/transfer/pkg/docker" + "github.com/doublecloud/transfer/pkg/container" "github.com/doublecloud/transfer/pkg/format" "github.com/doublecloud/transfer/pkg/stats" "github.com/doublecloud/transfer/pkg/util" @@ -38,7 +38,7 @@ type Storage struct { transfer *model.Transfer state map[string]*coordinator.TransferStateData - dw *docker.DockerWrapper + cw container.ContainerImpl } func (a *Storage) Close() {} @@ -367,28 +367,36 @@ func (a *Storage) discover() error { return nil } -func (a *Storage) baseOpts() docker.DockerOpts { - return docker.DockerOpts{ - Volumes: map[string]string{ - a.config.DataDir(): "/data", +func (a *Storage) baseOpts() container.ContainerOpts { + return container.ContainerOpts{ + Env: map[string]string{ + "AWS_EC2_METADATA_DISABLED": "true", }, - Mounts: nil, - LogDriver: "local", LogOptions: map[string]string{ "max-size": "100m", "max-file": "3", }, + Namespace: "", + RestartPolicy: "Never", + PodName: "", Image: a.config.DockerImage(), + LogDriver: "local", Network: "host", ContainerName: "", - Command: nil, - Env: []string{ - "AWS_EC2_METADATA_DISABLED=true", + Volumes: []container.Volume{ + { + Name: "data", + HostPath: a.config.DataDir(), + ContainerPath: "/data", + VolumeType: "bind", + }, }, + Command: nil, + Args: nil, Timeout: 0, - AutoRemove: true, AttachStdout: true, AttachStderr: true, + AutoRemove: true, } } @@ -400,7 +408,7 @@ func (a *Storage) runRawCommand(args ...string) (io.Reader, io.Reader, error) { a.logger.Info(opts.String()) - return a.dw.Run(ctx, opts) + return a.cw.Run(ctx, opts) } func (a *Storage) runCommand(args ...string) ([]byte, error) { @@ -482,7 +490,7 @@ func NewStorage(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordi lgr.Info("airbyte storage constructed with state", log.Any("state", state)) } - dockerWrapper, err := docker.NewDockerWrapper(lgr) + containerImpl, err := container.NewContainerImpl(lgr) if err != nil { return nil, xerrors.Errorf("unable to ensure dockerd running, please ensure you have specified supervisord with it: %w", err) } @@ -496,6 +504,6 @@ func NewStorage(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordi metrics: stats.NewSourceStats(registry), transfer: transfer, state: state, - dw: dockerWrapper, + cw: containerImpl, }, nil } diff --git a/pkg/transformer/registry/dbt/runner.go b/pkg/transformer/registry/dbt/runner.go index cb73d09c..db87f7d3 100644 --- a/pkg/transformer/registry/dbt/runner.go +++ b/pkg/transformer/registry/dbt/runner.go @@ -11,12 +11,13 @@ import ( "github.com/doublecloud/transfer/internal/logger" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract/model" - "github.com/doublecloud/transfer/pkg/docker" + "github.com/doublecloud/transfer/pkg/container" "github.com/doublecloud/transfer/pkg/runtime/shared/pod" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" "go.ytsaurus.tech/library/go/core/log" "gopkg.in/yaml.v3" + v1 "k8s.io/api/core/v1" ) type runner struct { @@ -25,11 +26,11 @@ type runner struct { transfer *model.Transfer - dw *docker.DockerWrapper + cw container.ContainerImpl } func newRunner(dst SupportedDestination, cfg *Config, transfer *model.Transfer) (*runner, error) { - dockerWrapper, err := docker.NewDockerWrapper(logger.Log) + containerImpl, err := container.NewContainerImpl(logger.Log) if err != nil { return nil, err } @@ -40,7 +41,7 @@ func newRunner(dst SupportedDestination, cfg *Config, transfer *model.Transfer) transfer: transfer, - dw: dockerWrapper, + cw: containerImpl, }, nil } @@ -60,7 +61,7 @@ func (r *runner) Run(ctx context.Context) error { } func (r *runner) initializeDocker(ctx context.Context) error { - if err := r.dw.Pull(ctx, r.fullImageID(), types.ImagePullOptions{}); err != nil { + if err := r.cw.Pull(ctx, r.fullImageID(), types.ImagePullOptions{}); err != nil { return xerrors.Errorf("docker initialization failed: %w", err) } return nil @@ -162,41 +163,46 @@ func (r *runner) cleanupConfiguration() { } func (r *runner) run(ctx context.Context) error { - opts := docker.DockerOpts{ - Volumes: map[string]string{}, - Mounts: []mount.Mount{ - { - Type: mount.TypeBind, - Source: pathProject(), - Target: "/usr/app", - }, - { - Type: mount.TypeBind, - Source: pathProfiles(), - Target: "/root/.dbt/profiles.yml", - }, + opts := container.ContainerOpts{ + Env: map[string]string{ + "AWS_EC2_METADATA_DISABLED": "true", }, - LogDriver: "local", LogOptions: map[string]string{ "max-size": "100m", "max-file": "3", }, + Namespace: "", + RestartPolicy: v1.RestartPolicyNever, + PodName: "", Image: r.fullImageID(), + LogDriver: "local", Network: "host", ContainerName: "", + Volumes: []container.Volume{ + { + Name: "project", + VolumeType: string(mount.TypeBind), + HostPath: pathProject(), + ContainerPath: "/usr/app", + }, + { + Name: "profiles", + VolumeType: string(mount.TypeBind), + HostPath: pathProfiles(), + ContainerPath: "/root/.dbt/profiles.yml", + }, + }, Command: []string{ r.cfg.Operation, }, - Env: []string{ - "AWS_EC2_METADATA_DISABLED=true", - }, + Args: nil, Timeout: 0, - AutoRemove: true, AttachStdout: true, AttachStderr: true, + AutoRemove: true, } - if _, _, err := r.dw.Run(ctx, opts); err != nil { + if _, _, err := r.cw.Run(ctx, opts); err != nil { return xerrors.Errorf("docker run failed: %w", err) }