Skip to content

Commit 51f9358

Browse files
committed
feat(make_backup): support root_path parameter for MakeBackup
1 parent 90dad61 commit 51f9358

File tree

23 files changed

+526
-187
lines changed

23 files changed

+526
-187
lines changed

.github/workflows/unit-test.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,20 @@ jobs:
9393
- name: docker compose down
9494
run: |
9595
docker compose down
96+
- name: docker compose up
97+
if: ${{ matrix.enable_new_paths_format }}
98+
run: |
99+
docker compose up -d
100+
- name: run new_paths_format tests
101+
if: ${{ matrix.enable_new_paths_format }}
102+
run: |
103+
while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do
104+
echo "Waiting for container to become healthy..."
105+
sleep 1
106+
done
107+
echo "Starting new_paths_format tests!"
108+
docker exec local-ydbcp sh -c './test_new_paths_format'
109+
- name: docker compose down
110+
if: ${{ matrix.enable_new_paths_format }}
111+
run: |
112+
docker compose down

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go
2020
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go
2121
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
2222
RUN go build -o ./orm ./cmd/integration/orm/main.go
23+
RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go
2324

2425
# Command to run the executable
2526
CMD ["./main", "--config=local_config.yaml"]
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
"ydbcp/cmd/integration/common"
8+
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
9+
10+
"google.golang.org/grpc"
11+
)
12+
13+
const (
14+
containerID = "abcde"
15+
databaseName = "/local"
16+
ydbcpEndpoint = "0.0.0.0:50051"
17+
databaseEndpoint = "grpcs://local-ydb:2135"
18+
)
19+
20+
type backupScenario struct {
21+
name string
22+
request *pb.MakeBackupRequest
23+
expectedRootPath string
24+
expectedSourcePaths []string
25+
}
26+
27+
type negativeBackupScenario struct {
28+
name string
29+
request *pb.MakeBackupRequest
30+
}
31+
32+
func newMakeBackupRequest(rootPath string, sourcePaths []string) *pb.MakeBackupRequest {
33+
return &pb.MakeBackupRequest{
34+
ContainerId: containerID,
35+
DatabaseName: databaseName,
36+
DatabaseEndpoint: databaseEndpoint,
37+
RootPath: rootPath,
38+
SourcePaths: sourcePaths,
39+
}
40+
}
41+
42+
func runBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario backupScenario) {
43+
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
44+
if err != nil {
45+
log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err)
46+
}
47+
op, err := opClient.GetOperation(
48+
ctx, &pb.GetOperationRequest{
49+
Id: tbwr.Id,
50+
},
51+
)
52+
if err != nil {
53+
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
54+
}
55+
56+
if op.GetRootPath() != scenario.expectedRootPath {
57+
log.Panicf("scenario %s: expected root path %q, got %q", scenario.name, scenario.expectedRootPath, op.GetRootPath())
58+
}
59+
60+
if !equalStringSlices(op.GetSourcePaths(), scenario.expectedSourcePaths) {
61+
log.Panicf("scenario %s: expected source paths %v, got %v", scenario.name, scenario.expectedSourcePaths, op.GetSourcePaths())
62+
}
63+
64+
log.Printf("scenario %s: passed", scenario.name)
65+
}
66+
67+
func runNegativeBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario negativeBackupScenario) {
68+
// MakeBackup should succeed and return an operation
69+
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
70+
if err != nil {
71+
log.Panicf("scenario %s: MakeBackup should succeed but got error: %v", scenario.name, err)
72+
}
73+
74+
operationID := tbwr.Id
75+
log.Printf("scenario %s: operation created with ID %s, waiting for it to fail...", scenario.name, operationID)
76+
77+
// Wait for the operation to process and fail
78+
maxWait := 10 * time.Second
79+
deadline := time.Now().Add(maxWait)
80+
ticker := time.NewTicker(1 * time.Second)
81+
defer ticker.Stop()
82+
83+
var op *pb.Operation
84+
for time.Now().Before(deadline) {
85+
<-ticker.C
86+
op, err = opClient.GetOperation(ctx, &pb.GetOperationRequest{Id: operationID})
87+
if err != nil {
88+
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
89+
}
90+
91+
// Check if operation is in ERROR state
92+
if op.GetStatus() == pb.Operation_ERROR {
93+
94+
log.Printf("scenario %s: passed", scenario.name)
95+
return
96+
}
97+
98+
// If operation completed successfully, that's unexpected
99+
if op.GetStatus() == pb.Operation_DONE {
100+
log.Panicf("scenario %s: operation completed successfully but expected ERROR status", scenario.name)
101+
}
102+
}
103+
104+
// Timeout - operation didn't reach ERROR state
105+
log.Panicf("scenario %s: timeout waiting for operation to reach ERROR status. Current status: %v, message: %q", scenario.name, op.GetStatus(), op.GetMessage())
106+
}
107+
108+
func equalStringSlices(a, b []string) bool {
109+
if len(a) != len(b) {
110+
return false
111+
}
112+
for i := range a {
113+
if a[i] != b[i] {
114+
return false
115+
}
116+
}
117+
return true
118+
}
119+
120+
func main() {
121+
conn := common.CreateGRPCClient(ydbcpEndpoint)
122+
defer func(conn *grpc.ClientConn) {
123+
err := conn.Close()
124+
if err != nil {
125+
log.Panicln("failed to close connection")
126+
}
127+
}(conn)
128+
backupClient := pb.NewBackupServiceClient(conn)
129+
opClient := pb.NewOperationServiceClient(conn)
130+
131+
ctx := context.Background()
132+
133+
scenarios := []backupScenario{
134+
{
135+
name: "full backup",
136+
request: newMakeBackupRequest("", nil),
137+
expectedRootPath: "",
138+
expectedSourcePaths: []string{},
139+
},
140+
{
141+
name: "full backup with specified root path",
142+
request: newMakeBackupRequest("stocks", nil),
143+
expectedRootPath: "stocks",
144+
expectedSourcePaths: []string{},
145+
},
146+
{
147+
name: "partial backup",
148+
request: newMakeBackupRequest("", []string{"kv_test"}),
149+
expectedRootPath: "",
150+
expectedSourcePaths: []string{"kv_test"},
151+
},
152+
{
153+
name: "partial backup with specified root path",
154+
request: newMakeBackupRequest("stocks", []string{"orders", "orderLines"}),
155+
expectedRootPath: "stocks",
156+
expectedSourcePaths: []string{"orders", "orderLines"},
157+
},
158+
}
159+
160+
for _, scenario := range scenarios {
161+
runBackupScenario(ctx, backupClient, opClient, scenario)
162+
time.Sleep(2 * time.Second)
163+
}
164+
165+
negativeScenarios := []negativeBackupScenario{
166+
{
167+
name: "non-existing root path",
168+
request: newMakeBackupRequest("non_existing_path", nil),
169+
},
170+
{
171+
name: "non-existing source path",
172+
request: newMakeBackupRequest("", []string{"non_existing_table"}),
173+
},
174+
{
175+
name: "non-existing source paths with root path",
176+
request: newMakeBackupRequest("stocks", []string{"non_existing_table1", "non_existing_table2"}),
177+
},
178+
{
179+
name: "mixed existing and non-existing source paths",
180+
request: newMakeBackupRequest("", []string{"kv_test", "non_existing_table"}),
181+
},
182+
{
183+
name: "absolute root path",
184+
request: newMakeBackupRequest("/local/stocks", nil),
185+
},
186+
{
187+
name: "absolute sorce path",
188+
request: newMakeBackupRequest("", []string{"/local/stocks/orders"}),
189+
},
190+
{
191+
name: "absolute source path with root path",
192+
request: newMakeBackupRequest("stocks", []string{"/local/stocks/orders"}),
193+
},
194+
{
195+
name: "source path relative to db root with root path",
196+
request: newMakeBackupRequest("stocks", []string{"stocks/orders"}),
197+
},
198+
}
199+
200+
for _, scenario := range negativeScenarios {
201+
runNegativeBackupScenario(ctx, backupClient, opClient, scenario)
202+
}
203+
}

init_db/create_tables.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,16 @@
33

44
# create and fill user table kv_test
55
./ydb -e ${YDB_ENDPOINT} -d /local workload kv init
6-
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100
6+
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100
7+
8+
# create and fill user tables: stock, orders, orderLines
9+
./ydb -e ${YDB_ENDPOINT} -d /local workload stock init -p 10 -q 10 -o 10
10+
11+
# create directory for user tables
12+
./ydb -e ${YDB_ENDPOINT} -d /local scheme mkdir stocks
13+
14+
# move user tables (stock, orders, orderLines) to separate directory
15+
./ydb -e ${YDB_ENDPOINT} -d /local tools rename \
16+
--item source=/local/stock,destination=/local/stocks/stock \
17+
--item source=/local/orders,destination=/local/stocks/orders \
18+
--item source=/local/orderLines,destination=/local/stocks/orderLines

internal/backup_operations/make_backup.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type MakeBackupInternalRequest struct {
3030
ContainerID string
3131
DatabaseEndpoint string
3232
DatabaseName string
33+
RootPath string
3334
SourcePaths []string
3435
SourcePathsToExclude []string
3536
ScheduleID *string
@@ -42,6 +43,7 @@ func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalReques
4243
ContainerID: schedule.ContainerID,
4344
DatabaseEndpoint: schedule.DatabaseEndpoint,
4445
DatabaseName: schedule.DatabaseName,
46+
RootPath: schedule.RootPath,
4547
SourcePaths: schedule.SourcePaths,
4648
SourcePathsToExclude: schedule.SourcePathsToExclude,
4749
ScheduleID: &schedule.ID,
@@ -57,6 +59,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
5759
ContainerID: tbwr.ContainerID,
5860
DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint,
5961
DatabaseName: tbwr.YdbConnectionParams.DatabaseName,
62+
RootPath: tbwr.RootPath,
6063
SourcePaths: tbwr.SourcePaths,
6164
SourcePathsToExclude: tbwr.SourcePathsToExclude,
6265
ScheduleID: tbwr.ScheduleID,
@@ -141,9 +144,14 @@ func ValidateSourcePaths(
141144
if req.ScheduleID != nil {
142145
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
143146
}
147+
basePath, ok := SafePathJoin(req.DatabaseName, req.RootPath)
148+
if !ok {
149+
xlog.Error(ctx, "incorrect root path", zap.String("path", req.RootPath))
150+
return nil, status.Errorf(codes.InvalidArgument, "incorrect root path %s", req.RootPath)
151+
}
144152
sourcePaths := make([]string, 0, len(req.SourcePaths))
145153
for _, p := range req.SourcePaths {
146-
fullPath, ok := SafePathJoin(req.DatabaseName, p)
154+
fullPath, ok := SafePathJoin(basePath, p)
147155
if !ok {
148156
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
149157
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
@@ -345,6 +353,7 @@ func MakeBackup(
345353
SecretKey: secretKey,
346354
Description: "ydbcp backup", // TODO: the description shoud be better
347355
NumberOfRetries: 10, // TODO: get it from configuration
356+
RootPath: req.RootPath,
348357
SourcePaths: pathsForExport,
349358
DestinationPrefix: destinationPrefix,
350359
S3ForcePathStyle: s3.S3ForcePathStyle,
@@ -393,6 +402,7 @@ func MakeBackup(
393402
Endpoint: req.DatabaseEndpoint,
394403
DatabaseName: req.DatabaseName,
395404
},
405+
RootPath: req.RootPath,
396406
SourcePaths: req.SourcePaths,
397407
SourcePathsToExclude: req.SourcePathsToExclude,
398408
Audit: &pb.AuditInfo{

internal/connectors/client/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (d *ClientYdbConnector) ExportToS3(
268268
}
269269

270270
if featureFlags.EnableNewPathsFormat {
271-
exportRequest.Settings.SourcePath = clientDb.Name()
271+
exportRequest.Settings.SourcePath = path.Join(clientDb.Name(), s3Settings.RootPath)
272272
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
273273
}
274274

internal/connectors/db/process_result_set.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
143143
ydbOperationId *string
144144
operationStateBuf *string
145145
message *string
146+
rootPath *string
146147
sourcePaths *string
147148
sourcePathsToExclude *string
148149
creator *string
@@ -168,6 +169,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
168169
query.Named("operation_id", &ydbOperationId),
169170
query.Named("status", &operationStateBuf),
170171
query.Named("message", &message),
172+
query.Named("root_path", &rootPath),
171173
query.Named("paths", &sourcePaths),
172174
query.Named("paths_to_exclude", &sourcePathsToExclude),
173175

@@ -223,6 +225,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
223225
Endpoint: databaseEndpoint,
224226
DatabaseName: databaseName,
225227
},
228+
RootPath: StringOrEmpty(rootPath),
226229
SourcePaths: sourcePathsSlice,
227230
SourcePathsToExclude: sourcePathsToExcludeSlice,
228231
YdbOperationId: StringOrEmpty(ydbOperationId),
@@ -301,6 +304,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
301304
Endpoint: databaseEndpoint,
302305
DatabaseName: databaseName,
303306
},
307+
RootPath: StringOrEmpty(rootPath),
304308
SourcePaths: sourcePathsSlice,
305309
SourcePathsToExclude: sourcePathsToExcludeSlice,
306310
Audit: auditFromDb(creator, createdAt, completedAt),
@@ -330,6 +334,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
330334
createdAt *time.Time
331335
name *string
332336
ttl *time.Duration
337+
rootPath *string
333338
sourcePaths *string
334339
sourcePathsToExclude *string
335340
recoveryPointObjective *time.Duration
@@ -351,6 +356,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
351356
query.Named("created_at", &createdAt),
352357
query.Named("name", &name),
353358
query.Named("ttl", &ttl),
359+
query.Named("root_path", &rootPath),
354360
query.Named("paths", &sourcePaths),
355361
query.Named("paths_to_exclude", &sourcePathsToExclude),
356362
query.Named("recovery_point_objective", &recoveryPointObjective),
@@ -400,6 +406,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
400406
ContainerID: containerID,
401407
DatabaseName: databaseName,
402408
DatabaseEndpoint: databaseEndpoint,
409+
RootPath: StringOrEmpty(rootPath),
403410
SourcePaths: sourcePathsSlice,
404411
SourcePathsToExclude: sourcePathsToExcludeSlice,
405412
Audit: auditFromDb(initiated, createdAt, nil),

0 commit comments

Comments
 (0)