-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathhive_to_rdbms_operator.py
71 lines (59 loc) · 2.04 KB
/
hive_to_rdbms_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-
#
#
"""
hive出库到pg或mysql
"""
import os
import signal
from hooks.hive_to_rdbms_hook import Hive2RDBMSHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class Hive2RDBMSOperator(BaseOperator):
"""
hive出库
1. hive -e "select a,b,c from tbl where xxx limit 123"
2. datax text reader and mysql writer
3. delete text
https://github.com/alibaba/DataX
:param hive_query_sql: hive查询语句
:param rdbms_conn_id : 出库连接
:param rdbms_table: 出库table
:param rdbms_column : 出库列 ['id', 'name']
:param rdbms_presql : 出库前执行
"""
template_fields = ('hive_query_sql', 'rdbms_presql')
ui_color = '#d7f3a7'
@apply_defaults
def __init__(self,
hive_query_sql,
rdbms_conn_id,
rdbms_table,
rdbms_column,
rdbms_presql,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.hive_query_sql = hive_query_sql
self.rdbms_conn_id = rdbms_conn_id
self.rdbms_table = rdbms_table
self.rdbms_column = rdbms_column
self.rdbms_presql = rdbms_presql
def execute(self, context):
"""
Execute
"""
task_id = context['task_instance'].dag_id + "#" + context['task_instance'].task_id
self.hook = Hive2RDBMSHook(
task_id = task_id,
hive_query_sql = self.hive_query_sql,
rdbms_conn_id=self.rdbms_conn_id,
rdbms_table=self.rdbms_table,
rdbms_column=self.rdbms_column,
rdbms_presql=self.rdbms_presql
)
self.hook.execute(context=context)
def on_kill(self):
self.log.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)