Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions pycardano/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
__all__ = [
"default_encoder",
"IndefiniteList",
"IndefiniteFrozenList",
"Primitive",
"CBORBase",
"CBORSerializable",
Expand Down Expand Up @@ -199,7 +200,9 @@ def decode_array(self, subtype: int) -> Sequence[Any]:
length = self._decode_length(subtype, allow_indefinite=True)

if length is None:
return IndefiniteList(cast(Primitive, self.decode_array(subtype=subtype)))
ret = IndefiniteFrozenList(list(self.decode_array(subtype=subtype)))
ret.freeze()
return ret
else:
return self.decode_array(subtype=subtype)

Expand Down Expand Up @@ -323,20 +326,27 @@ def _dfs(value, freeze=False):
return _set
elif isinstance(value, tuple):
return tuple(_dfs(v, freeze) for v in value)
elif isinstance(value, list):
elif isinstance(
value, (IndefiniteFrozenList, FrozenList, IndefiniteList, list)
):
_list = [_dfs(v, freeze) for v in value]
if freeze:
fl = FrozenList(_list)
fl.freeze()
return fl
return _list
elif isinstance(value, IndefiniteList):
_list = [_dfs(v, freeze) for v in value]
if freeze:
fl = IndefiniteFrozenList(_list)
fl.freeze()
return fl
return IndefiniteList(_list)

already_frozen = isinstance(value, (IndefiniteFrozenList, FrozenList))
should_freeze = already_frozen or freeze

if not should_freeze:
return (
IndefiniteList(_list)
if isinstance(value, IndefiniteList)
else _list
)

is_indefinite = isinstance(
value, (IndefiniteFrozenList, IndefiniteList)
)
fl = IndefiniteFrozenList(_list) if is_indefinite else FrozenList(_list)
fl.freeze()
return fl
elif isinstance(value, CBORTag):
return CBORTag(value.tag, _dfs(value.value, freeze))
else:
Expand Down Expand Up @@ -1186,13 +1196,21 @@ def __eq__(self, other: object) -> bool:
def __repr__(self) -> str:
return f"{self.__class__.__name__}({list(self)})"

def to_shallow_primitive(self) -> Union[CBORTag, Union[List[T], IndefiniteList]]:
def to_shallow_primitive(
self,
) -> Union[CBORTag, List[T], IndefiniteList, FrozenList, IndefiniteFrozenList]:
fields: Union[IndefiniteFrozenList, FrozenList]
if self._is_indefinite_list:
fields = IndefiniteFrozenList(list(self))
else:
fields = FrozenList(self)
fields.freeze()
if self._use_tag:
return CBORTag(
258,
IndefiniteList(list(self)) if self._is_indefinite_list else list(self),
fields,
)
return IndefiniteList(list(self)) if self._is_indefinite_list else list(self)
return fields

@classmethod
def from_primitive(
Expand Down Expand Up @@ -1223,6 +1241,9 @@ def from_primitive(
def __deepcopy__(self, memo):
return self.__class__(deepcopy(list(self), memo), use_tag=self._use_tag)

def __hash__(self):
return hash(self.to_shallow_primitive())


class NonEmptyOrderedSet(OrderedSet[T]):
def __init__(
Expand Down
78 changes: 74 additions & 4 deletions test/pycardano/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import cbor2
import pytest
from cbor2 import CBORTag
from frozenlist import FrozenList

from pycardano import (
CBORBase,
Datum,
IndefiniteFrozenList,
MultiAsset,
Primitive,
RawPlutusData,
Expand Down Expand Up @@ -618,8 +620,8 @@ def test_ordered_set():
# Test serialization without tag
s = OrderedSet([1, 2, 3], use_tag=False)
primitive = s.to_primitive()
assert isinstance(primitive, list)
assert primitive == [1, 2, 3]
assert isinstance(primitive, (list, FrozenList))
assert list(primitive) == [1, 2, 3]

# Test serialization with tag
s = OrderedSet([1, 2, 3], use_tag=True)
Expand Down Expand Up @@ -695,8 +697,10 @@ def test_non_empty_ordered_set():
# Test serialization without tag
s = NonEmptyOrderedSet([1, 2, 3], use_tag=False)
primitive = s.to_primitive()
assert isinstance(primitive, list)
assert primitive == [1, 2, 3]
from frozenlist import FrozenList

assert isinstance(primitive, (list, FrozenList))
assert list(primitive) == [1, 2, 3]

# Test serialization with tag
s = NonEmptyOrderedSet([1, 2, 3], use_tag=True)
Expand Down Expand Up @@ -1050,3 +1054,69 @@ def to_shallow_primitive(self) -> Union[Primitive, CBORSerializable]:

with pytest.raises(IOError):
test1.save(f.name)


def test_ordered_set_as_key_in_dict():
a = NonEmptyOrderedSet([1, 2, 3])

class MyTest(DictCBORSerializable):
KEY_TYPE = NonEmptyOrderedSet
VALUE_TYPE = int

d = MyTest()
d[a] = 1

check_two_way_cbor(d)


def test_indefinite_list_highjacking_does_not_break_cbor2():
ls = IndefiniteFrozenList(["hello"])
ls.freeze()
a = {ls: 1}
encoded = cbor2.dumps(a, default=default_encoder)
decoded = cbor2.loads(encoded)
assert isinstance(list(decoded.keys())[0], IndefiniteList)


def test_definite_list_highjacking_does_not_break_cbor2():
ls = FrozenList(["hello"])
ls.freeze()
a = {ls: 1}
encoded = cbor2.dumps(a, default=default_encoder)
decoded = cbor2.loads(encoded)
assert isinstance(list(decoded.keys())[0], (list, tuple))


def test_indefinite_list_highjacking_does_not_break_cbor2_datum():
ls = IndefiniteFrozenList(["hello"])
ls.freeze()
datum = CBORTag(251, ls)
a = {datum: 1}
encoded = cbor2.dumps(a, default=default_encoder)
decoded = cbor2.loads(encoded)
assert isinstance(list(decoded.keys())[0], CBORTag)
assert isinstance(list(decoded.keys())[0].value, IndefiniteList)


def test_definite_list_highjacking_does_not_break_cbor2_datum():
ls = FrozenList(["hello"])
ls.freeze()
datum = CBORTag(251, ls)
a = {datum: 1}
encoded = cbor2.dumps(a, default=default_encoder)
decoded = cbor2.loads(encoded)
assert isinstance(list(decoded.keys())[0], CBORTag)
assert isinstance(list(decoded.keys())[0].value, (list, tuple))


def test_ordered_set_as_key_in_dict_indefinite_list():
a = NonEmptyOrderedSet(IndefiniteList([1, 2, 3]))

class MyTest(DictCBORSerializable):
KEY_TYPE = NonEmptyOrderedSet
VALUE_TYPE = int

d = MyTest()
d[a] = 1

check_two_way_cbor(d)
46 changes: 28 additions & 18 deletions test/pycardano/test_txbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pytest
from cbor2 import CBORTag
from frozenlist import FrozenList

from pycardano import (
AssetName,
Expand Down Expand Up @@ -69,6 +70,13 @@
from pycardano.witness import TransactionWitnessSet, VerificationKeyWitness


def frozen_list(items):
"""Helper function to create a frozen list from items."""
fl = FrozenList(items)
fl.freeze()
return fl


def test_tx_builder(chain_context):
tx_builder = TransactionBuilder(chain_context, [RandomImproveMultiAsset([0, 0])])
sender = "addr_test1vrm9x2zsux7va6w892g38tvchnzahvcd9tykqf3ygnmwtaqyfg52x"
Expand All @@ -82,7 +90,7 @@ def test_tx_builder(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])),
1: [
# First output
[sender_address.to_primitive(), 500000],
Expand Down Expand Up @@ -126,7 +134,7 @@ def test_tx_builder_with_certain_input(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"2" * 32, 1]]),
0: CBORTag(258, frozen_list([[b"2" * 32, 1]])),
1: [
# First output
[sender_address.to_primitive(), 500000],
Expand Down Expand Up @@ -296,7 +304,7 @@ def test_tx_builder_with_potential_inputs(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expect = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
# First output
[sender_address.to_primitive(), 2500000],
Expand Down Expand Up @@ -397,10 +405,12 @@ def test_tx_builder_mint_multi_asset(chain_context):
expected = {
0: CBORTag(
258,
[
[b"11111111111111111111111111111111", 0],
[b"22222222222222222222222222222222", 1],
],
frozen_list(
[
[b"11111111111111111111111111111111", 0],
[b"22222222222222222222222222222222", 1],
]
),
),
1: [
# First output
Expand All @@ -420,7 +430,7 @@ def test_tx_builder_mint_multi_asset(chain_context):
3: 123456789,
8: 1000,
9: mint,
14: CBORTag(258, [sender_address.payment_part.to_primitive()]),
14: CBORTag(258, frozen_list([sender_address.payment_part.to_primitive()])),
}

assert expected == tx_body.to_primitive()
Expand Down Expand Up @@ -1310,7 +1320,7 @@ def test_excluded_input(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"22222222222222222222222222222222", 1]]),
0: CBORTag(258, frozen_list([[b"22222222222222222222222222222222", 1]])),
1: [
# First output
[sender_address.to_primitive(), 500000],
Expand Down Expand Up @@ -1448,7 +1458,7 @@ def test_tx_builder_exact_fee_no_change(chain_context):
tx = tx_builder.build_and_sign([SK])

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 9835951],
],
Expand Down Expand Up @@ -1484,7 +1494,7 @@ def test_tx_builder_certificates(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])),
1: [
# First output
[sender_address.to_primitive(), 500000],
Expand Down Expand Up @@ -1603,7 +1613,7 @@ def test_tx_builder_stake_pool_registration(chain_context, pool_params):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"22222222222222222222222222222222", 2]]),
0: CBORTag(258, frozen_list([[b"22222222222222222222222222222222", 2]])),
1: [
[
b"`\xf6S(P\xe1\xbc\xce\xe9\xc7*\x91\x13\xad\x98\xbc\xc5\xdb\xb3\r*\xc9`&$D\xf6\xe5\xf4",
Expand Down Expand Up @@ -1659,7 +1669,7 @@ def test_tx_builder_withdrawal(chain_context):
tx_body = tx_builder.build(change_address=sender_address)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])),
1: [
# First output
[sender_address.to_primitive(), 500000],
Expand Down Expand Up @@ -1694,7 +1704,7 @@ def test_tx_builder_no_output(chain_context):
)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 9835951],
],
Expand Down Expand Up @@ -1724,7 +1734,7 @@ def test_tx_builder_merge_change_to_output(chain_context):
)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 9835951],
],
Expand Down Expand Up @@ -1758,7 +1768,7 @@ def test_tx_builder_merge_change_to_output_2(chain_context):
)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 10000],
[receiver_address.to_primitive(), 10000],
Expand Down Expand Up @@ -1790,7 +1800,7 @@ def test_tx_builder_merge_change_to_zero_amount_output(chain_context):
)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 9835951],
],
Expand Down Expand Up @@ -1820,7 +1830,7 @@ def test_tx_builder_merge_change_smaller_than_min_utxo(chain_context):
)

expected = {
0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]),
0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])),
1: [
[sender_address.to_primitive(), 9835951],
],
Expand Down