-
Notifications
You must be signed in to change notification settings - Fork 0
/
dataflow.py
128 lines (106 loc) · 3.85 KB
/
dataflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import json
from deltalake import DeltaTable, Schema
from datetime import datetime
import polars as pl
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import asyncio
from poc_python_deltars_upsert.sales_data import sales_data
def dummy_dataflow() -> None:
df = pl.DataFrame(
{
"sales_order_id": ["1000", "1001", "1002", "1003"],
"product": ["bike", "scooter", "car", "motorcycle"],
"order_date": [
datetime(2023, 1, 1),
datetime(2023, 1, 5),
datetime(2023, 1, 10),
datetime(2023, 2, 1),
],
"sales_price": [120.25, 2400, 32000, 9000],
"paid_by_customer": [True, False, False, True],
}
)
print(df)
df.write_delta("data/sales_orders", mode="append")
new_data = pl.DataFrame(
{
"sales_order_id": ["1002", "1004"],
"product": ["car", "car"],
"order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
"sales_price": [30000.0, 40000.0],
"paid_by_customer": [True, True],
}
)
dt = DeltaTable("data/sales_orders")
source = new_data.to_arrow()
delta_schema = Schema.from_pyarrow(source.schema).to_pyarrow()
source = source.cast(delta_schema)
(
dt.merge(
source=source,
predicate="s.sales_order_id = t.sales_order_id",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
print(pl.read_delta("data/sales_orders"))
def ingest_json_kafka_dataflow(kafka_server: str, topic: str) -> None:
async def send():
producer = AIOKafkaProducer(bootstrap_servers=kafka_server)
await producer.start()
try:
for sale_data in sales_data:
await producer.send_and_wait(topic, json.dumps(sale_data, default=serialize_datetime).encode("utf-8"))
finally:
await producer.stop()
asyncio.run(send())
def read_kafka_sink_delta(kafka_server: str, topic: str, consumer_group: str):
dt = DeltaTable("data/sales_orders")
async def consume():
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=kafka_server,
group_id=consumer_group,
auto_offset_reset='earliest',
)
await consumer.start()
try:
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
data = [json.loads(msg.value.decode("utf-8"), object_hook=datetime_parser)]
new_data = pl.from_dicts(data)
print(new_data)
source = new_data.to_arrow()
delta_schema = Schema.from_pyarrow(source.schema).to_pyarrow()
source = source.cast(delta_schema)
(
dt.merge(
source=source,
predicate="s.sales_order_id = t.sales_order_id",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
finally:
await consumer.stop()
asyncio.run(consume())
def serialize_datetime(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError("Type not serializable")
def datetime_parser(json_dict):
for (key, value) in json_dict.items():
try:
if isinstance(value, str) :
json_dict[key] = datetime.fromisoformat(value)
except:
pass
return json_dict
datetime.fromisoformat