Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stock ticker dataset #303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dbldatagen/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from .dataset_provider import DatasetProvider, dataset_definition
from .basic_geometries import BasicGeometriesProvider
from .basic_process_historian import BasicProcessHistorianProvider
from .basic_stock_ticker import BasicStockTickerProvider
from .basic_telematics import BasicTelematicsProvider
from .basic_user import BasicUserProvider
from .benchmark_groupby import BenchmarkGroupByProvider
from .multi_table_sales_order_provider import MultiTableSalesOrderProvider
from .multi_table_telephony_provider import MultiTableTelephonyProvider

__all__ = ["dataset_provider",
"basic_geometries",
"basic_process_historian",
"basic_stock_ticker",
"basic_telematics",
"basic_user",
"benchmark_groupby",
"multi_table_sales_order_provider",
"multi_table_telephony_provider"
]
24 changes: 18 additions & 6 deletions dbldatagen/datasets/basic_geometries.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ class BasicGeometriesProvider(DatasetProvider.NoAssociatedDatasetsMixin, Dataset
"""
MIN_LOCATION_ID = 1000000
MAX_LOCATION_ID = 9223372036854775807
DEFAULT_MIN_LAT = -90.0
DEFAULT_MAX_LAT = 90.0
DEFAULT_MIN_LON = -180.0
DEFAULT_MAX_LON = 180.0
COLUMN_COUNT = 2
ALLOWED_OPTIONS = [
"geometryType",
"maxVertices",
"minLatitude",
"maxLatitude",
"minLongitude",
"maxLongitude",
"random"
]

Expand All @@ -45,6 +53,10 @@ def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions
generateRandom = options.get("random", False)
geometryType = options.get("geometryType", "point")
maxVertices = options.get("maxVertices", 1 if geometryType == "point" else 3)
minLatitude = options.get("minLatitude", self.DEFAULT_MIN_LAT)
maxLatitude = options.get("maxLatitude", self.DEFAULT_MAX_LAT)
minLongitude = options.get("minLongitude", self.DEFAULT_MIN_LON)
maxLongitude = options.get("maxLongitude", self.DEFAULT_MAX_LON)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
Expand All @@ -62,9 +74,9 @@ def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions
if maxVertices > 1:
w.warn('Ignoring property maxVertices for point geometries')
df_spec = (
df_spec.withColumn("lat", "float", minValue=-90.0, maxValue=90.0,
df_spec.withColumn("lat", "float", minValue=minLatitude, maxValue=maxLatitude,
step=1e-5, random=generateRandom, omit=True)
.withColumn("lon", "float", minValue=-180.0, maxValue=180.0,
.withColumn("lon", "float", minValue=minLongitude, maxValue=maxLongitude,
step=1e-5, random=generateRandom, omit=True)
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')")
)
Expand All @@ -75,9 +87,9 @@ def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
df_spec.withColumn(f"lat_{j}", "float", minValue=minLatitude, maxValue=maxLatitude,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
.withColumn(f"lon_{j}", "float", minValue=minLongitude, maxValue=maxLongitude,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
Expand All @@ -93,9 +105,9 @@ def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions
j = 0
while j < maxVertices:
df_spec = (
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
df_spec.withColumn(f"lat_{j}", "float", minValue=minLatitude, maxValue=maxLatitude,
step=1e-5, random=generateRandom, omit=True)
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
.withColumn(f"lon_{j}", "float", minValue=minLongitude, maxValue=maxLongitude,
step=1e-5, random=generateRandom, omit=True)
)
j = j + 1
Expand Down
103 changes: 103 additions & 0 deletions dbldatagen/datasets/basic_stock_ticker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from random import random

from .dataset_provider import DatasetProvider, dataset_definition


@dataset_definition(name="basic/stock_ticker",
summary="Stock ticker dataset",
autoRegister=True,
supportsStreaming=True)
class BasicStockTickerProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
"""
Basic Stock Ticker Dataset
========================

This is a basic stock ticker dataset with time-series `symbol`, `open`, `close`, `high`, `low`,
`adj_close`, and `volume` values.

It takes the following options when retrieving the table:
- rows : number of rows to generate
- partitions: number of partitions to use
- numSymbols: number of unique stock ticker symbols
- startDate: first date for stock ticker data
- endDate: last date for stock ticker data

As the data specification is a DataGenerator object, you can add further columns to the data set and
add constraints (when the feature is available)

Note that this dataset does not use any features that would prevent it from being used as a source for a
streaming dataframe, and so the flag `supportsStreaming` is set to True.

"""
DEFAULT_NUM_SYMBOLS = 100
DEFAULT_START_DATE = "2024-10-01"
COLUMN_COUNT = 8
ALLOWED_OPTIONS = [
"numSymbols",
"startDate"
]

@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, **options):
import dbldatagen as dg

numSymbols = options.get("numSymbols", self.DEFAULT_NUM_SYMBOLS)
startDate = options.get("startDate", self.DEFAULT_START_DATE)

assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
if rows is None or rows < 0:
rows = DatasetProvider.DEFAULT_ROWS
if partitions is None or partitions < 0:
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
if numSymbols <= 0:
raise ValueError("'numSymbols' must be > 0")

df_spec = (
dg.DataGenerator(sparkSession=sparkSession, rows=rows,
partitions=partitions, randomSeedMethod="hash_fieldname")
.withColumn("symbol_id", "long", minValue=676, maxValue=676 + numSymbols - 1)
.withColumn("rand_value", "float", minValue=0.0, maxValue=1.0, step=0.1,
baseColumn="symbol_id", omit=True)
.withColumn("symbol", "string",
expr="""concat_ws('', transform(split(conv(symbol_id, 10, 26), ''),
x -> case when x < 10 then char(ascii(x) - 48 + 65) else char(ascii(x) + 10) end))""")
.withColumn("days_from_start_date", "int", expr=f"floor(id / {numSymbols})", omit=True)
.withColumn("post_date", "date", expr=f"date_add(cast('{startDate}' as date), days_from_start_date)")
.withColumn("start_value", "decimal(11,2)",
values=[1.0 + 199.0 * random() for _ in range(int(numSymbols / 10))], omit=True)
.withColumn("growth_rate", "float", values=[-0.1 + 0.35 * random() for _ in range(int(numSymbols / 10))],
baseColumn="symbol_id")
.withColumn("volatility", "float", values=[0.0075 * random() for _ in range(int(numSymbols / 10))],
baseColumn="symbol_id", omit=True)
.withColumn("prev_modifier_sign", "float",
expr=f"case when sin((id - {numSymbols}) % 17) > 0 then -1.0 else 1.0 end""",
omit=True)
.withColumn("modifier_sign", "float",
expr="case when sin(id % 17) > 0 then -1.0 else 1.0 end",
omit=True)
.withColumn("open_base", "decimal(11,2)",
expr=f"""start_value
+ (volatility * prev_modifier_sign * start_value * sin((id - {numSymbols}) % 17))
+ (growth_rate * start_value * (days_from_start_date - 1) / 365)""",
omit=True)
.withColumn("close_base", "decimal(11,2)",
expr="""start_value
+ (volatility * start_value * sin(id % 17))
+ (growth_rate * start_value * days_from_start_date / 365)""",
omit=True)
.withColumn("high_base", "decimal(11,2)",
expr="greatest(open_base, close_base) + rand() * volatility * open_base",
omit=True)
.withColumn("low_base", "decimal(11,2)",
expr="least(open_base, close_base) - rand() * volatility * open_base",
omit=True)
.withColumn("open", "decimal(11,2)", expr="greatest(open_base, 0.0)")
.withColumn("close", "decimal(11,2)", expr="greatest(close_base, 0.0)")
.withColumn("high", "decimal(11,2)", expr="greatest(high_base, 0.0)")
.withColumn("low", "decimal(11,2)", expr="greatest(low_base, 0.0)")
.withColumn("dividend", "decimal(4,2)", expr="0.05 * rand_value * close", omit=True)
.withColumn("adj_close", "decimal(11,2)", expr="greatest(close - dividend, 0.0)")
.withColumn("volume", "long", minValue=100000, maxValue=5000000, random=True)
)

return df_spec
Loading
Loading