diff --git a/bleak/formatter.py b/bleak/formatter.py index c340b59e..7859c000 100644 --- a/bleak/formatter.py +++ b/bleak/formatter.py @@ -6,6 +6,20 @@ from binascii import hexlify +class FormatIncompleteError(Exception): + def __init__(self, *args): + if args: + self.message = args[0] + else: + self.message = None + + def __str__(self): + if self.message: + return 'FormatIncompleteError, {0} '.format(self.message) + else: + return 'FormatIncompleteError has been raised' + + def encode_FLOAT_ieee11073(value, precision=1, debug=False): """ Binary representation of float value as IEEE-11073:20601 32-bit FLOAT @@ -14,7 +28,7 @@ def encode_FLOAT_ieee11073(value, precision=1, debug=False): - https://community.hiveeyes.org/t/implementing-ble-gatt-ess-characteristics-with-micropython/2413/3 """ - assert abs(value * (10 ** precision)) < 2**23, 'Mantissa too big' + assert abs(value * (10 ** precision)) <= 2**23, 'Mantissa too big' encoded = int(value * (10 ** precision)).to_bytes(3, 'little', signed=True) + struct.pack(' 2**4 - 1 for v in [val, val2]]) is False, 'Nibble value too big, only values (0-15) allowed' + + # shift 4 bits to the left + fullbyte = (val2 << 4) + val + return struct.pack('B', fullbyte) + + +def decode_nibbles(bb): + """This needs 1 byte to decode two nibbles""" + fullbyte, = struct.unpack('B', bb) + # shift 4 bits to the right + val2 = fullbyte >> 4 + # Mask 4 bits on the left + val = fullbyte & 0b1111 + return(val, val2) + + +def encode_2_uint12(val, val2): + """ + Format two values as two unsigned 12 bit integers + 2_uint12 len: 3 bytes + Format string: 'o' + """ + full3bytes = (val << 12) + val2 + return full3bytes.to_bytes(3, 'little', signed=False) + + +def decode_2_uint12(bb): + """ + Decode 3 bytes as two unsigned 12 bit integers + 2_uint12 len: 3 bytes + Format string: 'o' + """ + _bb_int = int.from_bytes(bb, byteorder='little', signed=False) + val = _bb_int >> 12 + val2 = _bb_int & 0b111111111111 # 2**12-1 + return (val, val2) + + +def encode_uint24(val): + """ + Format a value as a unsigned 24 bit integer + uint24 len: 3 bytes + Format string: 'k' + """ + return val.to_bytes(3, 'little', signed=False) + + +def decode_uint24(bb): + """ + Decode 3 bytes as a unsigned 24 bit integer + uint24 len: 3 bytes + Format string: 'k' + """ + return int.from_bytes(bb, byteorder='little', signed=False) + + +def encode_sint24(val): + """ + Format a value as a signed 24 bit integer + uint24 len: 3 bytes + Format string: 'K' + """ + return val.to_bytes(3, 'little', signed=True) + + +def decode_sint24(bb): + """ + Decode 3 bytes as a signed 24 bit integer + uint24 len: 3 bytes + Format string: 'K' + """ + return int.from_bytes(bb, byteorder='little', signed=True) + + +def encode_uint40(val): + """ + Format a value as an unsigned 40 bit integer + uint40 len: 5 bytes + Format string: 'j' + """ + return val.to_bytes(5, 'little', signed=False) + + +def decode_uint40(bb): + """ + Decode 5 bytes as an unsigned 40 bit integer + uint40 len: 5 bytes + Format string: 'j' + """ + return int.from_bytes(bb, byteorder='little') + + +def encode_uint48(val): + """ + Format a value as an unsigned 48 bit integer + uint48 len: 6 bytes + Format string: 'J' + """ + return val.to_bytes(6, 'little', signed=False) + + +def decode_uint48(bb): + """ + Decode 6 bytes as an unsigned 48 bit integer + uint48 len: 6 bytes + Format string: 'J' + """ + return int.from_bytes(bb, byteorder='little', signed=False) + + +def encode_uint128(val): + """ + Format a value as a unsigned 128 bit integer + uint128 len: 16 bytes + Format string: 'z' + """ + return val.to_bytes(16, 'little', signed=False) + + +def decode_uint128(bb): + """ + Decode 16 bytes as a unsigned 128 bit integer + uint128 len: 16 bytes + Format string: 'z' + """ + return int.from_bytes(bb, byteorder='little', signed=False) + + class SuperStruct: def __init__(self): - self._version = 'Struct class ieee11073 compliant' + self._version = 'Struct class Bluetooth SIG compliant' + self.spec_formats = ['F', 'S', 'Y', 'j', 'J', 'k', 'K', 'z', 'o'] self.len_F = 4 # bytes # 8 bit * 4 --> (32 bit) self.len_SF = 2 # bytes # 8 bit * 2 --> (16 bit) - self.spec_formats = ['F', 'S'] + self.len_nibble = 1/2 # bytes 8 bit * 1/2 --> (4 bit) + self.len_uint12 = 1 + (1/2) # bytes 8 bit * 3/2 --> (12 bit) + self.len_uint24 = 3 # bytes # 8 bit * 3 --> (24 bit) + self.len_sint24 = 3 # bytes # 8 bit * 3 --> (24 bit) + self.len_uint40 = 5 # bytes # 8 bit * 5 --> (40 bit) + self.len_uint48 = 6 # bytes # 8 bit * 6 --> (48 bit) + self.len_uint128 = 16 # bytes # 8 bit * 16 --> (128 bit) def __repr__(self): return(self._version) @@ -155,9 +312,21 @@ def unpack(self, fmt, bb): else: return struct.unpack(fmt, bb) + def pack(self, fmt, *args): + assert len(fmt) == len([*args]), 'pack expected {} items for packing (got {})'.format(len(fmt), len([*args])) + if any([f in self.spec_formats for f in fmt]): + # values, index = self._get_all_index_bytes(fmt, *args) + # return tuple(values) + return self._put_all_index_bytes(fmt, *args) + + else: + return struct.pack(fmt, *args) + def _get_all_index_bytes(self, fmt_string, bb): indexes = [] intermediate_fmt_string = "" + intermediate_nibble_fmt_string = "" + intermediate_uint12_fmt_string = "" values = [] index = 0 expected_size = self._get_overall_size(fmt_string) @@ -176,28 +345,174 @@ def _get_all_index_bytes(self, fmt_string, bb): val_F = decode_FLOAT_ieee11073(bb[index:index+self.len_F]) values.append(val_F) index += self.len_F + elif s == 'S': val_S = decode_SFLOAT_ieee11073( bb[index:index+self.len_SF]) values.append(val_S) index += self.len_SF + + elif s == 'Y': + if intermediate_nibble_fmt_string == "": + intermediate_nibble_fmt_string += s + elif intermediate_nibble_fmt_string == 'Y': + val, val2 = decode_nibbles( + bb[index:index+int(self.len_nibble*2)]) + values.append(val) + values.append(val2) + index += int(self.len_nibble*2) + intermediate_nibble_fmt_string = "" + + elif s == 'o': + if intermediate_uint12_fmt_string == "": + intermediate_uint12_fmt_string += s + elif intermediate_uint12_fmt_string == 'o': + val, val2 = decode_2_uint12( + bb[index:index+int(self.len_uint12*2)]) + values.append(val) + values.append(val2) + index += int(self.len_uint12*2) + intermediate_uint12_fmt_string = "" + + elif s == 'k': + val_uint24 = decode_uint24(bb[index:index+self.len_uint24]) + values.append(val_uint24) + index += self.len_uint24 + + elif s == 'K': + val_sint24 = decode_sint24(bb[index:index+self.len_sint24]) + values.append(val_sint24) + index += self.len_sint24 + + elif s == 'j': + val_uint40 = decode_uint40(bb[index:index+self.len_uint40]) + values.append(val_uint40) + index += self.len_uint40 + + elif s == 'J': + val_uint48 = decode_uint48(bb[index:index+self.len_uint48]) + values.append(val_uint48) + index += self.len_uint48 + elif s == 'z': + val_uint128 = decode_uint128( + bb[index:index+self.len_uint128]) + values.append(val_uint128) + index += self.len_uint128 + intermediate_fmt_string = "" else: intermediate_fmt_string += s if intermediate_fmt_string: val = struct.unpack( - intermediate_fmt_string, bb[index:index + struct.calcsize(intermediate_fmt_string)]) + intermediate_fmt_string, bb[index:index+struct.calcsize(intermediate_fmt_string)]) for v in val: values.append(v) return (values, indexes) + def _put_all_index_bytes(self, fmt_string, *args): + expected_size = self.calcsize(fmt_string) + indexes = [] + intermediate_fmt_string = "" + intermediate_values = [] + intermediate_nibble_fmt_string = "" + intermediate_nibble_values = [] + intermediate_uint12_fmt_string = "" + intermediate_uint12_values = [] + values = [*args] + bb = b'' + index = 0 + for s in fmt_string: + if s in self.spec_formats: + if intermediate_fmt_string: + val = struct.pack( + intermediate_fmt_string, *intermediate_values) + bb += val + intermediate_fmt_string = "" + intermediate_values = [] + if s == 'F': + _precision = 0 + if '.' in str(values[index]): + _precision = len(str(values[index]).split('.')[-1]) + val_F = encode_FLOAT_ieee11073(values[index], + precision=_precision) + bb += val_F + + elif s == 'S': + _precision = 0 + if '.' in str(values[index]): + _precision = len(str(values[index]).split('.')[-1]) + val_S = encode_SFLOAT_ieee11073(values[index], + precision=_precision) + bb += val_S + + elif s == 'Y': + if intermediate_nibble_fmt_string == "": + intermediate_nibble_fmt_string += s + intermediate_nibble_values.append(values[index]) + elif intermediate_nibble_fmt_string == 'Y': + intermediate_nibble_values.append(values[index]) + val_nibbles = encode_nibbles(*intermediate_nibble_values) + bb += val_nibbles + intermediate_nibble_fmt_string = "" + intermediate_nibble_values = [] + + elif s == 'o': + if intermediate_uint12_fmt_string == "": + intermediate_uint12_fmt_string += s + intermediate_uint12_values.append(values[index]) + elif intermediate_uint12_fmt_string == 'o': + intermediate_uint12_values.append(values[index]) + val_2_uint12 = encode_2_uint12(*intermediate_uint12_values) + bb += val_2_uint12 + intermediate_uint12_fmt_string = "" + intermediate_uint12_values = [] + + elif s == 'k': + val_uint24 = encode_uint24(values[index]) + bb += val_uint24 + + elif s == 'K': + val_sint24 = encode_sint24(values[index]) + bb += val_sint24 + + elif s == 'j': + val_uint40 = encode_uint40(values[index]) + bb += val_uint40 + + elif s == 'J': + val_uint48 = encode_uint48(values[index]) + bb += val_uint48 + elif s == 'z': + val_uint128 = encode_uint128(values[index]) + bb += val_uint128 + + index += 1 + else: + intermediate_fmt_string += s + intermediate_values.append(values[index]) + index += 1 + + if intermediate_fmt_string: + val = struct.pack( + intermediate_fmt_string, *intermediate_values) + bb += val + + return bb + def _get_overall_size(self, fmt_string): intermediate_fmt_string = "" size_value = 0 + is_nibble_now = False + is_uint12_now = False index = 0 for s in fmt_string: + if is_nibble_now and s != 'Y': + raise FormatIncompleteError("'Y' format (nibble) must come in pairs") + if is_uint12_now and s != 'o': + raise FormatIncompleteError("'o' format (uint12) must come in pairs") + if s in self.spec_formats: if intermediate_fmt_string: size_value += struct.calcsize(intermediate_fmt_string) @@ -206,13 +521,30 @@ def _get_overall_size(self, fmt_string): size_value += self.len_F elif s == 'S': size_value += self.len_SF + elif s == 'Y': + is_nibble_now = not is_nibble_now + size_value += self.len_nibble + elif s == 'o': + is_uint12_now = not is_uint12_now + size_value += self.len_uint12 + elif s == 'k': + size_value += self.len_uint24 + elif s == 'K': + size_value += self.len_sint24 + elif s == 'j': + size_value += self.len_uint40 + elif s == 'J': + size_value += self.len_uint48 + elif s == 'z': + size_value += self.len_uint128 intermediate_fmt_string = "" else: intermediate_fmt_string += s if intermediate_fmt_string: size_value += struct.calcsize(intermediate_fmt_string) - return size_value + + return int(size_value) def calcsize(self, fmt): return self._get_overall_size(fmt) diff --git a/bleak/utils.py b/bleak/utils.py index 985a7a8f..fe3c23d1 100644 --- a/bleak/utils.py +++ b/bleak/utils.py @@ -126,11 +126,11 @@ "uint8": "B", "sint16": "h", "uint16": "H", - "uint24": "I", + "uint24": "k", "sint32": "i", "uint32": "I", - "uint40": "Q", - "uint48": "Q", + "uint40": "j", + "uint48": "J", "utf8s": "utf8", "8bit": "B", "16bit": "h", @@ -140,14 +140,14 @@ "boolean": "?", "32bit": "I", "FLOAT": "F", - "24bit": "I", + "24bit": "k", "SFLOAT": "S", - "sint24": "I", - "nibble": "B", + "sint24": "K", + "nibble": "Y", "2bit": "B", - "uint128": "2Q", - "uint12": "H", - "4bit": "B", + "uint128": "z", + "uint12": "o", + "4bit": "Y", "characteristic": "X", } @@ -461,6 +461,7 @@ def _autobitmask(val, total_size, index, size, keymap): _bitmask = eval( "0b{}".format("0" * (total_size - (index + size)) + (size * "1") + "0" * index) ) + key = (val & _bitmask) >> index key_str = str(key) mapped_val = keymap[key_str] @@ -511,6 +512,7 @@ def _autoformat(char, val, field_to_unpack=None): size=size, keymap=key_map, ) + break return fields else: