Skip to content

Commit

Permalink
feat: implement bigtable io connector with write capabilities (#23411)
Browse files Browse the repository at this point in the history
  • Loading branch information
capthiron authored Nov 7, 2022
1 parent c4218e5 commit 9533fc3
Show file tree
Hide file tree
Showing 5 changed files with 498 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).

## New Features / Improvements

Expand Down
8 changes: 8 additions & 0 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

require cloud.google.com/go/bigtable v1.16.0

require (
cloud.google.com/go v0.104.0 // indirect
cloud.google.com/go/compute v1.10.0 // indirect
Expand All @@ -64,11 +66,17 @@ require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/containerd v1.6.8 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
10 changes: 9 additions & 1 deletion sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/bigquery v1.43.0 h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
cloud.google.com/go/bigtable v1.16.0 h1:sqJhhslzQOag49Mf2/uH3+u+NdfpPX0gjKAcgYpRUCU=
cloud.google.com/go/bigtable v1.16.0/go.mod h1:6f7WVXfeZaJz0xevUZoTA1s8sTmmrQqIAkRDVEHVg7I=
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=
cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M=
Expand Down Expand Up @@ -185,10 +187,10 @@ github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInq
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand All @@ -209,11 +211,13 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 h1:hzAQntlaYRkVSFEfj9OTWlVV1H155FMD8BTKktLv0QI=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo=
Expand Down Expand Up @@ -403,7 +407,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 h1:xvqufLtNVwAhN8NMyWklVgxnWohi+wtMGQMhtxexlm0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand Down Expand Up @@ -523,6 +529,7 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -1722,6 +1729,7 @@ k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
285 changes: 285 additions & 0 deletions sdks/go/pkg/beam/io/bigtableio/bigtable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package bigtableio provides transformations and utilities to interact with
// Google Bigtable. See also: https://cloud.google.com/bigtable/docs
package bigtableio

import (
"context"
"fmt"
"hash/fnv"
"reflect"

"cloud.google.com/go/bigtable"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeFn{})
register.DoFn3x1[context.Context, int, func(*Mutation) bool, error](&writeBatchFn{})
register.Iter1[*Mutation]()
}

// Mutation represents a necessary serializable wrapper analogue
// to bigtable.Mutation containing a rowKey and the operations to be applied.
type Mutation struct {
RowKey string
Ops []Operation

// optional custom beam.GroupByKey key, default is a fixed key of 1.
GroupKey string
}

// Operation represents a raw change to be applied within a Mutation.
type Operation struct {
Family string
Column string
Ts bigtable.Timestamp
Value []byte
}

// NewMutation returns a new *Mutation, analogue to bigtable.NewMutation().
func NewMutation(rowKey string) *Mutation {
return &Mutation{RowKey: rowKey}
}

// Set sets a value in a specified column, with the given timestamp,
// analogue to bigtable.Mutation.Set().
// The timestamp will be truncated to millisecond granularity.
// A timestamp of ServerTime means to use the server timestamp.
func (m *Mutation) Set(family, column string, ts bigtable.Timestamp, value []byte) {
m.Ops = append(m.Ops, Operation{Family: family, Column: column, Ts: ts, Value: value})
}

// WithGroupKey sets a custom group key to be utilised by beam.GroupByKey.
func (m *Mutation) WithGroupKey(key string) *Mutation {
m.GroupKey = key
return m
}

// Write writes the elements of the given PCollection<bigtableio.Mutation> to bigtable.
func Write(s beam.Scope, project, instanceID, table string, col beam.PCollection) {
t := col.Type().Type()
err := mustBeBigtableioMutation(t)
if err != nil {
panic(err)
}

s = s.Scope("bigtable.Write")

pre := beam.ParDo(s, addGroupKeyFn, col)
post := beam.GroupByKey(s, pre)
beam.ParDo0(s, &writeFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, post)
}

// WriteBatch writes the elements of the given PCollection<bigtableio.Mutation>
// to bigtable using bigtable.ApplyBulk().
// For the underlying bigtable.ApplyBulk function to work properly
// the maximum number of operations per bigtableio.Mutation of the input
// PCollection must not be greater than 100,000. For more information
// see https://cloud.google.com/bigtable/docs/writes#batch for more.
func WriteBatch(s beam.Scope, project, instanceID, table string, col beam.PCollection) {
t := col.Type().Type()
err := mustBeBigtableioMutation(t)
if err != nil {
panic(err)
}

s = s.Scope("bigtable.WriteBatch")

pre := beam.ParDo(s, addGroupKeyFn, col)
post := beam.GroupByKey(s, pre)
beam.ParDo0(s, &writeBatchFn{Project: project, InstanceID: instanceID, TableName: table, Type: beam.EncodedType{T: t}}, post)
}

func addGroupKeyFn(mutation Mutation) (int, Mutation) {
if mutation.GroupKey != "" {
return hashStringToInt(mutation.GroupKey), mutation
}
return 1, mutation
}

func hashStringToInt(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32())
}

func mustBeBigtableioMutation(t reflect.Type) error {
if t != reflect.TypeOf(Mutation{}) {
return fmt.Errorf("type must be bigtableio.Mutation but is: %v", t)
}
return nil
}

type writeFn struct {
// Project is the project
Project string `json:"project"`
// InstanceID is the bigtable instanceID
InstanceID string `json:"instanceId"`
// Client is the bigtable.Client
client *bigtable.Client `json:"-"`
// TableName is the qualified table identifier.
TableName string `json:"tableName"`
// Table is a bigtable.Table instance with an eventual open connection
table *bigtable.Table `json:"-"`
// Type is the encoded schema type.
Type beam.EncodedType `json:"type"`
}

func (f *writeFn) Setup(ctx context.Context) error {
var err error
f.client, err = bigtable.NewClient(ctx, f.Project, f.InstanceID)
if err != nil {
return fmt.Errorf("could not create data operations client: %v", err)
}

f.table = f.client.Open(f.TableName)
return nil
}

func (f *writeFn) Teardown() error {
if err := f.client.Close(); err != nil {
return fmt.Errorf("could not close data operations client: %v", err)
}
return nil
}

func (f *writeFn) ProcessElement(ctx context.Context, key int, values func(*Mutation) bool) error {

var mutation Mutation
for values(&mutation) {

err := validateMutation(mutation)
if err != nil {
return fmt.Errorf("invalid bigtableio.Mutation: %s", err)
}

err = f.table.Apply(ctx, mutation.RowKey, getBigtableMutation(mutation))
if err != nil {
return fmt.Errorf("could not apply mutation for row key='%s': %v", mutation.RowKey, err)
}

}

return nil
}

type writeBatchFn struct {
// Project is the project
Project string `json:"project"`
// InstanceID is the bigtable instanceID
InstanceID string `json:"instanceId"`
// Client is the bigtable.Client
client *bigtable.Client `json:"-"`
// TableName is the qualified table identifier.
TableName string `json:"tableName"`
// Table is a bigtable.Table instance with an eventual open connection
table *bigtable.Table `json:"-"`
// Type is the encoded schema type.
Type beam.EncodedType `json:"type"`
}

func (f *writeBatchFn) Setup(ctx context.Context) error {
var err error
f.client, err = bigtable.NewClient(ctx, f.Project, f.InstanceID)
if err != nil {
return fmt.Errorf("could not create data operations client: %v", err)
}

f.table = f.client.Open(f.TableName)
return nil
}

func (f *writeBatchFn) Teardown() error {
if err := f.client.Close(); err != nil {
return fmt.Errorf("could not close data operations client: %v", err)
}
return nil
}

func (f *writeBatchFn) ProcessElement(ctx context.Context, key int, values func(*Mutation) bool) error {

var rowKeysInBatch []string
var mutationsInBatch []*bigtable.Mutation

// opsAddedToBatch is used to make sure that one batch does not include more than 100000 operations/mutations
opsAddedToBatch := 0

var mutation Mutation
for values(&mutation) {

err := validateMutation(mutation)
if err != nil {
return fmt.Errorf("invalid bigtableio.Mutation: %s", err)
}

opsInMutation := len(mutation.Ops)

if (opsAddedToBatch + opsInMutation) > 100000 {
err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch))
if err != nil {
return err
}

rowKeysInBatch = nil
mutationsInBatch = nil
opsAddedToBatch = 0
}

rowKeysInBatch = append(rowKeysInBatch, mutation.RowKey)
mutationsInBatch = append(mutationsInBatch, getBigtableMutation(mutation))
opsAddedToBatch += len(mutation.Ops)

}

if len(rowKeysInBatch) != 0 && len(mutationsInBatch) != 0 {
err := tryApplyBulk(f.table.ApplyBulk(ctx, rowKeysInBatch, mutationsInBatch))
if err != nil {
return err
}
}

return nil
}

func validateMutation(mutation Mutation) error {
if len(mutation.Ops) > 100000 {
return fmt.Errorf("one instance of bigtableio.Mutation must not have more than 100,000 operations/mutations, see https://cloud.google.com/bigtable/docs/writes#batch")
}
return nil
}

func tryApplyBulk(errs []error, processErr error) error {
if processErr != nil {
return fmt.Errorf("bulk apply procces failed: %v", processErr)
}
for _, err := range errs {
if err != nil {
return fmt.Errorf("could not apply mutation: %v", err)
}
}
return nil
}

func getBigtableMutation(mutation Mutation) *bigtable.Mutation {
bigtableMutation := bigtable.NewMutation()
for _, m := range mutation.Ops {
bigtableMutation.Set(m.Family, m.Column, m.Ts, m.Value)
}
return bigtableMutation
}
Loading

0 comments on commit 9533fc3

Please sign in to comment.