Skip to content

Commit

Permalink
device: reorder code in settings
Browse files Browse the repository at this point in the history
  • Loading branch information
pfps committed Feb 25, 2024
1 parent e8dadcd commit 67be689
Showing 1 changed file with 150 additions and 148 deletions.
298 changes: 150 additions & 148 deletions lib/logitech_receiver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,153 +64,6 @@ def bool_or_toggle(current, new):
return None


# moved first for dependency reasons
class Validator:
@classmethod
def build(cls, setting_class, device, **kwargs):
return cls(**kwargs)

@classmethod
def to_string(cls, value):
return str(value)

def compare(self, args, current):
if len(args) != 1:
return False
return args[0] == current


class BooleanValidator(Validator):
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")

kind = KIND.toggle
default_true = 0x01
default_false = 0x00
# mask specifies all the affected bits in the value
default_mask = 0xFF

def __init__(
self,
true_value=default_true,
false_value=default_false,
mask=default_mask,
read_skip_byte_count=0,
write_prefix_bytes=b"",
):
if isinstance(true_value, int):
assert isinstance(false_value, int)
if mask is None:
mask = self.default_mask
else:
assert isinstance(mask, int)
assert true_value & false_value == 0
assert true_value & mask == true_value
assert false_value & mask == false_value
self.needs_current_value = mask != self.default_mask
elif isinstance(true_value, bytes):
if false_value is None or false_value == self.default_false:
false_value = b"\x00" * len(true_value)
else:
assert isinstance(false_value, bytes)
if mask is None or mask == self.default_mask:
mask = b"\xFF" * len(true_value)
else:
assert isinstance(mask, bytes)
assert len(mask) == len(true_value) == len(false_value)
tv = _bytes2int(true_value)
fv = _bytes2int(false_value)
mv = _bytes2int(mask)
assert tv != fv # true and false might be something other than bit values
assert tv & mv == tv
assert fv & mv == fv
self.needs_current_value = any(m != 0xFF for m in mask)
else:
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))

self.true_value = true_value
self.false_value = false_value
self.mask = mask
self.read_skip_byte_count = read_skip_byte_count
self.write_prefix_bytes = write_prefix_bytes

def validate_read(self, reply_bytes):
reply_bytes = reply_bytes[self.read_skip_byte_count :]
if isinstance(self.mask, int):
reply_value = ord(reply_bytes[:1]) & self.mask
if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
if reply_value == self.true_value:
return True
if reply_value == self.false_value:
return False
logger.warning(
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
reply_value,
self.true_value,
self.false_value,
self.mask,
)
return False

count = len(self.mask)
mask = _bytes2int(self.mask)
reply_value = _bytes2int(reply_bytes[:count]) & mask

true_value = _bytes2int(self.true_value)
if reply_value == true_value:
return True

false_value = _bytes2int(self.false_value)
if reply_value == false_value:
return False

logger.warning(
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
)
return False

def prepare_write(self, new_value, current_value=None):
if new_value is None:
new_value = False
else:
assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value

to_write = self.true_value if new_value else self.false_value

if isinstance(self.mask, int):
if current_value is not None and self.needs_current_value:
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
if current_value is not None and to_write == ord(current_value[:1]):
return None
to_write = bytes([to_write])
else:
to_write = bytearray(to_write)
count = len(self.mask)
for i in range(0, count):
b = ord(to_write[i : i + 1])
m = ord(self.mask[i : i + 1])
assert b & m == b
# b &= m
if current_value is not None and self.needs_current_value:
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
to_write[i] = b
to_write = bytes(to_write)

if current_value is not None and to_write == current_value[: len(to_write)]:
return None

if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)

return self.write_prefix_bytes + to_write

def acceptable(self, args, current):
if len(args) != 1:
return None
val = bool_or_toggle(current, args[0])
return [val] if val is not None else None


class Setting:
"""A setting descriptor. Needs to be instantiated for each specific device."""

Expand All @@ -219,7 +72,7 @@ class Setting:
min_version = 0
persist = True
rw_options = {}
validator_class = BooleanValidator
validator_class = None
validator_options = {}

def __init__(self, device, rw, validator):
Expand Down Expand Up @@ -836,6 +689,152 @@ def write(self, device, key, data_bytes):
return reply if not self.no_reply else True


class Validator:
@classmethod
def build(cls, setting_class, device, **kwargs):
return cls(**kwargs)

@classmethod
def to_string(cls, value):
return str(value)

def compare(self, args, current):
if len(args) != 1:
return False
return args[0] == current


class BooleanValidator(Validator):
__slots__ = ("true_value", "false_value", "read_skip_byte_count", "write_prefix_bytes", "mask", "needs_current_value")

kind = KIND.toggle
default_true = 0x01
default_false = 0x00
# mask specifies all the affected bits in the value
default_mask = 0xFF

def __init__(
self,
true_value=default_true,
false_value=default_false,
mask=default_mask,
read_skip_byte_count=0,
write_prefix_bytes=b"",
):
if isinstance(true_value, int):
assert isinstance(false_value, int)
if mask is None:
mask = self.default_mask
else:
assert isinstance(mask, int)
assert true_value & false_value == 0
assert true_value & mask == true_value
assert false_value & mask == false_value
self.needs_current_value = mask != self.default_mask
elif isinstance(true_value, bytes):
if false_value is None or false_value == self.default_false:
false_value = b"\x00" * len(true_value)
else:
assert isinstance(false_value, bytes)
if mask is None or mask == self.default_mask:
mask = b"\xFF" * len(true_value)
else:
assert isinstance(mask, bytes)
assert len(mask) == len(true_value) == len(false_value)
tv = _bytes2int(true_value)
fv = _bytes2int(false_value)
mv = _bytes2int(mask)
assert tv != fv # true and false might be something other than bit values
assert tv & mv == tv
assert fv & mv == fv
self.needs_current_value = any(m != 0xFF for m in mask)
else:
raise Exception("invalid mask '%r', type %s" % (mask, type(mask)))

self.true_value = true_value
self.false_value = false_value
self.mask = mask
self.read_skip_byte_count = read_skip_byte_count
self.write_prefix_bytes = write_prefix_bytes

def validate_read(self, reply_bytes):
reply_bytes = reply_bytes[self.read_skip_byte_count :]
if isinstance(self.mask, int):
reply_value = ord(reply_bytes[:1]) & self.mask
if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: validate read %r => %02X", reply_bytes, reply_value)
if reply_value == self.true_value:
return True
if reply_value == self.false_value:
return False
logger.warning(
"BooleanValidator: reply %02X mismatched %02X/%02X/%02X",
reply_value,
self.true_value,
self.false_value,
self.mask,
)
return False

count = len(self.mask)
mask = _bytes2int(self.mask)
reply_value = _bytes2int(reply_bytes[:count]) & mask

true_value = _bytes2int(self.true_value)
if reply_value == true_value:
return True

false_value = _bytes2int(self.false_value)
if reply_value == false_value:
return False

logger.warning(
"BooleanValidator: reply %r mismatched %r/%r/%r", reply_bytes, self.true_value, self.false_value, self.mask
)
return False

def prepare_write(self, new_value, current_value=None):
if new_value is None:
new_value = False
else:
assert isinstance(new_value, bool), "New value %s for boolean setting is not a boolean" % new_value

to_write = self.true_value if new_value else self.false_value

if isinstance(self.mask, int):
if current_value is not None and self.needs_current_value:
to_write |= ord(current_value[:1]) & (0xFF ^ self.mask)
if current_value is not None and to_write == ord(current_value[:1]):
return None
to_write = bytes([to_write])
else:
to_write = bytearray(to_write)
count = len(self.mask)
for i in range(0, count):
b = ord(to_write[i : i + 1])
m = ord(self.mask[i : i + 1])
assert b & m == b
# b &= m
if current_value is not None and self.needs_current_value:
b |= ord(current_value[i : i + 1]) & (0xFF ^ m)
to_write[i] = b
to_write = bytes(to_write)

if current_value is not None and to_write == current_value[: len(to_write)]:
return None

if logger.isEnabledFor(logging.DEBUG):
logger.debug("BooleanValidator: prepare_write(%s, %s) => %r", new_value, current_value, to_write)

return self.write_prefix_bytes + to_write

def acceptable(self, args, current):
if len(args) != 1:
return None
val = bool_or_toggle(current, args[0])
return [val] if val is not None else None


class BitFieldValidator(Validator):
__slots__ = ("byte_count", "options")

Expand Down Expand Up @@ -1569,3 +1568,6 @@ def apply_all_settings(device):
ignore = sensitives.get(s.name, False)
if ignore != SENSITIVITY_IGNORE:
s.apply()


Setting.validator_class = BooleanValidator

0 comments on commit 67be689

Please sign in to comment.