Skip to content

Commit

Permalink
SQLAlchemy: Add insert_bulk fast-path INSERT method for pandas
Browse files Browse the repository at this point in the history
This method supports efficient batch inserts using CrateDB's bulk
operations endpoint.

https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
  • Loading branch information
amotl committed May 11, 2023
1 parent 9a54d4a commit af9d8f5
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 34 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changes for crate
Unreleased
==========

- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
order to support efficient batch inserts using CrateDB's bulk operations endpoint.


2023/04/18 0.31.1
=================
Expand Down
1 change: 1 addition & 0 deletions docs/by-example/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`.
sqlalchemy/working-with-types
sqlalchemy/advanced-querying
sqlalchemy/inspection-reflection
sqlalchemy/dataframe


.. _Python DB API: https://peps.python.org/pep-0249/
Expand Down
105 changes: 105 additions & 0 deletions docs/by-example/sqlalchemy/dataframe.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
.. _sqlalchemy-pandas:
.. _sqlalchemy-dataframe:

================================
SQLAlchemy: DataFrame operations
================================

About
=====

This section of the documentation demonstrates support for efficient batch
``INSERT`` operations with `pandas`_, using the CrateDB SQLAlchemy dialect.


Introduction
============

The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
two-dimensional data and its corresponding labels. DataFrames are widely used
in data science, machine learning, scientific computing, and many other
data-intensive fields.

DataFrames are similar to SQL tables or the spreadsheets that you work with in
Excel or Calc. In many cases, DataFrames are faster, easier to use, and more
powerful than tables or spreadsheets because they are an integral part of the
`Python`_ and `NumPy`_ ecosystems.

The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
using `SQL`_ is based on `SQLAlchemy`_.


.. rubric:: Table of Contents

.. contents::
:local:


Efficient ``INSERT`` operations with pandas
===========================================

The package provides a ``bulk_insert`` function to use the
:meth:`pandas:pandas.DataFrame.to_sql` method most efficiently, based on the `CrateDB
bulk operations`_ endpoint. It will effectively split your insert workload across
multiple batches, using a defined chunk size.

>>> import sqlalchemy as sa
>>> from pandas._testing import makeTimeDataFrame
>>> from crate.client.sqlalchemy.support import insert_bulk
...
>>> # Define number of records, and chunk size.
>>> INSERT_RECORDS = 42
>>> CHUNK_SIZE = 8
...
>>> # Connect to CrateDB, and create a pandas DataFrame.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> engine = sa.create_engine(f"crate://{crate_host}")
...
>>> # Insert batches of records. Effectively, six. 42 / 8 = 5.25.
>>> df.to_sql(
... name="test-testdrive",
... con=engine,
... if_exists="replace",
... index=False,
... chunksize=CHUNK_SIZE,
... method=insert_bulk,
... )

.. TIP::

You will observe that the optimal chunk size highly depends on the shape of
your data, specifically the width of each record, i.e. the number of columns
and their individual sizes. You will need to determine a good chunk size by
running corresponding experiments on your own behalf. For that purpose, you
can use the `insert_pandas.py`_ program as a blueprint.

It is a good idea to start your explorations with a chunk size of 5000, and
then see if performance improves when you increase or decrease that figure.
Chunk sizes of 20000 may also be applicable, but make sure to take the limits
of your HTTP infrastructure into consideration.

In order to learn more about what wide- vs. long-form (tidy, stacked, narrow)
data means in the context of `DataFrame computing`_, let us refer you to `a
general introduction <wide-narrow-general_>`_, the corresponding section in
the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.


.. hidden: Disconnect from database
>>> engine.dispose()
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
.. _insert_pandas.py: https://github.com/crate/crate-python/blob/master/examples/insert_pandas.py
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
.. _SQL: https://en.wikipedia.org/wiki/SQL
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data
.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow
.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data
3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
intersphinx_mapping.update({
'py': ('https://docs.python.org/3/', None),
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None)
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
'pandas': ('https://pandas.pydata.org/docs/', None),
})


Expand Down
36 changes: 4 additions & 32 deletions examples/insert_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
from colorlog.escape_codes import escape_codes
from pandas._testing import makeTimeDataFrame

from crate.client.sqlalchemy.support import insert_bulk

logger = logging.getLogger(__name__)

pkg_resources.require("sqlalchemy>=2.0")
Expand Down Expand Up @@ -84,7 +86,7 @@ def process(self, mode: str, num_records: int, bulk_size: int, insertmanyvalues_
logger.info(f"Connecting to {self.dburi}")
logger.info(f"Importing data with mode={mode}, bulk_size={bulk_size}, insertmanyvalues_page_size={insertmanyvalues_page_size}")

engine = self.get_engine(insertmanyvalues_page_size=insertmanyvalues_page_size)
engine = self.get_engine(echo=True, insertmanyvalues_page_size=insertmanyvalues_page_size)

# SQLAlchemy "Insert Many Values" mode. 40K records/s
# https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues
Expand All @@ -103,41 +105,11 @@ def process(self, mode: str, num_records: int, bulk_size: int, insertmanyvalues_
# CrateDB bulk transfer mode. 65K records/s
# https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
elif mode == "bulk":
df.to_sql(name=self.table_name, con=engine, if_exists="append", index=False, chunksize=bulk_size, method=self.insert_bulk)
df.to_sql(name=self.table_name, con=engine, if_exists="append", index=False, chunksize=bulk_size, method=insert_bulk)

else:
raise ValueError(f"Unknown mode: {mode}")

@staticmethod
def insert_bulk(pd_table, conn, keys, data_iter):
"""
A fast insert method for pandas and Dask, using CrateDB's "bulk operations" endpoint.
The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
DBAPI connection client, in order to invoke a request using `bulk_parameters`::
cursor.execute(sql=sql, bulk_parameters=data)
- https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
The vanilla implementation, used by SQLAlchemy, is::
data = [dict(zip(keys, row)) for row in data_iter]
conn.execute(pd_table.table.insert(), data)
"""

# Bulk
sql = str(pd_table.table.insert().compile(bind=conn))
data = list(data_iter)

logger.info(f"Bulk SQL: {sql}")
logger.info(f"Bulk records: {len(data)}")

cursor = conn._dbapi_connection.cursor()
cursor.execute(sql=sql, bulk_parameters=data)
cursor.close()

def show_table_stats(self):
"""
Display number of records in table.
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def read(path):
'createcoverage>=1,<2',
'stopit>=1.1.2,<2',
'flake8>=4,<7',
'pandas>=2,<3',
'pytz',
# `test_http.py` needs `setuptools.ssl_support`
'setuptools<57',
Expand Down
62 changes: 62 additions & 0 deletions src/crate/client/sqlalchemy/support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8; -*-
#
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
# license agreements. See the NOTICE file distributed with this work for
# additional information regarding copyright ownership. Crate licenses
# this file to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
import logging


logger = logging.getLogger(__name__)


def insert_bulk(pd_table, conn, keys, data_iter):
"""
Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method.
The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]::
cursor.execute(sql=sql, bulk_parameters=data)
The vanilla implementation, used by SQLAlchemy, is::
data = [dict(zip(keys, row)) for row in data_iter]
conn.execute(pd_table.table.insert(), data)
Batch chunking will happen outside of this function, for example [3] demonstrates
the relevant code in `pandas.io.sql`.
[1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
[2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
[3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027
"""

# Compile SQL statement and materialize batch.
sql = str(pd_table.table.insert().compile(bind=conn))
data = list(data_iter)

# For debugging and tracing the batches running through this method.
# Because it's a performance-optimized code path, the log statements are not active by default.
# logger.info(f"Bulk SQL: {sql}")
# logger.info(f"Bulk records: {len(data)}")
# logger.info(f"Bulk data: {data}")

# Invoke bulk insert operation.
cursor = conn._dbapi_connection.cursor()
cursor.execute(sql=sql, bulk_parameters=data)
cursor.close()
40 changes: 39 additions & 1 deletion src/crate/client/sqlalchemy/tests/bulk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


fake_cursor = MagicMock(name='fake_cursor')
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor)
FakeCursor = MagicMock(name='FakeCursor', spec=Cursor, return_value=fake_cursor)
FakeCursor.return_value = fake_cursor


Expand Down Expand Up @@ -168,3 +168,41 @@ def test_bulk_save_modern(self):
'Callisto', 37,
)
self.assertSequenceEqual(expected_bulk_args, bulk_args)

@patch('crate.client.connection.Cursor', mock_cursor=FakeCursor)
def test_bulk_save_pandas(self, mock_cursor):
"""
Verify bulk INSERT with pandas.
"""
import sqlalchemy as sa
from pandas._testing import makeTimeDataFrame
from crate.client.sqlalchemy.support import insert_bulk

# 42 records / 8 chunksize = 5.25, which means 6 batches will be emitted.
INSERT_RECORDS = 42
CHUNK_SIZE = 8
BATCH_COUNT = 6

# Create a DataFrame to feed into the database.
df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")

dburi = "crate://localhost:4200"
engine = sa.create_engine(dburi, echo=True)
retval = df.to_sql(
name="test-testdrive",
con=engine,
if_exists="replace",
index=False,
chunksize=CHUNK_SIZE,
method=insert_bulk,
)
self.assertIsNone(retval)

# Initializing the query has an overhead of two calls to the cursor object, probably one
# initial connection from the DB-API driver, to inquire the database version, and another
# one, for SQLAlchemy. SQLAlchemy will use it to inquire the table schema using `information_schema`,
# and to eventually issue the `CREATE TABLE ...` statement.
cursor_call_count = mock_cursor.call_count - 2

# Verify number of batches.
self.assertEqual(cursor_call_count, BATCH_COUNT)
1 change: 1 addition & 0 deletions src/crate/client/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ def test_suite():
'docs/by-example/sqlalchemy/working-with-types.rst',
'docs/by-example/sqlalchemy/advanced-querying.rst',
'docs/by-example/sqlalchemy/inspection-reflection.rst',
'docs/by-example/sqlalchemy/dataframe.rst',
module_relative=False,
setUp=setUpCrateLayerSqlAlchemy,
tearDown=tearDownDropEntitiesSqlAlchemy,
Expand Down

0 comments on commit af9d8f5

Please sign in to comment.