Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong return value of timestamp column if the timestamp value is 1970-01-01 and the timezone offset is negative #1577

Merged
merged 5 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,27 @@ String MyDateTime::toString(int fsp) const
return result;
}

bool isZeroDate(UInt64 time) { return time == 0; }
inline bool isZeroDate(UInt64 time) { return time == 0; }

inline bool supportedByDateLUT(const MyDateTime & my_time) { return my_time.year >= 1970; }

/// DateLUT only support time from year 1970, in some corner cases, the input date may be
/// 1969-12-31, need extra logical to handle it
inline time_t getEpochSecond(const MyDateTime & my_time, const DateLUTImpl & time_zone)
{
if likely (supportedByDateLUT(my_time))
return time_zone.makeDateTime(my_time.year, my_time.month, my_time.day, my_time.hour, my_time.minute, my_time.second);
if likely (my_time.year == 1969 && my_time.month == 12 && my_time.day == 31)
{
/// - 3600 * 24 + my_time.hour * 3600 + my_time.minute * 60 + my_time.second is UTC based, need to adjust
/// the epoch according to the input time_zone
return -3600 * 24 + my_time.hour * 3600 + my_time.minute * 60 + my_time.second - time_zone.getOffsetAtStartEpoch();
}
else
{
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
}
}

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to)
{
Expand All @@ -793,8 +813,7 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
return;
}
MyDateTime from_my_time(from_time);
time_t epoch = time_zone_from.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
time_t epoch = getEpochSecond(from_my_time, time_zone_from);
if (unlikely(epoch + time_zone_to.getOffsetAtStartEpoch() + SECONDS_PER_DAY < 0))
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
MyDateTime to_my_time(time_zone_to.toYear(epoch), time_zone_to.toMonth(epoch), time_zone_to.toDayOfMonth(epoch),
Expand All @@ -810,8 +829,7 @@ void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, c
return;
}
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
time_t epoch = getEpochSecond(from_my_time, time_zone);
epoch += offset;
if (unlikely(epoch + SECONDS_PER_DAY < 0))
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
return grpc::Status::OK;
}

::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) {
::grpc::Status FlashService::CancelMPPTask(
::grpc::ServerContext * grpc_context, const ::mpp::CancelTaskRequest * request, ::mpp::CancelTaskResponse * response)
{
// CancelMPPTask cancels the query of the task.
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": cancel mpp task request: " << request->DebugString());

Expand All @@ -217,10 +219,10 @@ ::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context,
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment();
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});

auto [context, status] = createDBContext(grpc_context);
auto err = new mpp::Error();
Expand All @@ -232,7 +234,7 @@ ::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context,
}
auto & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->cancelMPPQuery(request->meta().start_ts());
task_manager->cancelMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB");
return grpc::Status::OK;
}

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ bool MPPTask::isTaskHanging()
return false;
}

void MPPTask::cancel()
void MPPTask::cancel(const String & reason)
{
auto current_status = status.load();
if (current_status == FINISHED || current_status == CANCELLED)
Expand All @@ -314,7 +314,7 @@ void MPPTask::cancel()
/// Here we use `closeAllTunnel` because currently, `cancel` is a query level cancel, which
/// means if this mpp task is cancelled, all the mpp tasks belonging to the same query are
/// cancelled at the same time, so there is no guarantee that the tunnel can be connected.
closeAllTunnel("MPP Task canceled because it seems hangs");
closeAllTunnel(reason);
LOG_WARNING(log, "Finish cancel task: " + id.toString());
}

Expand Down Expand Up @@ -404,7 +404,7 @@ MPPTaskManager::MPPTaskManager(BackgroundProcessingPool & background_pool_)
if (has_hanging_task)
{
has_hanging_query = true;
this->cancelMPPQuery(query_id);
this->cancelMPPQuery(query_id, "MPP Task canceled because it seems hangs");
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ struct MPPTunnel
void close(const String & reason)
{
std::unique_lock<std::mutex> lk(mu);
if (finished)
return;
if (connected)
{
mpp::MPPDataPacket data;
Expand Down Expand Up @@ -294,7 +296,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab

bool isTaskHanging();

void cancel();
void cancel(const String & reason);

/// Similar to `writeErrToAllTunnel`, but it just try to write the error message to tunnel
/// without waiting the tunnel to be connected
Expand Down Expand Up @@ -501,7 +503,7 @@ class MPPTaskManager : private boost::noncopyable
return ret ? it->second : nullptr;
}

void cancelMPPQuery(UInt64 query_id)
void cancelMPPQuery(UInt64 query_id, const String & reason)
{
MPPQueryMap::iterator it;
{
Expand All @@ -516,13 +518,19 @@ class MPPTaskManager : private boost::noncopyable
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
}
for (auto & task_id : it->second.task_map)
task_id.second->cancel();
task_id.second->cancel(reason);
MPPQueryTaskSet canceled_task_set;
{
std::lock_guard<std::mutex> lock(mu);
/// just to double check the query still exists
it = mpp_query_map.find(query_id);
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}
Expand Down
22 changes: 22 additions & 0 deletions tests/delta-merge-test/tidb_query/time_zone.test
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test default encode with negative timezone offset
=> DBGInvoke dag('select * from default.test',4,'encode_type:default,tz_offset:-28800')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 1970-01-01 │ 1970-01-01 00:00:01.00000 │ 1970-01-01 00:00:01 │
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test chunk encode
=> DBGInvoke dag('select * from default.test',4,'encode_type:chunk,tz_name:America/Chicago')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
Expand All @@ -71,6 +82,17 @@
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test default encode with negative timezone offset
=> DBGInvoke dag('select * from default.test',4,'encode_type:default,tz_name:America/Chicago')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 1970-01-01 │ 1970-01-01 00:00:01.00000 │ 1970-01-01 00:00:01 │
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

=> DBGInvoke dag('select * from default.test where col_2 > col_3')

=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'encode_type:default,tz_offset:28800')
Expand Down