-
Notifications
You must be signed in to change notification settings - Fork 5
/
inc_import.py
54 lines (36 loc) · 1.69 KB
/
inc_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from car_framework.base_import import BaseImport
from car_framework.context import context
from car_framework.util import IncrementalImportNotPossible, RecoverableFailure
from car_framework.car_service import CarDbStatus
class BaseIncrementalImport(BaseImport):
def __init__(self):
super().__init__()
def get_new_model_state_id(self):
raise NotImplementedError()
def get_data_for_delta(self, last_model_state_id, new_model_state_id):
raise NotImplementedError()
def import_vertices(self):
raise NotImplementedError()
def import_edges(self):
raise NotImplementedError()
def delete_vertices(self):
raise NotImplementedError()
def run(self):
db_status = context().car_service.get_db_status()
if db_status == CarDbStatus.FAILURE:
raise RecoverableFailure('Database is not ready.')
if db_status == CarDbStatus.NEWLY_CREATED:
raise IncrementalImportNotPossible('Newly created CAR database is detected.')
last_model_state_id = self.get_last_model_state_id()
if not last_model_state_id:
raise IncrementalImportNotPossible('"Last known model state" is not available.')
new_model_state_id = self.get_new_model_state_id()
if not new_model_state_id:
raise IncrementalImportNotPossible('Current model state is not available.')
self.get_data_for_delta(last_model_state_id, new_model_state_id)
self.import_vertices()
self.wait_for_completion_of_import_jobs()
self.import_edges()
self.wait_for_completion_of_import_jobs()
self.delete_vertices()
self.save_new_model_state_id(new_model_state_id)