Skip to content

Ingesting data from REST API into Database

dakodakov edited this page Sep 13, 2021 · 20 revisions

Overview

In this example we will use the Versatile Data Kit to ingest data from a REST API to a SQLite database.

Before you continue, make sure you are familiar with the Getting Started section of the wiki.

Code

The relevant Data Job code is available here.

You can follow along and run this example Data Job on your machine to get first-hand experience working with Data Jobs; alternatively, you can use the available code as a template and extend it to make a Data Job that fits your use case more closely.

Database

We will be using a temporary SQLite database, which will be created for us automatically.

Configuration

You can install Versatile Data Kit and the plugins required for this example by running the following command from a terminal:

pip install quickstart-vdk

Ingestion requires us to set the default database type and the ingestion method, or the type of database we will be ingesting to, as environment variables:

export VDK_DB_DEFAULT_TYPE=SQLITE
export VDK_INGEST_METHOD_DEFAULT=SQLITE

Data Job

The structure of our Data Job is as follows:

ingest-from-rest-api-example-job/
├── 10_delete_table.sql
├── 20_create_table.sql
├── 30_rest_ingest.py

Note that the name of a data job is defined by the name of the directory its steps are contained in.

The purpose of our Data Job ingest-from-rest-api-example-job is to make a GET request to a REST API, and then ingest the returned JSON to the target database.

Our data job consists of two SQL steps and one Python step. Note that VDK allows us the mix Python and SQL steps in whatever order we would prefer. The reason the step names are prefixed by numbers is that steps are executed in alphabetical order, so it is a good practice to prefix the steps with numbers, which makes their order clear both to Versatile Data Kit and to other users who might read through the Data Job.

10_delete_table.sql
DROP TABLE IF EXISTS rest_target_table;
20_create_table.sql
CREATE TABLE rest_target_table (userId, id, title, completed);
30_rest_ingest.py
import requests

def run(job_input):     response = requests.get("https://jsonplaceholder.typicode.com/todos/1")     response.raise_for_status()     payload = response.json()     job_input.send_object_for_ingestion(         payload=payload,         destination_table="rest_target_table"     )

  • The first step deletes the new table if it exists. This query only serves to make the Data Job repeatable;
  • The second step creates the table we will be inserting data into;
  • The third step requests the payload from the specified REST API, and the ingests it into the destination_table in the target database.

To run the Data Job, we navigate to the parent directory of the Job, and run the following command from a terminal:

vdk run ingest-from-rest-api-example-job/

Upon successful completion of the Data Job, we should see a log similar to this:

Result logs
2021-08-27 15:04:35,381=1630065875[VDK] ingest-from-rest-api-example-job [INFO ] taurus.vdk.builtin_plugins.run           cli_run.py:66   run_job         [OpId:1630065872-e39532-3f42e6]- Data Job execution summary: {
  "data_job_name": "ingest-from-rest-api-example-job",
  "execution_id": "1630065872-e39532",
  "start_time": "2021-08-27T12:04:33.186862",
  "end_time": "2021-08-27T12:04:33.354630",
  "status": "success",
  "steps_list": [
    {
      "name": "10_delete_table.sql",
      "type": "sql",
      "start_time": "2021-08-27T12:04:33.186885",
      "end_time": "2021-08-27T12:04:33.194096",
      "status": "success",
      "details": null,
      "exception": null
    },
    {
      "name": "20_create_table.sql",
      "type": "sql",
      "start_time": "2021-08-27T12:04:33.194160",
      "end_time": "2021-08-27T12:04:33.196529",
      "status": "success",
      "details": null,
      "exception": null
    },
    {
      "name": "30_rest_ingest.py",
      "type": "python",
      "start_time": "2021-08-27T12:04:33.196575",
      "end_time": "2021-08-27T12:04:33.354595",
      "status": "success",
      "details": null,
      "exception": null
    }
  ],
  "exception": null
}

After running the Data Job, we can check whether the new table was populated correctly by using the sqlite-query command afforded to us by the vdk-sqlite plugin, which we can use to execute queries against the configured SQLite database without having to set up a Data Job:

vdk sqlite-query -q 'SELECT * FROM rest_target_table'

We should see the following output:

-  -  ------------------  -
1  1  delectus aut autem  0
-  -  ------------------  -

What's next

You can find a list of all Versatile Data Kit examples here.

Clone this wiki locally