Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Paddle-Signature verification can now handle multiple 'h1' values and…
Browse files Browse the repository at this point in the history
… multiple Secrets
  • Loading branch information
Invincibear committed Jan 27, 2024
1 parent ad9f328 commit 202d2d9
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 30 deletions.
121 changes: 104 additions & 17 deletions paddle_billing_python_sdk/Notification/PaddleSignature.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from src import log
from paddle_billing_python_sdk.Notification.Secret import Secret
from hmac import HMAC, compare_digest, new as hmac_new
from hashlib import sha256
from hashlib import sha256
from hmac import HMAC, compare_digest, new as hmac_new

from paddle_billing_python_sdk import log
from paddle_billing_python_sdk.Notification.Secret import Secret


class PaddleSignature:
Expand All @@ -11,10 +12,30 @@ def __init__(self):
"""


@property
def HASH_ALGORITHM_1(self): # noqa N802
return 'h1'

@property
def HEADER(self): # noqa N802
return 'Paddle-Signature'

@property
def TIMESTAMP(self): # noqa N802
return 'ts'


# @staticmethod
# def parse(signature_header: str) -> tuple:
# """
# Parse the Paddle-Signature header to extract the timestamp and signature
#
# @param signature_header: The Paddle-Signature key=value from the webhook event's headers
# @return: A tuple containing (timestamp, signature)
# """
# timestamp, signature = signature_header.split(";")
# return timestamp.lstrip(f"ts="), signature.lstrip(f"h1=")


@staticmethod
def parse(signature_header: str) -> tuple:
Expand All @@ -24,8 +45,26 @@ def parse(signature_header: str) -> tuple:
@param signature_header: The Paddle-Signature key=value from the webhook event's headers
@return: A tuple containing (timestamp, signature)
"""
timestamp, signature = signature_header.split(";")
return timestamp.lstrip(f"ts="), signature.lstrip(f"h1=")
components = {
PaddleSignature().TIMESTAMP: 0,
'hashes': {PaddleSignature().HASH_ALGORITHM_1: []},
}
# print(f"components: {components}")

for key_value_pair in signature_header.split(";"):
# print(f"key_value_pair: {key_value_pair}")
if '=' in key_value_pair:
key, value = key_value_pair.split('=')

if key == PaddleSignature().TIMESTAMP:
components[PaddleSignature().TIMESTAMP] = value
elif key == PaddleSignature().HASH_ALGORITHM_1:
components['hashes'][PaddleSignature().HASH_ALGORITHM_1].append(value)
else:
raise ValueError(f"Unrecognized Paddle-Signature key")

print(f"components: {components}")
return components[PaddleSignature().TIMESTAMP], components['hashes']


@staticmethod
Expand All @@ -34,24 +73,72 @@ def calculate_hmac(secret_key: str, data: bytes) -> HMAC:


@staticmethod
def verify(signature_header: str, raw_body: str, secret_key: Secret) -> bool:
def __do_comparison(generated_signature: str, signature: str) -> bool:
log.debug(f"Comparing received Paddle signature '{signature}' to our calculated signature: '{generated_signature}'")

return compare_digest(generated_signature, signature)


@staticmethod
def __do_verify(timestamp: str, signatures: list[str], raw_body: str, secret_key: Secret) -> bool:
"""
Verifies an individual secret key against a Paddle-Signature header.
Called by PaddleSignature.verify()
@param signatures: The Paddle-Signature header
@param raw_body: Raw body of the webhook request
@param secret_key: A Paddle secret key: https://developer.paddle.com/webhooks/signature-verification#get-secret-key
@return: True on verification success, False on verification failure
"""
new_body_to_verify = f"{timestamp}:{raw_body}".encode('utf-8')
generated_signature = PaddleSignature.calculate_hmac(secret_key.secret_key, new_body_to_verify).hexdigest()
integrity = False

for signature in signatures:
integrity_result = PaddleSignature.__do_comparison(generated_signature, signature)
if integrity_result is True:
integrity = True
break

log.info(f"Paddle signature integrity {'passed' if integrity else 'failed'}")
return integrity


@staticmethod
def verify(signature_header: str, raw_body: str, secrets: list[Secret] | Secret) -> bool:
"""
https://developer.paddle.com/webhooks/signature-verification
Performs an integrity check on a Paddle webhook's signature
Performs an integrity check on a Paddle webhook's signature against one or more Secrets
Handling multiple Secrets is needed because of key rotation situations
@param signature_header: The Paddle-Signature header
@param raw_body: Raw body of the webhook request
@param secret_key: Your Paddle secret key: https://developer.paddle.com/webhooks/signature-verification#get-secret-key
@return: True on verification success, False on verification failure
@param secrets: One or more Paddle secret key(s): https://developer.paddle.com/webhooks/signature-verification#get-secret-key
@return: True if any secret key passes verification success, False if all secret keys fail verification
"""
log.info(f"Verifying Paddle signature integrity")
is_list = type(secrets) is list
key_count = 'multiple secret keys' if is_list else 'one secret key'
log.info(f"Verifying Paddle signature integrity against {key_count}")

timestamp, signature = PaddleSignature.parse(signature_header)
new_body_to_verify = f"{timestamp}:{raw_body}".encode('utf-8')
generated_signature = PaddleSignature.calculate_hmac(secret_key.secret_key, new_body_to_verify).hexdigest()

integrity_result = compare_digest(generated_signature, signature)
log.debug(f"Comparing received Paddle signature '{signature}' to our calculated signature: '{generated_signature}'")
log.info(f"Paddle signature integrity {'passed' if integrity_result else 'failed'}")
if not is_list:
return PaddleSignature.__do_verify(
timestamp = timestamp,
signatures = signature[PaddleSignature().HASH_ALGORITHM_1],
raw_body = raw_body,
secret_key = secrets,
)

for secret in secrets:
verification_result = PaddleSignature.__do_verify(
timestamp = timestamp,
signatures = signature[PaddleSignature().HASH_ALGORITHM_1],
raw_body = raw_body,
secret_key = secret,
)
if verification_result is True:
return True

return integrity_result
# If we got this far then none of the provided secrets passed verification
return False
25 changes: 13 additions & 12 deletions paddle_billing_python_sdk/Notification/Verifier.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from time import time
from src import log
from paddle_billing_python_sdk.Notification import PaddleSignature, Secret
import requests

from time import time

from paddle_billing_python_sdk import log
from paddle_billing_python_sdk.Notification import PaddleSignature, Secret


class Verifier:
def __init__(self, maximum_variance: int = 5):
Expand All @@ -15,25 +17,24 @@ def maximum_variance(self) -> int:
return self.__maximum_variance


def verify(self, request: requests, secret_key: Secret):
def verify(self, request: requests, secrets: list[Secret] | Secret):
"""
:param request:
:param secret_key:
:return: True on verification success, False on verification failure
@param request: The request object to verify
@param secrets: One or more Secrets to use for verifying the request
@return: True on verification success, False on verification failure
"""
log.info(f"Attempting to verify the authenticity of a request")

signature_header = request.headers.get(PaddleSignature.HEADER, None)
signature_header = request.headers.get(PaddleSignature().HEADER, None)
if not signature_header:
log.critical(f"Unable to extract the '{PaddleSignature.HEADER}' header from the request")
log.critical(f"Unable to extract the '{PaddleSignature().HEADER}' header from the request")
return False

timestamp, signature = PaddleSignature.parse(signature_header)
if self.maximum_variance > 0 and time() > int(timestamp + self.maximum_variance):
log.critical(f"Too much time has elapsed between the request and this process")
return False

raw_body = request.body.decode('utf-8')
raw_body = request.body.decode('utf-8') if hasattr(request, 'body') else request.content.decode('utf-8')

return PaddleSignature.verify(signature_header, raw_body, secret_key)
return PaddleSignature.verify(signature_header, raw_body, secrets)
2 changes: 1 addition & 1 deletion paddle_billing_python_sdk/__VERSION__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VERSION__ = '0.0.1a55'
__VERSION__ = '0.0.1a56'

0 comments on commit 202d2d9

Please sign in to comment.