How to Use Databricks and PySpark to Write Delta Table Data to Snowflake: A Step-by-Step Guide

Dhruv Singhal
3 min readMay 6, 2023

--

To create and write data from a Delta table to Snowflake using Databricks and PySpark, you can follow these steps:

Step 1: Set up Databricks and Snowflake

  • Make sure that you have a Databricks account and the necessary credentials to access it.
  • Set up a Snowflake account and the necessary credentials to connect to it.
  • Connect your Databricks workspace to Snowflake using the Snowflake connector for Databricks.

Step 2: Load Delta table data into PySpark DataFrame

  • Load the data from your Delta table into a PySpark DataFrame using the DeltaTable API.
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "<path_to_delta_table>")
df = delta_table.toDF()

Step 3: Write data to Snowflake

  • Write the data from your PySpark DataFrame to Snowflake using the write method of the DataFrame API.
snowflake_options = {
"sfURL": "<snowflake_url>",
"sfUser": "<snowflake_user>",
"sfPassword": "<snowflake_password>",
"sfDatabase": "<snowflake_database>",
"sfSchema": "<snowflake_schema>",
"sfWarehouse": "<snowflake_warehouse>",
"sfRole": "<snowflake_role>"
}

df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "<snowflake_table>") \
.mode("overwrite") \
.save()

Note that in order to use the code below, you will need to replace <source_format>, <source_option>, <option_value>, <source_path>, <path_to_delta_table>, and <primary_key_column> with your actual source and Delta table details.

To insert new records into a Delta table using PySpark and Databricks, follow these steps:

Step 1: Load data from the source

  • Load the data from the source into a PySpark DataFrame. This can involve reading data from a file, database, or any other source supported by PySpark.
# Load data from source into a PySpark DataFrame
source_df = spark.read.format("<source_format>").option("<source_option>", "<option_value>").load("<source_path>")

Step 2: Load existing data from the Delta table

  • Load the existing data from the Delta table into a PySpark dataframe using the DeltaTable API.
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "<path_to_delta_table>")
existing_data_df = delta_table.toDF()

Step 3: Identify new records

  • Identify the new records that need to be inserted into the Delta table by comparing the source data with the existing data in the Delta table. You can use DataFrame operations, such as exceptAll or anti to find the records that exist in the source data but not in the Delta table.
# Find new records to insert
new_records_df = source_df.exceptAll(existing_data_df)

Step 4: Insert new records into the Delta table

  • Insert the new records from the source data into the Delta table using the merge method of the DeltaTable API with the mode set to "append".
delta_table.alias("existing_data") \
.merge(new_records_df.alias("new_records"), "existing_data.<primary_key_column> = new_records.<primary_key_column>") \
.whenNotMatchedInsertAll() \
.execute()

Note that <path_to_delta_table> should be replaced with the actual path to your Delta table, and <primary_key_column> should be replaced with the name of the primary key column in your Delta table that is used to merge the new records. You may also need to specify other options, such as the sourceAlias and targetAlias, depending on your specific use case.

This approach assumes that you want to perform an “upsert” operation, where new records are inserted into the Delta table if they do not already exist, based on a primary key column. If you want to perform a different type of operation, such as “delete” or “update”, you can modify the merge operation accordingly. Refer to the Delta Lake documentation for more information on the various merge operation options.

Any suggestions and comments would be greatly appreciated. If you found this article helpful, please like and share it with others. Don’t forget to follow me to stay up-to-date on my latest articles. Thank you for taking the time to read this!

--

--

Dhruv Singhal
Dhruv Singhal

Written by Dhruv Singhal

Data engineer with expertise in PySpark, SQL, Flask. Skilled in Databricks, Snowflake, and Datafactory. Published articles. Passionate about tech and games.

No responses yet