Skip to content

Commit

Permalink
[#11749] YSQL: Improve/refactor tablegroups and colocation
Browse files Browse the repository at this point in the history
Summary:
The main objective of this diff it to make colocation/tablegroups code cleaner and clearer.

- Introduced `YsqlTablegroupManager` responsible for tracking tablegroups on master side
- Did a bunch of renames for clarity - tried to remove as much ambiguity from the term "colocation" as possible
- Fixed `TablegroupExists` erroneously returning `false` before
- Reworked how colocation is treated in `CreateTable`
- `CreateTablegroup` and `DeleteTablegroup` implementation slightly reworked and moved from `client` to `client-internal`
- Added `IsIdentifierLikeUuid` and a bunch of sanity DCHECKs using it
- Reworked `GetTablegroupSchema` and 2DC using it

Smaller changes:
- Added `IsColocationParentTable`
- `IsTablegroupParentTableId`, etc. moved to `master_util`, encapsulated colocation suffixes there too
- Removed unused `CatalogManager::HasTablegroups`
- Removed unnecessary database name from `DeleteTablegroup` request
- Improved `TestTablespaceProperties`

---

Resolves #11749

Test Plan:
Existing tests, specifically:
- PgLibPqTest.ColocatedTablegroups
- MasterTest.TestTablegroups
- TwoDCYsqlTest.TablegroupReplication
- org.yb.pgsql.TestPgRegressTablegroup

Reviewers: mihnea, nicolas, rahuldesirazu, jason

Reviewed By: jason

Subscribers: rahuldesirazu, nicolas, yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D16158
  • Loading branch information
frozenspider committed Apr 25, 2022
1 parent 11d2f23 commit fffd1ae
Show file tree
Hide file tree
Showing 64 changed files with 1,093 additions and 838 deletions.
5 changes: 3 additions & 2 deletions ent/src/yb/integration-tests/twodc_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "yb/master/master.h"
#include "yb/master/master_cluster.proxy.h"
#include "yb/master/master_ddl.proxy.h"
#include "yb/master/master_util.h"
#include "yb/master/master_replication.proxy.h"
#include "yb/master/master-test-util.h"
#include "yb/master/sys_catalog_initialization.h"
Expand Down Expand Up @@ -403,7 +404,7 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface<T
Format("Unable to find tablegroup in namespace $0", namespace_name));
}

return resp.tablegroups()[0].id() + master::kTablegroupParentTableIdSuffix;
return master::GetTablegroupParentTableId(resp.tablegroups()[0].id());
}

Status CreateTablegroup(Cluster* cluster,
Expand Down Expand Up @@ -846,7 +847,7 @@ TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) {
// Only need to add the colocated parent table id.
setup_universe_req.mutable_producer_table_ids()->Reserve(1);
setup_universe_req.add_producer_table_ids(
ns_resp.namespace_().id() + master::kColocatedParentTableIdSuffix);
master::GetColocatedDbParentTableId(ns_resp.namespace_().id()));
auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster());
auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client()->proxy_cache(),
Expand Down
46 changes: 25 additions & 21 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "yb/master/master.h"
#include "yb/master/master_backup.pb.h"
#include "yb/master/master_error.h"
#include "yb/master/ysql_tablegroup_manager.h"

#include "yb/cdc/cdc_consumer.pb.h"
#include "yb/cdc/cdc_service.h"
Expand Down Expand Up @@ -670,9 +671,8 @@ Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req,

TRACE("Locking table");
auto l = table_info->LockForRead();
// PG schema name is available for YSQL table only.
// Except '<uuid>.colocated.parent.uuid' table ID.
if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocatedParentTableId(entry.id())) {
// PG schema name is available for YSQL table only, except for colocation parent tables.
if (l->table_type() == PGSQL_TABLE_TYPE && !IsColocationParentTableId(entry.id())) {
const string pg_schema_name = VERIFY_RESULT(GetPgSchemaName(table_info));
VLOG(1) << "PG Schema: " << pg_schema_name << " for table " << table_info->ToString();
backup_entry->set_pg_schema_name(pg_schema_name);
Expand Down Expand Up @@ -1547,7 +1547,7 @@ Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map,
SharedLock lock(mutex_);
if (VERIFY_RESULT(CheckTableForImport(table, table_data))) {
LOG_WITH_FUNC(INFO) << "Found existing table: '" << table->ToString() << "'";
if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) {
if (meta.colocated() && IsColocationParentTableId(table_data->old_table_id)) {
// Parent colocated tables don't have partition info, so make sure to mark them.
is_parent_colocated_table = true;
}
Expand Down Expand Up @@ -1576,12 +1576,13 @@ Status CatalogManager::ImportTableEntry(const NamespaceMap& namespace_map,
// For YSQL, the table must be created via external call. Therefore, continue the search for
// the table, this time checking for name matches rather than id matches.

if (meta.colocated() && IsColocatedParentTableId(table_data->old_table_id)) {
// TODO(alex): Handle tablegroups in #11632
if (meta.colocated() && IsColocatedDbParentTableId(table_data->old_table_id)) {
// For the parent colocated table we need to generate the new_table_id ourselves
// since the names will not match.
// For normal colocated tables, we are still able to follow the normal table flow, so no
// need to generate the new_table_id ourselves.
table_data->new_table_id = new_namespace_id + kColocatedParentTableIdSuffix;
table_data->new_table_id = GetColocatedDbParentTableId(new_namespace_id);
is_parent_colocated_table = true;
} else {
if (!table_data->new_table_id.empty()) {
Expand Down Expand Up @@ -3306,18 +3307,19 @@ Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRe

// SETUP CONTINUES after this async call.
Status s;
if (IsColocatedParentTableId(req->producer_table_ids(i))) {
if (IsColocatedDbParentTableId(req->producer_table_ids(i))) {
auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>();
s = cdc_rpc->client()->GetColocatedTabletSchemaById(
s = cdc_rpc->client()->GetColocatedTabletSchemaByParentTableId(
req->producer_table_ids(i), tables_info,
Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this),
ri->id(), tables_info, table_id_to_bootstrap_id));
} else if (IsTablegroupParentTableId(req->producer_table_ids(i))) {
auto tablegroup_info = std::make_shared<std::vector<client::YBTableInfo>>();
auto tablegroup_id = GetTablegroupIdFromParentTableId(req->producer_table_ids(i));
auto tables_info = std::make_shared<std::vector<client::YBTableInfo>>();
s = cdc_rpc->client()->GetTablegroupSchemaById(
req->producer_table_ids(i), tablegroup_info,
tablegroup_id, tables_info,
Bind(&enterprise::CatalogManager::GetTablegroupSchemaCallback, Unretained(this),
ri->id(), tablegroup_info, req->producer_table_ids(i), table_id_to_bootstrap_id));
ri->id(), tables_info, tablegroup_id, table_id_to_bootstrap_id));
} else {
auto table_info = std::make_shared<client::YBTableInfo>();
s = cdc_rpc->client()->GetTableSchemaById(
Expand Down Expand Up @@ -3685,16 +3687,17 @@ void CatalogManager::GetTablegroupSchemaCallback(
// the tablegroup ID from table ID), we only do this call once and do validation afterward.
TablegroupId consumer_tablegroup_id;
{
const auto& result = FindTablegroupByTableId(*validated_consumer_tables.begin());
if (!result.has_value()) {
SharedLock lock(mutex_);
const auto* tablegroup = tablegroup_manager_->FindByTable(*validated_consumer_tables.begin());
if (!tablegroup) {
std::string message =
Format("No consumer tablegroup found for producer tablegroup: $0",
producer_tablegroup_id);
MarkUniverseReplicationFailed(universe, STATUS(IllegalState, message));
LOG(ERROR) << message;
return;
}
consumer_tablegroup_id = result.value();
consumer_tablegroup_id = tablegroup->id();
}

// tables_in_consumer_tablegroup are the tables listed within the consumer_tablegroup_id.
Expand All @@ -3703,7 +3706,7 @@ void CatalogManager::GetTablegroupSchemaCallback(
{
GetTablegroupSchemaRequestPB req;
GetTablegroupSchemaResponsePB resp;
req.mutable_parent_tablegroup()->set_id(consumer_tablegroup_id);
req.mutable_tablegroup()->set_id(consumer_tablegroup_id);
Status status = GetTablegroupSchema(&req, &resp);
if (!status.ok() || resp.has_error()) {
std::string message = Format("Error when getting consumer tablegroup schema: $0",
Expand Down Expand Up @@ -3747,10 +3750,11 @@ void CatalogManager::GetTablegroupSchemaCallback(
<< producer_tablegroup_id << ": " << status;
}

status = AddValidatedTableAndCreateCdcStreams(universe,
table_bootstrap_ids,
producer_tablegroup_id,
consumer_tablegroup_id);
status = AddValidatedTableAndCreateCdcStreams(
universe,
table_bootstrap_ids,
GetTablegroupParentTableId(producer_tablegroup_id),
GetTablegroupParentTableId(consumer_tablegroup_id));
if (!status.ok()) {
LOG(ERROR) << "Found error while adding validated table to system catalog: "
<< producer_tablegroup_id << ": " << status;
Expand Down Expand Up @@ -3813,9 +3817,9 @@ void CatalogManager::GetColocatedTabletSchemaCallback(
}
// Store the parent table ids.
producer_parent_table_ids.insert(
info.table_name.namespace_id() + kColocatedParentTableIdSuffix);
GetColocatedDbParentTableId(info.table_name.namespace_id()));
consumer_parent_table_ids.insert(
resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix);
GetColocatedDbParentTableId(resp.identifier().namespace_().id()));
}

// Verify that we only found one producer and one consumer colocated parent table id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,16 @@ private void runThread() {
// Sleep for a short time and give the child process a chance to generate more output.
Thread.sleep(10);
}
} catch (IOException ex) {
if (ex.getMessage().toLowerCase().contains("stream closed")) {
// This probably means we're stopping, OK to ignore.
LOG.info(withPrefix(ex.getMessage()));
} else {
throw ex;
}
} catch (InterruptedException iex) {
// This probably means we're stopping, OK to ignore.
LOG.info(withPrefix(iex.getMessage()), iex);
LOG.info(withPrefix(iex.getMessage()));
} catch (Throwable t) {
LOG.warn(withPrefix(t.getMessage()), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.List;
Expand All @@ -34,6 +36,8 @@
import org.slf4j.LoggerFactory;

import com.yugabyte.util.PSQLException;

import org.yb.CommonNet.CloudInfoPB;
import org.yb.client.LeaderStepDownResponse;
import org.yb.client.LocatedTablet;
import org.yb.client.ModifyClusterConfigLiveReplicas;
Expand Down Expand Up @@ -552,6 +556,8 @@ void verifyCustomPlacement(final String table) throws Exception {
client.waitForReplicaCount(getTableFromName(table), 2, 30_000);

List<LocatedTablet> tabletLocations = getTableFromName(table).getTabletsLocations(30_000);
String errorMsg = "Invalid custom placement for table '" + table + "': "
+ getPlacementInfoString(tabletLocations);

// Get tablets for table.
for (LocatedTablet tablet : tabletLocations) {
Expand All @@ -561,15 +567,15 @@ void verifyCustomPlacement(final String table) throws Exception {
for (LocatedTablet.Replica replica : replicas) {
final String role = replica.getRole();
assertFalse(role, role.contains("READ_REPLICA"));
org.yb.CommonNet.CloudInfoPB cloudInfo = replica.getCloudInfo();
if (cloudInfo.getPlacementCloud().equals("cloud1")) {
assertTrue(cloudInfo.getPlacementRegion().equals("region1"));
assertTrue(cloudInfo.getPlacementZone().equals("zone1"));
CloudInfoPB cloudInfo = replica.getCloudInfo();
if ("cloud1".equals(cloudInfo.getPlacementCloud())) {
assertEquals(errorMsg, "region1", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone1", cloudInfo.getPlacementZone());
continue;
}
assertTrue(cloudInfo.getPlacementCloud().equals("cloud2"));
assertTrue(cloudInfo.getPlacementRegion().equals("region2"));
assertTrue(cloudInfo.getPlacementZone().equals("zone2"));
assertEquals(errorMsg, "cloud2", cloudInfo.getPlacementCloud());
assertEquals(errorMsg, "region2", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone2", cloudInfo.getPlacementZone());
}
}
}
Expand All @@ -579,26 +585,53 @@ void verifyDefaultPlacement(final String table) throws Exception {
client.waitForReplicaCount(getTableFromName(table), 3, 30_000);

List<LocatedTablet> tabletLocations = getTableFromName(table).getTabletsLocations(30_000);
String errorMsg = "Invalid default placement for table '" + table + "': "
+ getPlacementInfoString(tabletLocations);

// Get tablets for table.
for (LocatedTablet tablet : tabletLocations) {
List<LocatedTablet.Replica> replicas = tablet.getReplicas();
// Verify that tablets can be present in any zone.
for (LocatedTablet.Replica replica : replicas) {
org.yb.CommonNet.CloudInfoPB cloudInfo = replica.getCloudInfo();
if (cloudInfo.getPlacementCloud().equals("cloud1")) {
assertTrue(cloudInfo.getPlacementRegion().equals("region1"));
assertTrue(cloudInfo.getPlacementZone().equals("zone1"));
CloudInfoPB cloudInfo = replica.getCloudInfo();
if ("cloud1".equals(cloudInfo.getPlacementCloud())) {
assertEquals(errorMsg, "region1", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone1", cloudInfo.getPlacementZone());
continue;
} else if (cloudInfo.getPlacementCloud().equals("cloud2")) {
assertTrue(cloudInfo.getPlacementRegion().equals("region2"));
assertTrue(cloudInfo.getPlacementZone().equals("zone2"));
} else if ("cloud2".equals(cloudInfo.getPlacementCloud())) {
assertEquals(errorMsg, "region2", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone2", cloudInfo.getPlacementZone());
continue;
}
assertTrue(cloudInfo.getPlacementCloud().equals("cloud3"));
assertTrue(cloudInfo.getPlacementRegion().equals("region3"));
assertTrue(cloudInfo.getPlacementZone().equals("zone3"));
assertEquals(errorMsg, "cloud3", cloudInfo.getPlacementCloud());
assertEquals(errorMsg, "region3", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone3", cloudInfo.getPlacementZone());
}
}
}

public String getPlacementInfoString(List<LocatedTablet> locatedTablets) {
StringBuilder sb = new StringBuilder("[");
for (LocatedTablet tablet : locatedTablets) {
sb.append("[");
List<LocatedTablet.Replica> replicas = new ArrayList<>(tablet.getReplicas());
// Somewhat dirty but would do.
replicas
.sort((r1, r2) -> r1.getCloudInfo().toString().compareTo(r2.getCloudInfo().toString()));
for (LocatedTablet.Replica replica : replicas) {
if (sb.charAt(sb.length() - 1) != '[') {
sb.append(", ");
}
sb.append("{\n");
sb.append(Stream.<String>of(replica.getCloudInfo().toString().trim().split("\n"))
.map((s) -> " " + s)
.collect(Collectors.joining(",\n")));
sb.append("\n}");
}
sb.append("]");
}
sb.append("]");
return sb.toString();
}

public String getTablespaceTransactionTableName() throws Exception {
Expand Down Expand Up @@ -642,7 +675,7 @@ public void readReplicaWithTablespaces() throws Exception {
miniCluster.startTServer(readReplicaPlacement);
miniCluster.waitForTabletServers(expectedTServers);

org.yb.CommonNet.CloudInfoPB cloudInfo0 = org.yb.CommonNet.CloudInfoPB.newBuilder()
CloudInfoPB cloudInfo0 = CloudInfoPB.newBuilder()
.setPlacementCloud("cloud1")
.setPlacementRegion("region1")
.setPlacementZone("zone1")
Expand Down Expand Up @@ -715,7 +748,7 @@ void verifyPlacementForReadReplica(final String table) throws Exception {
List<LocatedTablet.Replica> replicas = tablet.getReplicas();
// Verify that tablets can be present in any zone.
for (LocatedTablet.Replica replica : replicas) {
org.yb.CommonNet.CloudInfoPB cloudInfo = replica.getCloudInfo();
CloudInfoPB cloudInfo = replica.getCloudInfo();
final String errorMsg = "Unexpected cloud.region.zone: " + cloudInfo.toString();

if (replica.getRole().contains("READ_REPLICA")) {
Expand All @@ -725,9 +758,9 @@ void verifyPlacementForReadReplica(final String table) throws Exception {
assertFalse(foundLiveReplica);
foundLiveReplica = true;
}
assertTrue(errorMsg, cloudInfo.getPlacementCloud().equals("cloud1"));
assertTrue(errorMsg, cloudInfo.getPlacementRegion().equals("region1"));
assertTrue(errorMsg, cloudInfo.getPlacementZone().equals("zone1"));
assertEquals(errorMsg, "cloud1", cloudInfo.getPlacementCloud());
assertEquals(errorMsg, "region1", cloudInfo.getPlacementRegion());
assertEquals(errorMsg, "zone1", cloudInfo.getPlacementZone());
}
// A live replica must be found.
assertTrue(foundLiveReplica);
Expand Down
23 changes: 6 additions & 17 deletions src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ ybcSetupScanPlan(bool xs_want_itup, YbScanDesc ybScan, YbScanPlan scan_plan)
/*
* Setup control-parameters for Yugabyte preparing statements for different
* types of scan.
* - "querying_colocated_table": Support optimizations for (system and
* user) colocated tables
* - "querying_colocated_table": Support optimizations for (system,
* user database and tablegroup) colocated tables
* - "index_oid, index_only_scan, use_secondary_index": Different index
* scans.
* NOTE: Primary index is a special case as there isn't a primary index
Expand All @@ -443,24 +443,13 @@ ybcSetupScanPlan(bool xs_want_itup, YbScanDesc ybScan, YbScanPlan scan_plan)

ybScan->prepare_params.querying_colocated_table =
IsSystemRelation(relation);
if (!ybScan->prepare_params.querying_colocated_table &&
MyDatabaseColocated)

if (!ybScan->prepare_params.querying_colocated_table)
{
bool colocated = false;
bool notfound;
HandleYBStatusIgnoreNotFound(YBCPgIsTableColocated(MyDatabaseId,
YbGetStorageRelid(relation),
&colocated),
&notfound);
bool colocated = YbIsUserTableColocated(MyDatabaseId,
YbGetStorageRelid(relation));
ybScan->prepare_params.querying_colocated_table |= colocated;
}
else if (!ybScan->prepare_params.querying_colocated_table)
{
Oid tablegroupId = InvalidOid;
if (YbTablegroupCatalogExists)
tablegroupId = get_tablegroup_oid_by_table_oid(RelationGetRelid(relation));
ybScan->prepare_params.querying_colocated_table |= (tablegroupId != InvalidOid);
}

if (index)
{
Expand Down
Loading

0 comments on commit fffd1ae

Please sign in to comment.