Skip to content

Commit

Permalink
Fix wrong return value of timestamp column if the timestamp value is …
Browse files Browse the repository at this point in the history
…`1970-01-01` and the timezone offset is negative (#1577)

* fix timestamp value error in default encode type

* add ut

* fix ci error
  • Loading branch information
windtalker authored Mar 20, 2021
1 parent ae4a40c commit be6ccc9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 17 deletions.
28 changes: 23 additions & 5 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,27 @@ String MyDateTime::toString(int fsp) const
return result;
}

bool isZeroDate(UInt64 time) { return time == 0; }
inline bool isZeroDate(UInt64 time) { return time == 0; }

inline bool supportedByDateLUT(const MyDateTime & my_time) { return my_time.year >= 1970; }

/// DateLUT only support time from year 1970, in some corner cases, the input date may be
/// 1969-12-31, need extra logical to handle it
inline time_t getEpochSecond(const MyDateTime & my_time, const DateLUTImpl & time_zone)
{
if likely (supportedByDateLUT(my_time))
return time_zone.makeDateTime(my_time.year, my_time.month, my_time.day, my_time.hour, my_time.minute, my_time.second);
if likely (my_time.year == 1969 && my_time.month == 12 && my_time.day == 31)
{
/// - 3600 * 24 + my_time.hour * 3600 + my_time.minute * 60 + my_time.second is UTC based, need to adjust
/// the epoch according to the input time_zone
return -3600 * 24 + my_time.hour * 3600 + my_time.minute * 60 + my_time.second - time_zone.getOffsetAtStartEpoch();
}
else
{
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
}
}

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to)
{
Expand All @@ -793,8 +813,7 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
return;
}
MyDateTime from_my_time(from_time);
time_t epoch = time_zone_from.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
time_t epoch = getEpochSecond(from_my_time, time_zone_from);
if (unlikely(epoch + time_zone_to.getOffsetAtStartEpoch() + SECONDS_PER_DAY < 0))
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
MyDateTime to_my_time(time_zone_to.toYear(epoch), time_zone_to.toMonth(epoch), time_zone_to.toDayOfMonth(epoch),
Expand All @@ -810,8 +829,7 @@ void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, c
return;
}
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
time_t epoch = getEpochSecond(from_my_time, time_zone);
epoch += offset;
if (unlikely(epoch + SECONDS_PER_DAY < 0))
throw Exception("Unsupported timestamp value , TiFlash only support timestamp after 1970-01-01 00:00:00 UTC)");
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
return grpc::Status::OK;
}

::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context, const ::mpp::CancelTaskRequest* request, ::mpp::CancelTaskResponse* response) {
::grpc::Status FlashService::CancelMPPTask(
::grpc::ServerContext * grpc_context, const ::mpp::CancelTaskRequest * request, ::mpp::CancelTaskResponse * response)
{
// CancelMPPTask cancels the query of the task.
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": cancel mpp task request: " << request->DebugString());

Expand All @@ -217,10 +219,10 @@ ::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context,
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment();
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});
GET_METRIC(metrics, tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_METRIC(metrics, tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(metrics, tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
});

auto [context, status] = createDBContext(grpc_context);
auto err = new mpp::Error();
Expand All @@ -232,7 +234,7 @@ ::grpc::Status FlashService::CancelMPPTask(::grpc::ServerContext* grpc_context,
}
auto & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->cancelMPPQuery(request->meta().start_ts());
task_manager->cancelMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB");
return grpc::Status::OK;
}

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ bool MPPTask::isTaskHanging()
return false;
}

void MPPTask::cancel()
void MPPTask::cancel(const String & reason)
{
auto current_status = status.load();
if (current_status == FINISHED || current_status == CANCELLED)
Expand All @@ -314,7 +314,7 @@ void MPPTask::cancel()
/// Here we use `closeAllTunnel` because currently, `cancel` is a query level cancel, which
/// means if this mpp task is cancelled, all the mpp tasks belonging to the same query are
/// cancelled at the same time, so there is no guarantee that the tunnel can be connected.
closeAllTunnel("MPP Task canceled because it seems hangs");
closeAllTunnel(reason);
LOG_WARNING(log, "Finish cancel task: " + id.toString());
}

Expand Down Expand Up @@ -404,7 +404,7 @@ MPPTaskManager::MPPTaskManager(BackgroundProcessingPool & background_pool_)
if (has_hanging_task)
{
has_hanging_query = true;
this->cancelMPPQuery(query_id);
this->cancelMPPQuery(query_id, "MPP Task canceled because it seems hangs");
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ struct MPPTunnel
void close(const String & reason)
{
std::unique_lock<std::mutex> lk(mu);
if (finished)
return;
if (connected)
{
mpp::MPPDataPacket data;
Expand Down Expand Up @@ -294,7 +296,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab

bool isTaskHanging();

void cancel();
void cancel(const String & reason);

/// Similar to `writeErrToAllTunnel`, but it just try to write the error message to tunnel
/// without waiting the tunnel to be connected
Expand Down Expand Up @@ -501,7 +503,7 @@ class MPPTaskManager : private boost::noncopyable
return ret ? it->second : nullptr;
}

void cancelMPPQuery(UInt64 query_id)
void cancelMPPQuery(UInt64 query_id, const String & reason)
{
MPPQueryMap::iterator it;
{
Expand All @@ -516,13 +518,19 @@ class MPPTaskManager : private boost::noncopyable
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
}
for (auto & task_id : it->second.task_map)
task_id.second->cancel();
task_id.second->cancel(reason);
MPPQueryTaskSet canceled_task_set;
{
std::lock_guard<std::mutex> lock(mu);
/// just to double check the query still exists
it = mpp_query_map.find(query_id);
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}
Expand Down
22 changes: 22 additions & 0 deletions tests/delta-merge-test/tidb_query/time_zone.test
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test default encode with negative timezone offset
=> DBGInvoke dag('select * from default.test',4,'encode_type:default,tz_offset:-28800')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 1970-01-01 │ 1970-01-01 00:00:01.00000 │ 1970-01-01 00:00:01 │
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test chunk encode
=> DBGInvoke dag('select * from default.test',4,'encode_type:chunk,tz_name:America/Chicago')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
Expand All @@ -71,6 +82,17 @@
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test default encode with negative timezone offset
=> DBGInvoke dag('select * from default.test',4,'encode_type:default,tz_name:America/Chicago')
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 1970-01-01 │ 1970-01-01 00:00:01.00000 │ 1970-01-01 00:00:01 │
│ 0000-00-00 │ 0000-00-00 00:00:00.00000 │ 0000-00-00 00:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

=> DBGInvoke dag('select * from default.test where col_2 > col_3')

=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'encode_type:default,tz_offset:28800')
Expand Down

0 comments on commit be6ccc9

Please sign in to comment.