Skip to content

Commit

Permalink
Added support for lakehouses with a schema (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bertverbeek4PS authored Dec 18, 2024
2 parents 8dab466 + 27b3754 commit bc80472
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions fabric/CopyBusinessCentral.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"spark.conf.set(\"spark.sql.parquet.mergeSchema\", \"false\")\n",
"spark.conf.set(\"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version\", \"2\")\n",
"spark.conf.set(\"spark.sql.delta.commitProtocol.enabled\", \"true\")\n",
"spark.conf.set(\"spark.sql.analyzer.maxIterations\", \"999\")\n",
"\n",
"# file paths\n",
"folder_path_spark = 'Files/deltas/' # this is mostly the default\n",
Expand All @@ -71,7 +72,8 @@
"Drop_table_if_mismatch = False; #option to drop the table if json file has different columns then in the table\n",
"no_Partition = 258 #how many partition is used in the dataframe, a good starting point might be 2-4 partitions per CPU core in your Spark cluster\n",
"DecimalFormat = 'float' #how to format the decimal numbers, can be 'float' or 'decimal(10,3)'. If you change this it will be a breaking change for the table\n",
"DateTimeFormat = 'date' #how to format the datetime, can be 'timestamp' or 'date'. If you change this it will be a breaking change for the table"
"DateTimeFormat = 'date' #how to format the datetime, can be 'timestamp' or 'date'. If you change this it will be a breaking change for the table\n",
"schema_name = \"\" #for if you are using a lakehouse based on a schema"
]
},
{
Expand Down Expand Up @@ -179,6 +181,9 @@
"from pyspark.sql.functions import desc\n",
"file_list = []\n",
"\n",
"def is_string_empty(s):\n",
" return not bool(s.strip())\n",
"\n",
"for entry in os.scandir(folder_path):\n",
" if entry.is_dir():\n",
" # Get the list of all file paths in the directory \n",
Expand Down Expand Up @@ -256,10 +261,17 @@
" df_new = df_new.dropDuplicates(['systemId-2000000000'])\n",
" \n",
" #overwrite the dataframe in the new table\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name) \n",
" if is_string_empty(schema_name):\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").saveAsTable(f\"{table_name}\")\n",
" else:\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").saveAsTable(f\"{schema_name}.{table_name}\")\n",
" else: \n",
" #table isn't there so just insert it\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name)\n",
" if is_string_empty(schema_name):\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").saveAsTable(f\"{table_name}\")\n",
" else:\n",
" df_new.write.mode(\"overwrite\").format(\"delta\").saveAsTable(f\"{schema_name}.{table_name}\")\n",
" \n",
"\n",
" #delete the files\n",
" if Remove_delta:\n",
Expand Down

0 comments on commit bc80472

Please sign in to comment.