Skip to content

Commit

Permalink
Update examples for open source release (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsantn authored Dec 10, 2024
1 parent 506776e commit b33ac75
Show file tree
Hide file tree
Showing 69 changed files with 3,565 additions and 10,697 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
Set the environment variable `BODO_NUM_WORKERS` to limit the number of cores used.
"""
import time

import argparse
import bodo
import time

import pandas as pd

import bodo


@bodo.jit(cache=True)
def run_queries(data_folder):

# Load the data
t1 = time.time()
lineitem = load_lineitem(data_folder)
Expand Down Expand Up @@ -924,11 +926,13 @@ def q19(lineitem, part):
& (jn.L_QUANTITY <= 25)
& (jn.P_SIZE <= 10)
)
|((jn.P_BRAND == Brand43)
& (jn.P_CONTAINER.isin([LGBOX, LGCASE, LGPACK, LGPKG]))
& (jn.L_QUANTITY >= 26)
& (jn.L_QUANTITY <= 36)
& (jn.P_SIZE <= 15))
| (
(jn.P_BRAND == Brand43)
& (jn.P_CONTAINER.isin([LGBOX, LGCASE, LGPACK, LGPKG]))
& (jn.L_QUANTITY >= 26)
& (jn.L_QUANTITY <= 36)
& (jn.P_SIZE <= 15)
)
)
jn = jn[jnsel]
total = (jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT)).sum()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
"""
Copyright (C) 2021 Bodo Inc. All rights reserved
This script generates TPC-H data in parquet format for any scale factor
Can use very little memory regardless of SCALE_FACTOR (and very little disk
space when uploading directly to s3). Simply adjust the number of pieces as needed.
Copyright (C) 2021 Bodo Inc. All rights reserved
This script generates TPC-H data in parquet format for any scale factor
Can use very little memory regardless of SCALE_FACTOR (and very little disk
space when uploading directly to s3). Simply adjust the number of pieces as needed.
"""

import os
import argparse
import os
import shutil
import subprocess
from multiprocessing import Pool, set_start_method
import pyarrow.parquet as pq


import pyarrow.parquet as pq
from loader import (
load_customer,
load_lineitem_with_date,
Expand All @@ -27,6 +26,7 @@
# Change location of tpch-dbgen if not in same place as this script
tpch_dbgen_location = "./tpch-dbgen"


# First element is the table single character short-hand understood by dbgen
# Second element is the number of pieces we want the parquet dataset to have for that table
# Third element is the function that reads generated CSV to a pandas dataframe
Expand Down Expand Up @@ -79,7 +79,6 @@ def to_parquet(args):
def generate(
tables, SCALE_FACTOR, folder, upload_to_s3, validate_dataset, num_processes
):

if upload_to_s3:
assert "AWS_ACCESS_KEY_ID" in os.environ, "AWS credentials not set"
else:
Expand All @@ -94,7 +93,6 @@ def generate(
fs = s3fs.S3FileSystem()

for table_name, (table_short, num_pieces, load_func) in tables.items():

if upload_to_s3:
output_prefix = f"s3://{folder}/{table_name}.pq"
else:
Expand Down
278 changes: 278 additions & 0 deletions benchmarks/tpch/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
"""
Copyright (C) 2021 Bodo Inc. All rights reserved
This script reads TPC-H data in csv format.
"""

import numpy as np
import pandas as pd


def load_lineitem(fpath):
cols_names = [
"L_ORDERKEY",
"L_PARTKEY",
"L_SUPPKEY",
"L_LINENUMBER",
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_COMMITDATE",
"L_RECEIPTDATE",
"L_SHIPINSTRUCT",
"L_SHIPMODE",
"L_COMMENT",
]
cols = {
"L_ORDERKEY": np.int64,
"L_PARTKEY": np.int64,
"L_SUPPKEY": np.int64,
"L_LINENUMBER": np.int64,
"L_QUANTITY": np.float64,
"L_EXTENDEDPRICE": np.float64,
"L_DISCOUNT": np.float64,
"L_TAX": np.float64,
"L_RETURNFLAG": str,
"L_LINESTATUS": str,
"L_SHIPDATE": str,
"L_COMMITDATE": str,
"L_RECEIPTDATE": str,
"L_SHIPINSTRUCT": str,
"L_SHIPMODE": str,
"L_COMMENT": str,
}
rel = pd.read_csv(
fpath,
sep="|",
header=None,
names=cols_names,
dtype=cols,
parse_dates=[10, 11, 12],
)
return rel


def load_lineitem_with_date(fpath):
cols_names = [
"L_ORDERKEY",
"L_PARTKEY",
"L_SUPPKEY",
"L_LINENUMBER",
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_COMMITDATE",
"L_RECEIPTDATE",
"L_SHIPINSTRUCT",
"L_SHIPMODE",
"L_COMMENT",
]
cols = {
"L_ORDERKEY": np.int64,
"L_PARTKEY": np.int64,
"L_SUPPKEY": np.int64,
"L_LINENUMBER": np.int64,
"L_QUANTITY": np.float64,
"L_EXTENDEDPRICE": np.float64,
"L_DISCOUNT": np.float64,
"L_TAX": np.float64,
"L_RETURNFLAG": str,
"L_LINESTATUS": str,
"L_SHIPDATE": str,
"L_COMMITDATE": str,
"L_RECEIPTDATE": str,
"L_SHIPINSTRUCT": str,
"L_SHIPMODE": str,
"L_COMMENT": str,
}
rel = pd.read_csv(
fpath,
sep="|",
header=None,
names=cols_names,
dtype=cols,
parse_dates=[10, 11, 12],
)
rel["L_SHIPDATE"] = [time.date() for time in rel["L_SHIPDATE"]]
rel["L_COMMITDATE"] = [time.date() for time in rel["L_COMMITDATE"]]
rel["L_RECEIPTDATE"] = [time.date() for time in rel["L_RECEIPTDATE"]]
return rel


def load_part(fpath):
cols_names = [
"P_PARTKEY",
"P_NAME",
"P_MFGR",
"P_BRAND",
"P_TYPE",
"P_SIZE",
"P_CONTAINER",
"P_RETAILPRICE",
"P_COMMENT",
]
cols = {
"P_PARTKEY": np.int64,
"P_NAME": str,
"P_MFGR": str,
"P_BRAND": str,
"P_TYPE": str,
"P_SIZE": np.int64,
"P_CONTAINER": str,
"P_RETAILPRICE": np.float64,
"P_COMMENT": str,
}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel


def load_orders(fpath):
cols_names = [
"O_ORDERKEY",
"O_CUSTKEY",
"O_ORDERSTATUS",
"O_TOTALPRICE",
"O_ORDERDATE",
"O_ORDERPRIORITY",
"O_CLERK",
"O_SHIPPRIORITY",
"O_COMMENT",
]
cols = {
"O_ORDERKEY": np.int64,
"O_CUSTKEY": np.int64,
"O_ORDERSTATUS": str,
"O_TOTALPRICE": np.float64,
"O_ORDERDATE": np.int64,
"O_ORDERPRIORITY": str,
"O_CLERK": str,
"O_SHIPPRIORITY": np.int64,
"O_COMMENT": str,
}
rel = pd.read_csv(
fpath, sep="|", header=None, names=cols_names, dtype=cols, parse_dates=[4]
)
return rel


def load_orders_with_date(fpath):
cols_names = [
"O_ORDERKEY",
"O_CUSTKEY",
"O_ORDERSTATUS",
"O_TOTALPRICE",
"O_ORDERDATE",
"O_ORDERPRIORITY",
"O_CLERK",
"O_SHIPPRIORITY",
"O_COMMENT",
]
cols = {
"O_ORDERKEY": np.int64,
"O_CUSTKEY": np.int64,
"O_ORDERSTATUS": str,
"O_TOTALPRICE": np.float64,
"O_ORDERDATE": np.int64,
"O_ORDERPRIORITY": str,
"O_CLERK": str,
"O_SHIPPRIORITY": np.int64,
"O_COMMENT": str,
}
rel = pd.read_csv(
fpath, sep="|", header=None, names=cols_names, dtype=cols, parse_dates=[4]
)
rel["O_ORDERDATE"] = [time.date() for time in rel["O_ORDERDATE"]]
return rel


def load_customer(fpath):
cols_names = [
"C_CUSTKEY",
"C_NAME",
"C_ADDRESS",
"C_NATIONKEY",
"C_PHONE",
"C_ACCTBAL",
"C_MKTSEGMENT",
"C_COMMENT",
]
cols = {
"C_CUSTKEY": np.int64,
"C_NAME": str,
"C_ADDRESS": str,
"C_NATIONKEY": np.int64,
"C_PHONE": str,
"C_ACCTBAL": np.float64,
"C_MKTSEGMENT": str,
"C_COMMENT": str,
}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel


def load_nation(fpath):
cols_names = ["N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"]
cols = {
"N_NATIONKEY": np.int64,
"N_NAME": str,
"N_REGIONKEY": np.int64,
"N_COMMENT": str,
}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel


def load_region(fpath):
cols_names = ["R_REGIONKEY", "R_NAME", "R_COMMENT"]
cols = {"R_REGIONKEY": np.int64, "R_NAME": str, "R_COMMENT": str}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel


def load_supplier(fpath):
cols_names = [
"S_SUPPKEY",
"S_NAME",
"S_ADDRESS",
"S_NATIONKEY",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT",
]
cols = {
"S_SUPPKEY": np.int64,
"S_NAME": str,
"S_ADDRESS": str,
"S_NATIONKEY": np.int64,
"S_PHONE": str,
"S_ACCTBAL": np.float64,
"S_COMMENT": str,
}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel


def load_partsupp(fpath):
cols_names = [
"PS_PARTKEY",
"PS_SUPPKEY",
"PS_AVAILQTY",
"PS_SUPPLYCOST",
"PS_COMMENT",
]
cols = {
"PS_PARTKEY": np.int64,
"PS_SUPPKEY": np.int64,
"PS_AVAILQTY": np.int64,
"PS_SUPPLYCOST": np.float64,
"PS_COMMENT": str,
}
rel = pd.read_csv(fpath, sep="|", header=None, names=cols_names, dtype=cols)
return rel
Loading

0 comments on commit b33ac75

Please sign in to comment.