Skip to content

Commit

Permalink
Add type annotations; use Value type in RD annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Marcela Melara <marcela.melara@intel.com>
  • Loading branch information
marcelamelara committed Jun 30, 2023
1 parent 0dc6e30 commit 2a8a992
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
21 changes: 15 additions & 6 deletions python/in_toto_attestation/v1/resource_descriptor.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
# Wrapper class for in-toto attestation ResourceDescriptor protos.

import in_toto_attestation.v1.resource_descriptor_pb2 as rdpb
from google.protobuf.struct_pb2 import Value
import google.protobuf.json_format as pb_json
import json

class ResourceDescriptor:
def __init__(self, name='', uri='', digest={}, content=bytes(), download_location='', media_type='', annotations={}):
def __init__(self, name='', uri='', digest=None, content=bytes(), download_location='', media_type='', annotations=None) -> None:
self.pb = rdpb.ResourceDescriptor()
self.pb.name = name
self.pb.uri = uri
self.pb.digest.update(digest)
if digest:
self.pb.digest.update(digest)
self.pb.content = content
self.pb.downloadLocation = download_location
self.pb.mediaType = media_type

# HACK
for k,v in annotations.items():
self.pb.annotations[k].update(v)
if annotations:
# Value type fields are considered Messages in Python.
# this is needed to set Value fields into the annotations map
# without having to know its value type a priori.
for k, v in annotations.items():
val_pb = pb_json.Parse(json.dumps(v), Value())
self.pb.annotations[k].CopyFrom(val_pb)

@staticmethod
def copy_from_pb(proto):
def copy_from_pb(proto: type[rdpb.ResourceDescriptor]) -> 'ResourceDescriptor':
rd = ResourceDescriptor()
rd.pb.CopyFrom(proto)
return rd

def validate(self):
def validate(self) -> None:
# at least one of name, URI or digest are required
if self.pb.name == '' and self.pb.uri == '' and len(self.pb.digest) == 0:
raise ValueError("At least one of name, URI, or digest need to be set")
8 changes: 4 additions & 4 deletions python/in_toto_attestation/v1/statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
STATEMENT_TYPE_URI = 'https://in-toto.io/Statement/v1'

class Statement:
def __init__(self, subjects=[], predicate_type='', predicate={}):
def __init__(self, subjects: list, predicate_type: str, predicate: dict) -> None:
self.pb = spb.Statement()
self.pb.type = STATEMENT_TYPE_URI
self.pb.subject.extend(subjects)
self.pb.predicateType = predicate_type
self.pb.predicate.update(predicate)

@staticmethod
def copy_from_pb(proto):
stmt = Statement()
def copy_from_pb(proto: type[spb.Statement]) -> 'Statement':
stmt = Statement([], '', {})
stmt.pb.CopyFrom(proto)
return stmt

def validate(self):
def validate(self) -> None:
if self.pb.type != STATEMENT_TYPE_URI:
raise ValueError('Wrong statement type')

Expand Down
4 changes: 2 additions & 2 deletions tests/python/test_resource_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from in_toto_attestation.v1.resource_descriptor import ResourceDescriptor

def create_test_desc():
desc = ResourceDescriptor('theName', 'https://example.com', {'alg':'abc123'}, b'bytescontent', 'https://example.com/test.zip', 'theMediaType', {'a1':{'keyStr': 'val1', 'keyNum': 13}, 'a2': {'keyObj': {'subKey': 'subVal'}}})
desc = ResourceDescriptor('theName', 'https://example.com', {'alg':'abc123'}, b'bytescontent', 'https://example.com/test.zip', 'theMediaType', {'keyStr': 'val1', 'keyNum': 13, 'keyObj': {'subKey': 'subVal'}})

return desc

Expand All @@ -19,7 +19,7 @@ def test_create_resource_descriptor(self):
test_rd.validate()

def test_json_parse_resource_descriptor(self):
full_rd = '{"name":"theName","uri":"https://example.com","digest":{"alg":"abc123"},"content":"Ynl0ZXNjb250ZW50","downloadLocation":"https://example.com/test.zip","mediaType":"theMediaType","annotations":{"a1":{"keyNum": 13,"keyStr":"val1"},"a2":{"keyObj":{"subKey":"subVal"}}}}'
full_rd = '{"name":"theName","uri":"https://example.com","digest":{"alg":"abc123"},"content":"Ynl0ZXNjb250ZW50","downloadLocation":"https://example.com/test.zip","mediaType":"theMediaType","annotations":{"keyNum": 13,"keyStr":"val1","keyObj":{"subKey":"subVal"}}}'
got_pb = pb_json.Parse(full_rd, rdpb.ResourceDescriptor())
got = got_pb.SerializeToString(deterministic=True)

Expand Down

0 comments on commit 2a8a992

Please sign in to comment.