Skip to content

Commit

Permalink
feat(bigtable): Support copy backup in admin client (#9005)
Browse files Browse the repository at this point in the history
* feat(bigtable): Support copy backup in admin client
  • Loading branch information
bhshkh authored Dec 1, 2023
1 parent afe7c98 commit 834c47f
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 32 deletions.
16 changes: 12 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ intend only to run integration tests on a single package.

#### GCP Setup

To run the integrations tests, creation and configuration of two projects in
To run the integrations tests, creation and configuration of three projects in
the Google Developers Console is required: one specifically for Firestore
integration tests, and another for all other integration tests. We'll refer to
these projects as "general project" and "Firestore project".
integration tests, one specifically for Bigtable integration tests, and another
for all other integration tests. We'll refer to these projects as
"Firestore project", "Bigtable project" and "general project".

Note: You can skip setting up Bigtable project if you do not plan working on or running a few Bigtable
tests that require a secondary project

After creating each project, you must [create a service account](https://developers.google.com/identity/protocols/OAuth2ServiceAccount#creatinganaccount)
for each project. Ensure the project-level **Owner**
Expand Down Expand Up @@ -118,7 +122,7 @@ Finally, in the general project, create an API key for the translate API:

#### Local Setup

Once the two projects are created and configured, set the following environment
Once the three projects are created and configured, set the following environment
variables:

- `GCLOUD_TESTS_GOLANG_PROJECT_ID`: Developers Console project's ID (e.g.
Expand All @@ -132,6 +136,7 @@ project's service account.
- `GCLOUD_TESTS_GOLANG_FIRESTORE_KEY`: The path to the JSON key file of the
Firestore project's service account.
- `GCLOUD_TESTS_API_KEY`: API key for using the Translate API created above.
- `GCLOUD_TESTS_GOLANG_SECONDARY_BIGTABLE_PROJECT_ID`: Developers Console project's ID (e.g. doorway-cliff-677) for Bigtable optional secondary project. This can be same as Firestore project or any project other than the general project.

As part of the setup that follows, the following variables will be configured:

Expand Down Expand Up @@ -219,6 +224,9 @@ For instance, in `.zshrc`:
# Developers Console project's ID (e.g. bamboo-shift-455) for the general project.
export GCLOUD_TESTS_GOLANG_PROJECT_ID=your-project

# Developers Console project's ID (e.g. bamboo-shift-455) for the Bigtable project.
export GCLOUD_TESTS_GOLANG_SECONDARY_BIGTABLE_PROJECT_ID=your-bigtable-optional-secondary-project

# The path to the JSON key file of the general project's service account.
export GCLOUD_TESTS_GOLANG_KEY=~/directory/your-project-abcd1234.json

Expand Down
29 changes: 28 additions & 1 deletion bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ func (ac *AdminClient) Close() error {
}

func (ac *AdminClient) instancePrefix() string {
return fmt.Sprintf("projects/%s/instances/%s", ac.project, ac.instance)
return instancePrefix(ac.project, ac.instance)
}

func instancePrefix(project, instance string) string {
return fmt.Sprintf("projects/%s/instances/%s", project, instance)
}

func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
Expand Down Expand Up @@ -1920,6 +1924,27 @@ func (ac *AdminClient) CreateBackup(ctx context.Context, table, cluster, backup
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// CopyBackup copies the specified source backup with the user-provided expire time.
func (ac *AdminClient) CopyBackup(ctx context.Context, sourceCluster, sourceBackup,
destProject, destInstance, destCluster, destBackup string, expireTime time.Time) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
sourceBackupPath := ac.backupPath(sourceCluster, ac.instance, sourceBackup)
destPrefix := instancePrefix(destProject, destInstance)
req := &btapb.CopyBackupRequest{
Parent: destPrefix + "/clusters/" + destCluster,
BackupId: destBackup,
SourceBackup: sourceBackupPath,
ExpireTime: timestamppb.New(expireTime),
}

op, err := ac.tClient.CopyBackup(ctx, req)
if err != nil {
return err
}
resp := btapb.Backup{}
return longrunning.InternalNewOperation(ac.lroClient, op).Wait(ctx, &resp)
}

// Backups returns a BackupIterator for iterating over the backups in a cluster.
// To list backups across all of the clusters in the instance specify "-" as the cluster.
func (ac *AdminClient) Backups(ctx context.Context, cluster string) *BackupIterator {
Expand Down Expand Up @@ -1991,6 +2016,7 @@ func newBackupInfo(backup *btapb.Backup) (*BackupInfo, error) {
bi := BackupInfo{
Name: name,
SourceTable: tableID,
SourceBackup: backup.SourceBackup,
SizeBytes: backup.SizeBytes,
StartTime: startTime,
EndTime: endTime,
Expand Down Expand Up @@ -2030,6 +2056,7 @@ func (it *BackupIterator) Next() (*BackupInfo, error) {
type BackupInfo struct {
Name string
SourceTable string
SourceBackup string
SizeBytes int64
StartTime time.Time
EndTime time.Time
Expand Down
36 changes: 36 additions & 0 deletions bigtable/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"testing"
"time"

"cloud.google.com/go/internal/pretty"
"cloud.google.com/go/internal/testutil"
longrunning "cloud.google.com/go/longrunning/autogen/longrunningpb"
"github.com/google/go-cmp/cmp"
btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type mockTableAdminClock struct {
Expand All @@ -35,6 +38,9 @@ type mockTableAdminClock struct {
updateTableReq *btapb.UpdateTableRequest
createTableResp *btapb.Table
updateTableError error

copyBackupReq *btapb.CopyBackupRequest
copyBackupError error
}

func (c *mockTableAdminClock) CreateTable(
Expand All @@ -56,6 +62,14 @@ func (c *mockTableAdminClock) UpdateTable(
}, c.updateTableError
}

func (c *mockTableAdminClock) CopyBackup(
ctx context.Context, in *btapb.CopyBackupRequest, opts ...grpc.CallOption,
) (*longrunning.Operation, error) {
c.copyBackupReq = in
c.copyBackupError = fmt.Errorf("Mock error from client API")
return nil, c.copyBackupError
}

func setupTableClient(t *testing.T, ac btapb.BigtableTableAdminClient) *AdminClient {
ctx := context.Background()
c, err := NewAdminClient(ctx, "my-cool-project", "my-cool-instance")
Expand Down Expand Up @@ -123,6 +137,28 @@ func TestTableAdmin_CreateTableFromConf_ChangeStream_Valid(t *testing.T) {
}
}

func TestTableAdmin_CopyBackup_ErrorFromClient(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)

currTime := time.Now()
err := c.CopyBackup(context.Background(), "source-cluster", "source-backup", "dest-project", "dest-instance", "dest-cluster", "dest-backup", currTime)
if err == nil {
t.Errorf("CopyBackup got: nil, want: non-nil error")
}

got := mock.copyBackupReq
want := &btapb.CopyBackupRequest{
Parent: "projects/dest-project/instances/dest-instance/clusters/dest-cluster",
BackupId: "dest-backup",
SourceBackup: "projects/my-cool-project/instances/my-cool-instance/clusters/source-cluster/backups/source-backup",
ExpireTime: timestamppb.New(currTime),
}
if diff := testutil.Diff(got, want, cmp.AllowUnexported(btapb.CopyBackupRequest{})); diff != "" {
t.Errorf("CopyBackupRequest \ngot:\n%v,\nwant:\n%v,\ndiff:\n%v", pretty.Value(got), pretty.Value(want), diff)
}
}

func TestTableAdmin_CreateTableFromConf_ChangeStream_Disable(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)
Expand Down
46 changes: 33 additions & 13 deletions bigtable/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func init() {
flag.StringVar(&c.AdminEndpoint, "it.admin-endpoint", "", "Admin api host and port")
flag.StringVar(&c.DataEndpoint, "it.data-endpoint", "", "Data api host and port")
flag.StringVar(&c.Project, "it.project", "", "Project to use for integration test")
flag.StringVar(&c.Project2, "it.project2", "", "Optional secondary project to use for copy backup integration test")
flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
Expand Down Expand Up @@ -83,6 +84,7 @@ type IntegrationTestConfig struct {
AdminEndpoint string
DataEndpoint string
Project string
Project2 string
Instance string
Cluster string
Table string
Expand All @@ -94,6 +96,7 @@ type IntegrationTestConfig struct {
// The environment can be implemented using production or an emulator
type IntegrationEnv interface {
Config() IntegrationTestConfig
AdminClientOptions() (context.Context, []option.ClientOption, error) // Client options to be used in creating client
NewAdminClient() (*AdminClient, error)
// NewInstanceAdminClient will return nil if instance administration is unsupported in this environment
NewInstanceAdminClient() (*InstanceAdminClient, error)
Expand All @@ -110,6 +113,9 @@ func NewIntegrationEnv() (IntegrationEnv, error) {
if c.Project == "" {
c.Project = os.Getenv("GCLOUD_TESTS_GOLANG_PROJECT_ID")
}
if c.Project2 == "" {
c.Project2 = os.Getenv("GCLOUD_TESTS_GOLANG_SECONDARY_BIGTABLE_PROJECT_ID")
}
if c.Instance == "" {
c.Instance = os.Getenv("GCLOUD_TESTS_BIGTABLE_INSTANCE")
}
Expand Down Expand Up @@ -188,11 +194,10 @@ func (e *EmulatedEnv) Config() IntegrationTestConfig {

var headersInterceptor = testutil.DefaultHeadersEnforcer()

// NewAdminClient builds a new connected admin client for this environment
func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
func (e *EmulatedEnv) AdminClientOptions() (context.Context, []option.ClientOption, error) {
o, err := btopt.DefaultClientOptions(e.server.Addr, e.server.Addr, AdminScope, clientUserAgent)
if err != nil {
return nil, err
return nil, nil, err
}
// Add gRPC client interceptors to supply Google client information.
//
Expand All @@ -208,11 +213,18 @@ func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
o = append(o, option.WithGRPCDialOption(grpc.WithBlock()))
conn, err := gtransport.DialInsecure(ctx, o...)
if err != nil {
return nil, err
return nil, nil, err
}
return ctx, []option.ClientOption{option.WithGRPCConn(conn)}, nil
}

return NewAdminClient(ctx, e.config.Project, e.config.Instance,
option.WithGRPCConn(conn))
// NewAdminClient builds a new connected admin client for this environment
func (e *EmulatedEnv) NewAdminClient() (*AdminClient, error) {
ctx, options, err := e.AdminClientOptions()
if err != nil {
return nil, err
}
return NewAdminClient(ctx, e.config.Project, e.config.Instance, options...)
}

// NewInstanceAdminClient returns nil for the emulated environment since the API is not implemented.
Expand Down Expand Up @@ -287,22 +299,30 @@ func (e *ProdEnv) Config() IntegrationTestConfig {
return e.config
}

// NewAdminClient builds a new connected admin client for this environment
func (e *ProdEnv) NewAdminClient() (*AdminClient, error) {
func (e *ProdEnv) AdminClientOptions() (context.Context, []option.ClientOption, error) {
clientOpts := headersInterceptor.CallOptions()
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}
return NewAdminClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
return context.Background(), clientOpts, nil
}

// NewAdminClient builds a new connected admin client for this environment
func (e *ProdEnv) NewAdminClient() (*AdminClient, error) {
ctx, options, err := e.AdminClientOptions()
if err != nil {
return nil, err
}
return NewAdminClient(ctx, e.config.Project, e.config.Instance, options...)
}

// NewInstanceAdminClient returns a new connected instance admin client for this environment
func (e *ProdEnv) NewInstanceAdminClient() (*InstanceAdminClient, error) {
clientOpts := headersInterceptor.CallOptions()
if endpoint := e.config.AdminEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
ctx, options, err := e.AdminClientOptions()
if err != nil {
return nil, err
}
return NewInstanceAdminClient(context.Background(), e.config.Project, clientOpts...)
return NewInstanceAdminClient(ctx, e.config.Project, options...)
}

// NewClient builds a connected data client for this environment
Expand Down
Loading

0 comments on commit 834c47f

Please sign in to comment.