Skip to content

Commit

Permalink
[WIP] feat: Implement vald index-importer
Browse files Browse the repository at this point in the history
  • Loading branch information
highpon committed Jan 7, 2025
1 parent 64e70c0 commit f335516
Show file tree
Hide file tree
Showing 8 changed files with 804 additions and 8 deletions.
38 changes: 36 additions & 2 deletions cmd/index/job/importation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@
// limitations under the License.
package main

import "fmt"
import (
"context"
"log"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/index/job/importation/config"
"github.com/vdaas/vald/pkg/index/job/importation/usecase"
)

const (
maxVersion = "v0.0.10"
Expand All @@ -22,5 +32,29 @@ const (
)

func main() {
fmt.Println("hello world")
if err := safety.RecoverFunc(func() error {
return runner.Do(
context.Background(),
runner.WithName(name),
runner.WithVersion(info.Version, maxVersion, minVersion),
runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) {
// cfg, err := config.NewConfig(path)
cfg, err := config.NewConfig("cmd/index/job/importation/sample.yaml")
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration")
}
return cfg, &cfg.GlobalConfig, nil
}),
runner.WithDaemonInitializer(func(cfg any) (runner.Runner, error) {
c, ok := cfg.(*config.Data)
if !ok {
return nil, errors.ErrInvalidConfig
}
return usecase.New(c)
}),
)
})(); err != nil {
log.Fatal(err, info.Get())
return
}
}
191 changes: 191 additions & 0 deletions cmd/index/job/importation/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#
# Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: v0.0.0
time_zone: JST
logging:
format: raw
level: info
logger: glg
server_config:
servers:
- name: grpc
host: 0.0.0.0
port: 8081
grpc:
bidirectional_stream_concurrency: 20
connection_timeout: ""
header_table_size: 0
initial_conn_window_size: 0
initial_window_size: 0
interceptors: []
keepalive:
max_conn_age: ""
max_conn_age_grace: ""
max_conn_idle: ""
time: ""
timeout: ""
max_header_list_size: 0
max_receive_message_size: 0
max_send_message_size: 0
read_buffer_size: 0
write_buffer_size: 0
mode: GRPC
probe_wait_time: 3s
restart: true
health_check_servers:
- name: readiness
host: 0.0.0.0
port: 3001
http:
handler_timeout: ""
idle_timeout: ""
read_header_timeout: ""
read_timeout: ""
shutdown_duration: 0s
write_timeout: ""
mode: ""
probe_wait_time: 3s
metrics_servers:
startup_strategy:
- grpc
- readiness
full_shutdown_duration: 600s
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
key: /path/to/key
importer:
concurrency: 1
index_path: "/var/export/index/1733964783.db"
gateway:
addrs:
- localhost:20000
health_check_duration: "1s"
connection_pool:
enable_dns_resolver: true
enable_rebalance: true
old_conn_close_duration: 2m
rebalance_duration: 30m
size: 3
backoff:
backoff_factor: 1.1
backoff_time_limit: 5s
enable_error_log: true
initial_duration: 5ms
jitter_limit: 100ms
maximum_duration: 5s
retry_count: 100
circuit_breaker:
closed_error_rate: 0.7
closed_refresh_timeout: 10s
half_open_error_rate: 0.5
min_samples: 1000
open_timeout: 1s
call_option:
content_subtype: ""
max_recv_msg_size: 0
max_retry_rpc_buffer_size: 0
max_send_msg_size: 0
wait_for_ready: true
dial_option:
authority: ""
backoff_base_delay: 1s
backoff_jitter: 0.2
backoff_max_delay: 120s
backoff_multiplier: 1.6
disable_retry: false
enable_backoff: false
idle_timeout: 1h
initial_connection_window_size: 2097152
initial_window_size: 1048576
insecure: true
interceptors: []
keepalive:
permit_without_stream: false
time: ""
timeout: 30s
max_call_attempts: 0
max_header_list_size: 0
max_msg_size: 0
min_connection_timeout: 20s
net:
dialer:
dual_stack_enabled: true
keepalive: ""
timeout: ""
dns:
cache_enabled: true
cache_expiration: 1h
refresh_duration: 30m
socket_option:
ip_recover_destination_addr: false
ip_transparent: false
reuse_addr: true
reuse_port: true
tcp_cork: false
tcp_defer_accept: false
tcp_fast_open: false
tcp_no_delay: false
tcp_quick_ack: false
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
insecure_skip_verify: false
key: /path/to/key
read_buffer_size: 0
shared_write_buffer: false
timeout: ""
user_agent: Vald-gRPC
write_buffer_size: 0
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
insecure_skip_verify: false
key: /path/to/key
observability:
enabled: false
otlp:
collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317"
trace_batch_timeout: "1s"
trace_export_timeout: "1m"
trace_max_export_batch_size: 1024
trace_max_queue_size: 256
metrics_export_interval: "1s"
metrics_export_timeout: "1m"
attribute:
namespace: "_MY_POD_NAMESPACE_"
pod_name: "_MY_POD_NAME_"
node_name: "_MY_NODE_NAME_"
service_name: "vald-index-deletion"
metrics:
enable_cgo: true
enable_goroutine: true
enable_memory: true
enable_version_info: true
version_info_labels:
- vald_version
- server_name
- git_commit
- build_time
- go_version
- go_os
- go_arch
- algorithm_info
trace:
enabled: true
35 changes: 35 additions & 0 deletions internal/config/index_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config

// IndexImporter represents the configurations for index importation.
type IndexImporter struct {
// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

// IndexPath represents the export index file path
IndexPath string `json:"index_path,omitempty" yaml:"index_path"`

// Gateway represent gateway service configuration
Gateway *GRPCClient `json:"gateway" yaml:"gateway"`
}

func (e *IndexImporter) Bind() *IndexImporter {
e.IndexPath = GetActualValue(e.IndexPath)

if e.Gateway != nil {
e.Gateway = e.Gateway.Bind()
}
return e
}
12 changes: 6 additions & 6 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type (
// It receives messages from the stream, calls the function with the received message, and sends the returned message to the stream.
// It limits the number of concurrent calls to the function with the concurrency integer.
// It records errors and returns them as a single error.
func BidirectionalStream[Q any, R any](
func BidirectionalStream[Q, R any](
ctx context.Context,
stream ServerStream,
concurrency int,
Expand Down Expand Up @@ -167,8 +167,8 @@ func BidirectionalStream[Q any, R any](
}

// BidirectionalStreamClient is gRPC client stream.
func BidirectionalStreamClient(
stream ClientStream, dataProvider, newData func() any, f func(any, error),
func BidirectionalStreamClient[S, R any](
stream ClientStream, sendDataProvider func() *S, callBack func(*R, error),
) (err error) {
if stream == nil {
return errors.ErrGRPCClientStreamNotFound
Expand All @@ -183,13 +183,13 @@ func BidirectionalStreamClient(
case <-ctx.Done():
return ctx.Err()
default:
res := newData()
res := new(R)
err = stream.RecvMsg(res)
if err == io.EOF || errors.Is(err, io.EOF) {
cancel()
return nil
}
f(res, err)
callBack(res, err)
}
}
}))
Expand All @@ -208,7 +208,7 @@ func BidirectionalStreamClient(
case <-ctx.Done():
return eg.Wait()
default:
data := dataProvider()
data := sendDataProvider()
if data == nil {
err = stream.CloseSend()
cancel()
Expand Down
66 changes: 66 additions & 0 deletions pkg/index/job/importation/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config

import (
"github.com/vdaas/vald/internal/config"
"github.com/vdaas/vald/internal/errors"
)

// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations.
type GlobalConfig = config.GlobalConfig

// Data represents the application configurations.
type Data struct {
// GlobalConfig represents application base configurations.
config.GlobalConfig `json:",inline" yaml:",inline"`

// Server represent all server configurations
Server *config.Servers `json:"server_config" yaml:"server_config"`

// Observability represents observability configurations.
Observability *config.Observability `json:"observability" yaml:"observability"`

// Importer represents auto indexing service configurations.
Importer *config.IndexImporter `json:"importer" yaml:"importer"`
}

// NewConfig load configurations from file path.
func NewConfig(path string) (cfg *Data, err error) {
cfg = new(Data)

if err = config.Read(path, &cfg); err != nil {
return nil, err
}

if cfg != nil {
cfg.Bind()
} else {
return nil, errors.ErrInvalidConfig
}

if cfg.Observability != nil {
_ = cfg.Observability.Bind()
} else {
cfg.Observability = new(config.Observability).Bind()
}

if cfg.Importer != nil {
cfg.Importer = cfg.Importer.Bind()
} else {
return nil, errors.ErrInvalidConfig
}

return cfg, nil
}
Loading

0 comments on commit f335516

Please sign in to comment.