Skip to content

Commit

Permalink
Copy improvements in the SQL Database verified source (#749)
Browse files Browse the repository at this point in the history
* Copy improvements in the SQL Database verified source
  • Loading branch information
anuunchin authored Nov 16, 2023
1 parent 63c18fe commit f72c331
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 66 deletions.
139 changes: 76 additions & 63 deletions docs/website/docs/dlt-ecosystem/verified-sources/sql_database.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SQL Database
# 30+ SQL Databases

:::info Need help deploying these sources, or figuring out how to run them in your data stack?

Expand All @@ -9,9 +9,11 @@ or [book a call](https://calendar.app.google/kiLhuMsWKpZUpfho6) with our support
SQL databases are management systems (DBMS) that store data in a structured format, commonly used
for efficient and reliable data retrieval.

This SQL database `dlt` verified source and
[pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py)
loads data using SqlAlchemy to the destination of your choice.
Our SQL Database verified source loads data to your specified destination using SQLAlchemy.

:::tip
View the pipeline example [here](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py).
:::

Sources and resources that can be loaded using this verified source are:

Expand All @@ -20,18 +22,41 @@ Sources and resources that can be loaded using this verified source are:
| sql_database | Retrieves data from an SQL database |
| sql_table | Retrieves data from an SQL database table |

### Supported databases

We support all [SQLAlchemy dialects](https://docs.sqlalchemy.org/en/20/dialects/), which include, but are not limited to, the following database engines:

* PostgreSQL
* MySQL
* SQLite
* Oracle
* Microsoft SQL Server
* MariaDB
* IBM DB2 and Informix
* Google BigQuery
* Snowflake
* Redshift
* Apache Hive and Presto
* SAP Hana
* CockroachDB
* Firebird
* Teradata Vantage

:::note
Note that there many unofficial dialects, such as [DuckDB](https://duckdb.org/).
:::

## Setup Guide

### Grab credentials

This verified source utilizes SQLAlchemy for database connectivity. Let us consider this public
database example:
This verified source utilizes SQLAlchemy for database connectivity. Let's take a look at the following public database example:

`connection_url = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"`

> This public database doesn't require a password.
The database above doesn't require a password.

Connection URL can be broken down into:
The connection URL can be broken down into:

```python
connection_url = "connection_string = f"{drivername}://{username}:{password}@{host}:{port}/{database}"
Expand All @@ -52,34 +77,34 @@ connection_url = "connection_string = f"{drivername}://{username}:{password}@{ho

- E.g., A public database at "mysql-rfam-public.ebi.ac.uk" hosted by EBI.

`port`: The port for the database connection. E.g., "4497", in the above connection URL.
`port`: The port for the database connection.

- E.g., "4497", in the above connection URL.
`port`: The port for the database connection.

- E.g., "4497", in the above connection URL.

`database`: The specific database on the server.

- E.g., Connecting to the "Rfam" database.

### Provide special options in connection string
### Configure connection

Here we use `mysql` and `pymysql` dialect to set up SSL connection to a server. All information
taken from the
[SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections).
Here, we use the `mysql` and `pymysql` dialects to set up an SSL connection to a server, with all information taken from the [SQLAlchemy docs](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections).

1. To force SSL on the client without a client certificate you may pass the following DSN:
1. To enforce SSL on the client without a client certificate you may pass the following DSN:

```toml
sources.sql_database.credentials="mysql+pymysql://root:<pass>@<host>:3306/mysql?ssl_ca="
```

1. You can also pass server public certificate as a file. For servers with a public certificate
(potentially bundled with your pipeline) and disabling host name checks:
1. You can also pass the server's public certificate (potentially bundled with your pipeline) and disable host name checks:

```toml
sources.sql_database.credentials="mysql+pymysql://root:<pass>@<host>:3306/mysql?ssl_ca=server-ca.pem&ssl_check_hostname=false"
```

1. For servers requiring a client certificate, provide the client's private key (a secret value). In
Airflow, this is usually saved as a variable and exported to a file before use. Server cert is
omitted in the example below:
1. For servers requiring a client certificate, provide the client's private key (a secret value). In Airflow, this is usually saved as a variable and exported to a file before use. The server certificate is omitted in the example below:

```toml
sources.sql_database.credentials="mysql+pymysql://root:<pass>@35.203.96.191:3306/mysql?ssl_ca=&ssl_cert=client-cert.pem&ssl_key=client-key.pem"
Expand All @@ -95,13 +120,14 @@ To get started with your data pipeline, follow these steps:
dlt init sql_database duckdb
```

[This command](../../reference/command-line-interface) will initialize
It will initialize
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py)
with SQL database as the [source](../../general-usage/source) and
[duckdb](../destinations/duckdb.md) as the [destination](../destinations).
with an SQL database as the [source](../../general-usage/source) and
[DuckDB](../destinations/duckdb.md) as the [destination](../destinations).

1. If you'd like to use a different destination, simply replace `duckdb` with the name of your
preferred [destination](../destinations).
:::tip
If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../destinations).
:::

1. After running this command, a new directory will be created with the necessary files and
configuration settings to get started.
Expand Down Expand Up @@ -131,7 +157,7 @@ For more information, read the
sources.sql_database.credentials="mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
```

1. Alternatively, you can also pass credentials in the pipeline script like this:
1. You can also pass credentials in the pipeline script the following way:

```python
credentials = ConnectionStringCredentials(
Expand All @@ -143,8 +169,7 @@ For more information, read the
> [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/sql_database_pipeline.py)
> for details.
1. Finally, follow the instructions in [Destinations](../destinations/) to add credentials for your
chosen destination. This will ensure that your data is properly routed to its final destination.
1. Finally, follow the instructions in [Destinations](../destinations/) to add credentials for your chosen destination. This will ensure that your data is properly routed.

For more information, read the [General Usage: Credentials.](../../general-usage/credentials)

Expand All @@ -156,20 +181,23 @@ For more information, read the [General Usage: Credentials.](../../general-usage
pip install -r requirements.txt
```

1. Now the verified source can be run by using the command:
1. Run the verified source by entering:

```bash
python sql_database_pipeline.py
```

1. To make sure that everything is loaded as expected, use the command:
1. Make sure that everything is loaded as expected with:

```bash
dlt pipeline <pipeline_name> show
```

For example, the pipeline_name for the above pipeline example is `rfam`, you may also use any
:::note
The pipeline_name for the above example is `rfam`, you may also use any
custom name instead.
:::


## Sources and resources

Expand All @@ -179,7 +207,7 @@ For more information, read the [General Usage: Credentials.](../../general-usage
### Source `sql_database`:

This function loads data from an SQL database via SQLAlchemy and auto-creates resources for each
table or from a specified list.
table or from a specified list of tables.

```python
@dlt.source
Expand All @@ -191,13 +219,13 @@ def sql_database(
) -> Iterable[DltResource]:
```

`credentials`: Database details or a 'sqlalchemy.Engine' instance.
`credentials`: Database details or an 'sqlalchemy.Engine' instance.

`schema`: Database schema name (default if unspecified).

`metadata`: Optional, sqlalchemy.MetaData takes precedence over schema.
`metadata`: Optional SQLAlchemy.MetaData; takes precedence over schema.

`table_names`: List of tables to load. Defaults to all if not provided.
`table_names`: List of tables to load; defaults to all if not provided.

### Resource `sql_table`

Expand All @@ -220,9 +248,9 @@ def sql_table(

`table`: Table to load, set in code or default from "config.toml".

`schema`: Optional, name of table schema.
`schema`: Optional name of the table schema.

`metadata`: Optional, sqlalchemy.MetaData takes precedence over schema.
`metadata`: Optional SQLAlchemy.MetaData; takes precedence over schema.

`incremental`: Optional, enables incremental loading.

Expand All @@ -231,8 +259,7 @@ def sql_table(
## Customization
### Create your own pipeline

If you wish to create your own pipelines, you can leverage source and resource methods from this
verified source.
To create your own pipeline, use source and resource methods from this verified source.

1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:

Expand All @@ -244,7 +271,7 @@ verified source.
)
```

1. You can pass credentials using any of the methods discussed above.
1. Pass your credentials using any of the methods [described above](#add-credentials).

1. To load the entire database, use the `sql_database` source as:

Expand All @@ -254,9 +281,7 @@ verified source.
print(info)
```

> Use one method from the methods [described above](#add-credentials) to pass credentials.
1. To load just the "family" table using the `sql_database` source:
1. If you just need the "family" table, use:

```python
source = sql_database().with_resources("family")
Expand All @@ -267,7 +292,7 @@ verified source.

1. To pseudonymize columns and hide personally identifiable information (PII), refer to the
[documentation](https://dlthub.com/docs/general-usage/customising-pipelines/pseudonymizing_columns).
For example, to pseudonymize the "rfam_acc" column in the "family" table:
As an example, here's how to pseudonymize the "rfam_acc" column in the "family" table:

```python
import hashlib
Expand Down Expand Up @@ -299,7 +324,7 @@ verified source.
print(info)
```

1. To exclude the columns, for e.g. "rfam_id" column from the "family" table before loading:
1. To exclude columns, such as the "rfam_id" column from the "family" table before loading:

```python
def remove_columns(doc):
Expand Down Expand Up @@ -328,10 +353,7 @@ verified source.
info = pipeline.run(source, write_disposition="merge")
print(info)
```

> In this example, we load the "family" table and set the "updated" column for incremental
> loading. In the first run, it loads all the data from January 1, 2022, at midnight (00:00:00) and
> then loads incrementally in subsequent runs using "updated" field.
In this example, we load data from the `family` table, using the `updated` column for incremental loading. In the first run, the process loads all data starting from midnight (00:00:00) on January 1, 2022. Subsequent runs perform incremental loading, guided by the values in the `updated` field.

1. To incrementally load the "family" table using the 'sql_table' resource.

Expand All @@ -347,20 +369,11 @@ verified source.
print(info)
```

> Loads all data from "family" table from January 1, 2022, at midnight (00:00:00) and then loads
> incrementally in subsequent runs using "updated" field.
> 💡 Please note that to use merge write disposition a primary key must exist in the source table.
> `dlt` finds and sets up primary keys automatically.
This process initially loads all data from the `family` table starting at midnight on January 1, 2022. For later runs, it uses the `updated` field for incremental loading as well.

> 💡 `apply_hints` is a powerful method that allows to modify the schema of the resource after it
> was created: including the write disposition and primary keys. You are free to select many
> different tables and use `apply_hints` several times to have pipelines where some resources are
> merged, appended or replaced.
:::info
* For merge write disposition, the source table needs a primary key, which `dlt` automatically sets up.
* `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appendend, or replaced resources.
:::

1. Remember, to maintain the same pipeline name and destination dataset name. The pipeline name
retrieves the [state](https://dlthub.com/docs/general-usage/state) from the last run, essential
for incremental data loading. Changing these names might trigger a
[“full_refresh”](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh),
disrupting metadata tracking for
[incremental loads](https://dlthub.com/docs/general-usage/incremental-loading).
1. Remember to keep the pipeline name and destination dataset name consistent. The pipeline name is crucial for retrieving the [state](https://dlthub.com/docs/general-usage/state) from the last run, which is essential for incremental loading. Altering these names could initiate a "[full_refresh](https://dlthub.com/docs/general-usage/pipeline#do-experiments-with-full-refresh)", interfering with the metadata tracking necessary for [incremental loads](https://dlthub.com/docs/general-usage/incremental-loading).
6 changes: 3 additions & 3 deletions docs/website/docs/examples/nested_data/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ def mongodb_collection(
write_disposition: Optional[str] = dlt.config.value,
) -> Any:
# set up mongo client
client = MongoClient(connection_url, uuidRepresentation="standard", tz_aware=True)
client: Any = MongoClient(connection_url, uuidRepresentation="standard", tz_aware=True)
mongo_database = client.get_default_database() if not database else client[database]
collection_obj = mongo_database[collection]

def collection_documents(
client,
collection,
client: Any,
collection: Any,
incremental: Optional[dlt.sources.incremental[Any]] = None,
) -> Iterator[TDataItem]:
LoaderClass = CollectionLoader
Expand Down

0 comments on commit f72c331

Please sign in to comment.