Skip to content

Commit

Permalink
Merge pull request #12 from Solratic/feature/checksum
Browse files Browse the repository at this point in the history
Feature/checksum
  • Loading branch information
alan890104 authored Sep 16, 2023
2 parents c2a1fe1 + df8187a commit 0760217
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 93 deletions.
26 changes: 26 additions & 0 deletions decodex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
from . import constant
from . import convert
from . import decode
from . import installer
from . import search
from . import translate
from . import type
from . import utils

installer.download_github_file(
save_path=str(constant.DECODEX_DIR.joinpath("ethereum", "tags.json")),
org="brianleect",
repo="etherscan-labels",
branch="main",
path="data/etherscan/combined/combinedAllLabels.json",
is_lfs=False,
verify_ssl=False,
use_tempfile=False,
)


installer.download_github_file(
save_path=str(constant.DECODEX_DIR.joinpath("ethereum", "signatures.csv")),
org="Solratic",
repo="function-signature-registry",
branch="main",
path="data/ethereum/func_sign.csv.gz",
is_lfs=True,
verify_ssl=False,
use_tempfile=True,
)

__all__ = [
"convert",
"decode",
"search",
"translate",
"type",
"utils",
"installer",
]
46 changes: 35 additions & 11 deletions decodex/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from jinja2 import Template
from tabulate import tabulate

from .installer import download_and_save_csv
from .installer import download_and_save_json
from decodex.constant import DECODEX_DIR
from decodex.installer import download_github_file
from decodex.translate import Translator
from decodex.utils import fmt_addr
from decodex.utils import fmt_blktime
Expand Down Expand Up @@ -44,24 +43,49 @@ def cli():

@cli.command(help="download tags and signatures for a chain")
@click.argument("chain", default="ethereum", type=click.Choice(["ethereum"]))
def download(chain: str):
@click.option("--verify-ssl", is_flag=True, help="Verify SSL", default=True)
def download(chain: str, verify_ssl: bool):
chain = chain.lower()
dir = DECODEX_DIR.joinpath(chain)
dir.mkdir(parents=True, exist_ok=True)
parents = DECODEX_DIR.joinpath(chain)

if chain == "ethereum":
download_and_save_json(
"https://raw.githubusercontent.com/brianleect/etherscan-labels/main/data/etherscan/combined/combinedAllLabels.json",
dir.joinpath("tags.json"),
download_github_file(
save_path=str(parents.joinpath("tags.json")),
org="brianleect",
repo="etherscan-labels",
branch="main",
path="data/etherscan/combined/combinedAllLabels.json",
is_lfs=False,
verify_ssl=verify_ssl,
use_tempfile=False,
)
download_and_save_csv(
chain="ethereum",
save_path=dir.joinpath("signatures.csv"),
download_github_file(
save_path=str(parents.joinpath("signatures.csv")),
org="Solratic",
repo="function-signature-registry",
branch="main",
path="data/ethereum/func_sign.csv.gz",
is_lfs=True,
verify_ssl=verify_ssl,
use_tempfile=True,
)
else:
raise ValueError(f"Chain {chain} is not yet supported.")


@cli.command(help="Remove downloaded tags and signatures for a chain")
@click.argument("chain", default="ethereum", type=click.Choice(["ethereum"]))
def clean(chain: str):
chain = chain.lower()
dir = DECODEX_DIR.joinpath(chain)
if dir.exists():
for f in dir.iterdir():
f.unlink()
dir.rmdir()
else:
print(f"{chain} not found")


@cli.command(help="Explain the transaction by the given hash")
@click.option("--txhash", type=str, help="Hash of the transaction", default=None)
@click.option("--from-addr", type=str, help="Address of the sender", default=None)
Expand Down
79 changes: 0 additions & 79 deletions decodex/installer.py

This file was deleted.

6 changes: 6 additions & 0 deletions decodex/installer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .installer import download_github_file


__all__ = [
"download_github_file",
]
144 changes: 144 additions & 0 deletions decodex/installer/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import gzip
import hashlib
import json
from abc import abstractstaticmethod
from io import BytesIO
from pathlib import Path

import requests

from .utils import get_github_url


class BaseBeforeCallback:
@abstractstaticmethod
def before_download(save_path: str, **kwargs) -> str:
pass


class BasePostCallback:
@abstractstaticmethod
def post_download(src_path: str, save_path: str, **kwargs):
pass


class GithubRawBeforeCallback(BaseBeforeCallback):
"""
kwargs : Dict
The arguments to pass to get_github_url.
- org : str
The name of the organization or user.
- repo : str
The name of the repository.
- branch : str
The name of the branch.
- path : str
The path to the file in the repository.
"""

def before_download(save_path: str, **kwargs) -> str:
url = get_github_url(use_api=False, **kwargs)
return url


class GithubLFSBeforeCallback(BaseBeforeCallback):
@staticmethod
def before_download(save_path: str, **kwargs) -> str:
"""
Get spec from Github API then check the checksum of the file to download.
Parameters
----------
save_path : str
The path to save the file to.
kwargs : Dict
The arguments to pass to get_github_lfs_url.
- org : str
The name of the organization or user.
- repo : str
The name of the repository.
- branch : str
The name of the branch.
- path : str
The path to the file in the repository.
Returns
-------
str
The modified URL.
Raises
------
FileExistsError
If the file already exists and is up to date.
"""
spec_url = get_github_url(use_api=True, **kwargs)

spec = requests.request(method="GET", url=spec_url)

spec.raise_for_status()
spec = json.loads(spec.text)
download_url = spec["download_url"]
checksum = spec["sha"]

# Check if the file already exists
path = Path(save_path)
if path.exists() and path.with_suffix(".checksum").exists():
# Check if the checksum is the same
with path.with_suffix(".checksum").open("r") as cf:
if cf.read() == checksum:
raise FileExistsError("File already exists and is up to date")
# Save the checksum
path.with_suffix(".checksum").write_text(checksum)
return download_url


class GzipPostCallback(BasePostCallback):
@staticmethod
def post_download(src_path: str, save_path: str, **config):
"""
Decompress a gzip file.
Parameters
----------
src_path : str
The path to the file to decompress.
save_path : str
The path to save the decompressed file to.
"""
in_path = Path(src_path)
out_path = Path(save_path)
with in_path.open("rb") as f, out_path.open("wb") as g:
compressed_data = BytesIO(f.read())
decompressed_file = gzip.GzipFile(fileobj=compressed_data)
g.write(decompressed_file.read())


class ChecksumPostCallback(BasePostCallback):
@staticmethod
def post_download(src_path: str, save_path: str, **config):
"""
Calculate the checksum of a file and save it to {save_path}.checksum.
Parameters
----------
src_path : str
The path to the file to check.
save_path : str
The path to save the file to.
"""
# Define the path objects
checksum_path = Path(save_path).with_suffix(".checksum")

# Calculate the checksum of the file
hash_algorithm = config.get("hash_algorithm", config.get("algorithm", "sha256"))
hash_object = hashlib.new(hash_algorithm)

with Path(src_path).open("rb") as f:
while chunk := f.read(4096):
hash_object.update(chunk)

checksum = hash_object.hexdigest()

# Save the checksum to the specified path
with checksum_path.open("w") as g:
g.write(checksum)
Loading

0 comments on commit 0760217

Please sign in to comment.