Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API-282] Implement SQL service #390

Merged
merged 9 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ API Documentation
predicate
proxy/modules
serialization
sql
transaction
util

4 changes: 4 additions & 0 deletions docs/api/sql.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SQL
===========

.. automodule:: hazelcast.sql
5 changes: 5 additions & 0 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
from hazelcast.reactor import AsyncoreReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.sql import _InternalSqlService, SqlService
from hazelcast.statistics import Statistics
from hazelcast.transaction import TWO_PHASE, TransactionManager
from hazelcast.util import AtomicInteger, RoundRobinLB
Expand Down Expand Up @@ -388,6 +389,10 @@ def __init__(self, **kwargs):
self._invocation_service.init(
self._internal_partition_service, self._connection_manager, self._listener_service
)
self._internal_sql_service = _InternalSqlService(
self._connection_manager, self._serialization_service, self._invocation_service
)
self.sql = SqlService(self._internal_sql_service)
self._init_context()
self._start()

Expand Down
39 changes: 29 additions & 10 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,22 @@ def add_listener(self, on_connection_opened=None, on_connection_closed=None):
def get_connection(self, member_uuid):
return self.active_connections.get(member_uuid, None)

def get_random_connection(self):
def get_random_connection(self, should_get_data_member=False):
if self._smart_routing_enabled:
member = self._load_balancer.next()
if member:
connection = self.get_connection(member.uuid)
if connection:
return connection

# We should not get to this point under normal circumstances.
# Therefore, copying the list should be OK.
for connection in list(six.itervalues(self.active_connections)):
connection = self._get_connection_from_load_balancer(should_get_data_member)
if connection:
return connection

# We should not get to this point under normal circumstances
# for the smart client. For uni-socket client, there would be
# a single connection in the dict. Therefore, copying the list
# should be acceptable.
for member_uuid, connection in list(six.iteritems(self.active_connections)):
if should_get_data_member:
member = self._cluster_service.get_member(member_uuid)
if not member or member.lite_member:
continue

return connection

return None
Expand Down Expand Up @@ -256,6 +261,20 @@ def check_invocation_allowed(self):
else:
raise IOError("No connection found to cluster")

def _get_connection_from_load_balancer(self, should_get_data_member):
load_balancer = self._load_balancer
member = None
if should_get_data_member:
if load_balancer.can_get_next_data_member():
member = load_balancer.next_data_member()
else:
member = load_balancer.next()

if not member:
return None

return self.get_connection(member.uuid)

def _get_or_connect_to_address(self, address):
for connection in list(six.itervalues(self.active_connections)):
if connection.remote_address == address:
Expand Down
277 changes: 276 additions & 1 deletion hazelcast/protocol/builtin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import uuid
from datetime import date, time, datetime, timedelta
from decimal import Decimal

from hazelcast import six
from hazelcast.six.moves import range
Expand All @@ -11,7 +13,7 @@
NULL_FINAL_FRAME_BUF,
END_FINAL_FRAME_BUF,
)
from hazelcast.serialization import (
from hazelcast.serialization.bits import (
LONG_SIZE_IN_BYTES,
UUID_SIZE_IN_BYTES,
LE_INT,
Expand All @@ -23,8 +25,21 @@
LE_INT8,
UUID_MSB_SHIFT,
UUID_LSB_MASK,
BYTE_SIZE_IN_BYTES,
SHORT_SIZE_IN_BYTES,
LE_INT16,
FLOAT_SIZE_IN_BYTES,
LE_FLOAT,
LE_DOUBLE,
DOUBLE_SIZE_IN_BYTES,
)
from hazelcast.serialization.data import Data
from hazelcast.util import int_from_bytes, timezone

_LOCAL_DATE_SIZE_IN_BYTES = SHORT_SIZE_IN_BYTES + BYTE_SIZE_IN_BYTES * 2
_LOCAL_TIME_SIZE_IN_BYTES = BYTE_SIZE_IN_BYTES * 3 + INT_SIZE_IN_BYTES
_LOCAL_DATE_TIME_SIZE_IN_BYTES = _LOCAL_DATE_SIZE_IN_BYTES + _LOCAL_TIME_SIZE_IN_BYTES
_OFFSET_DATE_TIME_SIZE_IN_BYTES = _LOCAL_DATE_TIME_SIZE_IN_BYTES + INT_SIZE_IN_BYTES


class CodecUtil(object):
Expand Down Expand Up @@ -274,6 +289,49 @@ def decode_uuid(buf, offset):
)
return uuid.UUID(bytes=bytes(b))

@staticmethod
def decode_short(buf, offset):
return LE_INT16.unpack_from(buf, offset)[0]

@staticmethod
def decode_float(buf, offset):
return LE_FLOAT.unpack_from(buf, offset)[0]

@staticmethod
def decode_double(buf, offset):
return LE_DOUBLE.unpack_from(buf, offset)[0]

@staticmethod
def decode_local_date(buf, offset):
year = FixSizedTypesCodec.decode_short(buf, offset)
month = FixSizedTypesCodec.decode_byte(buf, offset + SHORT_SIZE_IN_BYTES)
day = FixSizedTypesCodec.decode_byte(buf, offset + SHORT_SIZE_IN_BYTES + BYTE_SIZE_IN_BYTES)

return date(year, month, day)

@staticmethod
def decode_local_time(buf, offset):
hour = FixSizedTypesCodec.decode_byte(buf, offset)
minute = FixSizedTypesCodec.decode_byte(buf, offset + BYTE_SIZE_IN_BYTES)
second = FixSizedTypesCodec.decode_byte(buf, offset + BYTE_SIZE_IN_BYTES * 2)
nano = FixSizedTypesCodec.decode_int(buf, offset + BYTE_SIZE_IN_BYTES * 3)

return time(hour, minute, second, int(nano / 1000.0))

@staticmethod
def decode_local_date_time(buf, offset):
date_value = FixSizedTypesCodec.decode_local_date(buf, offset)
time_value = FixSizedTypesCodec.decode_local_time(buf, offset + _LOCAL_DATE_SIZE_IN_BYTES)

return datetime.combine(date_value, time_value)

@staticmethod
def decode_offset_date_time(buf, offset):
datetime_value = FixSizedTypesCodec.decode_local_date_time(buf, offset)
offset_seconds = FixSizedTypesCodec.decode_int(buf, offset + _LOCAL_DATE_TIME_SIZE_IN_BYTES)

return datetime_value.replace(tzinfo=timezone(timedelta(seconds=offset_seconds)))


class ListIntegerCodec(object):
@staticmethod
Expand Down Expand Up @@ -496,3 +554,220 @@ def encode(buf, value, is_final=False):
@staticmethod
def decode(msg):
return msg.next_frame().buf.decode("utf-8")


class ListCNFixedSizeCodec(object):
_TYPE_NULL_ONLY = 1
_TYPE_NOT_NULL_ONLY = 2
_TYPE_MIXED = 3

_ITEMS_PER_BITMASK = 8

_HEADER_SIZE = BYTE_SIZE_IN_BYTES + INT_SIZE_IN_BYTES

@staticmethod
def decode(msg, item_size, decoder):
frame = msg.next_frame()
type = FixSizedTypesCodec.decode_byte(frame.buf, 0)
count = FixSizedTypesCodec.decode_int(frame.buf, 1)

if type == ListCNFixedSizeCodec._TYPE_NULL_ONLY:
return [None] * count
elif type == ListCNFixedSizeCodec._TYPE_NOT_NULL_ONLY:
header_size = ListCNFixedSizeCodec._HEADER_SIZE
return [decoder(frame.buf, header_size + i * item_size) for i in range(count)]
else:
response = [None] * count
position = ListCNFixedSizeCodec._HEADER_SIZE
read_count = 0
items_per_bitmask = ListCNFixedSizeCodec._ITEMS_PER_BITMASK

while read_count < count:
bitmask = FixSizedTypesCodec.decode_byte(frame.buf, position)
position += 1

batch_size = min(items_per_bitmask, count - read_count)
for i in range(batch_size):
mask = 1 << i
if (bitmask & mask) == mask:
response[read_count] = decoder(frame.buf, position)
position += item_size

read_count += 1

return response


class ListCNBooleanCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, BOOLEAN_SIZE_IN_BYTES, FixSizedTypesCodec.decode_boolean
)


class ListCNByteCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(msg, BYTE_SIZE_IN_BYTES, FixSizedTypesCodec.decode_byte)


class ListCNShortCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, SHORT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_short
)


class ListCNIntegerCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(msg, INT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_int)


class ListCNLongCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(msg, LONG_SIZE_IN_BYTES, FixSizedTypesCodec.decode_long)


class ListCNFloatCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, FLOAT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_float
)


class ListCNDoubleCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, DOUBLE_SIZE_IN_BYTES, FixSizedTypesCodec.decode_double
)


class ListCNLocalDateCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, _LOCAL_DATE_SIZE_IN_BYTES, ListCNLocalDateCodec._decode_item
)

@staticmethod
def _decode_item(buf, offset):
return FixSizedTypesCodec.decode_local_date(buf, offset).isoformat()


class ListCNLocalTimeCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, _LOCAL_TIME_SIZE_IN_BYTES, ListCNLocalTimeCodec._decode_item
)

@staticmethod
def _decode_item(buf, offset):
return FixSizedTypesCodec.decode_local_time(buf, offset).isoformat()


class ListCNLocalDateTimeCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, _LOCAL_DATE_TIME_SIZE_IN_BYTES, ListCNLocalDateTimeCodec._decode_item
)

@staticmethod
def _decode_item(buf, offset):
return FixSizedTypesCodec.decode_local_date_time(buf, offset).isoformat()


class ListCNOffsetDateTimeCodec(object):
@staticmethod
def decode(msg):
return ListCNFixedSizeCodec.decode(
msg, _OFFSET_DATE_TIME_SIZE_IN_BYTES, ListCNOffsetDateTimeCodec._decode_item
)

@staticmethod
def _decode_item(buf, offset):
return FixSizedTypesCodec.decode_offset_date_time(buf, offset).isoformat()


class BigDecimalCodec(object):
@staticmethod
def decode(msg):
buf = msg.next_frame().buf
size = FixSizedTypesCodec.decode_int(buf, 0)
unscaled_value = int_from_bytes(buf[INT_SIZE_IN_BYTES : INT_SIZE_IN_BYTES + size])
scale = FixSizedTypesCodec.decode_int(buf, INT_SIZE_IN_BYTES + size)
sign = 0 if unscaled_value >= 0 else 1
return str(
Decimal((sign, tuple(int(digit) for digit in str(abs(unscaled_value))), -1 * scale))
)


class SqlPageCodec(object):
@staticmethod
def decode(msg):
from hazelcast.sql import SqlColumnType, _SqlPage

# begin frame
msg.next_frame()

# read the "last" flag
is_last = LE_INT8.unpack_from(msg.next_frame().buf, 0)[0] == 1

# read column types
column_type_ids = ListIntegerCodec.decode(msg)
column_count = len(column_type_ids)

# read columns
columns = [None] * column_count

for i in range(column_count):
column_type_id = column_type_ids[i]

if column_type_id == SqlColumnType.VARCHAR:
columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, StringCodec.decode)
elif column_type_id == SqlColumnType.BOOLEAN:
columns[i] = ListCNBooleanCodec.decode(msg)
elif column_type_id == SqlColumnType.TINYINT:
columns[i] = ListCNByteCodec.decode(msg)
elif column_type_id == SqlColumnType.SMALLINT:
columns[i] = ListCNShortCodec.decode(msg)
elif column_type_id == SqlColumnType.INTEGER:
columns[i] = ListCNIntegerCodec.decode(msg)
elif column_type_id == SqlColumnType.BIGINT:
columns[i] = ListCNLongCodec.decode(msg)
elif column_type_id == SqlColumnType.REAL:
columns[i] = ListCNFloatCodec.decode(msg)
elif column_type_id == SqlColumnType.DOUBLE:
columns[i] = ListCNDoubleCodec.decode(msg)
elif column_type_id == SqlColumnType.DATE:
columns[i] = ListCNLocalDateCodec.decode(msg)
elif column_type_id == SqlColumnType.TIME:
columns[i] = ListCNLocalTimeCodec.decode(msg)
elif column_type_id == SqlColumnType.TIMESTAMP:
columns[i] = ListCNLocalDateTimeCodec.decode(msg)
elif column_type_id == SqlColumnType.TIMESTAMP_WITH_TIME_ZONE:
columns[i] = ListCNOffsetDateTimeCodec.decode(msg)
elif column_type_id == SqlColumnType.DECIMAL:
columns[i] = ListMultiFrameCodec.decode_contains_nullable(
msg, BigDecimalCodec.decode
)
elif column_type_id == SqlColumnType.NULL:
frame = msg.next_frame()
size = FixSizedTypesCodec.decode_int(frame.buf, 0)
column = [None for _ in range(size)]
columns[i] = column
elif column_type_id == SqlColumnType.OBJECT:
columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)
else:
raise ValueError("Unknown type %s" % column_type_id)

CodecUtil.fast_forward_to_end_frame(msg)

return _SqlPage(column_type_ids, columns, is_last)
Loading