Skip to content

Commit f3e84e7

Browse files
feat: add BigQueryWriteClient where append_rows returns a helper for writing rows (#284)
* WIP: write client sample * add sample with nullable types * add schema for all supported types * add complex types to code sample * refactor sample so that it can be tested * make test assertions more thorough * fix lint error * remove done TODO * address reviewer comments * fix tag mismatch * test on multiple regions * correct comments about why offset exists * upgrade g-c-b * WIP: invert stream using BiDi class * WIP: attempt to use Future for send instead * WIP: use futures, populated by background consumer * make sure stream is actually open before returning from open * copy close implementation from pub/sub * support extra metadata * process exceptions, add open timeout * sort imports * WIP: unit tests * drain futures when stream closes * update docs * add callbacks to detect when a stream fails * add unit tests * add sleep to loop waiting for RPC to be active * don't freeze if initial RPC fails * add needed initializations so done() functions * fail fast when there is a problem with the initial request * don't inherit concurrent.futures It's unnecessary and kept resulting in stuff getting stuck. * add unit test for open timeout * 🦉 Updates from OwlBot See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * add manual client to docs * typo in sample comments * force timeout and metadata to be kwargs * unify interface for sending row data * pull stream name from merged request * require newer proto-plus for copy_from method Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent e7cd2df commit f3e84e7

File tree

11 files changed

+1277
-0
lines changed

11 files changed

+1277
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2021 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START bigquerystorage_append_rows_raw_proto2]
16+
"""
17+
This code sample demonstrates using the low-level generated client for Python.
18+
"""
19+
20+
import datetime
21+
import decimal
22+
23+
from google.cloud import bigquery_storage_v1beta2
24+
from google.cloud.bigquery_storage_v1beta2 import types
25+
from google.cloud.bigquery_storage_v1beta2 import writer
26+
from google.protobuf import descriptor_pb2
27+
28+
# If you make updates to the sample_data.proto protocol buffers definition,
29+
# run:
30+
#
31+
# protoc --python_out=. sample_data.proto
32+
#
33+
# from the samples/snippets directory to generate the sample_data_pb2 module.
34+
from . import sample_data_pb2
35+
36+
37+
def append_rows_proto2(project_id: str, dataset_id: str, table_id: str):
38+
"""Create a write stream, write some sample data, and commit the stream."""
39+
write_client = bigquery_storage_v1beta2.BigQueryWriteClient()
40+
parent = write_client.table_path(project_id, dataset_id, table_id)
41+
write_stream = types.WriteStream()
42+
43+
# When creating the stream, choose the type. Use the PENDING type to wait
44+
# until the stream is committed before it is visible. See:
45+
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta2#google.cloud.bigquery.storage.v1beta2.WriteStream.Type
46+
write_stream.type_ = types.WriteStream.Type.PENDING
47+
write_stream = write_client.create_write_stream(
48+
parent=parent, write_stream=write_stream
49+
)
50+
stream_name = write_stream.name
51+
52+
# Create a template with fields needed for the first request.
53+
request_template = types.AppendRowsRequest()
54+
55+
# The initial request must contain the stream name.
56+
request_template.write_stream = stream_name
57+
58+
# So that BigQuery knows how to parse the serialized_rows, generate a
59+
# protocol buffer representation of your message descriptor.
60+
proto_schema = types.ProtoSchema()
61+
proto_descriptor = descriptor_pb2.DescriptorProto()
62+
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
63+
proto_schema.proto_descriptor = proto_descriptor
64+
proto_data = types.AppendRowsRequest.ProtoData()
65+
proto_data.writer_schema = proto_schema
66+
request_template.proto_rows = proto_data
67+
68+
# Some stream types support an unbounded number of requests. Construct an
69+
# AppendRowsStream to send an arbitrary number of requests to a stream.
70+
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
71+
72+
# Create a batch of row data by appending proto2 serialized bytes to the
73+
# serialized_rows repeated field.
74+
proto_rows = types.ProtoRows()
75+
76+
row = sample_data_pb2.SampleData()
77+
row.row_num = 1
78+
row.bool_col = True
79+
row.bytes_col = b"Hello, World!"
80+
row.float64_col = float("+inf")
81+
row.int64_col = 123
82+
row.string_col = "Howdy!"
83+
proto_rows.serialized_rows.append(row.SerializeToString())
84+
85+
row = sample_data_pb2.SampleData()
86+
row.row_num = 2
87+
row.bool_col = False
88+
proto_rows.serialized_rows.append(row.SerializeToString())
89+
90+
row = sample_data_pb2.SampleData()
91+
row.row_num = 3
92+
row.bytes_col = b"See you later!"
93+
proto_rows.serialized_rows.append(row.SerializeToString())
94+
95+
row = sample_data_pb2.SampleData()
96+
row.row_num = 4
97+
row.float64_col = 1000000.125
98+
proto_rows.serialized_rows.append(row.SerializeToString())
99+
100+
row = sample_data_pb2.SampleData()
101+
row.row_num = 5
102+
row.int64_col = 67000
103+
proto_rows.serialized_rows.append(row.SerializeToString())
104+
105+
row = sample_data_pb2.SampleData()
106+
row.row_num = 6
107+
row.string_col = "Auf Wiedersehen!"
108+
proto_rows.serialized_rows.append(row.SerializeToString())
109+
110+
# Set an offset to allow resuming this stream if the connection breaks.
111+
# Keep track of which requests the server has acknowledged and resume the
112+
# stream at the first non-acknowledged message. If the server has already
113+
# processed a message with that offset, it will return an ALREADY_EXISTS
114+
# error, which can be safely ignored.
115+
#
116+
# The first request must always have an offset of 0.
117+
request = types.AppendRowsRequest()
118+
request.offset = 0
119+
proto_data = types.AppendRowsRequest.ProtoData()
120+
proto_data.rows = proto_rows
121+
request.proto_rows = proto_data
122+
123+
response_future_1 = append_rows_stream.send(request)
124+
125+
# Create a batch of rows containing scalar values that don't directly
126+
# correspond to a protocol buffers scalar type. See the documentation for
127+
# the expected data formats:
128+
# https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
129+
proto_rows = types.ProtoRows()
130+
131+
row = sample_data_pb2.SampleData()
132+
row.row_num = 7
133+
date_value = datetime.date(2021, 8, 12)
134+
epoch_value = datetime.date(1970, 1, 1)
135+
delta = date_value - epoch_value
136+
row.date_col = delta.days
137+
proto_rows.serialized_rows.append(row.SerializeToString())
138+
139+
row = sample_data_pb2.SampleData()
140+
row.row_num = 8
141+
datetime_value = datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)
142+
row.datetime_col = datetime_value.strftime("%Y-%m-%d %H:%M:%S.%f")
143+
proto_rows.serialized_rows.append(row.SerializeToString())
144+
145+
row = sample_data_pb2.SampleData()
146+
row.row_num = 9
147+
row.geography_col = "POINT(-122.347222 47.651111)"
148+
proto_rows.serialized_rows.append(row.SerializeToString())
149+
150+
row = sample_data_pb2.SampleData()
151+
row.row_num = 10
152+
numeric_value = decimal.Decimal("1.23456789101112e+6")
153+
row.numeric_col = str(numeric_value)
154+
bignumeric_value = decimal.Decimal("-1.234567891011121314151617181920e+16")
155+
row.bignumeric_col = str(bignumeric_value)
156+
proto_rows.serialized_rows.append(row.SerializeToString())
157+
158+
row = sample_data_pb2.SampleData()
159+
row.row_num = 11
160+
time_value = datetime.time(11, 7, 48, 123456)
161+
row.time_col = time_value.strftime("%H:%M:%S.%f")
162+
proto_rows.serialized_rows.append(row.SerializeToString())
163+
164+
row = sample_data_pb2.SampleData()
165+
row.row_num = 12
166+
timestamp_value = datetime.datetime(
167+
2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc
168+
)
169+
epoch_value = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
170+
delta = timestamp_value - epoch_value
171+
row.timestamp_col = int(delta.total_seconds()) * 1000000 + int(delta.microseconds)
172+
proto_rows.serialized_rows.append(row.SerializeToString())
173+
174+
# Since this is the second request, you only need to include the row data.
175+
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
176+
# the first request.
177+
request = types.AppendRowsRequest()
178+
proto_data = types.AppendRowsRequest.ProtoData()
179+
proto_data.rows = proto_rows
180+
request.proto_rows = proto_data
181+
182+
# Offset must equal the number of rows that were previously sent.
183+
request.offset = 6
184+
185+
response_future_2 = append_rows_stream.send(request)
186+
187+
# Create a batch of rows with STRUCT and ARRAY BigQuery data types. In
188+
# protocol buffers, these correspond to nested messages and repeated
189+
# fields, respectively.
190+
proto_rows = types.ProtoRows()
191+
192+
row = sample_data_pb2.SampleData()
193+
row.row_num = 13
194+
row.int64_list.append(1)
195+
row.int64_list.append(2)
196+
row.int64_list.append(3)
197+
proto_rows.serialized_rows.append(row.SerializeToString())
198+
199+
row = sample_data_pb2.SampleData()
200+
row.row_num = 14
201+
row.struct_col.sub_int_col = 7
202+
proto_rows.serialized_rows.append(row.SerializeToString())
203+
204+
row = sample_data_pb2.SampleData()
205+
row.row_num = 15
206+
sub_message = sample_data_pb2.SampleData.SampleStruct()
207+
sub_message.sub_int_col = -1
208+
row.struct_list.append(sub_message)
209+
sub_message = sample_data_pb2.SampleData.SampleStruct()
210+
sub_message.sub_int_col = -2
211+
row.struct_list.append(sub_message)
212+
sub_message = sample_data_pb2.SampleData.SampleStruct()
213+
sub_message.sub_int_col = -3
214+
row.struct_list.append(sub_message)
215+
proto_rows.serialized_rows.append(row.SerializeToString())
216+
217+
request = types.AppendRowsRequest()
218+
request.offset = 12
219+
proto_data = types.AppendRowsRequest.ProtoData()
220+
proto_data.rows = proto_rows
221+
request.proto_rows = proto_data
222+
223+
# For each request sent, a message is expected in the responses iterable.
224+
# This sample sends 3 requests, therefore expect exactly 3 responses.
225+
response_future_3 = append_rows_stream.send(request)
226+
227+
# All three requests are in-flight, wait for them to finish being processed
228+
# before finalizing the stream.
229+
print(response_future_1.result())
230+
print(response_future_2.result())
231+
print(response_future_3.result())
232+
233+
# Shutdown background threads and close the streaming connection.
234+
append_rows_stream.close()
235+
236+
# A PENDING type stream must be "finalized" before being committed. No new
237+
# records can be written to the stream after this method has been called.
238+
write_client.finalize_write_stream(name=write_stream.name)
239+
240+
# Commit the stream you created earlier.
241+
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
242+
batch_commit_write_streams_request.parent = parent
243+
batch_commit_write_streams_request.write_streams = [write_stream.name]
244+
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
245+
246+
print(f"Writes to stream: '{write_stream.name}' have been committed.")
247+
248+
249+
# [END bigquerystorage_append_rows_raw_proto2]
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
import decimal
17+
import pathlib
18+
import random
19+
20+
from google.cloud import bigquery
21+
import pytest
22+
23+
from . import append_rows_proto2
24+
25+
26+
DIR = pathlib.Path(__file__).parent
27+
28+
29+
regions = ["US", "non-US"]
30+
31+
32+
@pytest.fixture(params=regions)
33+
def sample_data_table(
34+
request: pytest.FixtureRequest,
35+
bigquery_client: bigquery.Client,
36+
project_id: str,
37+
dataset_id: str,
38+
dataset_id_non_us: str,
39+
) -> str:
40+
dataset = dataset_id
41+
if request.param != "US":
42+
dataset = dataset_id_non_us
43+
schema = bigquery_client.schema_from_json(str(DIR / "sample_data_schema.json"))
44+
table_id = f"append_rows_proto2_{random.randrange(10000)}"
45+
full_table_id = f"{project_id}.{dataset}.{table_id}"
46+
table = bigquery.Table(full_table_id, schema=schema)
47+
table = bigquery_client.create_table(table, exists_ok=True)
48+
yield full_table_id
49+
bigquery_client.delete_table(table, not_found_ok=True)
50+
51+
52+
def test_append_rows_proto2(
53+
capsys: pytest.CaptureFixture,
54+
bigquery_client: bigquery.Client,
55+
sample_data_table: str,
56+
):
57+
project_id, dataset_id, table_id = sample_data_table.split(".")
58+
append_rows_proto2.append_rows_proto2(
59+
project_id=project_id, dataset_id=dataset_id, table_id=table_id
60+
)
61+
out, _ = capsys.readouterr()
62+
assert "have been committed" in out
63+
64+
rows = bigquery_client.query(
65+
f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"
66+
).result()
67+
row_items = [
68+
# Convert to sorted tuple of items, omitting NULL values, to make
69+
# searching for expected rows easier.
70+
tuple(
71+
sorted(
72+
item for item in row.items() if item[1] is not None and item[1] != []
73+
)
74+
)
75+
for row in rows
76+
]
77+
78+
assert (
79+
("bool_col", True),
80+
("bytes_col", b"Hello, World!"),
81+
("float64_col", float("+inf")),
82+
("int64_col", 123),
83+
("row_num", 1),
84+
("string_col", "Howdy!"),
85+
) in row_items
86+
assert (("bool_col", False), ("row_num", 2)) in row_items
87+
assert (("bytes_col", b"See you later!"), ("row_num", 3)) in row_items
88+
assert (("float64_col", 1000000.125), ("row_num", 4)) in row_items
89+
assert (("int64_col", 67000), ("row_num", 5)) in row_items
90+
assert (("row_num", 6), ("string_col", "Auf Wiedersehen!")) in row_items
91+
assert (("date_col", datetime.date(2021, 8, 12)), ("row_num", 7)) in row_items
92+
assert (
93+
("datetime_col", datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)),
94+
("row_num", 8),
95+
) in row_items
96+
assert (
97+
("geography_col", "POINT(-122.347222 47.651111)"),
98+
("row_num", 9),
99+
) in row_items
100+
assert (
101+
("bignumeric_col", decimal.Decimal("-1.234567891011121314151617181920e+16")),
102+
("numeric_col", decimal.Decimal("1.23456789101112e+6")),
103+
("row_num", 10),
104+
) in row_items
105+
assert (
106+
("row_num", 11),
107+
("time_col", datetime.time(11, 7, 48, 123456)),
108+
) in row_items
109+
assert (
110+
("row_num", 12),
111+
(
112+
"timestamp_col",
113+
datetime.datetime(
114+
2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc
115+
),
116+
),
117+
) in row_items
118+
assert (("int64_list", [1, 2, 3]), ("row_num", 13)) in row_items
119+
assert (("row_num", 14), ("struct_col", {"sub_int_col": 7}),) in row_items
120+
assert (
121+
("row_num", 15),
122+
(
123+
"struct_list",
124+
[{"sub_int_col": -1}, {"sub_int_col": -2}, {"sub_int_col": -3}],
125+
),
126+
) in row_items

0 commit comments

Comments
 (0)