Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix perf-test input data and refactor two tests #4301

Merged
merged 6 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dbms/programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,11 @@ class Client : public Poco::Util::Application

try
{
if (!processSingleQuery(str, ast) && !ignore_error)
auto ast_to_process = ast;
if (insert && insert->data)
ast_to_process = nullptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean, that the client will send data in source format uncompressed?

Copy link
Member Author

@alesapin alesapin Feb 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code contained an error. If client were used in multiquery mode, than all queries after insert

insert into tbl values (1, 2, 3);
select * from system.numbers limit 10;
...etc...

were placed into insert->data field. But it didn't lead to problems, because server had ignored data filed of insert queries. Now server sometimes parse insert->data, so we have to process multiquery insert queries correctly on client side.
I don't know how this is connected with compression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queries were always sent to server without embedded data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When data is sent in blocks (not embedded in query), it is compressed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queries were always sent to server without embedded data.
This is true just for insert into ... values queries of clickhouse-client https://github.com/yandex/ClickHouse/blob/master/dbms/programs/client/Client.cpp#L883-L885. If there were insert-select query, more precisely buggy insert .. values (1, 2, 3); select * from ... query parsed with multiquery option, it was sent with embedded query->data, but data was always ignored on server side: https://github.com/yandex/ClickHouse/blob/master./dbms/src/Interpreters/executeQuery.cpp#L168-L169.

Since I add logic for embedded data processing on server side it became a problem, so I split parsing of insert queries (parse each query separately, not all of them together) if they are insert queries and contain embedded data.

Nothing have changed, native client (clickhouse-client) will send insert queries with not embedded data in compressed blocks. I still do not understand how specified line connected with compression of data. It just query parsing and there was bug for multiqueries.


if (!processSingleQuery(str, ast_to_process) && !ignore_error)
return false;
}
catch (...)
Expand Down
44 changes: 38 additions & 6 deletions dbms/programs/performance-test/PerformanceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}

namespace
{
void waitQuery(Connection & connection)
{
bool finished = false;
while (true)
{
if (!connection.poll(1000000))
continue;

Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Exception:
throw *packet.exception;
}

if (finished)
break;
}
}
}

namespace fs = boost::filesystem;

PerformanceTest::PerformanceTest(
Expand Down Expand Up @@ -135,14 +161,18 @@ void PerformanceTest::prepare() const
{
for (const auto & query : test_info.create_queries)
{
LOG_INFO(log, "Executing create query '" << query << "'");
connection.sendQuery(query);
LOG_INFO(log, "Executing create query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}

for (const auto & query : test_info.fill_queries)
{
LOG_INFO(log, "Executing fill query '" << query << "'");
connection.sendQuery(query);
LOG_INFO(log, "Executing fill query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}

}
Expand All @@ -151,8 +181,10 @@ void PerformanceTest::finish() const
{
for (const auto & query : test_info.drop_queries)
{
LOG_INFO(log, "Executing drop query '" << query << "'");
connection.sendQuery(query);
LOG_INFO(log, "Executing drop query \"" << query << '\"');
connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false);
waitQuery(connection);
LOG_INFO(log, "Query finished");
}
}

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace ErrorCodes


InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context)
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());

Expand All @@ -36,15 +36,17 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
buffers.push_back(input_buffer_ast_part.get());
buffers.push_back(&input_buffer_tail_part);

if (input_buffer_tail_part)
buffers.push_back(input_buffer_tail_part);

/** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
* - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
*/

input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);

res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);

auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table);
if (columns_description && !columns_description->defaults.empty())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/InputStreamFromASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context);

Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>

#include <Common/typeid_cast.h>

#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/copyData.h>

#include <Parsers/ASTInsertQuery.h>
Expand Down Expand Up @@ -106,11 +110,12 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<SquashingBlockOutputStream>(
out, table->getSampleBlock(), context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
}
auto query_sample_block = getSampleBlock(query, table);

/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, getSampleBlock(query, table), table->getSampleBlock(), table->getColumns().defaults, context);
out, query_sample_block, table->getSampleBlock(), table->getColumns().defaults, context);

auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
Expand Down Expand Up @@ -140,6 +145,12 @@ BlockIO InterpreterInsertQuery::execute()
throw Exception("Cannot insert column " + name_type.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (query.data && !query.has_tail) /// can execute without additional data
{
res.in = std::make_shared<InputStreamFromASTInsertQuery>(query_ptr, nullptr, query_sample_block, context);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, res.out);
res.out = nullptr;
}

return res;
}
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
QueryProcessingStage::Enum stage,
bool has_query_tail)
{
time_t current_time = time(nullptr);

Expand All @@ -164,9 +165,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// TODO Parser should fail early when max_query_size limit is reached.
ast = parseQuery(parser, begin, end, "", max_query_size);

const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
auto * insert_query = dynamic_cast<ASTInsertQuery *>(ast.get());
if (insert_query && insert_query->data)
{
query_end = insert_query->data;
insert_query->has_tail = has_query_tail;
}
else
query_end = end;
}
Expand Down Expand Up @@ -434,7 +438,7 @@ BlockIO executeQuery(
QueryProcessingStage::Enum stage)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage);
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage, false);
return streams;
}

Expand Down Expand Up @@ -479,13 +483,13 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;

std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, !istr.eof());

try
{
if (streams.out)
{
InputStreamFromASTInsertQuery in(ast, istr, streams, context);
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context);
copyData(in, *streams.out);
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Parsers/ASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class ASTInsertQuery : public IAST
const char * data = nullptr;
const char * end = nullptr;

/// Query has additional data, which will be sent later
bool has_tail = false;

/** Get the text that identifies this element. */
String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; }

Expand Down
Loading