Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,20 @@ jobs:
- name: docker compose down
run: |
docker compose down
- name: docker compose up
if: ${{ matrix.enable_new_paths_format }}
run: |
docker compose up -d
- name: run new_paths_format tests
if: ${{ matrix.enable_new_paths_format }}
run: |
while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do
echo "Waiting for container to become healthy..."
sleep 1
done
echo "Starting new_paths_format tests!"
docker exec local-ydbcp sh -c './test_new_paths_format'
- name: docker compose down
if: ${{ matrix.enable_new_paths_format }}
run: |
docker compose down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
RUN go build -o ./orm ./cmd/integration/orm/main.go
RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
Expand Down
203 changes: 203 additions & 0 deletions cmd/integration/new_paths_format/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package main

import (
"context"
"log"
"time"
"ydbcp/cmd/integration/common"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
)

const (
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
)

type backupScenario struct {
name string
request *pb.MakeBackupRequest
expectedRootPath string
expectedSourcePaths []string
}

type negativeBackupScenario struct {
name string
request *pb.MakeBackupRequest
}

func newMakeBackupRequest(rootPath string, sourcePaths []string) *pb.MakeBackupRequest {
return &pb.MakeBackupRequest{
ContainerId: containerID,
DatabaseName: databaseName,
DatabaseEndpoint: databaseEndpoint,
RootPath: rootPath,
SourcePaths: sourcePaths,
}
}

func runBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario backupScenario) {
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
if err != nil {
log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err)
}
op, err := opClient.GetOperation(
ctx, &pb.GetOperationRequest{
Id: tbwr.Id,
},
)
if err != nil {
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
}

if op.GetRootPath() != scenario.expectedRootPath {
log.Panicf("scenario %s: expected root path %q, got %q", scenario.name, scenario.expectedRootPath, op.GetRootPath())
}

if !equalStringSlices(op.GetSourcePaths(), scenario.expectedSourcePaths) {
log.Panicf("scenario %s: expected source paths %v, got %v", scenario.name, scenario.expectedSourcePaths, op.GetSourcePaths())
}

log.Printf("scenario %s: passed", scenario.name)
}

func runNegativeBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario negativeBackupScenario) {
// MakeBackup should succeed and return an operation
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
if err != nil {
log.Panicf("scenario %s: MakeBackup should succeed but got error: %v", scenario.name, err)
}

operationID := tbwr.Id
log.Printf("scenario %s: operation created with ID %s, waiting for it to fail...", scenario.name, operationID)

// Wait for the operation to process and fail
maxWait := 10 * time.Second
deadline := time.Now().Add(maxWait)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

var op *pb.Operation
for time.Now().Before(deadline) {
<-ticker.C
op, err = opClient.GetOperation(ctx, &pb.GetOperationRequest{Id: operationID})
if err != nil {
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
}

// Check if operation is in ERROR state
if op.GetStatus() == pb.Operation_ERROR {

log.Printf("scenario %s: passed", scenario.name)
return
}

// If operation completed successfully, that's unexpected
if op.GetStatus() == pb.Operation_DONE {
log.Panicf("scenario %s: operation completed successfully but expected ERROR status", scenario.name)
}
}

// Timeout - operation didn't reach ERROR state
log.Panicf("scenario %s: timeout waiting for operation to reach ERROR status. Current status: %v, message: %q", scenario.name, op.GetStatus(), op.GetMessage())
}

func equalStringSlices(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

func main() {
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
log.Panicln("failed to close connection")
}
}(conn)
backupClient := pb.NewBackupServiceClient(conn)
opClient := pb.NewOperationServiceClient(conn)

ctx := context.Background()

scenarios := []backupScenario{
{
name: "full backup",
request: newMakeBackupRequest("", nil),
expectedRootPath: "",
expectedSourcePaths: []string{},
},
{
name: "full backup with specified root path",
request: newMakeBackupRequest("stocks", nil),
expectedRootPath: "stocks",
expectedSourcePaths: []string{},
},
{
name: "partial backup",
request: newMakeBackupRequest("", []string{"kv_test"}),
expectedRootPath: "",
expectedSourcePaths: []string{"kv_test"},
},
{
name: "partial backup with specified root path",
request: newMakeBackupRequest("stocks", []string{"orders", "orderLines"}),
expectedRootPath: "stocks",
expectedSourcePaths: []string{"orders", "orderLines"},
},
}

for _, scenario := range scenarios {
runBackupScenario(ctx, backupClient, opClient, scenario)
time.Sleep(2 * time.Second)
}

negativeScenarios := []negativeBackupScenario{
{
name: "non-existing root path",
request: newMakeBackupRequest("non_existing_path", nil),
},
{
name: "non-existing source path",
request: newMakeBackupRequest("", []string{"non_existing_table"}),
},
{
name: "non-existing source paths with root path",
request: newMakeBackupRequest("stocks", []string{"non_existing_table1", "non_existing_table2"}),
},
{
name: "mixed existing and non-existing source paths",
request: newMakeBackupRequest("", []string{"kv_test", "non_existing_table"}),
},
{
name: "absolute root path",
request: newMakeBackupRequest("/local/stocks", nil),
},
{
name: "absolute sorce path",
request: newMakeBackupRequest("", []string{"/local/stocks/orders"}),
},
{
name: "absolute source path with root path",
request: newMakeBackupRequest("stocks", []string{"/local/stocks/orders"}),
},
{
name: "source path relative to db root with root path",
request: newMakeBackupRequest("stocks", []string{"stocks/orders"}),
},
}

for _, scenario := range negativeScenarios {
runNegativeBackupScenario(ctx, backupClient, opClient, scenario)
}
}
11 changes: 7 additions & 4 deletions cmd/integration/orm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package main

import (
"context"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"reflect"
"time"
Expand All @@ -16,6 +12,11 @@ import (
"ydbcp/internal/metrics"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -44,6 +45,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
Endpoint: ydbcpEndpoint,
DatabaseName: databaseName,
},
RootPath: "root",
SourcePaths: []string{"path"},
SourcePathsToExclude: []string{"exclude"},
Audit: &pb.AuditInfo{
Expand All @@ -68,6 +70,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
Endpoint: ydbcpEndpoint,
DatabaseName: databaseName,
},
RootPath: "root",
SourcePaths: []string{"path"},
SourcePathsToExclude: []string{"exclude"},
Audit: &pb.AuditInfo{
Expand Down
14 changes: 13 additions & 1 deletion init_db/create_tables.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,16 @@

# create and fill user table kv_test
./ydb -e ${YDB_ENDPOINT} -d /local workload kv init
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100

# create and fill user tables: stock, orders, orderLines
./ydb -e ${YDB_ENDPOINT} -d /local workload stock init -p 10 -q 10 -o 10

# create directory for user tables
./ydb -e ${YDB_ENDPOINT} -d /local scheme mkdir stocks

# move user tables (stock, orders, orderLines) to separate directory
./ydb -e ${YDB_ENDPOINT} -d /local tools rename \
--item source=/local/stock,destination=/local/stocks/stock \
--item source=/local/orders,destination=/local/stocks/orders \
--item source=/local/orderLines,destination=/local/stocks/orderLines
12 changes: 11 additions & 1 deletion internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type MakeBackupInternalRequest struct {
ContainerID string
DatabaseEndpoint string
DatabaseName string
RootPath string
SourcePaths []string
SourcePathsToExclude []string
ScheduleID *string
Expand All @@ -42,6 +43,7 @@ func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalReques
ContainerID: schedule.ContainerID,
DatabaseEndpoint: schedule.DatabaseEndpoint,
DatabaseName: schedule.DatabaseName,
RootPath: schedule.RootPath,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
ScheduleID: &schedule.ID,
Expand All @@ -57,6 +59,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
ContainerID: tbwr.ContainerID,
DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint,
DatabaseName: tbwr.YdbConnectionParams.DatabaseName,
RootPath: tbwr.RootPath,
SourcePaths: tbwr.SourcePaths,
SourcePathsToExclude: tbwr.SourcePathsToExclude,
ScheduleID: tbwr.ScheduleID,
Expand Down Expand Up @@ -141,9 +144,14 @@ func ValidateSourcePaths(
if req.ScheduleID != nil {
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
}
basePath, ok := SafePathJoin(req.DatabaseName, req.RootPath)
if !ok {
xlog.Error(ctx, "incorrect root path", zap.String("path", req.RootPath))
return nil, status.Errorf(codes.InvalidArgument, "incorrect root path %s", req.RootPath)
}
sourcePaths := make([]string, 0, len(req.SourcePaths))
for _, p := range req.SourcePaths {
fullPath, ok := SafePathJoin(req.DatabaseName, p)
fullPath, ok := SafePathJoin(basePath, p)
if !ok {
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
Expand Down Expand Up @@ -345,6 +353,7 @@ func MakeBackup(
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
RootPath: req.RootPath,
SourcePaths: pathsForExport,
DestinationPrefix: destinationPrefix,
S3ForcePathStyle: s3.S3ForcePathStyle,
Expand Down Expand Up @@ -393,6 +402,7 @@ func MakeBackup(
Endpoint: req.DatabaseEndpoint,
DatabaseName: req.DatabaseName,
},
RootPath: req.RootPath,
SourcePaths: req.SourcePaths,
SourcePathsToExclude: req.SourcePathsToExclude,
Audit: &pb.AuditInfo{
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *ClientYdbConnector) ExportToS3(
}

if featureFlags.EnableNewPathsFormat {
exportRequest.Settings.SourcePath = clientDb.Name()
exportRequest.Settings.SourcePath = path.Join(clientDb.Name(), s3Settings.RootPath)
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
}

Expand Down
Loading
Loading