From bb8e3b97ca217b59809b1e113ef4e1d7a7a7fb6f Mon Sep 17 00:00:00 2001 From: s-shiraki Date: Sun, 27 Oct 2024 00:30:53 +0900 Subject: [PATCH] feat: Implement delete expired index job (#2702) * feat: Implement delete expired index job * fix: Replace agent Remove RPC to Vald Remove RPC * fix: service name * fix: typo * fix: log method * fix: variable name * fix: use internal package * fix: Change struct field name --- .cspell.json | 1 + .gitfiles | 1 + .../detect-docker-image-tags/action.yaml | 1 + .github/workflows/dockers-image-scan.yaml | 4 + .../dockers-index-deletion-image.yaml | 80 ++++ .github/workflows/dockers-index-deletion.yaml | 80 ++++ .../dockers-release-branch-image.yaml | 6 + Makefile | 1 + Makefile.d/build.mk | 10 + Makefile.d/docker.mk | 13 + cmd/index/job/deletion/main.go | 59 +++ cmd/index/job/deletion/sample.yaml | 230 ++++++++++ dockers/index/job/deletion/Dockerfile | 87 ++++ hack/actions/gen/main.go | 5 + hack/cspell/main.go | 1 + hack/docker/gen/main.go | 4 + internal/config/index_deleter.go | 61 +++ k8s/index/job/deletion/configmap.yaml | 415 ++++++++++++++++++ k8s/index/job/deletion/cronjob.yaml | 146 ++++++ pkg/index/job/deletion/config/config.go | 71 +++ pkg/index/job/deletion/service/deleter.go | 219 +++++++++ pkg/index/job/deletion/service/options.go | 48 ++ pkg/index/job/deletion/usecase/deletion.go | 214 +++++++++ 23 files changed, 1757 insertions(+) create mode 100644 .github/workflows/dockers-index-deletion-image.yaml create mode 100644 .github/workflows/dockers-index-deletion.yaml create mode 100644 cmd/index/job/deletion/main.go create mode 100644 cmd/index/job/deletion/sample.yaml create mode 100644 dockers/index/job/deletion/Dockerfile create mode 100644 internal/config/index_deleter.go create mode 100644 k8s/index/job/deletion/configmap.yaml create mode 100644 k8s/index/job/deletion/cronjob.yaml create mode 100644 pkg/index/job/deletion/config/config.go create mode 100644 pkg/index/job/deletion/service/deleter.go create mode 100644 pkg/index/job/deletion/service/options.go create mode 100644 pkg/index/job/deletion/usecase/deletion.go diff --git a/.cspell.json b/.cspell.json index fd2e1e380d..4c77f84aa5 100644 --- a/.cspell.json +++ b/.cspell.json @@ -41,6 +41,7 @@ "**/cmd/gateway/mirror/mirror", "**/cmd/index/job/correction/index-correction", "**/cmd/index/job/creation/index-creation", + "**/cmd/index/job/deletion/index-deletion", "**/cmd/index/job/readreplica/rotate/readreplica-rotate", "**/cmd/index/job/save/index-save", "**/cmd/index/operator/index-operator", diff --git a/.gitfiles b/.gitfiles index 3461f07a1f..7a541e7829 100644 --- a/.gitfiles +++ b/.gitfiles @@ -96,6 +96,7 @@ .github/workflows/dockers-image-scan.yaml .github/workflows/dockers-index-correction-image.yaml .github/workflows/dockers-index-creation-image.yaml +.github/workflows/dockers-index-deletion-image.yaml .github/workflows/dockers-index-operator-image.yaml .github/workflows/dockers-index-save-image.yaml .github/workflows/dockers-loadtest-image.yaml diff --git a/.github/actions/detect-docker-image-tags/action.yaml b/.github/actions/detect-docker-image-tags/action.yaml index e38088b07a..3e889d360c 100644 --- a/.github/actions/detect-docker-image-tags/action.yaml +++ b/.github/actions/detect-docker-image-tags/action.yaml @@ -48,6 +48,7 @@ runs: ["vdaas/vald-mirror-gateway"]="gateway.mirror.image.tag" ["vdaas/vald-manager-index"]="manager.index.image.tag" ["vdaas/vald-index-creation"]="manager.index.creator.image.tag" + ["vdaas/vald-index-deletion"]="manager.index.delete.image.tag" ["vdaas/vald-index-save"]="manager.index.saver.image.tag" ["vdaas/vald-readreplica-rotate"]="manager.index.readreplica.rotator.image.tag" ["vdaas/vald-helm-operator"]="image.tag" diff --git a/.github/workflows/dockers-image-scan.yaml b/.github/workflows/dockers-image-scan.yaml index a2c6c2baf6..0c46dd41bb 100644 --- a/.github/workflows/dockers-image-scan.yaml +++ b/.github/workflows/dockers-image-scan.yaml @@ -70,6 +70,10 @@ jobs: uses: ./.github/workflows/_docker-image-scan.yaml with: target: index-creation + index-deletion: + uses: ./.github/workflows/_docker-image-scan.yaml + with: + target: index-deletion index-save: uses: ./.github/workflows/_docker-image-scan.yaml with: diff --git a/.github/workflows/dockers-index-deletion-image.yaml b/.github/workflows/dockers-index-deletion-image.yaml new file mode 100644 index 0000000000..31c741d9e5 --- /dev/null +++ b/.github/workflows/dockers-index-deletion-image.yaml @@ -0,0 +1,80 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DO_NOT_EDIT this workflow file is generated by https://github.com/vdaas/vald/blob/main/hack/actions/gen/main.go + +name: 'Build docker image: index-deletion' +on: + push: + branches: + - main + - release/v*.* + - '!release/v*.*.*' + tags: + - '*.*.*' + - v*.*.* + - '*.*.*-*' + - v*.*.*-* + pull_request: + paths: + - hack/docker/gen/main.go + - dockers/index/job/deletion/Dockerfile + - hack/actions/gen/main.go + - .github/workflows/dockers-index-deletion-image.yaml + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - cmd/index/job/deletion/** + - pkg/index/job/deletion/** + - apis/grpc/** + - apis/proto/** + - go.mod + - go.sum + - versions/GO_VERSION + - internal/** + - '!internal/**/*_test.go' + - '!internal/**/*_mock.go' + - '!internal/db/**' + - '!internal/k8s/**' + - Makefile + - Makefile.d/** + pull_request_target: + paths: + - hack/docker/gen/main.go + - dockers/index/job/deletion/Dockerfile + - hack/actions/gen/main.go + - .github/workflows/dockers-index-deletion-image.yaml + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - cmd/index/job/deletion/** + - pkg/index/job/deletion/** + - apis/grpc/** + - apis/proto/** + - go.mod + - go.sum + - versions/GO_VERSION + - internal/** + - '!internal/**/*_test.go' + - '!internal/**/*_mock.go' + - '!internal/db/**' + - '!internal/k8s/**' + - Makefile + - Makefile.d/** +jobs: + build: + uses: ./.github/workflows/_docker-image.yaml + with: + target: index-deletion + secrets: inherit diff --git a/.github/workflows/dockers-index-deletion.yaml b/.github/workflows/dockers-index-deletion.yaml new file mode 100644 index 0000000000..e30db57644 --- /dev/null +++ b/.github/workflows/dockers-index-deletion.yaml @@ -0,0 +1,80 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DO_NOT_EDIT this workflow file is generated by https://github.com/vdaas/vald/blob/main/hack/actions/gen/main.go + +name: "Build docker image: index-deletion" +on: + push: + branches: + - main + - release/v*.* + - "!release/v*.*.*" + tags: + - "*.*.*" + - v*.*.* + - "*.*.*-*" + - v*.*.*-* + pull_request: + paths: + - hack/docker/gen/main.go + - dockers/index/job/deletion/Dockerfile + - hack/actions/gen/main.go + - .github/workflows/dockers-index-deletion-image.yaml + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - cmd/index/job/deletion/** + - pkg/index/job/deletion/** + - apis/grpc/** + - apis/proto/** + - go.mod + - go.sum + - versions/GO_VERSION + - internal/** + - "!internal/**/*_test.go" + - "!internal/**/*_mock.go" + - "!internal/db/**" + - "!internal/k8s/**" + - Makefile + - Makefile.d/** + pull_request_target: + paths: + - hack/docker/gen/main.go + - dockers/index/job/deletion/Dockerfile + - hack/actions/gen/main.go + - .github/workflows/dockers-index-deletion-image.yaml + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - cmd/index/job/deletion/** + - pkg/index/job/deletion/** + - apis/grpc/** + - apis/proto/** + - go.mod + - go.sum + - versions/GO_VERSION + - internal/** + - "!internal/**/*_test.go" + - "!internal/**/*_mock.go" + - "!internal/db/**" + - "!internal/k8s/**" + - Makefile + - Makefile.d/** +jobs: + build: + uses: ./.github/workflows/_docker-image.yaml + with: + target: index-deletion + secrets: inherit diff --git a/.github/workflows/dockers-release-branch-image.yaml b/.github/workflows/dockers-release-branch-image.yaml index 6c0b15a382..ac841f6e3b 100644 --- a/.github/workflows/dockers-release-branch-image.yaml +++ b/.github/workflows/dockers-release-branch-image.yaml @@ -88,6 +88,12 @@ jobs: with: target: index-creation secrets: inherit + index-deletion: + needs: [dump-contexts-to-log] + uses: ./.github/workflows/_docker-image.yaml + with: + target: index-deletion + secrets: inherit index-save: needs: [dump-contexts-to-log] uses: ./.github/workflows/_docker-image.yaml diff --git a/Makefile b/Makefile index 06dbc5bf4b..99ed60bb73 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,7 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway HELM_OPERATOR_IMAGE = $(NAME)-helm-operator INDEX_CORRECTION_IMAGE = $(NAME)-index-correction INDEX_CREATION_IMAGE = $(NAME)-index-creation +INDEX_DELETION_IMAGE = $(NAME)-index-deletion INDEX_OPERATOR_IMAGE = $(NAME)-index-operator INDEX_SAVE_IMAGE = $(NAME)-index-save LB_GATEWAY_IMAGE = $(NAME)-lb-gateway diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 074ed42cce..029ada10fe 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -24,6 +24,7 @@ binary/build: \ cmd/gateway/mirror/mirror \ cmd/index/job/correction/index-correction \ cmd/index/job/creation/index-creation \ + cmd/index/job/deletion/index-deletion \ cmd/index/job/readreplica/rotate/readreplica-rotate \ cmd/index/job/save/index-save \ cmd/index/operator/index-operator \ @@ -80,6 +81,10 @@ cmd/index/job/creation/index-creation: $(eval CGO_ENABLED = 0) $(call go-build,index/job/creation,,-static,,,$@) +cmd/index/job/deletion/index-deletion: + $(eval CGO_ENABLED = 0) + $(call go-build,index/job/deletion,,-static,,,$@) + cmd/index/job/save/index-save: $(eval CGO_ENABLED = 0) $(call go-build,index/job/save,,-static,,,$@) @@ -128,6 +133,7 @@ binary/build/zip: \ artifacts/vald-filter-gateway-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-correction-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-creation-$(GOOS)-$(GOARCH).zip \ + artifacts/vald-index-deletion-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-operator-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-save-$(GOOS)-$(GOARCH).zip \ artifacts/vald-lb-gateway-$(GOOS)-$(GOARCH).zip \ @@ -187,6 +193,10 @@ artifacts/vald-index-creation-$(GOOS)-$(GOARCH).zip: cmd/index/job/creation/inde $(call mkdir, $(dir $@)) zip --junk-paths $@ $< +artifacts/vald-index-deletion-$(GOOS)-$(GOARCH).zip: cmd/index/job/deletion/index-deletion + $(call mkdir, $(dir $@)) + zip --junk-paths $@ $< + artifacts/vald-index-save-$(GOOS)-$(GOARCH).zip: cmd/index/job/save/index-save $(call mkdir, $(dir $@)) zip --junk-paths $@ $< diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index d23324f4fb..793ef66e27 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -36,6 +36,7 @@ docker/build: \ docker/build/helm-operator \ docker/build/index-correction \ docker/build/index-creation \ + docker/build/index-deletion \ docker/build/index-operator \ docker/build/index-save \ docker/build/loadtest \ @@ -63,6 +64,7 @@ docker/xpanes/build: docker/build/gateway-mirror \ docker/build/index-correction \ docker/build/index-creation \ + docker/build/index-deletion \ docker/build/index-operator \ docker/build/index-save \ docker/build/loadtest \ @@ -341,6 +343,17 @@ docker/build/index-save: IMAGE=$(INDEX_SAVE_IMAGE) \ docker/build/image +.PHONY: docker/name/index-deletion +docker/name/index-deletion: + @echo "$(ORG)/$(INDEX_DELETION_IMAGE)" + +.PHONY: docker/build/index-deletion +## build index-deletion image +docker/build/index-deletion: + @make DOCKERFILE="$(ROOTDIR)/dockers/index/job/deletion/Dockerfile" \ + IMAGE=$(INDEX_DELETION_IMAGE) \ + docker/build/image + .PHONY: docker/name/index-operator docker/name/index-operator: @echo "$(ORG)/$(INDEX_OPERATOR_IMAGE)" diff --git a/cmd/index/job/deletion/main.go b/cmd/index/job/deletion/main.go new file mode 100644 index 0000000000..b91c091aa7 --- /dev/null +++ b/cmd/index/job/deletion/main.go @@ -0,0 +1,59 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "context" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/info" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/pkg/index/job/deletion/config" + "github.com/vdaas/vald/pkg/index/job/deletion/usecase" +) + +const ( + maxVersion = "v0.0.10" + minVersion = "v0.0.0" + name = "index deletion job" +) + +func main() { + if err := safety.RecoverFunc(func() error { + return runner.Do( + context.Background(), + runner.WithName(name), + runner.WithVersion(info.Version, maxVersion, minVersion), + runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) { + cfg, err := config.NewConfig(path) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") + } + return cfg, &cfg.GlobalConfig, nil + }), + runner.WithDaemonInitializer(func(cfg any) (runner.Runner, error) { + c, ok := cfg.(*config.Data) + if !ok { + return nil, errors.ErrInvalidConfig + } + return usecase.New(c) + }), + ) + })(); err != nil { + log.Fatal(err, info.Get()) + return + } +} diff --git a/cmd/index/job/deletion/sample.yaml b/cmd/index/job/deletion/sample.yaml new file mode 100644 index 0000000000..b11bb11901 --- /dev/null +++ b/cmd/index/job/deletion/sample.yaml @@ -0,0 +1,230 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: v0.0.0 +time_zone: JST +logging: + format: raw + level: info + logger: glg +server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +deleter: + index_id: "sample" + agent_port: 8081 + agent_name: "vald-agent-ngt" + agent_dns: vald-agent-ngt.default.svc.cluster.local + agent_namespace: "default" + node_name: "" + concurrency: 1 + discoverer: + duration: 500ms + client: + addrs: + - vald-discoverer.default.svc.cluster.local:8081 + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + enable_backoff: false + initial_connection_window_size: 0 + initial_window_size: 0 + insecure: true + keepalive: + permit_without_stream: false + time: "" + timeout: "" + max_msg_size: 0 + min_connection_timeout: 20s + read_buffer_size: 0 + tcp: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + timeout: "" + write_buffer_size: 0 + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + agent_client_options: + addrs: [] + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + write_buffer_size: 0 + read_buffer_size: 0 + initial_window_size: 0 + initial_connection_window_size: 0 + max_msg_size: 0 + backoff_max_delay: "120s" + backoff_base_delay: "1s" + backoff_multiplier: 1.6 + backoff_jitter: 0.2 + min_connection_timeout: "20s" + enable_backoff: false + insecure: true + timeout: "" + tcp: + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + dialer: + timeout: "" + keepalive: "15m" + dual_stack_enabled: true + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + keepalive: + permit_without_stream: false + time: "" + timeout: "" + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +observability: + enabled: false + otlp: + collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-deletion" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - algorithm_info + trace: + enabled: true diff --git a/dockers/index/job/deletion/Dockerfile b/dockers/index/job/deletion/Dockerfile new file mode 100644 index 0000000000..cd1d29eac8 --- /dev/null +++ b/dockers/index/job/deletion/Dockerfile @@ -0,0 +1,87 @@ +# syntax = docker/dockerfile:latest +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go +ARG UPX_OPTIONS=-9 +# skipcq: DOK-DL3026,DOK-DL3007 +FROM ghcr.io/vdaas/vald/vald-buildbase:nightly AS builder +LABEL maintainer="vdaas.org vald team " +# skipcq: DOK-DL3002 +USER root:root +ARG TARGETARCH +ARG TARGETOS +ARG GO_VERSION +ARG RUST_VERSION +ENV APP_NAME=index-deletion +ENV DEBIAN_FRONTEND=noninteractive +ENV GO111MODULE=on +ENV GOPATH=/go +ENV GOROOT=/opt/go +ENV HOME=/root +ENV INITRD=No +ENV LANG=en_US.UTF-8 +ENV LANGUAGE=en_US.UTF-8 +ENV LC_ALL=en_US.UTF-8 +ENV ORG=vdaas +ENV PKG=index/job/deletion +ENV REPO=vald +ENV TZ=Etc/UTC +ENV USER=root +ENV PATH=${GOPATH}/bin:${GOROOT}/bin:/usr/local/bin:${PATH} +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +SHELL ["/bin/bash", "-o", "pipefail", "-c"] +#skipcq: DOK-W1001, DOK-SC2046, DOK-SC2086, DOK-DL3008 +RUN --mount=type=bind,target=.,rw \ + --mount=type=tmpfs,target=/tmp \ + --mount=type=cache,target=/var/lib/apt,sharing=locked,id=${APP_NAME} \ + --mount=type=cache,target=/var/cache/apt,sharing=locked,id=${APP_NAME} \ + --mount=type=cache,target="${GOPATH}/pkg",id="go-build-${TARGETARCH}" \ + --mount=type=cache,target="${HOME}/.cache/go-build",id="go-build-${TARGETARCH}" \ + --mount=type=tmpfs,target="${GOPATH}/src" \ + set -ex \ + && echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache \ + && echo 'APT::Install-Recommends "false";' > /etc/apt/apt.conf.d/no-install-recommends \ + && apt-get clean \ + && apt-get update -y \ + && apt-get upgrade -y \ + && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + ca-certificates \ + curl \ + tzdata \ + locales \ + git \ + && ldconfig \ + && echo "${LANG} UTF-8" > /etc/locale.gen \ + && ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \ + && locale-gen ${LANGUAGE} \ + && update-locale LANG=${LANGUAGE} \ + && dpkg-reconfigure -f noninteractive tzdata \ + && apt-get clean \ + && apt-get autoclean -y \ + && apt-get autoremove -y \ + && make GOPATH="${GOPATH}" GOROOT="${GOROOT}" GO_VERSION="${GO_VERSION}" go/install \ + && make GOPATH="${GOPATH}" GOROOT="${GOROOT}" GO_VERSION="${GO_VERSION}" go/download \ + && make GOARCH="${TARGETARCH}" GOOS="${TARGETOS}" REPO="${ORG}" NAME="${REPO}" cmd/${PKG}/${APP_NAME} \ + && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" +# skipcq: DOK-DL3026,DOK-DL3007 +FROM gcr.io/distroless/static:nonroot +LABEL maintainer="vdaas.org vald team " +COPY --from=builder /usr/bin/index-deletion /usr/bin/index-deletion +# skipcq: DOK-DL3002 +USER nonroot:nonroot +ENTRYPOINT ["/usr/bin/index-deletion"] diff --git a/hack/actions/gen/main.go b/hack/actions/gen/main.go index 1a63a285a3..621807598d 100644 --- a/hack/actions/gen/main.go +++ b/hack/actions/gen/main.go @@ -337,6 +337,7 @@ const ( indexCorrection = "index-correction" indexCreation = "index-creation" + indexDeletion = "index-deletion" indexSave = "index-save" indexOperator = "index-operator" @@ -568,6 +569,10 @@ func main() { AppName: "index-creation", PackageDir: "index/job/creation", }, + "vald-index-deletion": { + AppName: "index-deletion", + PackageDir: "index/job/deletion", + }, "vald-index-save": { AppName: "index-save", PackageDir: "index/job/save", diff --git a/hack/cspell/main.go b/hack/cspell/main.go index 075bb26076..f475f4a0fe 100644 --- a/hack/cspell/main.go +++ b/hack/cspell/main.go @@ -90,6 +90,7 @@ var ( "**/cmd/gateway/mirror/mirror", "**/cmd/index/job/correction/index-correction", "**/cmd/index/job/creation/index-creation", + "**/cmd/index/job/creation/index-deletion", "**/cmd/index/job/readreplica/rotate/readreplica-rotate", "**/cmd/index/job/save/index-save", "**/cmd/index/operator/index-operator", diff --git a/hack/docker/gen/main.go b/hack/docker/gen/main.go index f5ceb0c201..2e3487a779 100644 --- a/hack/docker/gen/main.go +++ b/hack/docker/gen/main.go @@ -613,6 +613,10 @@ func main() { AppName: "index-save", PackageDir: "index/job/save", }, + "vald-index-deletion": { + AppName: "index-deletion", + PackageDir: "index/job/deletion", + }, "vald-readreplica-rotate": { AppName: "readreplica-rotate", PackageDir: "index/job/readreplica/rotate", diff --git a/internal/config/index_deleter.go b/internal/config/index_deleter.go new file mode 100644 index 0000000000..5b0fcf3b33 --- /dev/null +++ b/internal/config/index_deleter.go @@ -0,0 +1,61 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// IndexDeleter represents the configurations for index deletion. +type IndexDeleter struct { + // IndexID represent target delete ID + IndexID string `json:"index_id" yaml:"index_id"` + + // AgentPort represent agent port number + AgentPort int `json:"agent_port" yaml:"agent_port"` + + // AgentName represent agents meta_name for service discovery + AgentName string `json:"agent_name" yaml:"agent_name"` + + // AgentNamespace represent agent namespace location + AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` + + // AgentDNS represent agents dns A record for service discovery + AgentDNS string `json:"agent_dns" yaml:"agent_dns"` + + // NodeName represents node name + NodeName string `json:"node_name" yaml:"node_name"` + + // Concurrency represents indexing concurrency. + Concurrency int `json:"concurrency" yaml:"concurrency"` + + // DeletionPoolSize represents batch pool size for indexing. + DeletionPoolSize uint32 `json:"deletion_pool_size" yaml:"deletion_pool_size"` + + // TargetAddrs represents indexing target addresses. + TargetAddrs []string `json:"target_addrs" yaml:"target_addrs"` + + // Discoverer represents agent discoverer service configuration. + Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` +} + +func (ic *IndexDeleter) Bind() *IndexDeleter { + ic.IndexID = GetActualValue(ic.IndexID) + ic.AgentName = GetActualValue(ic.AgentName) + ic.AgentNamespace = GetActualValue(ic.AgentNamespace) + ic.AgentDNS = GetActualValue(ic.AgentDNS) + ic.NodeName = GetActualValue(ic.NodeName) + ic.TargetAddrs = GetActualValues(ic.TargetAddrs) + + if ic.Discoverer != nil { + ic.Discoverer.Bind() + } + return ic +} diff --git a/k8s/index/job/deletion/configmap.yaml b/k8s/index/job/deletion/configmap.yaml new file mode 100644 index 0000000000..164fabb120 --- /dev/null +++ b/k8s/index/job/deletion/configmap.yaml @@ -0,0 +1,415 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: vald-index-deletion-config + labels: + app.kubernetes.io/name: vald + helm.sh/chart: vald-v1.7.14 + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/instance: release-name + app.kubernetes.io/version: v1.7.14 + app.kubernetes.io/component: vald-index-deletion +data: + config.yaml: | + --- + version: v0.0.0 + time_zone: UTC + logging: + format: raw + level: debug + logger: glg + server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + enable_admin: true + enable_channelz: true + enable_reflection: true + header_table_size: 0 + initial_conn_window_size: 2097152 + initial_window_size: 1048576 + interceptors: + - RecoverInterceptor + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + min_time: 10m + permit_without_stream: false + time: 3h + timeout: 60s + max_concurrent_streams: 0 + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + num_stream_workers: 0 + read_buffer_size: 0 + shared_write_buffer: false + wait_for_handlers: true + write_buffer_size: 0 + mode: GRPC + network: tcp + probe_wait_time: 3s + restart: true + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: false + tcp_no_delay: false + tcp_quick_ack: false + socket_path: "" + health_check_servers: + - name: liveness + host: 0.0.0.0 + port: 3000 + http: + handler_timeout: "" + http2: + enabled: false + handler_limit: 0 + max_concurrent_streams: 0 + max_decoder_header_table_size: 4096 + max_encoder_header_table_size: 4096 + max_read_frame_size: 0 + max_upload_buffer_per_connection: 0 + max_upload_buffer_per_stream: 0 + permit_prohibited_cipher_suites: true + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 5s + write_timeout: "" + mode: REST + network: tcp + probe_wait_time: 3s + restart: true + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: true + tcp_no_delay: true + tcp_quick_ack: true + socket_path: "" + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + http2: + enabled: false + handler_limit: 0 + max_concurrent_streams: 0 + max_decoder_header_table_size: 4096 + max_encoder_header_table_size: 4096 + max_read_frame_size: 0 + max_upload_buffer_per_connection: 0 + max_upload_buffer_per_stream: 0 + permit_prohibited_cipher_suites: true + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: REST + network: tcp + probe_wait_time: 3s + restart: true + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: true + tcp_no_delay: true + tcp_quick_ack: true + socket_path: "" + metrics_servers: + - name: pprof + host: 0.0.0.0 + port: 6060 + http: + handler_timeout: 5s + http2: + enabled: false + handler_limit: 0 + max_concurrent_streams: 0 + max_decoder_header_table_size: 4096 + max_encoder_header_table_size: 4096 + max_read_frame_size: 0 + max_upload_buffer_per_connection: 0 + max_upload_buffer_per_stream: 0 + permit_prohibited_cipher_suites: true + idle_timeout: 2s + read_header_timeout: 1s + read_timeout: 1s + shutdown_duration: 5s + write_timeout: 1m + mode: REST + network: tcp + probe_wait_time: 3s + restart: true + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: true + tcp_defer_accept: false + tcp_fast_open: false + tcp_no_delay: false + tcp_quick_ack: false + socket_path: "" + startup_strategy: + - liveness + - pprof + - grpc + - readiness + shutdown_strategy: + - readiness + - grpc + - pprof + - liveness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + observability: + enabled: false + otlp: + collector_endpoint: "" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-deletion" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - algorithm_info + trace: + enabled: false + deleter: + index_id: "sample" + agent_port: 8081 + agent_name: "vald-agent" + agent_dns: vald-agent.default.svc.cluster.local + agent_namespace: "_MY_POD_NAMESPACE_" + node_name: "" + concurrency: 1 + target_addrs: [] + discoverer: + duration: 500ms + client: + addrs: + - vald-discoverer.default.svc.cluster.local:8081 + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 2m + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + circuit_breaker: + closed_error_rate: 0.7 + closed_refresh_timeout: 10s + half_open_error_rate: 0.5 + min_samples: 1000 + open_timeout: 1s + call_option: + content_subtype: "" + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + authority: "" + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + disable_retry: false + enable_backoff: false + idle_timeout: 1h + initial_connection_window_size: 2097152 + initial_window_size: 1048576 + insecure: true + interceptors: [] + keepalive: + permit_without_stream: false + time: "" + timeout: 30s + max_call_attempts: 0 + max_header_list_size: 0 + max_msg_size: 0 + min_connection_timeout: 20s + net: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: false + tcp_no_delay: false + tcp_quick_ack: false + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + read_buffer_size: 0 + shared_write_buffer: false + timeout: "" + user_agent: Vald-gRPC + write_buffer_size: 0 + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + agent_client_options: + addrs: [] + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 2m + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + circuit_breaker: + closed_error_rate: 0.7 + closed_refresh_timeout: 10s + half_open_error_rate: 0.5 + min_samples: 1000 + open_timeout: 1s + call_option: + content_subtype: "" + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + write_buffer_size: 0 + read_buffer_size: 0 + initial_window_size: 1.048576e+06 + initial_connection_window_size: 2.097152e+06 + max_msg_size: 0 + backoff_max_delay: "120s" + backoff_base_delay: "1s" + backoff_multiplier: 1.6 + backoff_jitter: 0.2 + min_connection_timeout: "20s" + enable_backoff: false + insecure: true + timeout: "" + interceptors: [] + net: + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + dialer: + timeout: "" + keepalive: "15m" + dual_stack_enabled: true + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: false + tcp_no_delay: false + tcp_quick_ack: false + keepalive: + permit_without_stream: false + time: "" + timeout: 30s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key diff --git a/k8s/index/job/deletion/cronjob.yaml b/k8s/index/job/deletion/cronjob.yaml new file mode 100644 index 0000000000..ad1ff548f8 --- /dev/null +++ b/k8s/index/job/deletion/cronjob.yaml @@ -0,0 +1,146 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: batch/v1 +kind: CronJob +metadata: + name: vald-index-deletion + labels: + app: vald-index-deletion + app.kubernetes.io/name: vald + helm.sh/chart: vald-v1.7.14 + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/instance: release-name + app.kubernetes.io/component: vald-index-deletion + app.kubernetes.io/version: v1.7.14 +spec: + schedule: "* * * * *" + concurrencyPolicy: Forbid + suspend: false + startingDeadlineSeconds: 43200 + jobTemplate: + spec: + ttlSecondsAfterFinished: 86400 + template: + metadata: + labels: + app: vald-index-deletion + app.kubernetes.io/name: vald + helm.sh/chart: vald-v1.7.14 + app.kubernetes.io/managed-by: Helm + app.kubernetes.io/instance: release-name + app.kubernetes.io/component: vald-index-deletion + app.kubernetes.io/version: v1.7.14 + annotations: + pyroscope.io/scrape: "true" + pyroscope.io/application-name: vald-index-deletion + pyroscope.io/profile-cpu-enabled: "true" + pyroscope.io/profile-mem-enabled: "true" + pyroscope.io/port: "6060" + spec: + initContainers: + - name: wait-for-agent + image: busybox:stable + imagePullPolicy: Always + command: + - /bin/sh + - -e + - -c + - | + until [ "$(wget --server-response --spider --quiet http://vald-agent.default.svc.cluster.local:3001/readiness 2>&1 | awk 'NR==1{print $2}')" == "200" ]; do + echo "waiting for agent to be ready..." + sleep 2; + done + - name: wait-for-discoverer + image: busybox:stable + imagePullPolicy: Always + command: + - /bin/sh + - -e + - -c + - | + until [ "$(wget --server-response --spider --quiet http://vald-discoverer.default.svc.cluster.local:3001/readiness 2>&1 | awk 'NR==1{print $2}')" == "200" ]; do + echo "waiting for discoverer to be ready..." + sleep 2; + done + containers: + - name: vald-index-deletion + image: "vdaas/vald-index-deletion:nightly" + imagePullPolicy: Always + volumeMounts: + - name: vald-index-deletion-config + mountPath: /etc/server/ + livenessProbe: + failureThreshold: 2 + httpGet: + path: /liveness + port: liveness + scheme: HTTP + initialDelaySeconds: 5 + periodSeconds: 3 + successThreshold: 1 + timeoutSeconds: 2 + readinessProbe: + failureThreshold: 2 + httpGet: + path: /readiness + port: readiness + scheme: HTTP + initialDelaySeconds: 10 + periodSeconds: 3 + successThreshold: 1 + timeoutSeconds: 2 + startupProbe: + failureThreshold: 30 + httpGet: + path: /liveness + port: liveness + scheme: HTTP + initialDelaySeconds: 5 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 2 + ports: + - name: liveness + protocol: TCP + containerPort: 3000 + - name: readiness + protocol: TCP + containerPort: 3001 + - name: grpc + protocol: TCP + containerPort: 8081 + - name: pprof + protocol: TCP + containerPort: 6060 + env: + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + restartPolicy: OnFailure + volumes: + - name: vald-index-deletion-config + configMap: + defaultMode: 420 + name: vald-index-deletion-config diff --git a/pkg/index/job/deletion/config/config.go b/pkg/index/job/deletion/config/config.go new file mode 100644 index 0000000000..3528abd4fb --- /dev/null +++ b/pkg/index/job/deletion/config/config.go @@ -0,0 +1,71 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" +) + +// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations. +type GlobalConfig = config.GlobalConfig + +// Data represents the application configurations. +type Data struct { + // GlobalConfig represents application base configurations. + config.GlobalConfig `json:",inline" yaml:",inline"` + + // Server represent all server configurations + Server *config.Servers `json:"server_config" yaml:"server_config"` + + // Observability represents observability configurations. + Observability *config.Observability `json:"observability" yaml:"observability"` + + // Deletion represents auto indexing service configurations. + Deletion *config.IndexDeleter `json:"deleter" yaml:"deleter"` +} + +// NewConfig load configurations from file path. +func NewConfig(path string) (cfg *Data, err error) { + cfg = new(Data) + + if err = config.Read(path, &cfg); err != nil { + return nil, err + } + + if cfg != nil { + _ = cfg.GlobalConfig.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Server != nil { + _ = cfg.Server.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Observability != nil { + _ = cfg.Observability.Bind() + } else { + cfg.Observability = new(config.Observability).Bind() + } + + if cfg.Deletion != nil { + _ = cfg.Deletion.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + return cfg, nil +} diff --git a/pkg/index/job/deletion/service/deleter.go b/pkg/index/job/deletion/service/deleter.go new file mode 100644 index 0000000000..ce7fbfc044 --- /dev/null +++ b/pkg/index/job/deletion/service/deleter.go @@ -0,0 +1,219 @@ +package service + +import ( + "context" + "reflect" + "sync" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/apis/grpc/v1/vald" + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/strings" +) + +const ( + apiName = "vald/index/job/delete" + grpcMethodName = "vald.v1.Remove/" + vald.RemoveRPCName +) + +// Deleter represents an interface for deleting. +type Deleter interface { + StartClient(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error +} + +var defaultOpts = []Option{ + WithIndexingConcurrency(1), +} + +type index struct { + client discoverer.Client + targetAddrs []string + targetIndexID string + + concurrency int +} + +// New returns Deleter object if no error occurs. +func New(opts ...Option) (Deleter, error) { + idx := new(index) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(idx); err != nil { + oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + e := &errors.ErrCriticalOption{} + if errors.As(oerr, &e) { + log.Error(err) + return nil, oerr + } + log.Warn(oerr) + } + } + idx.targetAddrs = delDuplicateAddrs(idx.targetAddrs) + return idx, nil +} + +func delDuplicateAddrs(targetAddrs []string) []string { + addrs := make([]string, 0, len(targetAddrs)) + exist := make(map[string]bool) + + for _, addr := range targetAddrs { + if !exist[addr] { + addrs = append(addrs, addr) + exist[addr] = true + } + } + return addrs +} + +// StartClient starts the gRPC client. +func (idx *index) StartClient(ctx context.Context) (<-chan error, error) { + return idx.client.Start(ctx) +} + +func (idx *index) Start(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, apiName+"/service/index.Delete") + defer func() { + if span != nil { + span.End() + } + }() + + err := idx.doDeleteIndex(ctx, + func(ctx context.Context, rc vald.RemoveClient, copts ...grpc.CallOption) (*payload.Object_Location, error) { + return rc.Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: idx.targetIndexID, + }, + }, copts...) + }, + ) + if err != nil { + var attrs trace.Attributes + switch { + case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): + err = status.WrapWithInternal( + vald.RemoveRPCName+" API connection not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + case errors.Is(err, errors.ErrGRPCTargetAddrNotFound): + err = status.WrapWithInternal( + vald.RemoveRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + default: + var ( + st *status.Status + msg string + ) + st, msg, err = status.ParseError(err, codes.Internal, + "failed to parse "+vald.RemoveRPCName+" gRPC error response", + ) + attrs = trace.FromGRPCStatus(st.Code(), msg) + } + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(attrs...) + span.SetStatus(trace.StatusError, err.Error()) + } + return err + } + return nil +} + +func (idx *index) doDeleteIndex( + ctx context.Context, + fn func(_ context.Context, _ vald.RemoveClient, _ ...grpc.CallOption) (*payload.Object_Location, error), +) (errs error) { + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doDeleteIndex") + defer func() { + if span != nil { + span.End() + } + }() + + targetAddrs := idx.client.GetAddrs(ctx) + if len(idx.targetAddrs) != 0 { + // If target addresses is specified, that addresses are used in priority. + for _, addr := range idx.targetAddrs { + log.Infof("connect to target agent (%s)", addr) + if _, err := idx.client.GetClient().Connect(ctx, addr); err != nil { + return err + } + } + targetAddrs = idx.targetAddrs + } + log.Infof("target agent addrs: %v", targetAddrs) + + var emu sync.Mutex + err := idx.client.GetClient().OrderedRangeConcurrent(ctx, targetAddrs, idx.concurrency, + func(ctx context.Context, target string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), vald.RemoveRPCName+"/"+target) + defer func() { + if span != nil { + span.End() + } + }() + _, err := fn(ctx, vald.NewRemoveClient(conn), copts...) + if err != nil { + var attrs trace.Attributes + switch { + case errors.Is(err, context.Canceled): + err = status.WrapWithCanceled( + vald.RemoveRPCName+" API canceled", err, + ) + attrs = trace.StatusCodeCancelled(err.Error()) + case errors.Is(err, context.DeadlineExceeded): + err = status.WrapWithCanceled( + vald.RemoveRPCName+" API deadline exceeded", err, + ) + attrs = trace.StatusCodeDeadlineExceeded(err.Error()) + case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): + err = status.WrapWithInternal( + vald.RemoveRPCName+" API connection not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + case errors.Is(err, errors.ErrTargetNotFound): + err = status.WrapWithInvalidArgument( + vald.RemoveRPCName+" API target not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + default: + var ( + st *status.Status + msg string + ) + st, msg, err = status.ParseError(err, codes.Internal, + "failed to parse "+vald.RemoveRPCName+" gRPC error response", + ) + if st != nil && err != nil && st.Code() == codes.FailedPrecondition { + log.Warnf("DeleteIndex of %s skipped, indexID:%s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err)) + return nil + } + if st != nil && err != nil && st.Code() == codes.NotFound { + log.Warnf("DeleteIndex of %s skipped, indexID: %s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err)) + return nil + } + attrs = trace.FromGRPCStatus(st.Code(), msg) + } + log.Warnf("an error occurred in (%s) deleting index(%s): %v", target, idx.targetIndexID, err) + if span != nil { + span.RecordError(err) + span.SetAttributes(attrs...) + span.SetStatus(trace.StatusError, err.Error()) + } + emu.Lock() + errs = errors.Join(errs, err) + emu.Unlock() + } + return err + }, + ) + return errors.Join(err, errs) +} diff --git a/pkg/index/job/deletion/service/options.go b/pkg/index/job/deletion/service/options.go new file mode 100644 index 0000000000..ccc6ecffed --- /dev/null +++ b/pkg/index/job/deletion/service/options.go @@ -0,0 +1,48 @@ +package service + +import ( + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" +) + +type Option func(_ *index) error + +// WithDiscoverer returns Option that sets discoverer client. +func WithDiscoverer(client discoverer.Client) Option { + return func(idx *index) error { + if client == nil { + return errors.NewErrCriticalOption("discoverer", client) + } + idx.client = client + return nil + } +} + +// WithIndexingConcurrency returns Option that sets indexing concurrency. +func WithIndexingConcurrency(num int) Option { + return func(idx *index) error { + if num <= 0 { + return errors.NewErrInvalidOption("indexingConcurrency", num) + } + idx.concurrency = num + return nil + } +} + +// WithTargetAddrs returns Option that sets indexing target addresses. +func WithTargetAddrs(addrs ...string) Option { + return func(idx *index) error { + if len(addrs) != 0 { + idx.targetAddrs = append(idx.targetAddrs, addrs...) + } + return nil + } +} + +// WithTargetIndexID returns Option that sets target deleting index ID. +func WithTargetIndexID(indexID string) Option { + return func(idx *index) error { + idx.targetIndexID = indexID + return nil + } +} diff --git a/pkg/index/job/deletion/usecase/deletion.go b/pkg/index/job/deletion/usecase/deletion.go new file mode 100644 index 0000000000..84302c1067 --- /dev/null +++ b/pkg/index/job/deletion/usecase/deletion.go @@ -0,0 +1,214 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +import ( + "context" + "os" + "syscall" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + iconfig "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" + "github.com/vdaas/vald/internal/observability" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/servers/server" + "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/deletion/config" + "github.com/vdaas/vald/pkg/index/job/deletion/service" +) + +type run struct { + eg errgroup.Group + cfg *config.Data + observability observability.Observability + server starter.Server + indexer service.Deleter +} + +// New returns Runner instance. +func New(cfg *config.Data) (_ runner.Runner, err error) { + eg := errgroup.Get() + + dOpts, err := cfg.Deletion.Discoverer.Client.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + dOpts = append(dOpts, grpc.WithErrGroup(eg)) + + acOpts, err := cfg.Deletion.Discoverer.AgentClientOptions.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + acOpts = append(acOpts, grpc.WithErrGroup(eg)) + + discoverer, err := discoverer.New( + discoverer.WithAutoConnect(true), + discoverer.WithName(cfg.Deletion.AgentName), + discoverer.WithNamespace(cfg.Deletion.AgentNamespace), + discoverer.WithPort(cfg.Deletion.AgentPort), + discoverer.WithServiceDNSARecord(cfg.Deletion.AgentDNS), + discoverer.WithDiscovererClient(grpc.New(dOpts...)), + discoverer.WithDiscoverDuration(cfg.Deletion.Discoverer.Duration), + discoverer.WithOptions(acOpts...), + discoverer.WithNodeName(cfg.Deletion.NodeName), + discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { + last := len(addrs) - 1 + for i := 0; i < len(addrs)/2; i++ { + addrs[i], addrs[last-i] = addrs[last-i], addrs[i] + } + return nil + }), + ) + if err != nil { + return nil, err + } + + indexer, err := service.New( + service.WithDiscoverer(discoverer), + service.WithIndexingConcurrency(cfg.Deletion.Concurrency), + service.WithTargetAddrs(cfg.Deletion.TargetAddrs...), + service.WithTargetIndexID(cfg.Deletion.IndexID), + ) + if err != nil { + return nil, err + } + + srv, err := starter.New( + starter.WithConfig(cfg.Server), + starter.WithGRPC(func(cfg *iconfig.Server) []server.Option { + return []server.Option{ + server.WithGRPCOption( + grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), + grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), + ), + } + }), + ) + if err != nil { + return nil, err + } + + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + ) + if err != nil { + return nil, err + } + } + + return &run{ + eg: eg, + cfg: cfg, + observability: obs, + server: srv, + indexer: indexer, + }, nil +} + +// PreStart is a method called before execution of Start, and it invokes the PreStart method of observability. +func (r *run) PreStart(ctx context.Context) error { + if r.observability != nil { + return r.observability.PreStart(ctx) + } + return nil +} + +// Start is a method used to initiate an operation in the run, and it returns a channel for receiving errors +// during the operation and an error representing any initialization errors. +func (r *run) Start(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 4) + var sech, oech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech = r.server.ListenAndServe(ctx) + cech, err := r.indexer.StartClient(ctx) + if err != nil { + close(ech) + return nil, err + } + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + // using Fatal to avoid this process to be zombie + // skipcq: RVV-A0003 + log.Fatalf("failed to find my pid to kill %v", err) + return + } + log.Info("sending SIGTERM to myself to stop this job") + if err := p.Signal(syscall.SIGTERM); err != nil { + log.Error(err) + } + }() + return r.indexer.Start(ctx) + })) + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-oech: + case err = <-sech: + case err = <-cech: + } + if err != nil { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), err) + case ech <- err: + } + } + } + })) + return ech, nil +} + +// PreStop is a method called before execution of Stop. +func (*run) PreStop(_ context.Context) error { + return nil +} + +// Stop is a method used to stop an operation in the run. +func (r *run) Stop(ctx context.Context) (errs error) { + if r.observability != nil { + if err := r.observability.Stop(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + if r.server != nil { + if err := r.server.Shutdown(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + return errs +} + +// PostStop is a method called after execution of Stop. +func (*run) PostStop(_ context.Context) error { + return nil +}