-
Notifications
You must be signed in to change notification settings - Fork 0
/
simple_retrieve.py
41 lines (31 loc) · 1.02 KB
/
simple_retrieve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Copyright (c) 2020, eQualit.ie inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
from datetime import datetime
from pyspark.sql import functions as F
from es_retriever.config import Config
from es_retriever.es.storage import EsStorage
if __name__ == '__main__':
config = Config(
'HOST', 'USERNAME', 'PASSWORD', 'INDEX', 'TYPE'
)
since = datetime(2018, 4, 11, 10, 00, 00)
until = datetime(2018, 4, 11, 10, 10, 00)
# create an EsStorage instance
storage = EsStorage(
config
)
# get only one hour for this day
filter_condition = (F.col('@timestamp') >= since) & \
(F.col('@timestamp') < until)
# get the data for the period of time
df = storage.get(
since,
until,
filter_condition=filter_condition
).persist()
df.printSchema()
# write to a file
df.write.mode('overwrite').json(storage.base_resource)