Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
372 changes: 372 additions & 0 deletions spanner/tests/unit/test_keyset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import unittest


import six


class TestKeyRange(unittest.TestCase):

def _getTargetClass(self):
Expand All @@ -26,6 +29,15 @@ def _getTargetClass(self):
def _make_one(self, *args, **kwargs):
return self._getTargetClass()(*args, **kwargs)

def _check_unused_keys(self, krange, used_keys):

This comment was marked as spam.

key_types = ('start_closed',
'start_open',
'end_closed',
'end_open')
for key_type in key_types:
if key_type not in used_keys:
self.assertFalse(krange.to_pb().HasField(key_type))

def test_ctor_no_start_no_end(self):
with self.assertRaises(ValueError):
self._make_one()
Expand Down Expand Up @@ -119,6 +131,366 @@ def test_to_pb_w_start_open_and_end_closed(self):
self.assertEqual(len(krange_pb.end_closed), 1)
self.assertEqual(krange_pb.end_closed.values[0].string_value, KEY_2[0])

def test_ctor_empty_list_start_closed_and_key_end_closed(self):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

KEY_1 = [u'key_1']
key_types = {'start_closed': [], 'end_closed': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_start_closed_and_key_end_open(self):
KEY_1 = [u'key_1']
key_types = {'start_closed': [], 'end_open': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_end_closed_and_key_start_closed(self):
KEY_1 = [u'key_1']
key_types = {'start_closed': KEY_1, 'end_closed': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_end_open_and_key_start_closed(self):
KEY_1 = [u'key_1']
key_types = {'start_closed': KEY_1, 'end_open': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_start_open_and_key_end_closed(self):
KEY_1 = [u'key_1']
key_types = {'start_open': [], 'end_closed': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_start_open_and_key_end_open(self):
KEY_1 = [u'key_1']
key_types = {'start_open': [], 'end_open': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_end_closed_and_key_start_open(self):
KEY_1 = [u'key_1']
key_types = {'start_open': KEY_1, 'end_closed': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_empty_list_end_open_and_key_start_open(self):
KEY_1 = [u'key_1']
key_types = {'start_open': KEY_1, 'end_open': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_start_closed_and_key_end_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': [], 'end_closed': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_start_closed_and_key_end_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': [], 'end_open': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_end_closed_and_key_start_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_closed': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_end_open_and_key_start_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_open': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_start_open_and_key_end_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': [], 'end_closed': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_start_open_and_key_end_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': [], 'end_open': KEY_1}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_end_closed_and_key_start_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_closed': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_to_pb_empty_list_end_open_and_key_start_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange

KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_open': []}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
if key_types[key_type] == KEY_1:
self.assertEqual(len(getattr(krange_pb, key_type)), 1)
key1 = getattr(krange_pb, key_type).values[0].string_value
self.assertEqual(key1, KEY_1[0])
else:
self.assertEqual(len(getattr(krange_pb, key_type)), 0)
self._check_unused_keys(krange, used_keys)

def test_ctor_keys_start_closed_end_closed(self):
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_closed': KEY_2}
krange = self._make_one(**key_types)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_keys_start_closed_end_open(self):
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_open': KEY_2}
krange = self._make_one(**key_types)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_keys_start_open_end_closed(self):
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_closed': KEY_2}
krange = self._make_one(**key_types)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_ctor_keys_start_open_end_open(self):
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_open': KEY_2}
krange = self._make_one(**key_types)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
self.assertEqual(getattr(krange, key_type), key_types[key_type])
self._check_unused_keys(krange, used_keys)

def test_to_pb_keys_start_closed_end_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_closed': KEY_2}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
pb_key_type = getattr(krange_pb, key_type)
self.assertEqual(len(pb_key_type), 1)
self.assertEqual(pb_key_type.values[0].string_value,
key_types[key_type][0])
self._check_unused_keys(krange, used_keys)

def test_to_pb_keys_start_closed_end_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_closed': KEY_1, 'end_open': KEY_2}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
pb_key_type = getattr(krange_pb, key_type)
self.assertEqual(len(pb_key_type), 1)
self.assertEqual(pb_key_type.values[0].string_value,
key_types[key_type][0])
self._check_unused_keys(krange, used_keys)

def test_to_pb_keys_start_open_end_closed(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_closed': KEY_2}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
pb_key_type = getattr(krange_pb, key_type)
self.assertEqual(len(pb_key_type), 1)
self.assertEqual(pb_key_type.values[0].string_value,
key_types[key_type][0])
self._check_unused_keys(krange, used_keys)

def test_to_pb_keys_start_open_end_open(self):
from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange
KEY_1 = [u'key_1']
KEY_2 = [u'key_2']
key_types = {'start_open': KEY_1, 'end_open': KEY_2}
krange = self._make_one(**key_types)
krange_pb = krange.to_pb()
self.assertIsInstance(krange_pb, KeyRange)
used_keys = []
for key_type in six.iterkeys(key_types):
used_keys.append(key_type)
pb_key_type = getattr(krange_pb, key_type)
self.assertEqual(len(pb_key_type), 1)
self.assertEqual(pb_key_type.values[0].string_value,
key_types[key_type][0])
self._check_unused_keys(krange, used_keys)


class TestKeySet(unittest.TestCase):

Expand Down