Skip to content

Commit

Permalink
Adding Bigtable Table.read_rows().
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Feb 19, 2016
1 parent 112ff67 commit 72e9186
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 3 deletions.
58 changes: 55 additions & 3 deletions gcloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from gcloud.bigtable.column_family import ColumnFamily
from gcloud.bigtable.row import Row
from gcloud.bigtable.row_data import PartialRowData
from gcloud.bigtable.row_data import PartialRowsData


class Table(object):
Expand Down Expand Up @@ -255,6 +256,59 @@ def read_row(self, row_key, filter_=None):
raise ValueError('The row remains partial / is not committed.')
return result

def read_rows(self, start_key=None, end_key=None,
allow_row_interleaving=None, limit=None, filter_=None):
"""Read rows from this table.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type allow_row_interleaving: bool
:param allow_row_interleaving: (Optional) By default, rows are read
sequentially, producing results which
are guaranteed to arrive in increasing
row order. Setting
``allow_row_interleaving`` to
:data:`True` allows multiple rows to be
interleaved in the response stream,
which increases throughput but breaks
this guarantee, and may force the
client to use more memory to buffer
partially-received rows.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results. Note that if ``allow_row_interleaving`` is
set to :data:`True`, partial results may be returned for
more than N rows. However, only N ``commit_row`` chunks
will be sent.
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads every column in
each row.
:rtype: :class:`.PartialRowsData`
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
"""
request_pb = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
allow_row_interleaving=allow_row_interleaving, limit=limit)
client = self._cluster._client
response_iterator = client._data_stub.ReadRows(request_pb,
client.timeout_seconds)
# We expect an iterator of `data_messages_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)

def sample_row_keys(self):
"""Read a sample of row keys in the table.
Expand Down Expand Up @@ -314,9 +368,7 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
:class:`.row.RowFilterUnion` or
:class:`.row.ConditionalRowFilter`
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
Expand Down
60 changes: 60 additions & 0 deletions gcloud/bigtable/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,66 @@ def test_read_row_still_partial(self):
with self.assertRaises(ValueError):
self._read_row_helper(chunks)

def test_read_rows(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable.row_data import PartialRowsData
from gcloud.bigtable import table as MUT

project_id = 'project-id'
zone = 'zone'
cluster_id = 'cluster-id'
table_id = 'table-id'
timeout_seconds = 1111
client = _Client(timeout_seconds=timeout_seconds)
cluster_name = ('projects/' + project_id + '/zones/' + zone +
'/clusters/' + cluster_id)
cluster = _Cluster(cluster_name, client=client)
table = self._makeOne(table_id, cluster)

# Create request_pb
request_pb = object() # Returned by our mock.
mock_created = []

def mock_create_row_request(table_name, **kwargs):
mock_created.append((table_name, kwargs))
return request_pb

# Create response_iterator
response_iterator = object()

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_iterator)

# Create expected_result.
expected_result = PartialRowsData(response_iterator)

# Perform the method and check the result.
start_key = b'start-key'
end_key = b'end-key'
filter_obj = object()
allow_row_interleaving = True
limit = 22
with _Monkey(MUT, _create_row_request=mock_create_row_request):
result = table.read_rows(
start_key=start_key, end_key=end_key, filter_=filter_obj,
allow_row_interleaving=allow_row_interleaving, limit=limit)

self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'ReadRows',
(request_pb, timeout_seconds),
{},
)])
created_kwargs = {
'start_key': start_key,
'end_key': end_key,
'filter_': filter_obj,
'allow_row_interleaving': allow_row_interleaving,
'limit': limit,
}
self.assertEqual(mock_created, [(table.name, created_kwargs)])

def test_sample_row_keys(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
Expand Down

0 comments on commit 72e9186

Please sign in to comment.