-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fast-path INSERT method insert_bulk
for SQLAlchemy/pandas/Dask
#553
Conversation
af9d8f5
to
864642c
Compare
insert_bulk
fast-path INSERT
method for pandas/SQLAlchemyINSERT
method insert_bulk
for pandas/SQLAlchemy
864642c
to
8a42dd4
Compare
.. TIP:: | ||
|
||
You will observe that the optimal chunk size highly depends on the shape of | ||
your data, specifically the width of each record, i.e. the number of columns | ||
and their individual sizes. You will need to determine a good chunk size by | ||
running corresponding experiments on your own behalf. For that purpose, you | ||
can use the `insert_pandas.py`_ program as a blueprint. | ||
|
||
It is a good idea to start your explorations with a chunk size of 5000, and | ||
then see if performance improves when you increase or decrease that figure. | ||
Chunk sizes of 20000 may also be applicable, but make sure to take the limits | ||
of your HTTP infrastructure into consideration. | ||
|
||
In order to learn more about what wide- vs. long-form (tidy, stacked, narrow) | ||
data means in the context of `DataFrame computing`_, let us refer you to `a | ||
general introduction <wide-narrow-general_>`_, the corresponding section in | ||
the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas | ||
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On this section, about choosing the optimal batch size, @marijaselakovic had a good suggestion:
Looks good, I am just thinking maybe it makes sense to add that when choosing “optimal” chunksize to consider besides the share and size of the data, also the memory of the machine. I find here that chunk should be small enough to fit the memory but large enough to minimize the overhead of data ingestion. Even though this is from Dask documentation, should apply to pandas too. The rest looks good!
Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@all
: Please let me know if you can think of any other details I might have missed in this context of optimally educating users about this topic. Now, it's the right time to bring them in, optimally in the form of references to other valuable resources. Thank you already!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @marijaselakovic. I've improved the documentation section with 480897a, according to your suggestions. I did not use the link to Dask yet, because this section currently only explains efficient batching on behalf of pandas, and I do not want to confuse users too much.
Another section about Dask should probably be added right away, in order to better outline the parallel computing aspects around it, and to support the upcoming tutorial appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @marijaselakovic,
more details have been added to this patch with 3e7d1ea, and at 1, you can find the corresponding documentation section about inserting data efficiently with Dask. I still did not use the link you shared, because, after getting into the details, I found it specifically educates users about chunking in the context of Dask arrays. While the very same principles apply, I discovered the Dask Best Practices resources 2, which also expands the scope into Dask DataFrames, which we are working with here, so I used that link instead.
With kind regards,
Andreas.
Footnotes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might also consider adding Dask Dataframes best practices1
Footnotes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. The link has been added.
8a42dd4
to
837426b
Compare
55928e9
to
480897a
Compare
Regarding the failing software tests, they should not stop you from reviewing this patch. I will exclude the relevant test matrix slots from being executed, and add a corresponding |
480897a
to
c1a52cf
Compare
INSERT
method insert_bulk
for pandas/SQLAlchemyinsert_bulk
for SQLAlchemy/pandas/Dask
e6b12b4
to
81cd1d6
Compare
# 42 records / 4 partitions means each partition has a size of 10.5 elements. | ||
# Because the chunk size 8 is slightly smaller than 10, the partition will not | ||
# fit into it, so two batches will be emitted to the database for each data | ||
# partition. 4 partitions * 2 batches = 8 insert operations will be emitted. | ||
INSERT_RECORDS = 42 | ||
NPARTITIONS = 4 | ||
CHUNK_SIZE = 8 | ||
OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a perfect example of non-optimal settings, and has been made so on purpose. We should probably add a corresponding inline comment here, in order not to confuse dear readers of our code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the comment?
13f4b7b
to
3e7d1ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! added some suggestions
# For debugging and tracing the batches running through this method. | ||
# Because it's a performance-optimized code path, the log statements are not active by default. | ||
# logger.info(f"Bulk SQL: {sql}") | ||
# logger.info(f"Bulk records: {len(data)}") | ||
# logger.info(f"Bulk data: {data}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this be simplified by changing the level to DEBUG
and surround it with something like if logger.level is DEBUG
(don't know the exact syntax) to avoid any serialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried with 4d038ec105fc, but omitted logging the data
payload.
# 42 records / 4 partitions means each partition has a size of 10.5 elements. | ||
# Because the chunk size 8 is slightly smaller than 10, the partition will not | ||
# fit into it, so two batches will be emitted to the database for each data | ||
# partition. 4 partitions * 2 batches = 8 insert operations will be emitted. | ||
INSERT_RECORDS = 42 | ||
NPARTITIONS = 4 | ||
CHUNK_SIZE = 8 | ||
OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the comment?
Hi. Thanks for all the excellent review comments.
Initially, I rushed a bit to bring this in in time for this tutorial, but I missed the deadline. Now, that we have a little more headroom, I think this patch should only be the foundation, and those aspects should be addressed by corresponding followup patches:
I hope you agree with that strategy. With kind regards, P.S.: Added as backlog items at crate/sqlalchemy-cratedb#74. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @amotl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation on Dask and Pandas fast insert looks good!
running corresponding experiments on your own behalf. For that purpose, you | ||
can use the `insert_pandas.py`_ program as a blueprint. | ||
and their individual sizes, which will in the end determine the total size of | ||
each batch/chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add that the chunk should be small enough to fit the memory but large enough to minimize the overhead of data ingestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. It is there, a few more lines below.
... | ||
>>> # Create a pandas DataFrame, and connect to CrateDB. | ||
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") | ||
>>> engine = sa.create_engine(f"crate://{crate_host}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add here the full sqlachemy URI: crate://user:password@crate_host:port
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion. It is technically not possible, because this reStructuredText file is not only documentation, but is also part of the test suite, being tested using Python's doctests, in oder to make sure the code examples don't break. Apologies.
@@ -68,6 +68,7 @@ def read(path): | |||
'zope.testrunner>=5,<7', | |||
'zc.customdoctests>=1.0.1,<2', | |||
'createcoverage>=1,<2', | |||
'dask', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure but shouldn't we have here dask[complete]
to install all dask dependencies (e.g., dask.dataframe
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list of dependencies is only used as requirements for running the test suite. We do not automatically install pandas
or dask
together with the library, as runtime dependencies. As such, I think it is fine to use a minimal set of dependencies, which is enough to satisfy the requirements of the test suite.
3e7d1ea
to
7da9592
Compare
7da9592
to
c00c0e6
Compare
This method supports efficient batch inserts using CrateDB's bulk operations endpoint. https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations Co-authored-by: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com>
Specifically, outline _two_ concrete considerations for determining the optimal chunk size, and improve wording.
c00c0e6
to
754e1dc
Compare
Hi again,
in the same spirit as crate/crate-pdo#143, this patch unlocks CrateDB bulk operations for efficient batch inserts using pandas and Dask. It will accompany a followup to the Data Processing and Analytics with Dask and CrateDB: A Step-by-Step Tutorial.
The corresponding documentation section can be inspected at Preview: SQLAlchemy: DataFrame operations.
With kind regards,
Andreas.
/cc @marijaselakovic, @hammerhead, @hlcianfagna, @proddata, @WalBeh