Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to read in chunks + support for capacity byte #84

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Jenkisfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pipeline {
agent {
docker {
image 'research/python-qpython-env:99fb7f9cbe9720187984dc160e78742876bc9007'
}
}
stages {
stage('Build') {
steps {
echo ">> Building..."
}
}
stage('Test') {
steps {
sh 'tox'
}
post {
always {
junit 'test-reports/results.xml'
}
}
}
stage('Publish') {
steps {
echo ">> Publishing..."
}
}
}
}
12 changes: 8 additions & 4 deletions qpython/_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,24 @@ def _read_table(self, qtype = QTABLE):
if isinstance(data[i], str):
# convert character list (represented as string) to numpy representation
meta[column_name] = QSTRING
odict[column_name] = pandas.Series(list(data[i]), dtype = numpy.str).replace(b' ', numpy.nan)
odict[column_name] = pandas.Series(list(data[i]), dtype = str).replace(b' ', numpy.nan)
elif isinstance(data[i], bytes):
# convert character list (represented as string) to numpy representation
meta[column_name] = QSTRING
odict[column_name] = pandas.Series(list(data[i].decode()), dtype = numpy.str).replace(b' ', numpy.nan)
odict[column_name] = pandas.Series(list(data[i].decode()), dtype = str).replace(b' ', numpy.nan)
elif isinstance(data[i], (list, tuple)):
meta[column_name] = QGENERAL_LIST
tarray = numpy.ndarray(shape = len(data[i]), dtype = numpy.dtype('O'))
for j in range(len(data[i])):
tarray[j] = data[i][j]
odict[column_name] = tarray
else:
meta[column_name] = data[i].meta.qtype
odict[column_name] = data[i]
meta[column_name] = data[i].meta.qtype
if data[i].meta.qtype in {abs(QSYMBOL)}:
odict[column_name] = data[i].map(lambda x: x.decode('UTF-8'))
else:
odict[column_name] = data[i]


df = pandas.DataFrame(odict)
df.meta = meta
Expand Down
4 changes: 2 additions & 2 deletions qpython/fastutils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import numpy
cimport numpy

DTYPE = numpy.int
ctypedef numpy.int_t DTYPE_t
DTYPE = numpy.uint
ctypedef numpy.uint DTYPE_t
DTYPE8 = numpy.int
ctypedef numpy.uint8_t DTYPE8_t

Expand Down
25 changes: 15 additions & 10 deletions qpython/qcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ class QList(numpy.ndarray):
def _meta_init(self, **meta):
'''Initialises the meta-information.'''
self.meta = MetaData(**meta)

#This behaviour is different from ndarray which returns an array companring each element with the input
#This causes issues in pandas when it tries to create an array mask by using ==
# def __eq__(self, other):
# return numpy.array_equal(self, other)

def __eq__(self, other):
return numpy.array_equal(self, other)

def __ne__(self, other):
return not self.__eq__(other)
# def __ne__(self, other):
# return not self.__eq__(other)

def __hash__(self):
return hash((self.dtype, self.meta.qtype, self.tostring()))
Expand Down Expand Up @@ -294,11 +296,12 @@ class QTable(numpy.recarray):
def _meta_init(self, **meta):
self.meta = MetaData(**meta)

def __eq__(self, other):
return numpy.array_equal(self, other)
#Behaviour different from recarray: returns a bool istead of array of same length
# def __eq__(self, other):
# return numpy.array_equal(self, other)

def __ne__(self, other):
return not self.__eq__(other)
# def __ne__(self, other):
# return not self.__eq__(other)

def __array_finalize__(self, obj):
self.meta = MetaData() if obj is None else getattr(obj, 'meta', MetaData())
Expand Down Expand Up @@ -437,7 +440,9 @@ def __str__(self, *args, **kwargs):
return '%s!%s' % (self.keys, self.values)

def __eq__(self, other):
return isinstance(other, QKeyedTable) and numpy.array_equal(self.keys, other.keys) and numpy.array_equal(self.values, other.values)
if not isinstance(other, QKeyedTable):
return False
return (self.keys == other.keys) & (self.values == other.values)

def __ne__(self, other):
return not self.__eq__(other)
Expand Down
7 changes: 5 additions & 2 deletions qpython/qconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class QConnection(object):
- `encoding` (`string`) - string encoding for data deserialization
- `reader_class` (subclass of `QReader`) - data deserializer
- `writer_class` (subclass of `QWriter`) - data serializer
- `capacity_byte` (byte) - handshake capacity byte. Use it only if you
want to override the default. See https://code.kx.com/v2/basics/ipc/#handshake
:Options:
- `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed
data, **Default**: ``False``
Expand All @@ -78,11 +80,12 @@ class QConnection(object):
'''


def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options):
def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, capacity_byte = b'\3', **options):
self.host = host
self.port = port
self.username = username
self.password = password
self.capacity_byte = capacity_byte

self._connection = None
self._connection_file = None
Expand Down Expand Up @@ -186,7 +189,7 @@ def _initialize(self):
'''Performs a IPC protocol handshake.'''
credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '')
credentials = credentials.encode(self._encoding)
self._connection.send(credentials + b'\3\0')
self._connection.send(credentials + self.capacity_byte + b'\0')
response = self._connection.recv(1)

if len(response) != 1:
Expand Down
55 changes: 42 additions & 13 deletions qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

import struct
import sys
if sys.version > '3':
from sys import intern
unicode = str

from sys import intern
unicode = str

from qpython import MetaData, CONVERSION_OPTIONS
from qpython.qtype import * # @UnusedWildImport
Expand All @@ -30,7 +30,7 @@
except:
from qpython.utils import uncompress


CHUNK_SIZE = 4096

class QReaderException(Exception):
'''
Expand Down Expand Up @@ -167,7 +167,7 @@ def read_header(self, source = None):
# skip 1 byte
self._buffer.skip()

message_size = self._buffer.get_int()
message_size = self._buffer.get_size()
return QMessage(None, message_type, message_size, message_compressed)


Expand Down Expand Up @@ -197,14 +197,14 @@ def read_data(self, message_size, is_compressed = False, **options):
if is_compressed:
if self._stream:
self._buffer.wrap(self._read_bytes(4))
uncompressed_size = -8 + self._buffer.get_int()
uncompressed_size = -8 + self._buffer.get_size()
compressed_data = self._read_bytes(message_size - 12) if self._stream else self._buffer.raw(message_size - 12)

raw_data = numpy.frombuffer(compressed_data, dtype = numpy.uint8)
if uncompressed_size <= 0:
raise QReaderException('Error while data decompression.')

raw_data = uncompress(raw_data, numpy.intc(uncompressed_size))
raw_data = uncompress(raw_data, numpy.uintc(uncompressed_size))
raw_data = numpy.ndarray.tostring(raw_data)
self._buffer.wrap(raw_data)
elif self._stream:
Expand Down Expand Up @@ -243,7 +243,7 @@ def _read_error(self, qtype = QERROR):
@parse(QSTRING)
def _read_string(self, qtype = QSTRING):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()
return self._buffer.raw(length) if length > 0 else b''


Expand Down Expand Up @@ -284,7 +284,7 @@ def _read_temporal(self, qtype):

def _read_list(self, qtype):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()
conversion = PY_TYPE.get(-qtype, None)

if qtype == QSYMBOL_LIST:
Expand Down Expand Up @@ -333,7 +333,7 @@ def _read_table(self, qtype = QTABLE):
@parse(QGENERAL_LIST)
def _read_general_list(self, qtype = QGENERAL_LIST):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()

return [self._read_object() for x in range(length)]

Expand Down Expand Up @@ -373,7 +373,7 @@ def _read_adverb_function(self, qtype = QADVERB_FUNC_106):

@parse(QPROJECTION)
def _read_projection(self, qtype = QPROJECTION):
length = self._buffer.get_int()
length = self._buffer.get_size()
parameters = [ self._read_object() for x in range(length) ]
return QProjection(parameters)

Expand All @@ -384,13 +384,33 @@ def _read_bytes(self, length):

if length == 0:
return b''
else:
elif length <= CHUNK_SIZE:
data = self._stream.read(length)
else:
data = self._read_in_chunks(length)

if len(data) == 0:
if not data:
raise QReaderException('Error while reading data')
return data

def _read_in_chunks(self, length):
from io import StringIO ## for Python 3

# for large messages, read from the stream in chunks
remaining = length
buff = StringIO()

while remaining > 0:
chunk = self._stream.read(min(remaining, CHUNK_SIZE))

if chunk:
remaining = remaining - len(chunk)
buff.write(chunk)
else:
break

return buff.getvalue()



class BytesBuffer(object):
Expand Down Expand Up @@ -499,6 +519,15 @@ def get_int(self):
'''
return self.get('i')


def get_size(self):
'''
Gets a single 32-bit unsinged integer from the buffer.

:returns: single unsigned integer
'''
return self.get('I')


def get_symbol(self):
'''
Expand Down
6 changes: 4 additions & 2 deletions qpython/qwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ def _write_string(self, data):
if not self._options.single_char_strings and len(data) == 1:
self._write_atom(ord(data), QCHAR)
else:
self._buffer.write(struct.pack('=bxi', QSTRING, len(data)))
if isinstance(data, str):
self._buffer.write(data.encode(self._encoding))
encoded_data = data.encode(self._encoding)
self._buffer.write(struct.pack('=bxi', QSTRING, len(encoded_data)))
self._buffer.write(encoded_data)
else:
self._buffer.write(struct.pack('=bxi', QSTRING, len(data)))
self._buffer.write(data)


Expand Down
14 changes: 7 additions & 7 deletions qpython/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@


def uncompress(data, uncompressed_size):
_0 = numpy.intc(0)
_1 = numpy.intc(1)
_2 = numpy.intc(2)
_128 = numpy.intc(128)
_255 = numpy.intc(255)
_0 = numpy.uintc(0)
_1 = numpy.uintc(1)
_2 = numpy.uintc(2)
_128 = numpy.uintc(128)
_255 = numpy.uintc(255)

n, r, s, p = _0, _0, _0, _0
i, d = _1, _1
f = _255 & data[_0]

ptrs = numpy.zeros(256, dtype = numpy.intc)
ptrs = numpy.zeros(256, dtype = numpy.uintc)
uncompressed = numpy.zeros(uncompressed_size, dtype = numpy.uint8)
idx = numpy.arange(uncompressed_size, dtype = numpy.intc)
idx = numpy.arange(uncompressed_size, dtype = numpy.uintc)

while s < uncompressed_size:
pp = p + _1
Expand Down
12 changes: 6 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pytest>=2.5.0
numpy>=1.8.0
pandas>=0.14.0
cython>=0.20
twisted>=13.2.0
mock>=1.0.1
pytest>=3.6.0
numpy>=1.20.0
pandas>=1.3.0
cython>=0.28
twisted>=20.3.0
mock>=4.0.0
9 changes: 2 additions & 7 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

from distutils.core import setup
from setuptools import setup
from qpython import __version__

import os
Expand Down Expand Up @@ -71,13 +71,8 @@ def read(fname):
'Operating System :: POSIX',
'Operating System :: Unix',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: Database :: Front-Ends',
'Topic :: Scientific/Engineering',
'Topic :: Software Development',
Expand Down
5 changes: 3 additions & 2 deletions tests/pandas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import binascii
import struct
import os
import sys
try:
from cStringIO import BytesIO
Expand All @@ -29,7 +30,7 @@
from qpython.qcollection import qlist, QList, QTemporalList, QDictionary
from qpython.qtemporal import QTemporal


TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'test_data')

try:
import pandas
Expand Down Expand Up @@ -251,7 +252,7 @@ def init():
global BINARY
BINARY = OrderedDict()

with open('tests/QExpressions3.out', 'rb') as f:
with open(os.path.join(TEST_DATA_DIR, 'QExpressions3.out'), 'rb') as f:
while True:
query = f.readline().strip()
binary = f.readline().strip()
Expand Down
Loading