-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnect.py
215 lines (187 loc) · 8.85 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from functools import singledispatch
import pathlib
from typing import Iterable, List, Union, Tuple, Dict
from sqlalchemy import Table, MetaData
from mara_db import dbs, formats
import mara_db.sqlalchemy_engine
import mara_storage.storages
from mara_pipelines.pipelines import Command
from mara_pipelines.commands.sql import ExecuteSQL
from ._pyarrow import pyarrow_filesystem
from .catalog import StorageCatalog
from .ddl import create_external_table
from .formats import DeltaFormat
from .table import detect_table_schema
@singledispatch
def format_to_ddl_format(db: Union[str, dbs.DB], table_format: formats.Format) -> Tuple[str, Dict[str, str]]:
"""
Converts a format into a ddl specific format_name and format_options.
Args:
db: the database as alias or class for which the format_name and options dict
shall be created
table_format: the mara format with the custom settings
Returns:
A tuble which holds the type name for the specific database and a dictionary with
db specific options which are used when accessing the table.
"""
raise NotImplementedError(
f'Please implement mara_format_to_tuple for types "{db.__class__.__name__}" and "{table_format.__class__.__name__}"'
)
@format_to_ddl_format.register(str)
def __(db_alias: str, table_format: formats.Format) -> Tuple[str, Dict[str, str]]:
return format_to_ddl_format(dbs.db(db_alias), table_format=table_format)
@format_to_ddl_format.register(dbs.SQLServerDB)
def __(db: dbs.SQLServerDB, table_format: formats.Format) -> Tuple[str, Dict[str, str]]:
"""
SQL Server in its dialects (Polybase, Azure Synapse etc.) uses a extra
function CREATE EXTERNAL FILE FORMAT to define its file format. We assume
here that these file formats have already been created in their most meaningful
way.
"""
if isinstance(table_format, formats.ParquetFormat):
return ('PARQUET', {})
elif isinstance(table_format, formats.OrcFormat):
return ('ORC', {})
#elif isinstance(format, formats.Json): # not supported
elif isinstance(table_format, DeltaFormat):
return ('DELTA', {})
elif isinstance(table_format, formats.CsvFormat):
if table_format.header:
raise ValueError('A CSV format with header is not supported for SQL Server')
if table_format.delimiter_char == ',':
return ('CSV', {})
elif table_format.delimiter_char == '\t':
return ('TSV', {})
else:
raise ValueError('Only CSV with delimiter_char , or \\t is supported for SQL Server')
else:
raise NotImplementedError(f'The format {table_format} is not supported for SQLServerDB')
@format_to_ddl_format.register(dbs.DatabricksDB)
def __(db: dbs.DatabricksDB, table_format: formats.Format) -> Tuple[str, Dict[str, str]]:
if isinstance(table_format, formats.ParquetFormat):
return ('PARQUET', {})
elif isinstance(table_format, formats.OrcFormat):
return ('ORC', {})
elif isinstance(table_format, formats.JsonlFormat):
return ('JSON', {})
elif isinstance(table_format, DeltaFormat):
return ('DELTA', {})
elif isinstance(table_format, formats.CsvFormat):
options = {}
if table_format.header is not None:
options['header'] = 'true' if table_format.header else 'false'
if table_format.delimiter_char is not None:
options['sep'] = f"'{table_format.delimiter_char}'"
if table_format.quote_char is not None:
if table_format.quote_char == '\'':
options['quote'] = "'\''"
elif table_format.quote_char == '':
options['quote'] = 'null'
else:
options['quote'] = f"'{table_format.quote_char}'"
if table_format.null_value_string:
options['nullValue'] = f"'{table_format.null_value_string}'"
return ('CSV', options)
else:
raise NotImplementedError(f'The format {table_format} is not supported for DatabricksDB')
@format_to_ddl_format.register(dbs.SnowflakeDB)
def __(db: dbs.SnowflakeDB, table_format: formats.Format) -> Tuple[str, Dict[str, str]]:
if isinstance(table_format, formats.CsvFormat):
options = {
'TYPE': 'CSV'
}
if table_format.header:
options['SKIP_HEADER'] = '1'
if table_format.delimiter_char is not None:
options['FIELD_DELIMITER'] = f"'{table_format.delimiter_char}'"
if table_format.quote_char:
if table_format.quote_char == '\'':
options['quote'] = "0x27"
else:
options['quote'] = f"'{table_format.quote_char}'"
if table_format.null_value_string:
options['NULL_IF'] = f"('{table_format.null_value_string}')"
options['CSV']
return ('CSV', {'FILE_FORMAT': '( '
+ ', '.join([f'{k} = {v}' for k, v in options.items()])
+ ' )'})
elif isinstance(table_format, formats.JsonlFormat):
return ('JSON', {})
elif isinstance(table_format, formats.AvroFormat):
return ('AVRO', {})
elif isinstance(table_format, formats.OrcFormat):
return ('ORC', {})
elif isinstance(table_format, formats.ParquetFormat):
return ('PARQUET', {})
else:
raise NotImplementedError(f'The format {table_format} is not supported for SnowflakeDB')
def connect_catalog_mara_commands(
catalog: Union[str, StorageCatalog],
db_alias: str,
or_replace: bool = False
) -> Iterable[Command]:
"""
Returns a list of commands which connects a table list as external storage.
Args:
db_alias: The database alias where the tables shall be connected to.
storage_alias: The storage alias to be used.
or_replace: If True, the commands will call CREATE OR REPLACE EXTERNAL TABLE instead of just creating the table (`CREATE EXTERNAL TABLE`).
"""
if isinstance(catalog, str):
print(f'catalog by alias: {catalog}')
from . import config
catalog = config.catalogs()[catalog]
storage = mara_storage.storages.storage(catalog.storage_alias)
engine = mara_db.sqlalchemy_engine.engine(db_alias)
fs = pyarrow_filesystem(storage)
path_prefix = ''
if isinstance(storage, mara_storage.storages.AzureStorage):
path_prefix = storage.container_name
else:
raise NotImplementedError(f'Not supported storage type: {storage.__class__}')
for table in catalog.tables:
table_name = table['name']
schema_name = table.get('schema', catalog.default_schema)
table_format = table.get('format', None)
location = table.get('location', None)
if location is None:
raise Exception(f'Location of table `{schema_name}`.`{table_name}` not defined')
location = str(catalog.base_path / location)
format_name, format_options = format_to_ddl_format(db_alias, table_format=table_format)
print(f'path: {format_name}.`{location}`')
sqlalchemy_table = None
metadata_file = str(pathlib.Path(path_prefix) / location / '_sqlalchemy_metadata.py')
if fs.isfile(metadata_file):
_globals = {}
_locals = {}
exec(fs.open(metadata_file).read(), _globals, _locals)
if 'metadata' not in _locals or not _locals['metadata']:
print('skip _sqlalchemy_metadata.py: Could not find variable metadata')
else:
for table in _locals['metadata'].tables:
print(f'take table {table} from _sqlalchemy_metadata')
sqlalchemy_table = _locals['metadata'].tables[table]
sqlalchemy_table.name = table_name
sqlalchemy_table.schema = schema_name
break
if sqlalchemy_table is None:
if engine.dialect.name == 'databricks' and (\
isinstance(table_format, DeltaFormat) or \
isinstance(table_format, formats.ParquetFormat)):
# dialect databricks do not require a sqlalchemy table with columns,
# so we create just an raw table here.
sqlalchemy_table = Table(
table_name,
MetaData(),
schema=schema_name)
else:
print('read metadata from storage ...')
sqlalchemy_table = detect_table_schema(table_format,
path=location,
storage_alias=catalog.storage_alias,
table_name=table_name,
schema_name=schema_name)
sql_statement = create_external_table(bind=engine,
table=sqlalchemy_table, storage_name=catalog.storage_alias, path=location,
format_name=format_name, or_replace=or_replace, options=format_options)
yield ExecuteSQL(sql_statement, db_alias=db_alias)