@@ -85,9 +85,16 @@ def _do_create_temp_stage(
85
85
overwrite : bool ,
86
86
use_scoped_temp_object : bool ,
87
87
) -> None :
88
- create_stage_sql = f"CREATE { get_temp_type_for_object (use_scoped_temp_object )} STAGE /* Python:snowflake.connector.pandas_tools.write_pandas() */ { stage_location } FILE_FORMAT=(TYPE=PARQUET COMPRESSION={ compression } { ' BINARY_AS_TEXT=FALSE' if auto_create_table or overwrite else '' } )"
89
- logger .debug (f"creating stage with '{ create_stage_sql } '" )
90
- cursor .execute (create_stage_sql , _is_internal = True ).fetchall ()
88
+ create_stage_sql = f"CREATE { get_temp_type_for_object (use_scoped_temp_object )} STAGE /* Python:snowflake.connector.pandas_tools.write_pandas() */ identifier(?) FILE_FORMAT=(TYPE=PARQUET COMPRESSION={ compression } { ' BINARY_AS_TEXT=FALSE' if auto_create_table or overwrite else '' } )"
89
+ params = (stage_location ,)
90
+ logger .debug (f"creating stage with '{ create_stage_sql } '. params: %s" , params )
91
+ cursor .execute (
92
+ create_stage_sql ,
93
+ _is_internal = True ,
94
+ _force_qmark_paramstyle = True ,
95
+ params = params ,
96
+ num_statements = 1 ,
97
+ )
91
98
92
99
93
100
def _create_temp_stage (
@@ -147,12 +154,19 @@ def _do_create_temp_file_format(
147
154
use_scoped_temp_object : bool ,
148
155
) -> None :
149
156
file_format_sql = (
150
- f"CREATE { get_temp_type_for_object (use_scoped_temp_object )} FILE FORMAT { file_format_location } "
157
+ f"CREATE { get_temp_type_for_object (use_scoped_temp_object )} FILE FORMAT identifier(?) "
151
158
f"/* Python:snowflake.connector.pandas_tools.write_pandas() */ "
152
159
f"TYPE=PARQUET COMPRESSION={ compression } { sql_use_logical_type } "
153
160
)
154
- logger .debug (f"creating file format with '{ file_format_sql } '" )
155
- cursor .execute (file_format_sql , _is_internal = True )
161
+ params = (file_format_location ,)
162
+ logger .debug (f"creating file format with '{ file_format_sql } '. params: %s" , params )
163
+ cursor .execute (
164
+ file_format_sql ,
165
+ _is_internal = True ,
166
+ _force_qmark_paramstyle = True ,
167
+ params = params ,
168
+ num_statements = 1 ,
169
+ )
156
170
157
171
158
172
def _create_temp_file_format (
@@ -379,14 +393,20 @@ def write_pandas(
379
393
# Upload parquet file
380
394
upload_sql = (
381
395
"PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
382
- "'file://{path}' @{stage_location} PARALLEL={parallel}"
396
+ "'file://{path}' ? PARALLEL={parallel}"
383
397
).format (
384
398
path = chunk_path .replace ("\\ " , "\\ \\ " ).replace ("'" , "\\ '" ),
385
- stage_location = stage_location ,
386
399
parallel = parallel ,
387
400
)
388
- logger .debug (f"uploading files with '{ upload_sql } '" )
389
- cursor .execute (upload_sql , _is_internal = True )
401
+ params = ("@" + stage_location ,)
402
+ logger .debug (f"uploading files with '{ upload_sql } ', params: %s" , params )
403
+ cursor .execute (
404
+ upload_sql ,
405
+ _is_internal = True ,
406
+ _force_qmark_paramstyle = True ,
407
+ params = params ,
408
+ num_statements = 1 ,
409
+ )
390
410
# Remove chunk file
391
411
os .remove (chunk_path )
392
412
@@ -403,9 +423,16 @@ def write_pandas(
403
423
columns = quote + f"{ quote } ,{ quote } " .join (snowflake_column_names ) + quote
404
424
405
425
def drop_object (name : str , object_type : str ) -> None :
406
- drop_sql = f"DROP { object_type .upper ()} IF EXISTS { name } /* Python:snowflake.connector.pandas_tools.write_pandas() */"
407
- logger .debug (f"dropping { object_type } with '{ drop_sql } '" )
408
- cursor .execute (drop_sql , _is_internal = True )
426
+ drop_sql = f"DROP { object_type .upper ()} IF EXISTS identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */"
427
+ params = (name ,)
428
+ logger .debug (f"dropping { object_type } with '{ drop_sql } '. params: %s" , params )
429
+ cursor .execute (
430
+ drop_sql ,
431
+ _is_internal = True ,
432
+ _force_qmark_paramstyle = True ,
433
+ params = params ,
434
+ num_statements = 1 ,
435
+ )
409
436
410
437
if auto_create_table or overwrite :
411
438
file_format_location = _create_temp_file_format (
@@ -417,10 +444,17 @@ def drop_object(name: str, object_type: str) -> None:
417
444
sql_use_logical_type ,
418
445
_use_scoped_temp_object ,
419
446
)
420
- infer_schema_sql = f"SELECT COLUMN_NAME, TYPE FROM table(infer_schema(location=>'@{ stage_location } ', file_format=>'{ file_format_location } '))"
421
- logger .debug (f"inferring schema with '{ infer_schema_sql } '" )
447
+ infer_schema_sql = "SELECT COLUMN_NAME, TYPE FROM table(infer_schema(location=>?, file_format=>?))"
448
+ params = (f"@{ stage_location } " , file_format_location )
449
+ logger .debug (f"inferring schema with '{ infer_schema_sql } '. params: %s" , params )
422
450
column_type_mapping = dict (
423
- cursor .execute (infer_schema_sql , _is_internal = True ).fetchall ()
451
+ cursor .execute (
452
+ infer_schema_sql ,
453
+ _is_internal = True ,
454
+ _force_qmark_paramstyle = True ,
455
+ params = params ,
456
+ num_statements = 1 ,
457
+ ).fetchall ()
424
458
)
425
459
# Infer schema can return the columns out of order depending on the chunking we do when uploading
426
460
# so we have to iterate through the dataframe columns to make sure we create the table with its
@@ -440,12 +474,21 @@ def drop_object(name: str, object_type: str) -> None:
440
474
)
441
475
442
476
create_table_sql = (
443
- f"CREATE { table_type .upper ()} TABLE IF NOT EXISTS { target_table_location } "
477
+ f"CREATE { table_type .upper ()} TABLE IF NOT EXISTS identifier(?) "
444
478
f"({ create_table_columns } )"
445
479
f" /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
446
480
)
447
- logger .debug (f"auto creating table with '{ create_table_sql } '" )
448
- cursor .execute (create_table_sql , _is_internal = True )
481
+ params = (target_table_location ,)
482
+ logger .debug (
483
+ f"auto creating table with '{ create_table_sql } '. params: %s" , params
484
+ )
485
+ cursor .execute (
486
+ create_table_sql ,
487
+ _is_internal = True ,
488
+ _force_qmark_paramstyle = True ,
489
+ params = params ,
490
+ num_statements = 1 ,
491
+ )
449
492
# need explicit casting when the underlying table schema is inferred
450
493
parquet_columns = "$1:" + ",$1:" .join (
451
494
f"{ quote } { snowflake_col } { quote } ::{ column_type_mapping [col ]} "
@@ -464,12 +507,19 @@ def drop_object(name: str, object_type: str) -> None:
464
507
465
508
try :
466
509
if overwrite and (not auto_create_table ):
467
- truncate_sql = f"TRUNCATE TABLE { target_table_location } /* Python:snowflake.connector.pandas_tools.write_pandas() */"
468
- logger .debug (f"truncating table with '{ truncate_sql } '" )
469
- cursor .execute (truncate_sql , _is_internal = True )
510
+ truncate_sql = "TRUNCATE TABLE identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */"
511
+ params = (target_table_location ,)
512
+ logger .debug (f"truncating table with '{ truncate_sql } '. params: %s" , params )
513
+ cursor .execute (
514
+ truncate_sql ,
515
+ _is_internal = True ,
516
+ _force_qmark_paramstyle = True ,
517
+ params = params ,
518
+ num_statements = 1 ,
519
+ )
470
520
471
521
copy_into_sql = (
472
- f"COPY INTO { target_table_location } /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
522
+ f"COPY INTO identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */ "
473
523
f"({ columns } ) "
474
524
f"FROM (SELECT { parquet_columns } FROM @{ stage_location } ) "
475
525
f"FILE_FORMAT=("
@@ -478,10 +528,17 @@ def drop_object(name: str, object_type: str) -> None:
478
528
f"{ ' BINARY_AS_TEXT=FALSE' if auto_create_table or overwrite else '' } "
479
529
f"{ sql_use_logical_type } "
480
530
f") "
481
- f"PURGE=TRUE ON_ERROR={ on_error } "
531
+ f"PURGE=TRUE ON_ERROR=? "
482
532
)
483
- logger .debug (f"copying into with '{ copy_into_sql } '" )
484
- copy_results = cursor .execute (copy_into_sql , _is_internal = True ).fetchall ()
533
+ params = (target_table_location , on_error )
534
+ logger .debug (f"copying into with '{ copy_into_sql } '. params: %s" , params )
535
+ copy_results = cursor .execute (
536
+ copy_into_sql ,
537
+ _is_internal = True ,
538
+ _force_qmark_paramstyle = True ,
539
+ params = params ,
540
+ num_statements = 1 ,
541
+ ).fetchall ()
485
542
486
543
if overwrite and auto_create_table :
487
544
original_table_location = build_location_helper (
@@ -491,9 +548,16 @@ def drop_object(name: str, object_type: str) -> None:
491
548
quote_identifiers = quote_identifiers ,
492
549
)
493
550
drop_object (original_table_location , "table" )
494
- rename_table_sql = f"ALTER TABLE { target_table_location } RENAME TO { original_table_location } /* Python:snowflake.connector.pandas_tools.write_pandas() */"
495
- logger .debug (f"rename table with '{ rename_table_sql } '" )
496
- cursor .execute (rename_table_sql , _is_internal = True )
551
+ rename_table_sql = "ALTER TABLE identifier(?) RENAME TO identifier(?) /* Python:snowflake.connector.pandas_tools.write_pandas() */"
552
+ params = (target_table_location , original_table_location )
553
+ logger .debug (f"rename table with '{ rename_table_sql } '. params: %s" , params )
554
+ cursor .execute (
555
+ rename_table_sql ,
556
+ _is_internal = True ,
557
+ _force_qmark_paramstyle = True ,
558
+ params = params ,
559
+ num_statements = 1 ,
560
+ )
497
561
except ProgrammingError :
498
562
if overwrite and auto_create_table :
499
563
# drop table only if we created a new one with a random name
0 commit comments