-
Notifications
You must be signed in to change notification settings - Fork 6
/
helpers.py
66 lines (61 loc) · 2.17 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import duckdb
from icedb import IceDBv3, CompressionCodec
from icedb.log import S3Client
def get_local_ddb():
ddb = duckdb.connect(":memory:")
ddb.execute("install httpfs")
ddb.execute("load httpfs")
ddb.execute("SET s3_region='us-east-1'")
ddb.execute("SET s3_access_key_id='user'")
ddb.execute("SET s3_secret_access_key='password'")
ddb.execute("SET s3_endpoint='localhost:9000'")
ddb.execute("SET s3_use_ssl='false'")
ddb.execute("SET s3_url_style='path'")
return ddb
def get_local_s3_client(prefix="example"):
return S3Client(s3prefix=prefix, s3bucket="testbucket", s3region="us-east-1",
s3endpoint="http://localhost:9000",
s3accesskey="user", s3secretkey="password")
def delete_all_s3(s3c: S3Client):
"""
Deletes all files in the local s3 bucket!
"""
s3_files: list[dict] = []
no_more_files = False
continuation_token = ""
while not no_more_files:
res = s3c.s3.list_objects_v2(
Bucket=s3c.s3bucket,
MaxKeys=1000,
Prefix='/'.join([s3c.s3prefix, '_log']),
ContinuationToken=continuation_token
) if continuation_token != "" else s3c.s3.list_objects_v2(
Bucket=s3c.s3bucket,
MaxKeys=1000,
Prefix='/'.join([s3c.s3prefix, '_log'])
)
if 'Contents' not in res:
return
s3_files += res['Contents']
no_more_files = not res['IsTruncated']
if not no_more_files:
continuation_token = res['NextContinuationToken']
for file in s3_files:
s3c.s3.delete_object(
Bucket=s3c.s3bucket,
Key=file['Key']
)
print(f"deleted {len(s3_files)} files")
def get_ice(s3_client, part_func):
return IceDBv3(
part_func,
['event', 'ts'], # We are doing to sort by event, then timestamp of the event within the data part
"us-east-1", # This is all local minio stuff
"user",
"password",
"http://localhost:9000",
s3_client,
"dan-mbp",
s3_use_path=True, # needed for local minio
compression_codec=CompressionCodec.ZSTD # Let's force a higher compression level, default is SNAPPY
)