diff --git a/CHANGES.txt b/CHANGES.txt index cdcedbfc..848b716e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,8 @@ Unreleased - SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in order to support efficient batch inserts using CrateDB's "bulk operations" endpoint. +- SQLAlchemy: Add documentation and software tests for usage with Dask + 2023/04/18 0.31.1 ================= diff --git a/docs/by-example/sqlalchemy/dataframe.rst b/docs/by-example/sqlalchemy/dataframe.rst index ffcadf86..a2be1f88 100644 --- a/docs/by-example/sqlalchemy/dataframe.rst +++ b/docs/by-example/sqlalchemy/dataframe.rst @@ -5,6 +5,12 @@ SQLAlchemy: DataFrame operations ================================ +.. rubric:: Table of Contents + +.. contents:: + :local: + + About ===== @@ -12,7 +18,7 @@ This section of the documentation demonstrates support for efficient batch/bulk ``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect. Efficient bulk operations are needed for typical `ETL`_ batch processing and -data streaming workloads, for example to move data in- and out of OLAP data +data streaming workloads, for example to move data in and out of OLAP data warehouses, as contrasted to interactive online transaction processing (OLTP) applications. The strategies of `batching`_ together series of records for improving performance are also referred to as `chunking`_. @@ -21,6 +27,8 @@ improving performance are also referred to as `chunking`_. Introduction ============ +pandas +------ The :ref:`pandas DataFrame ` is a structure that contains two-dimensional data and its corresponding labels. DataFrames are widely used in data science, machine learning, scientific computing, and many other @@ -34,11 +42,29 @@ powerful than tables or spreadsheets because they are an integral part of the The :ref:`pandas I/O subsystem ` for `relational databases`_ using `SQL`_ is based on `SQLAlchemy`_. +Dask +---- +`Dask`_ is a flexible library for parallel computing in Python, which scales +Python code from multi-core local machines to large distributed clusters in +the cloud. Dask provides a familiar user interface by mirroring the APIs of +other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_, +and `NumPy`_. -.. rubric:: Table of Contents +A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller +pandas DataFrames, split along the index. These pandas DataFrames may live on +disk for larger-than-memory computing on a single machine, or on many different +machines in a cluster. One Dask DataFrame operation triggers many operations on +the constituent pandas DataFrames. -.. contents:: - :local: + +Compatibility notes +=================== + +.. NOTE:: + + Please note that DataFrame support for pandas and Dask is only validated + with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend + to use the most recent versions of those libraries. Efficient ``INSERT`` operations with pandas @@ -118,6 +144,91 @@ workload across multiple batches, using a defined chunk size. tutorial `_ about the same topic. +Efficient ``INSERT`` operations with Dask +========================================= + +The same ``bulk_insert`` function presented in the previous section will also +be used in the context of `Dask`_, in order to make the +:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the +`CrateDB bulk operations`_ endpoint. + +The example below will partition your insert workload into equal-sized parts, and +schedule it to be executed on Dask cluster resources, using a defined number of +compute partitions. Each worker instance will then insert its partition's records +in a batched/chunked manner, using a defined chunk size, effectively using the +pandas implementation introduced in the previous section. + + >>> import dask.dataframe as dd + >>> from pandas._testing import makeTimeDataFrame + >>> from crate.client.sqlalchemy.support import insert_bulk + ... + >>> # Define the number of records, the number of computing partitions, + >>> # and the chunk size of each database insert operation. + >>> INSERT_RECORDS = 100 + >>> NPARTITIONS = 4 + >>> CHUNK_SIZE = 25 + ... + >>> # Create a Dask DataFrame. + >>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + >>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + ... + >>> # Insert content of DataFrame using multiple workers on a + >>> # compute cluster, transferred using batches of records. + >>> ddf.to_sql( + ... name="test-testdrive", + ... uri=f"crate://{crate_host}", + ... if_exists="replace", + ... index=False, + ... chunksize=CHUNK_SIZE, + ... method=insert_bulk, + ... parallel=True, + ... ) + + +.. TIP:: + + You will observe that optimizing your workload will now also involve determining a + good value for the ``NPARTITIONS`` argument, based on the capacity and topology of + the available compute resources, and based on workload characteristics or policies + like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully + dedicated to the problem at hand, you may want to use all available processor cores, + while on a shared system, this strategy may not be appropriate. + + If you want to dedicate all available compute resources on your machine, you may want + to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find + out about the available CPU cores on your machine, for example by running the ``nproc`` + command in your terminal. + + Depending on the implementation and runtime behavior of the compute task, the optimal + number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be + figured out by running a few test iterations. For that purpose, you can use the + `insert_dask.py`_ program as a blueprint. + + Adjusting this value in both directions is perfectly fine: If you observe that you are + overloading the machine, maybe because there are workloads scheduled other than the one + you are running, try to reduce the value. If fragments/steps of your implementation + involve waiting for network or disk I/O, you may want to increase the number of workers + beyond the number of available CPU cores, to increase utilization. On the other hand, + you should be wary about not over-committing resources too much, as it may slow your + system down. + + Before getting more serious with Dask, you are welcome to read and watch the excellent + :doc:`dask:best-practices` and :ref:`dask:dataframe.performance` resources, in order to + learn about things to avoid, and beyond. For finding out if your compute workload + scheduling is healthy, you can, for example, use Dask's :doc:`dask:dashboard`. + +.. WARNING:: + + Because the settings assigned in the example above fit together well, the ``to_sql()`` + instruction will effectively run four insert operations, executed in parallel, and + scheduled optimally on the available cluster resources. + + However, not using those settings sensibly, you can easily misconfigure the resource + scheduling system, and overload the underlying hardware or operating system, virtualized + or not. This is why experimenting with different parameters, and a real dataset, is crucial. + + + .. hidden: Disconnect from database >>> engine.dispose() @@ -126,7 +237,10 @@ workload across multiple batches, using a defined chunk size. .. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage .. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing) .. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations +.. _Dask: https://en.wikipedia.org/wiki/Dask_(software) .. _DataFrame computing: https://realpython.com/pandas-dataframe/ +.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load +.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py .. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py .. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/ .. _NumPy: https://en.wikipedia.org/wiki/NumPy @@ -134,6 +248,8 @@ workload across multiple batches, using a defined chunk size. .. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html .. _Python: https://en.wikipedia.org/wiki/Python_(programming_language) .. _relational databases: https://en.wikipedia.org/wiki/Relational_database +.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn +.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection .. _SQL: https://en.wikipedia.org/wiki/SQL .. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html .. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/ diff --git a/docs/conf.py b/docs/conf.py index 6cea16ec..59cc622f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -13,6 +13,7 @@ 'py': ('https://docs.python.org/3/', None), 'sa': ('https://docs.sqlalchemy.org/en/14/', None), 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None), + 'dask': ('https://docs.dask.org/en/stable/', None), 'pandas': ('https://pandas.pydata.org/docs/', None), }) diff --git a/setup.py b/setup.py index 3055075a..5d4ee00b 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ def read(path): 'zope.testrunner>=5,<7', 'zc.customdoctests>=1.0.1,<2', 'createcoverage>=1,<2', + 'dask', 'stopit>=1.1.2,<2', 'flake8>=4,<7', 'pandas', diff --git a/src/crate/client/sqlalchemy/tests/bulk_test.py b/src/crate/client/sqlalchemy/tests/bulk_test.py index 48b3797c..4546d1a4 100644 --- a/src/crate/client/sqlalchemy/tests/bulk_test.py +++ b/src/crate/client/sqlalchemy/tests/bulk_test.py @@ -207,3 +207,50 @@ def test_bulk_save_pandas(self, mock_cursor): # Verify number of batches. self.assertEqual(effective_op_count, OPCOUNT) + + @skipIf(sys.version_info < (3, 8), "SQLAlchemy/Dask is not supported on Python <3.8") + @skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas") + @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) + def test_bulk_save_dask(self, mock_cursor): + """ + Verify bulk INSERT with Dask. + """ + import dask.dataframe as dd + from pandas._testing import makeTimeDataFrame + from crate.client.sqlalchemy.support import insert_bulk + + # 42 records / 4 partitions means each partition has a size of 10.5 elements. + # Because the chunk size 8 is slightly smaller than 10, the partition will not + # fit into it, so two batches will be emitted to the database for each data + # partition. 4 partitions * 2 batches = 8 insert operations will be emitted. + # Those settings are a perfect example of non-optimal settings, and have been + # made so on purpose, in order to demonstrate that using optimal settings + # is crucial. + INSERT_RECORDS = 42 + NPARTITIONS = 4 + CHUNK_SIZE = 8 + OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS + + # Create a DataFrame to feed into the database. + df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + + dburi = "crate://localhost:4200" + retval = ddf.to_sql( + name="test-testdrive", + uri=dburi, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + parallel=True, + ) + self.assertIsNone(retval) + + # Each of the insert operation incurs another call to the cursor object. This is probably + # the initial connection from the DB-API driver, to inquire the database version. + # This compensation formula has been determined empirically / by educated guessing. + effective_op_count = (mock_cursor.call_count - 2 * NPARTITIONS) - 2 + + # Verify number of batches. + self.assertEqual(effective_op_count, OPCOUNT)