-
Notifications
You must be signed in to change notification settings - Fork 208
/
file_system_elasticsearch_json_loader.py
68 lines (53 loc) · 1.91 KB
/
file_system_elasticsearch_json_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import os
from pyhocon import ConfigTree
from databuilder.loader.base_loader import Loader
from databuilder.models.elasticsearch_document import ElasticsearchDocument
class FSElasticsearchJSONLoader(Loader):
"""
Loader class to produce Elasticsearch bulk load file to Local FileSystem
"""
FILE_PATH_CONFIG_KEY = 'file_path'
FILE_MODE_CONFIG_KEY = 'mode'
def init(self, conf: ConfigTree) -> None:
"""
:param conf:
:return:
"""
self.conf = conf
self.file_path = self.conf.get_string(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY)
self.file_mode = self.conf.get_string(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY, 'w')
file_dir = self.file_path.rsplit('/', 1)[0]
self._ensure_directory_exists(file_dir)
self.file_handler = open(self.file_path, self.file_mode)
def _ensure_directory_exists(self, path: str) -> None:
"""
Check to ensure file directory exists; create the directories otherwise
:param path:
:return: None
"""
if os.path.exists(path):
return # nothing to do here
os.makedirs(path)
def load(self, record: ElasticsearchDocument) -> None:
"""
Write a record in json format to file
:param record:
:return:
"""
if not record:
return
if not isinstance(record, ElasticsearchDocument):
raise Exception("Record not of type 'ElasticsearchDocument'!")
self.file_handler.write(record.to_json())
self.file_handler.flush()
def close(self) -> None:
"""
close the file handler
:return:
"""
if self.file_handler:
self.file_handler.close()
def get_scope(self) -> str:
return 'loader.filesystem.elasticsearch'