Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demo refactor updates #1658

Merged
merged 7 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion demo/web/src/main/notebooks/00 The Deephaven IDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static_table = newTable(
```python
from deephaven.TableTools import timeTable
import random
updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = random.randint(0,100)").reverse()
updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse()
```


Expand Down
31 changes: 16 additions & 15 deletions demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ You can quickly see streaming data in a UI and do table operations, interactivel
For example, you can listen to a Kafka stream of cryptocurrency trades sourced from their native exchanges (like the ones below, built using the [XChange library](https://github.com/knowm/XChange)).

```python
from deephaven import KafkaTools as kt
from deephaven import ConsumeKafka as ck

def get_trades_stream():
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=kt.ALL_PARTITIONS_SEEK_TO_END,
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')

trades_stream = get_trades_stream()
Expand All @@ -29,9 +29,6 @@ trades_stream = get_trades_stream()
To keep the most recent ticks within view, you could sort the table descending by timestamp. Alternatively, you can reverse the table.

```python
# Not doing this:
# t = t.sortDescending("Timestamp")

trades_stream = trades_stream.reverse()
```
\
Expand Down Expand Up @@ -83,14 +80,18 @@ row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument"
Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/).

```python
from deephaven import ComboAggregateFactory as caf
from deephaven import Aggregation as agg, as_list

agg_list = as_list([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
agg.AggAvg("Avg_Size = Size", "Avg_Price = Price"),
agg.AggMin("Low_Price = Price"),
agg.AggMax("High_Price = Price")
])

multi_agg = trades_stream_cleaner.updateView("TimeBin = upperBin(KafkaTimestamp, MINUTE)")\
.by(caf.AggCombo(
caf.AggCount("Trade_Count"),
caf.AggSum("Total_Size = Size"),
caf.AggAvg("Avg_Size = Size", "Avg_Price = Price"),
caf.AggMin("Low_Price = Price"),
caf.AggMax("High_Price = Price")),"TimeBin", "Instrument")\
.aggBy(agg_list, "TimeBin", "Instrument")\
.sortDescending("TimeBin", "Trade_Count")\
.formatColumnWhere("Instrument", "Instrument = `BTC/USD`", "CYAN")
```
Expand Down
33 changes: 22 additions & 11 deletions demo/web/src/main/notebooks/02 Stream and Batch Together.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ Below you’ll do calculations and aggregations on stream and batch data using i
First, hook up a Kafka stream. (This is the same script from the first notebook.) Our [how-to guide](https://deephaven.io/core/docs/how-to-guides/kafka-stream/) provides detail on the integration.

```python
from deephaven import KafkaTools as kt
from deephaven import ConsumeKafka as ck

def get_trades_stream():
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=kt.ALL_PARTITIONS_SEEK_TO_END,
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')

trades_stream = get_trades_stream()
Expand Down Expand Up @@ -64,8 +64,8 @@ Let's return to our crypto data.
Read in a CSV of batch crypto data sourced on 09/22/2021.

```python
from deephaven.TableTools import readCsv
trades_batch_view = readCsv("/data/large/crypto/CryptoTrades_20210922.csv")
from deephaven import read_csv
trades_batch_view = read_csv("/data/large/crypto/CryptoTrades_20210922.csv")
```

\
Expand Down Expand Up @@ -93,15 +93,26 @@ The following scripts will demonstrate much the same with two examples:

```python
# the table decoration
from deephaven.DBTimeUtils import formatDate
from deephaven.DateTimeUtils import formatDate

add_column_streaming = trades_stream_view.updateView("Date = formatDate(KafkaTimestamp, TZ_NY)")
add_column_batch = trades_batch_view .updateView("Date = formatDate(Timestamp, TZ_NY)")

# the table aggregation
from deephaven import ComboAggregateFactory as caf
agg_streaming = add_column_streaming.by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument")
agg_batch = add_column_batch .by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument")
from deephaven import Aggregation as agg, as_list

agg_list = as_list([
agg.AggFirst("Price"),
agg.AggAvg("Avg_Price = Price"),
])

agg_streaming = add_column_streaming.aggBy(
agg_list, "Date", "Exchange", "Instrument"
)

agg_batch = add_column_batch.aggBy(
agg_list, "Date", "Exchange", "Instrument"
)
```

\
Expand Down
20 changes: 12 additions & 8 deletions demo/web/src/main/notebooks/03 Kafka Stream vs Append.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Start by importing some requisite packages. There is documentation on [installin
[ComboAggregateFactory](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/AggCombo/), [emptyTable](https://deephaven.io/core/docs/how-to-guides/empty-table/#related-documentation), and [merge](https://deephaven.io/core/docs/how-to-guides/merge-tables/#merge-tables).

```python
from deephaven import KafkaTools as kt, ComboAggregateFactory as caf
from deephaven import ConsumeKafka as ck, Aggregation as agg, combo_agg
from deephaven.TableTools import emptyTable, merge
```

Expand All @@ -32,12 +32,12 @@ This demo will demonstrate the impact of choices related to `offsets` and `table

```python
def get_trades(*, offsets, table_type):
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=offsets,
table_type=table_type)
```
Expand All @@ -50,7 +50,7 @@ In this demo, imagine you want to start your Kafka feed "1 million events ago" (
Create a Deephaven table that listens to current records (-- i.e. crypto trades happening now).

```python
latest_offset = get_trades(offsets=kt.ALL_PARTITIONS_SEEK_TO_END, table_type='stream')
latest_offset = get_trades(offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='stream')
```

\
Expand Down Expand Up @@ -100,9 +100,13 @@ Define a [table aggregation function](https://deephaven.io/core/docs/reference/t

```python
def trades_agg(table):
return table.by(caf.AggCombo(caf.AggCount("Trade_Count"),caf.AggSum("Total_Size = Size")),"Exchange", "Instrument")\
.sort("Exchange", "Instrument")\
.formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY")
agg_list = combo_agg([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
])
return table.aggBy(agg_list, "Exchange", "Instrument").\
sort("Exchange", "Instrument").\
formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY")
```

\
Expand Down