This repository has been archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_index.py
90 lines (81 loc) · 2.6 KB
/
main_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
index a pre-downloaded corpus into elasticsearch.
"""
from typing import Dict
from elasticsearch_dsl import Document
from metaflow import FlowSpec, step, Parameter
from storyteller.connectors import connect_to_es
from storyteller.elastic.indexer import Indexer
from storyteller.elastic.docs import (
GK, SC, MR, BS, DS,
SFC, KESS, KJ, KCSS,
SFKE, KSNS, KC, KETS,
KEPT, News, KUNIV
)
class IndexFlow(FlowSpec):
index_name = Parameter("index_name", type=str,
default="mr_story")
batch_size = Parameter("batch_size", type=int,
default=1000)
name2doc: Dict[str, Document]
@step
def start(self):
"""
prints out the parameters
"""
print(self.__dict__)
self.next(self.update)
@step
def update(self):
"""
check if an index with given name already exists.
If it does exists, delete it so that we overwrite the index in the following steps
"""
with connect_to_es() as es:
if es.indices.exists(index=self.index_name):
r = es.indices.delete(index=self.index_name)
print(f"Deleted {self.index_name} - {r}")
self.next(self.validate)
@step
def validate(self):
"""
validate index_name
"""
self.name2doc = {
GK.Index.name: GK,
SC.Index.name: SC,
MR.Index.name: MR,
BS.Index.name: BS,
DS.Index.name: DS,
SFC.Index.name: SFC,
KESS.Index.name: KESS,
KJ.Index.name: KJ,
KCSS.Index.name: KCSS,
SFKE.Index.name: SFKE,
KSNS.Index.name: KSNS,
KC.Index.name: KC,
KETS.Index.name: KETS,
KEPT.Index.name: KEPT,
News.Index.name: News,
KUNIV.Index.name: KUNIV,
}
if self.index_name not in self.name2doc.keys():
raise ValueError(f"Invalid index: {self.index_name}")
self.next(self.end)
@step
def end(self):
"""
Index Stories.
"""
self.index_name: str
self.batch_size: int
stories = self.name2doc[self.index_name].stream_from_corpus()
with connect_to_es() as es:
self.name2doc[self.index_name].init(using=es)
# --- init an indexer with the Stories --- #
indexer = Indexer(es, stories, self.index_name, self.batch_size)
# --- index the corpus --- #
indexer()
print(f"indexing {self.index_name} finished")
if __name__ == '__main__':
IndexFlow()