Skip to content

Commit

Permalink
tpm: fix tpm get_quote()
Browse files Browse the repository at this point in the history
Signed-off-by: Ruoyu Ying <ruoyu.ying@intel.com>
  • Loading branch information
Ruoyu-y committed Jun 21, 2024
1 parent 301fffb commit e5e58e7
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 5 deletions.
16 changes: 15 additions & 1 deletion src/python/cc_quote_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,28 @@ def main():
help="Output format: raw/human. Default raw.",
type=out_format_validator
)
parser.add_argument(
"--pcr-selection",
type=str,
default="sha256:1,2,10",
help="PCR Selection to generate quote",
dest="pcr_selection"
)
parser.add_argument("--ak-context", type=str, help="Path to ak context", dest="ak_context")
args = parser.parse_args()

nonce = make_nounce()
LOG.info("demo random number in base64: %s", nonce.decode("utf-8"))
userdata = make_userdata()
LOG.info("demo user data in base64: %s", userdata.decode("utf-8"))
extra_args = {}
extra_args["pcr_selection"] = args.pcr_selection
extra_args["ak_context"] = args.ak_context

quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata)
if ConfidentialVM.detect_cc_type() == CCTrustedApi.TYPE_CC_TPM:
quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata, extra_args)
else:
quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata)
if quote is not None:
quote.dump(args.out_format == OUT_FORMAT_RAW)
else:
Expand Down
93 changes: 89 additions & 4 deletions src/python/cctrusted_vm/cvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
from cctrusted_base.tdx.common import TDX_VERSION_1_0, TDX_VERSION_1_5
from cctrusted_base.tdx.rtmr import TdxRTMR
from cctrusted_base.tdx.quote import TdxQuoteReq10, TdxQuoteReq15, TdxQuote, TdxQuoteReq
from cctrusted_base.tpm.pcr import TpmPCR
from cctrusted_base.tpm.quote import Tpm2Quote
from cctrusted_base.tdx.report import TdxReportReq10, TdxReportReq15
from tpm2_pytss import ESAPI
from tpm2_pytss.types import TPML_PCR_SELECTION, TPMS_CONTEXT, TPM2B_DATA


LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,6 +149,10 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
Returns:
The ``CcReport`` object.
"""
# In tpm case, skip get report through configfs-tsm
if self.cc_type == CCTrustedApi.TYPE_CC_TPM:
return None

if not os.path.exists(self.tsm_prefix):
return None

Expand Down Expand Up @@ -248,9 +257,6 @@ def inst():
LOG.error("Fail to initialize the confidential VM.")
return ConfidentialVM._inst

from tpm2_pytss import ESAPI
from cctrusted_base.tpm.pcr import TpmPCR

class TpmVM(ConfidentialVM):

DEFAULT_TPM_DEVICE_NODE="/dev/tpm0"
Expand All @@ -265,6 +271,10 @@ def __init__(self, dev_node=DEFAULT_TPM_DEVICE_NODE):
def default_algo_id(self):
return TcgAlgorithmRegistry.TPM_ALG_SHA256

@property
def version(self):
return "TPM2"

def process_cc_report(self, report_data=None) -> bool:
"""
For TPM, we do not need to get integrited measurement register
Expand All @@ -276,15 +286,90 @@ def process_cc_report(self, report_data=None) -> bool:
return True

def process_eventlog(self) -> bool:
"""
Process TPM event logs from /sys/kernel/security/tpm0/binary_bios_measurements
"""
try:
with open(TpmVM.BIOS_MEAUSREMENT, "rb") as f:
self._boot_time_event_log = f.read()
assert len(self._boot_time_event_log) > 0
except (PermissionError, OSError):
LOG.error("Need root permission to open file %s", TdxVM.BIOS_MEAUSREMENT)
LOG.error("Need root permission to open file %s", TpmVM.BIOS_MEAUSREMENT)
return False
return True

def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcReport:
"""
Fetch TPM quote with user provided attestation key
"""
pcr_selection = None
ak_context_path = None
ak_handle = None
input_data = None

# Get user defined params including attestation key context and PCRSelections
if extraArgs is not None and "pcr_selection" in extraArgs.keys() \
and "ak_context" in extraArgs.keys():
try:
pcr_selection = TPML_PCR_SELECTION.parse(extraArgs["pcr_selection"])
except ValueError:
LOG.error("Invalid PCRSelection provided for quote generation")
return None
ak_context_path = extraArgs["ak_context"]
else:
LOG.error(
"Required params(pcr_selection or ak_context) not provided for quote generation.")
return None

# Prepare user defined data which could include nonce
if nonce is not None:
nonce = base64.b64decode(nonce, validate=True)
if data is not None:
data = base64.b64decode(data, validate=True)

# the algorithm to concatenate nonce and user data will depend on
# algorithm defined in the pcr_selection
algo = extraArgs["pcr_selection"].split(":")[0]
if algo == "sha1":
hash_algo = hashlib.sha1()
elif algo == "sha256":
hash_algo = hashlib.sha256()
elif algo == "sha384":
hash_algo = hashlib.sha384()
elif algo == "sha512":
hash_algo = hashlib.sha512()

LOG.info("Calculate report data by nonce and user data")
if nonce is not None:
hash_algo.update(bytes(nonce))
if data is not None:
hash_algo.update(bytes(data))
input_data = hash_algo.digest()

# Open attestation key context from file
if os.path.exists(ak_context_path):
try:
with open(ak_context_path, 'rb') as ak_file:
ak_context = ak_file.read()
ak_handle = self._esapi.context_load(TPMS_CONTEXT.from_tools(ak_context))
except(PermissionError, OSError):
LOG.error("Lack of permission to open file %s for ak context.",
ak_context_path)
else:
LOG.error("ak_context not found under path %s", ak_context_path)

# Generate quote and signature
quote, signature = self._esapi.quote(ak_handle, pcr_selection, TPM2B_DATA(input_data))

# Flush the loaded context after use
self._esapi.flush_context(ak_handle)

# Save the tpm quote
structured_quote = Tpm2Quote(None, CCTrustedApi.TYPE_CC_TPM)
structured_quote.set_quoted_data(quote)
structured_quote.set_sig(signature)
return structured_quote

class TdxVM(ConfidentialVM):

DEVICE_NODE_PATH = {
Expand Down

0 comments on commit e5e58e7

Please sign in to comment.