2
2
from datahub .emitter .mcp import MetadataChangeProposalWrapper
3
3
from datahub .metadata .schema_classes import DatasetPropertiesClass , SchemaMetadataClass
4
4
from datahub .emitter .rest_emitter import DatahubRestEmitter
5
+ from datahub .ingestion .graph .client import DatahubClientConfig , DataHubGraph
6
+ from datahub .metadata .schema_classes import UpstreamLineageClass
7
+ from collections import defaultdict
5
8
import requests
6
9
7
10
@@ -15,6 +18,7 @@ def __init__(self, gms_server="http://localhost:8080", extra_headers={}):
15
18
gms_server = gms_server , extra_headers = extra_headers
16
19
)
17
20
self .datahub_graph = self .emitter .to_graph ()
21
+ self .gms_server = gms_server
18
22
19
23
def _is_valid_gms_server (self , gms_server ):
20
24
# GMS 서버 주소의 유효성을 검사하는 로직 추가
@@ -60,10 +64,231 @@ def get_column_names_and_descriptions(self, urn):
60
64
columns = []
61
65
if schema_metadata :
62
66
for field in schema_metadata .fields :
67
+
68
+ # nativeDataType이 없거나 빈 문자열인 경우 None 처리
69
+ native_type = getattr (field , "nativeDataType" , None )
70
+ column_type = (
71
+ native_type if native_type and native_type .strip () else None
72
+ )
73
+
63
74
columns .append (
64
75
{
65
76
"column_name" : field .fieldPath ,
66
77
"column_description" : field .description ,
78
+ "column_type" : column_type ,
67
79
}
68
80
)
69
81
return columns
82
+
83
+ def get_table_lineage (
84
+ self ,
85
+ urn ,
86
+ counts = 100 ,
87
+ direction = "DOWNSTREAM" ,
88
+ degree_values = None ,
89
+ ):
90
+ # URN에 대한 DOWNSTREAM/UPSTREAM lineage entity를 counts 만큼 가져오는 함수
91
+ # degree_values에 따라 lineage depth가 결정
92
+ """
93
+ Fetches downstream/upstream lineage entities for a given dataset URN using DataHub's GraphQL API.
94
+
95
+ Args:
96
+ urn (str): Dataset URN to fetch lineage for.
97
+ count (int): Maximum number of entities to fetch (default=100).
98
+ direction (str): DOWNSTREAM or UPSTREAM.
99
+ degree_values (List[str]): Degree filter values like ["1", "2", "3+"]. Defaults to ["1", "2"].
100
+
101
+ Returns:
102
+ List[str, dict]: A list containing the dataset URN and its lineage result.
103
+ """
104
+
105
+ if degree_values is None :
106
+ degree_values = ["1" , "2" ]
107
+
108
+ graph = DataHubGraph (DatahubClientConfig (server = self .gms_server ))
109
+
110
+ query = """
111
+ query scrollAcrossLineage($input: ScrollAcrossLineageInput!) {
112
+ scrollAcrossLineage(input: $input) {
113
+ searchResults {
114
+ degree
115
+ entity {
116
+ urn
117
+ type
118
+ }
119
+ }
120
+ }
121
+ }
122
+ """
123
+ variables = {
124
+ "input" : {
125
+ "query" : "*" ,
126
+ "urn" : urn ,
127
+ "count" : counts ,
128
+ "direction" : direction ,
129
+ "orFilters" : [
130
+ {
131
+ "and" : [
132
+ {
133
+ "condition" : "EQUAL" ,
134
+ "negated" : "false" ,
135
+ "field" : "degree" ,
136
+ "values" : degree_values ,
137
+ }
138
+ ]
139
+ }
140
+ ],
141
+ }
142
+ }
143
+
144
+ result = graph .execute_graphql (query = query , variables = variables )
145
+ return urn , result
146
+
147
+ def get_column_lineage (self , urn ):
148
+ # URN에 대한 UPSTREAM lineage의 column source를 가져오는 함수
149
+ """
150
+ Fetches fine-grained column-level lineage grouped by upstream datasets.
151
+
152
+ Args:
153
+ urn (str): Dataset URN to fetch lineage for.
154
+
155
+ Returns:
156
+ dict: {
157
+ 'downstream_dataset': str,
158
+ 'lineage_by_upstream_dataset': List[{
159
+ 'upstream_dataset': str,
160
+ 'columns': List[{'upstream_column': str, 'downstream_column': str}]
161
+ }]
162
+ }
163
+ """
164
+
165
+ # DataHub 연결 및 lineage 가져오기
166
+ graph = DataHubGraph (DatahubClientConfig (server = self .gms_server ))
167
+ result = graph .get_aspect (entity_urn = urn , aspect_type = UpstreamLineageClass )
168
+
169
+ # downstream dataset (URN 테이블명) 파싱
170
+ try :
171
+ down_dataset = urn .split ("," )[1 ]
172
+ table_name = down_dataset .split ("." )[1 ]
173
+
174
+ except IndexError :
175
+ # URN이 유효하지 않는 경우
176
+ print (f"[ERROR] Invalid URN format: { urn } " )
177
+ return {}
178
+
179
+ # upstream_dataset별로 column lineage
180
+ upstream_map = defaultdict (list )
181
+
182
+ if not result :
183
+ return {"downstream_dataset" : table_name , "lineage_by_upstream_dataset" : []}
184
+
185
+ for fg in result .fineGrainedLineages or []:
186
+ confidence_score = (
187
+ fg .confidenceScore if fg .confidenceScore is not None else 1.0
188
+ )
189
+ for down in fg .downstreams :
190
+ down_column = down .split ("," )[- 1 ].replace (")" , "" )
191
+ for up in fg .upstreams :
192
+ up_dataset = up .split ("," )[1 ]
193
+ up_dataset = up_dataset .split ("." )[1 ]
194
+ up_column = up .split ("," )[- 1 ].replace (")" , "" )
195
+
196
+ upstream_map [up_dataset ].append (
197
+ {
198
+ "upstream_column" : up_column ,
199
+ "downstream_column" : down_column ,
200
+ "confidence" : confidence_score ,
201
+ }
202
+ )
203
+
204
+ # 최종 결과 구조 생성
205
+ parsed_lineage = {
206
+ "downstream_dataset" : table_name ,
207
+ "lineage_by_upstream_dataset" : [],
208
+ }
209
+
210
+ for up_dataset , column_mappings in upstream_map .items ():
211
+ parsed_lineage ["lineage_by_upstream_dataset" ].append (
212
+ {"upstream_dataset" : up_dataset , "columns" : column_mappings }
213
+ )
214
+
215
+ return parsed_lineage
216
+
217
+ def min_degree_lineage (self , lineage_result ):
218
+ # lineage 중 최소 degree만 가져오는 함수
219
+ """
220
+ Returns the minimum degree from the lineage result (fetched by get_table_lineage().)
221
+
222
+ Args:
223
+ lineage_result : (List[str, dict]): Result from get_table_lineage().
224
+
225
+ Returns:
226
+ dict : {table_name : minimum_degree}
227
+ """
228
+
229
+ table_degrees = {}
230
+
231
+ urn , lineage_data = lineage_result
232
+
233
+ for item in lineage_data ["scrollAcrossLineage" ]["searchResults" ]:
234
+ table = item ["entity" ]["urn" ].split ("," )[1 ]
235
+ table_name = table .split ("." )[1 ]
236
+ degree = item ["degree" ]
237
+ table_degrees [table_name ] = min (
238
+ degree , table_degrees .get (table_name , float ("inf" ))
239
+ )
240
+
241
+ return table_degrees
242
+
243
+ def build_table_metadata (self , urn , max_degree = 2 , sort_by_degree = True ):
244
+ # 테이블 단위로 테이블 이름, 설명, 컬럼, 테이블 별 리니지(downstream/upstream), 컬럼 별 리니지(upstream)이 포함된 메타데이터 생성 함수
245
+ """
246
+ Builds table metadata including description, columns, and lineage info.
247
+
248
+ Args:
249
+ urn (str): Dataset URN
250
+ max_degree (int): Max lineage depth to include (filtering)
251
+ sort_by_degree (bool): Whether to sort downstream/upstream tables by degree
252
+
253
+ Returns:
254
+ dict: Table metadata
255
+ """
256
+ metadata = {
257
+ "table_name" : self .get_table_name (urn ),
258
+ "description" : self .get_table_description (urn ),
259
+ "columns" : self .get_column_names_and_descriptions (urn ),
260
+ "lineage" : {},
261
+ }
262
+
263
+ def process_lineage (direction ):
264
+ # direction : DOWNSTREAM/UPSTREAM 별로 degree가 최소인 lineage를 가져오는 함수
265
+
266
+ # 테이블 lineage 가져오기
267
+ lineage_result = self .get_table_lineage (urn , direction = direction )
268
+ table_degrees = self .min_degree_lineage (lineage_result )
269
+ current_table_name = metadata ["table_name" ]
270
+
271
+ # degree 필터링
272
+ filtered_lineage = [
273
+ {"table" : table , "degree" : degree }
274
+ for table , degree in table_degrees .items ()
275
+ if degree <= max_degree and table != current_table_name
276
+ ]
277
+
278
+ # degree 기준 정렬
279
+ if sort_by_degree :
280
+ filtered_lineage .sort (key = lambda x : x ["degree" ])
281
+
282
+ return filtered_lineage
283
+
284
+ # DOWNSTREAM / UPSTREAM 링크 추가
285
+ metadata ["lineage" ]["downstream" ] = process_lineage ("DOWNSTREAM" )
286
+ metadata ["lineage" ]["upstream" ] = process_lineage ("UPSTREAM" )
287
+
288
+ # 컬럼 단위 lineage 추가
289
+ column_lineage = self .get_column_lineage (urn )
290
+ metadata ["lineage" ]["upstream_columns" ] = column_lineage .get (
291
+ "lineage_by_upstream_dataset" , []
292
+ )
293
+
294
+ return metadata
0 commit comments