Skip to content

Commit

Permalink
Update to qPython 2.0 and add support for utf-8
Browse files Browse the repository at this point in the history
  • Loading branch information
atf1206 committed Jul 13, 2020
1 parent 82eb623 commit ace42e9
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 78 deletions.
2 changes: 1 addition & 1 deletion QCon.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, host, port, username, password, name=None, hdb=False):
self.port = port
self.username = username
self.password = password
self.q = qconnection.QConnection(host = self.host, port = self.getPort(), username = self.username, password = self.password)
self.q = qconnection.QConnection(host = self.host, port = self.getPort(), username = self.username, password = self.password, encoding = 'UTF-8')

@classmethod
def fromH(cls, h):
Expand Down
4 changes: 2 additions & 2 deletions qpython/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
__all__ = ['qconnection', 'qtype', 'qtemporal', 'qcollection']


__version__ = '1.1.0'
__version__ = '2.0.0'



Expand Down Expand Up @@ -66,4 +66,4 @@ def union_dict(self, **kw):
numpy_temporals = False,
pandas = False,
single_char_strings = False
)
)
49 changes: 28 additions & 21 deletions qpython/_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from collections import OrderedDict

from import MetaData
from . import MetaData
from .qreader import QReader, QReaderException
from .qcollection import QDictionary, qlist
from .qwriter import QWriter, QWriterException
Expand All @@ -32,7 +32,8 @@

class PandasQReader(QReader):

parse = Mapper(QReader._reader_map)
_reader_map = dict.copy(QReader._reader_map)
parse = Mapper(_reader_map)

@parse(QDICTIONARY)
def _read_dictionary(self, qtype = QDICTIONARY):
Expand All @@ -57,8 +58,8 @@ def _read_dictionary(self, qtype = QDICTIONARY):

return table
else:
keys = keys if not isinstance(keys, pandas.Series) else keys.as_matrix()
values = values if not isinstance(values, pandas.Series) else values.as_matrix()
keys = keys if not isinstance(keys, pandas.Series) else keys.values
values = values if not isinstance(values, pandas.Series) else values.values
return QDictionary(keys, values)
else:
return QReader._read_dictionary(self, qtype = qtype)
Expand All @@ -71,7 +72,8 @@ def _read_table(self, qtype = QTABLE):
self._buffer.skip() # ignore dict type stamp

columns = self._read_object()
data = self._read_object()
self._buffer.skip() # ignore generic list type indicator
data = QReader._read_general_list(self, qtype)

odict = OrderedDict()
meta = MetaData(qtype = QTABLE)
Expand Down Expand Up @@ -106,34 +108,36 @@ def _read_list(self, qtype):
if self._options.pandas:
self._options.numpy_temporals = True

list = QReader._read_list(self, qtype = qtype)
qlist = QReader._read_list(self, qtype = qtype)

if self._options.pandas:
if -abs(qtype) not in [QMONTH, QDATE, QDATETIME, QMINUTE, QSECOND, QTIME, QTIMESTAMP, QTIMESPAN, QSYMBOL]:
null = QNULLMAP[-abs(qtype)][1]
ps = pandas.Series(data = list).replace(null, numpy.NaN)
ps = pandas.Series(data = qlist).replace(null, numpy.NaN)
else:
ps = pandas.Series(data = list)
ps = pandas.Series(data = qlist)

ps.meta = MetaData(qtype = qtype)
return ps
else:
return list
return qlist


@parse(QGENERAL_LIST)
def _read_general_list(self, qtype = QGENERAL_LIST):
list = QReader._read_general_list(self, qtype)
qlist = QReader._read_general_list(self, qtype)
if self._options.pandas:
return [numpy.nan if isinstance(element, basestring) and element == b' ' else element for element in list]
return [numpy.nan if isinstance(element, basestring) and element == b' ' else element for element in qlist]
else:
return list
return qlist



class PandasQWriter(QWriter):

serialize = Mapper(QWriter._writer_map)
_writer_map = dict.copy(QWriter._writer_map)
serialize = Mapper(_writer_map)


@serialize(pandas.Series)
def _write_pandas_series(self, data, qtype = None):
Expand Down Expand Up @@ -164,19 +168,19 @@ def _write_pandas_series(self, data, qtype = None):
raise QWriterException('Unable to serialize pandas series %s' % data)

if qtype == QGENERAL_LIST:
self._write_generic_list(data.as_matrix())
self._write_generic_list(data.values)
elif qtype == QCHAR:
self._write_string(data.replace(numpy.nan, ' ').as_matrix().astype(numpy.string_).tostring())
self._write_string(data.replace(numpy.nan, ' ').values.astype(numpy.string_).tostring())
elif data.dtype.type not in (numpy.datetime64, numpy.timedelta64):
data = data.fillna(QNULLMAP[-abs(qtype)][1])
data = data.as_matrix()
data = data.values

if PY_TYPE[qtype] != data.dtype:
data = data.astype(PY_TYPE[qtype])

self._write_list(data, qtype = qtype)
else:
data = data.as_matrix()
data = data.values
data = data.astype(TEMPORAL_Q_TYPE[qtype])
self._write_list(data, qtype = qtype)

Expand Down Expand Up @@ -207,7 +211,10 @@ def _write_pandas_data_frame(self, data, qtype = None):

@serialize(tuple, list)
def _write_generic_list(self, data):
self._buffer.write(struct.pack('=bxi', QGENERAL_LIST, len(data)))
for element in data:
# assume nan represents a string null
self._write(' ' if type(element) in [float, numpy.float32, numpy.float64] and numpy.isnan(element) else element)
if self._options.pandas:
self._buffer.write(struct.pack('=bxi', QGENERAL_LIST, len(data)))
for element in data:
# assume nan represents a string null
self._write(' ' if type(element) in [float, numpy.float32, numpy.float64] and numpy.isnan(element) else element)
else:
QWriter._write_generic_list(self, data)
53 changes: 39 additions & 14 deletions qpython/qconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class QConnection(object):
- `username` (`string` or `None`) - username for q authentication/authorization
- `password` (`string` or `None`) - password for q authentication/authorization
- `timeout` (`nonnegative float` or `None`) - set a timeout on blocking socket operations
- `encoding` (`string`) - string encoding for data deserialization
- `reader_class` (subclass of `QReader`) - data deserializer
- `writer_class` (subclass of `QWriter`) - data serializer
:Options:
- `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed
data, **Default**: ``False``
Expand All @@ -74,19 +77,37 @@ class QConnection(object):
strings are encoded as q strings instead of chars, **Default**: ``False``
'''

def __init__(self, host, port, username = None, password = None, timeout = None, **options):

def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options):
self.host = host
self.port = port
self.username = username
self.password = password

self._connection = None
self._connection_file = None
self._protocol_version = None

self.timeout = timeout

self._encoding = encoding

self._options = MetaData(**CONVERSION_OPTIONS.union_dict(**options))

try:
from qpython._pandas import PandasQReader, PandasQWriter
self._reader_class = PandasQReader
self._writer_class = PandasQWriter
except ImportError:
self._reader_class = QReader
self._writer_class = QWriter

if reader_class:
self._reader_class = reader_class

if writer_class:
self._writer_class = writer_class


def __enter__(self):
self.open()
Expand Down Expand Up @@ -122,8 +143,8 @@ def open(self):
self._init_socket()
self._initialize()

self._writer = QWriter(self._connection, protocol_version = self._protocol_version)
self._reader = QReader(self._connection.makefile('b'))
self._writer = self._writer_class(self._connection, protocol_version = self._protocol_version, encoding = self._encoding)
self._reader = self._reader_class(self._connection_file, encoding = self._encoding)


def _init_socket(self):
Expand All @@ -132,14 +153,18 @@ def _init_socket(self):
self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._connection.connect((self.host, self.port))
self._connection.settimeout(self.timeout)
self._connection_file = self._connection.makefile('b')
except:
self._connection = None
self._connection_file = None
raise


def close(self):
'''Closes connection with the q service.'''
if self._connection:
self._connection_file.close()
self._connection_file = None
self._connection.close()
self._connection = None

Expand All @@ -160,7 +185,7 @@ def is_connected(self):
def _initialize(self):
'''Performs a IPC protocol handshake.'''
credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '')
credentials = credentials.encode('latin-1')
credentials = credentials.encode(self._encoding)
self._connection.send(credentials + b'\3\0')
response = self._connection.recv(1)

Expand Down Expand Up @@ -220,7 +245,7 @@ def query(self, msg_type, query, *parameters, **options):
self._writer.write([query] + list(parameters), msg_type, **self._options.union_dict(**options))


def sync(self, query, *parameters, **options):
def sendSync(self, query, *parameters, **options):
'''Performs a synchronous query against a q service and returns parsed
data.
Expand All @@ -230,23 +255,23 @@ def sync(self, query, *parameters, **options):
Executes a q expression:
>>> print(q.sync('til 10'))
>>> print(q.sendSync('til 10'))
[0 1 2 3 4 5 6 7 8 9]
Executes an anonymous q function with a single parameter:
>>> print(q.sync('{til x}', 10))
>>> print(q.sendSync('{til x}', 10))
[0 1 2 3 4 5 6 7 8 9]
Executes an anonymous q function with two parameters:
>>> print(q.sync('{y + til x}', 10, 1))
>>> print(q.sendSync('{y + til x}', 10, 1))
[ 1 2 3 4 5 6 7 8 9 10]
>>> print(q.sync('{y + til x}', *[10, 1]))
>>> print(q.sendSync('{y + til x}', *[10, 1]))
[ 1 2 3 4 5 6 7 8 9 10]
The :func:`.sync` is called from the overloaded :func:`.__call__`
The :func:`.sendSync` is called from the overloaded :func:`.__call__`
function. This allows :class:`.QConnection` instance to be called as
a function:
Expand Down Expand Up @@ -284,7 +309,7 @@ def sync(self, query, *parameters, **options):
raise QReaderException('Received message of type: %s where response was expected')


def async(self, query, *parameters, **options):
def sendAsync(self, query, *parameters, **options):
'''Performs an asynchronous query and returns **without** retrieving of
the response.
Expand All @@ -294,11 +319,11 @@ def async(self, query, *parameters, **options):
Calls a anonymous function with a single parameter:
>>> q.async('{til x}', 10)
>>> q.sendAsync('{til x}', 10)
Executes a q expression:
>>> q.async('til 10')
>>> q.sendAsync('til 10')
:Parameters:
- `query` (`string`) - query to be executed
Expand Down Expand Up @@ -357,4 +382,4 @@ def receive(self, data_only = True, **options):


def __call__(self, *parameters, **options):
return self.sync(parameters[0], *parameters[1:], **options)
return self.sendSync(parameters[0], *parameters[1:], **options)
33 changes: 15 additions & 18 deletions qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,28 +99,21 @@ class QReader(object):
:Parameters:
- `stream` (`file object` or `None`) - data input stream
- `encoding` (`string`) - encoding for characters parsing
:Attrbutes:
- `_reader_map` - stores mapping between q types and functions
responsible for parsing into Python objects
'''

_reader_map = {}
parse = Mapper(_reader_map)


def __new__(cls, *args, **kwargs):
if cls is QReader:
# try to load optional pandas binding
try:
from qpython._pandas import PandasQReader
return super(QReader, cls).__new__(PandasQReader)
except ImportError:
return super(QReader, cls).__new__(QReader)
else:
#return super(QReader, cls).__new__(cls)
return super().__new__(cls) #komsit fix


def __init__(self, stream):
def __init__(self, stream, encoding = 'latin-1'):
self._stream = stream
self._buffer = QReader.BytesBuffer()
self._encoding = encoding


def read(self, source = None, **options):
Expand Down Expand Up @@ -207,7 +200,7 @@ def read_data(self, message_size, is_compressed = False, **options):
uncompressed_size = -8 + self._buffer.get_int()
compressed_data = self._read_bytes(message_size - 12) if self._stream else self._buffer.raw(message_size - 12)

raw_data = numpy.fromstring(compressed_data, dtype = numpy.uint8)
raw_data = numpy.frombuffer(compressed_data, dtype = numpy.uint8)
if uncompressed_size <= 0:
raise QReaderException('Error while data decompression.')

Expand All @@ -226,7 +219,7 @@ def read_data(self, message_size, is_compressed = False, **options):
def _read_object(self):
qtype = self._buffer.get_byte()

reader = QReader._reader_map.get(qtype, None)
reader = self._get_reader(qtype)

if reader:
return reader(self, qtype)
Expand All @@ -238,6 +231,10 @@ def _read_object(self):
raise QReaderException('Unable to deserialize q type: %s' % hex(qtype))


def _get_reader(self, qtype):
return self._reader_map.get(qtype, None)


@parse(QERROR)
def _read_error(self, qtype = QERROR):
raise QException(self._read_symbol())
Expand All @@ -257,7 +254,7 @@ def _read_symbol(self, qtype = QSYMBOL):

@parse(QCHAR)
def _read_char(self, qtype = QCHAR):
return chr(self._read_atom(QCHAR)).encode('latin-1')
return chr(self._read_atom(QCHAR)).encode(self._encoding)


@parse(QGUID)
Expand Down Expand Up @@ -299,7 +296,7 @@ def _read_list(self, qtype):
return qlist(data, qtype = qtype, adjust_dtype = False)
elif conversion:
raw = self._buffer.raw(length * ATOM_SIZE[qtype])
data = numpy.fromstring(raw, dtype = conversion)
data = numpy.frombuffer(raw, dtype = conversion)
if not self._is_native:
data.byteswap(True)

Expand Down
4 changes: 2 additions & 2 deletions qpython/qtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@
_QNULL2 = numpy.int16(-2**15)
_QNULL4 = numpy.int32(-2**31)
_QNULL8 = numpy.int64(-2**63)
_QNAN32 = numpy.fromstring(b'\x00\x00\xc0\x7f', dtype=numpy.float32)[0]
_QNAN64 = numpy.fromstring(b'\x00\x00\x00\x00\x00\x00\xf8\x7f', dtype=numpy.float64)[0]
_QNAN32 = numpy.frombuffer(b'\x00\x00\xc0\x7f', dtype=numpy.float32)[0]
_QNAN64 = numpy.frombuffer(b'\x00\x00\x00\x00\x00\x00\xf8\x7f', dtype=numpy.float64)[0]
_QNULL_BOOL = numpy.bool_(False)
_QNULL_SYM = numpy.string_('')
_QNULL_GUID = uuid.UUID('00000000-0000-0000-0000-000000000000')
Expand Down
Loading

0 comments on commit ace42e9

Please sign in to comment.