Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CERT-7012 Add New Listing Check #9

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ProposalTools/Checks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .diff import DiffCheck
from .global_variables import GlobalVariableCheck
from .feed_price import FeedPriceCheck
from .new_listing import NewListingCheck

all = ["DiffCheck", "GlobalVariableCheck", "FeedPriceCheck"]
all = ["DiffCheck", "GlobalVariableCheck", "FeedPriceCheck", "NewListingCheck"]
2 changes: 1 addition & 1 deletion ProposalTools/Checks/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, customer: str, chain: Chain, proposal_address: str, source_co
self.check_folder.mkdir(parents=True, exist_ok=True)


def _write_to_file(self, path: str | Path, data: dict | str) -> None:
def _write_to_file(self, path: str | Path, data: dict | str | list) -> None:
Copy link
Contributor

@yoav-el-certora yoav-el-certora Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I think Any should be more suitable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to be specific for now if there are more types in the future i will change that.

"""
Writes data to a specified file, creating the file and its parent directories if they do not exist.

Expand Down
215 changes: 215 additions & 0 deletions ProposalTools/Checks/new_listing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from solidity_parser.parser import Node
from pydantic import BaseModel

from ProposalTools.Checks.check import Check
import ProposalTools.Utils.pretty_printer as pp


class ListingDetails(BaseModel):
asset: str
assetSymbol: str = None
priceFeedAddress: str = None


class FunctionCallDetails(BaseModel):
pool: str
asset: str
asset_seed: str


class NewListingCheck(Check):
def new_listing_check(self) -> None:
"""
Checks if the proposal address is a new listing on the blockchain.

This method retrieves functions from the source codes and checks if there are any new listings.
If new listings are detected, it handles them accordingly. Otherwise, it prints a message indicating
no new listings were found.
"""
functions = self._get_functions_from_source_codes()

if "newListings" in functions or "newListingsCustom" in functions:
self._handle_new_listings(functions)
else:
pp.pretty_print(f"No new listings detected for {self.proposal_address}", pp.Colors.INFO)

def _get_functions_from_source_codes(self) -> dict:
"""
Retrieves functions from the source codes.

This method iterates over the source codes and collects all functions defined in them.

Returns:
dict: A dictionary where keys are function names and values are function nodes.
"""
functions = {}
for source_code in self.source_codes:
functions.update(source_code.get_functions() if source_code.get_functions() else {})
return functions

def _handle_new_listings(self, functions: dict) -> None:
"""
Handles new listings detected in the functions.

This method extracts listings from the function node and checks for approval and supply calls
related to the listings. It prints messages indicating the status of these calls.

Args:
functions (dict): A dictionary of functions retrieved from the source codes.
"""
pp.pretty_print(f"New listing detected for {self.proposal_address}", pp.Colors.WARNING)
listings = self.__extract_listings_from_function(
functions.get("newListings", functions.get("newListingsCustom"))._node
)
if listings:
pp.pretty_print(f"Found {len(listings)} new listings", pp.Colors.SUCCESS)
else:
pp.pretty_print(f"Failed to extract listings from function", pp.Colors.FAILURE)

approval_calls, supply_calls = self.__extract_approval_and_supply_calls(
functions.get("_postExecute")._node
)
approval_calls = {call.asset: call for call in approval_calls}
supply_calls = {call.asset: call for call in supply_calls}

for listing in listings:
self._check_listing_calls(listing, approval_calls, supply_calls)

def _check_listing_calls(self, listing: ListingDetails, approval_calls: dict, supply_calls: dict) -> None:
"""
Checks the approval and supply calls for a given listing.

This method verifies if there are approval and supply calls for the given listing and prints
messages indicating the status of these calls.

Args:
listing (ListingDetails): The details of the listing to check.
approval_calls (dict): A dictionary of approval calls.
supply_calls (dict): A dictionary of supply calls.
"""
pp.pretty_print(f"Listing: {listing}", pp.Colors.WARNING)
if listing.asset not in approval_calls:
pp.pretty_print(f"Missing approval call for {listing.asset}", pp.Colors.FAILURE)
self._write_to_file("missing_approval_calls.json", listing.dict())
else:
pp.pretty_print(f"Found approval call for {listing.asset}", pp.Colors.SUCCESS)
self._write_to_file("found_approval_calls.json", listing.dict())
if listing.asset not in supply_calls:
pp.pretty_print(f"Missing supply call for {listing.asset}", pp.Colors.FAILURE)
self._write_to_file("missing_supply_calls.json", listing.dict())
else:
pp.pretty_print(f"Found supply call for {listing.asset}", pp.Colors.SUCCESS)
self._write_to_file("found_supply_calls.json", listing.dict())

def __extract_listings_from_function(self, function_node: Node) -> list[ListingDetails]:
"""
Extracts new listings information from the function node.

This method simplifies the extraction of new listings by checking the function node for
variable declarations related to listings and extracting the relevant details.

Args:
function_node (Node): The function node to extract listings from.

Returns:
list[ListingDetails]: A list of ListingDetails objects representing the new listings.
"""
if function_node.get('type') != 'FunctionDefinition':
return []

new_listings = []
for statement in function_node.get('body', {}).get('statements', []):
if statement.get('type') == 'VariableDeclarationStatement':
for var in statement.get('variables', []):
if var.get('typeName', {}).get('baseTypeName', {}).get('namePath') == 'IAaveV3ConfigEngine.Listing':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is specific "IAaveV3ConfigEngine"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the structure of the TAC when trying to fetch the listing variables.

new_listings.extend(self._extract_listings_from_statements(function_node))
return new_listings

def _extract_listings_from_statements(self, function_node: Node) -> list[ListingDetails]:
"""
Extracts listings from the statements in the function node.

This method iterates over the statements in the function node and extracts listing details
from the relevant expressions.

Args:
function_node (Node): The function node to extract listings from.

Returns:
list[ListingDetails]: A list of ListingDetails objects representing the new listings.
"""
new_listings = []
for expr_stmt in function_node.get('body', {}).get('statements', []):
if expr_stmt.get('type') == 'ExpressionStatement':
expr = expr_stmt.get('expression', {})
if expr.get('type') == 'BinaryOperation' and expr.get('operator') == '=':
left = expr.get('left', {})
if left.get('type') == 'IndexAccess' and left.get('base', {}).get('name') == 'listings':
listing_details = self.__extract_listing_details(expr.get('right', {}).get('arguments', []))
if listing_details:
new_listings.append(listing_details)
return new_listings

def __extract_listing_details(self, arguments: list[Node]) -> ListingDetails:
"""
Extracts listing details from function arguments.

This method extracts the asset, asset symbol, and price feed address from the function arguments
and returns a ListingDetails object.

Args:
arguments (list[Node]): The list of function arguments to extract details from.

Returns:
ListingDetails: An object containing the extracted listing details.
"""
listing_info = {}
if arguments:
listing_info['asset'] = arguments[0].get('name')
listing_info['assetSymbol'] = arguments[1].get('value') if len(arguments) > 1 else None
listing_info['priceFeedAddress'] = arguments[2].get('number') if len(arguments) > 2 else None
return ListingDetails(**listing_info)

def __extract_approval_and_supply_calls(self, function_node: Node) -> tuple[list[FunctionCallDetails], list[FunctionCallDetails]]:
"""
Extracts approval and supply calls from the function node.

This method iterates over the statements in the function node and extracts details of approval
and supply calls.

Args:
function_node (Node): The function node to extract calls from.

Returns:
tuple[list[FunctionCallDetails], list[FunctionCallDetails]]: Two lists containing the details
of approval and supply calls respectively.
"""
approval_calls, supply_calls = [], []
for statement in function_node.get('body', {}).get('statements', []):
if statement.get('type') == 'ExpressionStatement':
expression = statement.get('expression', {})
if expression.get('type') == 'FunctionCall':
function_name = expression.get('expression', {}).get('name', "")
Comment on lines +188 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern or a similar one seems to be repetitive in 3 (or more) functions, can we maybe combine this to 1 flow?
I haven't had the chance to dig deep into details, but it seems like collecting the data can be done in 1 loop? or am I'm wrong here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can write every thing in one huge function, but it will frustrating to keep up with what happens there and the lack of documentations.

if "approve" in function_name or "forceApprove" in function_name:
approval_calls.append(self._extract_function_call_details(expression))
elif 'supply' in function_name:
supply_calls.append(self._extract_function_call_details(expression))
return approval_calls, supply_calls

def _extract_function_call_details(self, expression: dict) -> FunctionCallDetails:
"""
Extracts details of a function call.

This method extracts the pool, asset, and asset seed from the function call expression
and returns a FunctionCallDetails object.

Args:
expression (dict): The function call expression to extract details from.

Returns:
FunctionCallDetails: An object containing the extracted function call details.
"""
pool = expression['arguments'][0]['expression']['name']
asset = expression['arguments'][1]['name']
asset_seed = expression['arguments'][2]['name']
return FunctionCallDetails(pool=pool, asset=asset, asset_seed=asset_seed)
6 changes: 3 additions & 3 deletions ProposalTools/Utils/source_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _parse_source_code(self) -> None:
self._parsed_contract = ast_obj.contracts[contract_name]
except Exception as e:
pp.pretty_print(f"Error parsing source code for {self.file_name}: {e}\n"
f"global const or immutable check wont apply to this contract!!!",
f"Some of the checks will not apply to this contract!!!",
pp.Colors.FAILURE)

def get_constructor(self) -> dict | None:
Expand Down Expand Up @@ -76,12 +76,12 @@ def get_events(self) -> list | None:
return self._parsed_contract.events
return None

def get_functions(self) -> list | None:
def get_functions(self) -> dict | None:
"""
Retrieves the functions from the Solidity contract.

Returns:
(list | None): List of functions or None if not found.
(dict | None): List of functions or None if not found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this always returning a dict?
If so please change the txt as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""
if self._parsed_contract:
return self._parsed_contract.functions
Expand Down
4 changes: 3 additions & 1 deletion ProposalTools/check_proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def proposals_check(customer: str, chain_name: str, proposal_addresses: list[str

# Feed price check
Checks.FeedPriceCheck(customer, chain, proposal_address, missing_files).verify_feed_price()


# New listing check
Checks.NewListingCheck(customer, chain, proposal_address, missing_files).new_listing_check()

def main() -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20240827.140931.710128
20240828.104045.704958