Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/datacustomcode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def configure(
).update_ini(profile=profile)


@cli.command()
@click.argument("path", default="payload")
def zip(path: str):
from datacustomcode.deploy import zip

logger.debug("Zipping project")
zip(path)


@cli.command()
@click.option("--profile", default="default")
@click.option("--path", default="payload")
Expand Down Expand Up @@ -127,8 +136,11 @@ def init(directory: str):
@click.argument("filename")
@click.option("--config")
@click.option("--dry-run", is_flag=True)
def scan(filename: str, config: str, dry_run: bool):
from datacustomcode.scan import dc_config_json_from_file
@click.option(
"--no-requirements", is_flag=True, help="Skip generating requirements.txt file"
)
def scan(filename: str, config: str, dry_run: bool, no_requirements: bool):
from datacustomcode.scan import dc_config_json_from_file, write_requirements_file

config_location = config or os.path.join(os.path.dirname(filename), "config.json")
click.echo(
Expand All @@ -143,6 +155,13 @@ def scan(filename: str, config: str, dry_run: bool):
with open(config_location, "w") as f:
json.dump(config_json, f, indent=2)

if not no_requirements:
requirements_path = write_requirements_file(filename)
click.echo(
"Generated requirements file: "
+ click.style(requirements_path, fg="blue", bold=True)
)


@cli.command()
@click.argument("entrypoint")
Expand Down
85 changes: 70 additions & 15 deletions src/datacustomcode/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,14 @@ def prepare_dependency_archive(directory: str) -> None:
archive_file = os.path.join(archives_dir, DEPENDENCIES_ARCHIVE_NAME)
with tarfile.open(archive_file, "w:gz") as tar:
for file in os.listdir(temp_dir):
# Exclude requirements.txt from the archive
if file == "requirements.txt":
continue
tar.add(os.path.join(temp_dir, file), arcname=file)

logger.debug(f"Dependencies downloaded and archived to {archive_file}")


def zip_and_upload_directory(directory: str, file_upload_url: str) -> None:
file_upload_url = unescape(file_upload_url)

logger.debug(f"Zipping directory... {directory}")
shutil.make_archive(ZIP_FILE_NAME.rstrip(".zip"), "zip", directory)

logger.debug(f"Uploading deployment to {file_upload_url}")
with open(ZIP_FILE_NAME, "rb") as zip_file:
response = requests.put(
file_upload_url, data=zip_file, headers={"Content-Type": "application/zip"}
)
response.raise_for_status()


class DeploymentsResponse(BaseModel):
deploymentStatus: str

Expand Down Expand Up @@ -325,6 +314,71 @@ def create_data_transform(
return response


def has_nonempty_requirements_file(directory: str) -> bool:
"""
Check if requirements.txt exists in the given directory and has at least
one non-comment line.
Args:
directory (str): The directory to check for requirements.txt.
Returns:
bool: True if requirements.txt exists and has a non-comment line,
False otherwise.
"""
# Look for requirements.txt in the parent directory of the given directory
requirements_path = os.path.join(os.path.dirname(directory), "requirements.txt")

try:
if os.path.isfile(requirements_path):
with open(requirements_path, "r", encoding="utf-8") as f:
for line in f:
# Consider non-empty if any line is not a comment (ignoring
# leading whitespace)
if line.strip() and not line.lstrip().startswith("#"):
return True
except Exception as e:
logger.error(f"Error reading requirements.txt: {e}")
return False


def upload_zip(file_upload_url: str) -> None:
file_upload_url = unescape(file_upload_url)
with open(ZIP_FILE_NAME, "rb") as zip_file:
response = requests.put(
file_upload_url, data=zip_file, headers={"Content-Type": "application/zip"}
)
response.raise_for_status()


def zip(
directory: str,
):
# Create a zip file excluding .DS_Store files
import zipfile

# prepare payload only if requirements.txt is non-empty
if has_nonempty_requirements_file(directory):
prepare_dependency_archive(directory)
else:
logger.info(
f"Skipping dependency archive: requirements.txt is missing or empty "
f"in {directory}"
)

logger.debug(f"Zipping directory... {directory}")

with zipfile.ZipFile(ZIP_FILE_NAME, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(directory):
# Skip .DS_Store files when adding to zip
for file in files:
if file != ".DS_Store":
file_path = os.path.join(root, file)
# Preserve relative path structure in the zip file
arcname = os.path.relpath(file_path, directory)
zipf.write(file_path, arcname)

logger.debug(f"Created zip file: {ZIP_FILE_NAME}")


def deploy_full(
directory: str,
metadata: TransformationJobMetadata,
Expand All @@ -340,7 +394,8 @@ def deploy_full(

# create deployment and upload payload
deployment = create_deployment(access_token, metadata)
zip_and_upload_directory(directory, deployment.fileUploadUrl)
zip(directory)
upload_zip(deployment.fileUploadUrl)
wait_for_deployment(access_token, metadata, callback)

# create data transform
Expand Down
134 changes: 134 additions & 0 deletions src/datacustomcode/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
from __future__ import annotations

import ast
import os
from typing import (
Any,
ClassVar,
Dict,
Set,
Union,
)

Expand Down Expand Up @@ -131,6 +134,137 @@ def found(self) -> DataAccessLayerCalls:
)


class ImportVisitor(ast.NodeVisitor):
"""AST Visitor that extracts external package imports from Python code."""

# Standard library modules that should be excluded from requirements
STANDARD_LIBS: ClassVar[set[str]] = {
"abc",
"argparse",
"ast",
"asyncio",
"base64",
"collections",
"configparser",
"contextlib",
"copy",
"csv",
"datetime",
"enum",
"functools",
"glob",
"hashlib",
"http",
"importlib",
"inspect",
"io",
"itertools",
"json",
"logging",
"math",
"os",
"pathlib",
"pickle",
"random",
"re",
"shutil",
"site",
"socket",
"sqlite3",
"string",
"subprocess",
"sys",
"tempfile",
"threading",
"time",
"traceback",
"typing",
"uuid",
"warnings",
"xml",
"zipfile",
}

# Additional packages to exclude from requirements.txt
EXCLUDED_PACKAGES: ClassVar[set[str]] = {
"datacustomcode", # Internal package
"pyspark", # Provided by the runtime environment
}

def __init__(self) -> None:
self.imports: Set[str] = set()

def visit_Import(self, node: ast.Import) -> None:
"""Visit an import statement (e.g., import os, sys)."""
for name in node.names:
# Get the top-level package name
package = name.name.split(".")[0]
if (
package not in self.STANDARD_LIBS
and package not in self.EXCLUDED_PACKAGES
and not package.startswith("_")
):
self.imports.add(package)
self.generic_visit(node)

def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
"""Visit a from-import statement (e.g., from os import path)."""
if node.module is not None:
# Get the top-level package
package = node.module.split(".")[0]
if (
package not in self.STANDARD_LIBS
and package not in self.EXCLUDED_PACKAGES
and not package.startswith("_")
):
self.imports.add(package)
self.generic_visit(node)


def scan_file_for_imports(file_path: str) -> Set[str]:
"""Scan a Python file for external package imports."""
with open(file_path, "r") as f:
code = f.read()
tree = ast.parse(code)
visitor = ImportVisitor()
visitor.visit(tree)
return visitor.imports


def write_requirements_file(file_path: str) -> str:
"""
Scan a Python file for imports and write them to requirements.txt.

Args:
file_path: Path to the Python file to scan

Returns:
Path to the generated requirements.txt file
"""
imports = scan_file_for_imports(file_path)

# Write requirements.txt in the parent directory of the Python file
file_dir = os.path.dirname(file_path)
parent_dir = os.path.dirname(file_dir) if file_dir else "."
requirements_path = os.path.join(parent_dir, "requirements.txt")

# If the file exists, read existing requirements and merge with new ones
existing_requirements = set()
if os.path.exists(requirements_path):
with open(requirements_path, "r") as f:
existing_requirements = {line.strip() for line in f if line.strip()}

# Merge existing requirements with newly discovered ones
all_requirements = existing_requirements.union(imports)

# Write the combined requirements
with open(requirements_path, "w") as f:
for package in sorted(all_requirements):
f.write(f"{package}\n")

return requirements_path


def scan_file(file_path: str) -> DataAccessLayerCalls:
"""Scan a single Python file for Client read/write method calls."""
with open(file_path, "r") as f:
Expand Down
86 changes: 86 additions & 0 deletions src/datacustomcode/templates/account.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "0",
"metadata": {},
"outputs": [],
"source": [
"from datacustomcode.client import Client\n",
"from datacustomcode.io.writer.base import WriteMode\n",
"from pyspark.sql.functions import col, upper"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"client = Client()\n",
"\n",
"df = client.read_dlo(\"Account_Home__dll\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"# Perform transformations on the DataFrame\n",
"df_upper1 = df.withColumn(\"Description__c\", upper(col(\"Description__c\")))\n",
"\n",
"# Drop specific columns related to relationships\n",
"df_upper1 = df_upper1.drop(\"KQ_ParentId__c\")\n",
"df_upper1 = df_upper1.drop(\"KQ_Id__c\")\n",
"\n",
"df_upper1.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"# Save the transformed DataFrame\n",
"dlo_name = \"Account_Home_copy__dll\"\n",
"client.write_to_dlo(dlo_name, df_upper1, write_mode=WriteMode.APPEND)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading