Skip to content

Commit

Permalink
[YSQL] Support alter table for colocated tables: #4293
Browse files Browse the repository at this point in the history
Summary:
Add alter table support for colocated tables.
Changes include:
1. AlterSchema RPC takes table ID as input. If there is no table ID, we use tablet's primary table ID.
2. SchemaPB now contains information about colocated table ID.

Future diff:
Tablet report sends a schema version for each tablet. Since a colocated tablet can have multiple tables, we need to send multiple schema versions (1 per table) and process that on the master.

Test Plan: Added tests to yb_feature_colocation

Reviewers: bogdan, hector

Reviewed By: hector

Subscribers: jason, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D8603
  • Loading branch information
ndeodhar committed Jun 25, 2020
1 parent f92bfdf commit 94e2c3a
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 85 deletions.
57 changes: 53 additions & 4 deletions src/postgres/src/test/regress/expected/yb_feature_colocation.out
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ SELECT * FROM tab_range;
2
(1 row)

TRUNCATE TABLE tab_range;
-- truncate non-colocated table without index
TRUNCATE TABLE tab_nonkey_noco;
SELECT * FROM tab_nonkey_noco;
Expand Down Expand Up @@ -374,6 +375,54 @@ SELECT * FROM tab_range_nonkey_noco2;
public | tab_range_range_pkey | index | yugabyte | tab_range_range
(17 rows)

-- ALTER TABLE
INSERT INTO tab_range (a) VALUES (0), (1), (2);
INSERT INTO tab_range_nonkey2 (a, b) VALUES (0, 0), (1, 1);
SELECT * FROM tab_range;
a
---
0
1
2
(3 rows)

SELECT * FROM tab_range_nonkey2;
a | b
---+---
0 | 0
1 | 1
(2 rows)

-- Alter colocated tables
ALTER TABLE tab_range ADD COLUMN x INT;
ALTER TABLE tab_range_nonkey2 DROP COLUMN b;
SELECT * FROM tab_range;
a | x
---+---
0 |
1 |
2 |
(3 rows)

SELECT * FROM tab_range_nonkey2;
a
---
0
1
(2 rows)

ALTER TABLE tab_range_nonkey2 RENAME TO tab_range_nonkey2_renamed;
SELECT * FROM tab_range_nonkey2_renamed;
a
---
0
1
(2 rows)

SELECT * FROM tab_range_nonkey2;
ERROR: relation "tab_range_nonkey2" does not exist
LINE 1: SELECT * FROM tab_range_nonkey2;
^
-- DROP TABLE
-- drop colocated table with default index
DROP TABLE tab_range;
Expand All @@ -388,10 +437,10 @@ ERROR: relation "tab_nonkey_noco" does not exist
LINE 1: SELECT * FROM tab_nonkey_noco;
^
--- drop colocated table with explicit index
DROP TABLE tab_range_nonkey2;
SELECT * FROM tab_range_nonkey2;
ERROR: relation "tab_range_nonkey2" does not exist
LINE 1: SELECT * FROM tab_range_nonkey2;
DROP TABLE tab_range_nonkey2_renamed;
SELECT * FROM tab_range_nonkey2_renamed;
ERROR: relation "tab_range_nonkey2_renamed" does not exist
LINE 1: SELECT * FROM tab_range_nonkey2_renamed;
^
-- drop non-colocated table with explicit index
DROP TABLE tab_range_nonkey_noco2;
Expand Down
24 changes: 22 additions & 2 deletions src/postgres/src/test/regress/sql/yb_feature_colocation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ SELECT * FROM tab_range;
INSERT INTO tab_range VALUES (2);
SELECT * FROM tab_range;

TRUNCATE TABLE tab_range;

-- truncate non-colocated table without index
TRUNCATE TABLE tab_nonkey_noco;
SELECT * FROM tab_nonkey_noco;
Expand All @@ -129,6 +131,24 @@ SELECT * FROM tab_range_nonkey_noco2;
\dt
\di

-- ALTER TABLE
INSERT INTO tab_range (a) VALUES (0), (1), (2);
INSERT INTO tab_range_nonkey2 (a, b) VALUES (0, 0), (1, 1);

SELECT * FROM tab_range;
SELECT * FROM tab_range_nonkey2;

-- Alter colocated tables
ALTER TABLE tab_range ADD COLUMN x INT;
ALTER TABLE tab_range_nonkey2 DROP COLUMN b;

SELECT * FROM tab_range;
SELECT * FROM tab_range_nonkey2;

ALTER TABLE tab_range_nonkey2 RENAME TO tab_range_nonkey2_renamed;
SELECT * FROM tab_range_nonkey2_renamed;
SELECT * FROM tab_range_nonkey2;

-- DROP TABLE

-- drop colocated table with default index
Expand All @@ -140,8 +160,8 @@ DROP TABLE tab_nonkey_noco;
SELECT * FROM tab_nonkey_noco;

--- drop colocated table with explicit index
DROP TABLE tab_range_nonkey2;
SELECT * FROM tab_range_nonkey2;
DROP TABLE tab_range_nonkey2_renamed;
SELECT * FROM tab_range_nonkey2_renamed;

-- drop non-colocated table with explicit index
DROP TABLE tab_range_nonkey_noco2;
Expand Down
11 changes: 11 additions & 0 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ message ColumnSchemaPB {
repeated QLJsonOperationPB OBSOLETE_json_operations = 13;
}

message ColocatedTableIdentifierPB {
// Note: an absent value means NULL.
oneof value {
// Colocated YSQL user tables use 4-byte PG table ID.
uint32 pgtable_id = 1;
// Colocated YSQL system tables use 16-byte UUID.
bytes cotable_id = 2;
}
}

message TablePropertiesPB {
optional uint64 default_time_to_live = 1;
optional bool contain_counters = 2;
Expand Down Expand Up @@ -257,6 +267,7 @@ message TablePropertiesPB {
message SchemaPB {
repeated ColumnSchemaPB columns = 1;
optional TablePropertiesPB table_properties = 2;
optional ColocatedTableIdentifierPB colocated_table_id = 3;
}

// This message contains the metadata of a secondary index of a table.
Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ void SchemaBuilder::Reset() {
num_key_columns_ = 0;
next_id_ = kFirstColumnId;
table_properties_.Reset();
pgtable_id_ = 0;
cotable_id_ = Uuid(boost::uuids::nil_uuid());
}

void SchemaBuilder::Reset(const Schema& schema) {
Expand All @@ -594,6 +596,8 @@ void SchemaBuilder::Reset(const Schema& schema) {
next_id_ = *std::max_element(col_ids_.begin(), col_ids_.end()) + 1;
}
table_properties_ = schema.table_properties_;
pgtable_id_ = schema.pgtable_id_;
cotable_id_ = schema.cotable_id_;
}

Status SchemaBuilder::AddKeyColumn(const string& name, const shared_ptr<QLType>& type) {
Expand Down
26 changes: 24 additions & 2 deletions src/yb/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1169,8 +1169,28 @@ class SchemaBuilder {
return next_id_;
}

Schema Build() const { return Schema(cols_, col_ids_, num_key_columns_, table_properties_); }
Schema BuildWithoutIds() const { return Schema(cols_, num_key_columns_, table_properties_); }
void set_pgtable_id(PgTableOid pgtable_id) {
pgtable_id_ = pgtable_id;
}

PgTableOid pgtable_id() const {
return pgtable_id_;
}

void set_cotable_id(Uuid cotable_id) {
cotable_id_ = cotable_id;
}

Uuid cotable_id() const {
return cotable_id_;
}

Schema Build() const {
return Schema(cols_, col_ids_, num_key_columns_, table_properties_, cotable_id_, pgtable_id_);
}
Schema BuildWithoutIds() const {
return Schema(cols_, num_key_columns_, table_properties_, cotable_id_, pgtable_id_);
}

// assumes type is allowed in primary key -- this should be checked before getting here
// using DataType (not QLType) since primary key columns only support elementary types
Expand Down Expand Up @@ -1238,6 +1258,8 @@ class SchemaBuilder {
unordered_set<string> col_names_;
size_t num_key_columns_;
TableProperties table_properties_;
PgTableOid pgtable_id_ = 0;
Uuid cotable_id_ = Uuid(boost::uuids::nil_uuid());

DISALLOW_COPY_AND_ASSIGN(SchemaBuilder);
};
Expand Down
31 changes: 30 additions & 1 deletion src/yb/common/wire_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <string>
#include <vector>

#include "yb/common/entity_ids.h"
#include "yb/common/row.h"
#include "yb/gutil/port.h"
#include "yb/gutil/stl_util.h"
Expand Down Expand Up @@ -311,8 +312,19 @@ Status AddHostPortPBs(const std::vector<Endpoint>& addrs,
return Status::OK();
}

void SchemaToColocatedTableIdentifierPB(
const Schema& schema, ColocatedTableIdentifierPB* colocated_pb) {
if (schema.has_pgtable_id()) {
colocated_pb->set_pgtable_id(schema.pgtable_id());
} else if (schema.has_cotable_id()) {
colocated_pb->set_cotable_id(schema.cotable_id().ToString());
}

}

void SchemaToPB(const Schema& schema, SchemaPB *pb, int flags) {
pb->Clear();
SchemaToColocatedTableIdentifierPB(schema, pb->mutable_colocated_table_id());
SchemaToColumnPBs(schema, pb->mutable_columns(), flags);
schema.table_properties().ToTablePropertiesPB(pb->mutable_table_properties());
}
Expand All @@ -331,7 +343,24 @@ Status SchemaFromPB(const SchemaPB& pb, Schema *schema) {

// Convert the table properties.
TableProperties table_properties = TableProperties::FromTablePropertiesPB(pb.table_properties());
return schema->Reset(columns, column_ids, num_key_columns, table_properties);
RETURN_NOT_OK(schema->Reset(columns, column_ids, num_key_columns, table_properties));

if (pb.has_colocated_table_id()) {
switch (pb.colocated_table_id().value_case()) {
case ColocatedTableIdentifierPB::kCotableId: {
Uuid cotable_id;
RETURN_NOT_OK(cotable_id.FromString(pb.colocated_table_id().cotable_id()));
schema->set_cotable_id(cotable_id);
break;
}
case ColocatedTableIdentifierPB::kPgtableId:
schema->set_pgtable_id(pb.colocated_table_id().pgtable_id());
break;
case ColocatedTableIdentifierPB::VALUE_NOT_SET:
break;
}
}
return Status::OK();
}

void ColumnSchemaToPB(const ColumnSchema& col_schema, ColumnSchemaPB *pb, int flags) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/wire_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ void SchemaToColumnPBs(
google::protobuf::RepeatedPtrField<ColumnSchemaPB>* cols,
int flags = 0);

// Extract the colocated table information of the given schema into a protobuf object.
void SchemaToColocatedTableIdentifierPB(
const Schema& schema, ColocatedTableIdentifierPB* colocated_pb);

YB_DEFINE_ENUM(UsePrivateIpMode, (cloud)(region)(zone)(never));

// Returns mode for selecting between private and public IP.
Expand Down
11 changes: 10 additions & 1 deletion src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ AsyncTabletLeaderTask::AsyncTabletLeaderTask(
tablet_(tablet) {
}

AsyncTabletLeaderTask::AsyncTabletLeaderTask(
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
const scoped_refptr<TableInfo>& table)
: RetryingTSRpcTask(
master, callback_pool, gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)), table),
tablet_(tablet) {
}

std::string AsyncTabletLeaderTask::description() const {
return type_name() + " RPC for " + tablet_->ToString();
}
Expand Down Expand Up @@ -626,7 +634,7 @@ void AsyncAlterTable::HandleResponse(int attempt) {
if (state() == MonitoredTaskState::kComplete) {
// TODO: proper error handling here. Not critical, since TSHeartbeat will retry on failure.
WARN_NOT_OK(master_->catalog_manager()->HandleTabletSchemaVersionReport(
tablet_.get(), schema_version_),
tablet_.get(), schema_version_, table()),
yb::Format(
"$0 for $1 failed while running AsyncAlterTable::HandleResponse. response $2",
description(), tablet_->ToString(), resp_.DebugString()));
Expand All @@ -647,6 +655,7 @@ bool AsyncAlterTable::SendRequest(int attempt) {
req.set_schema_version(l->data().pb.version());
req.set_dest_uuid(permanent_uuid());
req.set_tablet_id(tablet_->tablet_id());
req.set_alter_table_id(table_->id());

if (l->data().pb.has_wal_retention_secs()) {
req.set_wal_retention_secs(l->data().pb.wal_retention_secs());
Expand Down
13 changes: 11 additions & 2 deletions src/yb/master/async_rpc_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "yb/gutil/gscoped_ptr.h"
#include "yb/gutil/strings/substitute.h"

#include "yb/master/catalog_entity_info.h"
#include "yb/rpc/rpc_controller.h"

#include "yb/server/monitored_task.h"
Expand Down Expand Up @@ -271,6 +272,10 @@ class AsyncTabletLeaderTask : public RetryingTSRpcTask {
AsyncTabletLeaderTask(
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet);

AsyncTabletLeaderTask(
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
const scoped_refptr<TableInfo>& table);

std::string description() const override;

protected:
Expand Down Expand Up @@ -358,10 +363,14 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
class AsyncAlterTable : public AsyncTabletLeaderTask {
public:
AsyncAlterTable(
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
bool has_wal_retention_secs = false)
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet)
: AsyncTabletLeaderTask(master, callback_pool, tablet) {}

AsyncAlterTable(
Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
const scoped_refptr<TableInfo>& table)
: AsyncTabletLeaderTask(master, callback_pool, tablet, table) {}

Type type() const override { return ASYNC_ALTER_TABLE; }

std::string type_name() const override { return "Alter Table"; }
Expand Down
20 changes: 12 additions & 8 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TabletInfo::TabletInfo(const scoped_refptr<TableInfo>& table, TabletId tablet_id
: tablet_id_(std::move(tablet_id)),
table_(table),
last_update_time_(MonoTime::Now()),
reported_schema_version_(0) {}
reported_schema_version_({}) {}

TabletInfo::~TabletInfo() {
}
Expand Down Expand Up @@ -178,18 +178,22 @@ MonoTime TabletInfo::last_update_time() const {
return last_update_time_;
}

bool TabletInfo::set_reported_schema_version(uint32_t version) {
bool TabletInfo::set_reported_schema_version(const TableId& table_id, uint32_t version) {
std::lock_guard<simple_spinlock> l(lock_);
if (version > reported_schema_version_) {
reported_schema_version_ = version;
if (reported_schema_version_.count(table_id) == 0 ||
version > reported_schema_version_[table_id]) {
reported_schema_version_[table_id] = version;
return true;
}
return false;
}

uint32_t TabletInfo::reported_schema_version() const {
uint32_t TabletInfo::reported_schema_version(const TableId& table_id) {
std::lock_guard<simple_spinlock> l(lock_);
return reported_schema_version_;
if (reported_schema_version_.count(table_id) == 0) {
return 0;
}
return reported_schema_version_[table_id];
}

bool TabletInfo::colocated() const {
Expand Down Expand Up @@ -367,10 +371,10 @@ void TableInfo::GetTabletsInRange(const GetTableLocationsRequestPB* req, TabletI
bool TableInfo::IsAlterInProgress(uint32_t version) const {
shared_lock<decltype(lock_)> l(lock_);
for (const TableInfo::TabletInfoMap::value_type& e : tablet_map_) {
if (e.second->reported_schema_version() < version) {
if (e.second->reported_schema_version(table_id_) < version) {
VLOG(3) << "Table " << table_id_ << " ALTER in progress due to tablet "
<< e.second->ToString() << " because reported schema "
<< e.second->reported_schema_version() << " < expected " << version;
<< e.second->reported_schema_version(table_id_) << " < expected " << version;
return true;
}
}
Expand Down
Loading

0 comments on commit 94e2c3a

Please sign in to comment.