Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,19 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
obj_info.has_provider()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Only ak, sk can be altered";
ss << "Bucket, endpoint, prefix and provider can not be altered";
msg = ss.str();
return -1;
}

if (obj_info.has_ak() ^ obj_info.has_sk()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
ss << "Accesskey and secretkey must be alter together";
msg = ss.str();
return -1;
}

const auto& name = vault.name();
// Here we try to get mutable iter since we might need to alter the vault name
auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(),
Expand Down Expand Up @@ -703,22 +712,25 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
*name_itr = vault.alter_name();
}
auto origin_vault_info = new_vault.DebugString();
AkSkPair pre {new_vault.obj_info().ak(), new_vault.obj_info().sk()};
const auto& plain_ak = obj_info.has_ak() ? obj_info.ak() : new_vault.obj_info().ak();
const auto& plain_sk = obj_info.has_ak() ? obj_info.sk() : new_vault.obj_info().sk();
AkSkPair plain_ak_sk_pair {plain_ak, plain_sk};
AkSkPair cipher_ak_sk_pair;
EncryptionInfoPB encryption_info;
auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code,
msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;

// For ak or sk is not altered.
EncryptionInfoPB encryption_info = new_vault.obj_info().encryption_info();
AkSkPair new_ak_sk_pair {new_vault.obj_info().ak(), new_vault.obj_info().sk()};

if (obj_info.has_ak()) {
// ak and sk must be altered together, there is check before.
auto ret = encrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), &encryption_info,
&new_ak_sk_pair, code, msg);
if (ret != 0) {
msg = "failed to encrypt";
code = MetaServiceCode::ERR_ENCRYPT;
LOG(WARNING) << msg;
return -1;
}
}
new_vault.mutable_obj_info()->set_ak(cipher_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(cipher_ak_sk_pair.second);

new_vault.mutable_obj_info()->set_ak(new_ak_sk_pair.first);
new_vault.mutable_obj_info()->set_sk(new_ak_sk_pair.second);
new_vault.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info);
if (obj_info.has_use_path_style()) {
new_vault.mutable_obj_info()->set_use_path_style(obj_info.use_path_style());
Expand Down
5 changes: 3 additions & 2 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
AlterObjStoreInfoResponse res;
meta_service->alter_storage_vault(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
ASSERT_NE(res.status().code(), MetaServiceCode::OK) << res.status().msg();
InstanceInfoPB instance;
get_test_instance(instance);

Expand All @@ -526,7 +526,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
TxnErrorCode::TXN_OK);
StorageVaultPB get_obj;
get_obj.ParseFromString(val);
ASSERT_EQ(get_obj.obj_info().ak(), "new_ak") << get_obj.obj_info().ak();
ASSERT_EQ(get_obj.obj_info().ak(), "ak") << get_obj.obj_info().ak();
}

{
Expand Down Expand Up @@ -578,6 +578,7 @@ TEST(MetaServiceTest, AlterS3StorageVaultTest) {
vault.set_alter_name(new_vault_name);
ObjectStoreInfoPB obj;
obj_info.set_ak("new_ak");
obj_info.set_sk("new_sk");
vault.mutable_obj_info()->MergeFrom(obj);
vault.set_name(vault_name);
req.mutable_vault()->CopyFrom(vault);
Expand Down
4 changes: 2 additions & 2 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,8 @@ def run(self, args):
metaServiceHttpAddress = "{ms_endpoint}"
metaServiceToken = "greedisgood9999"
recycleServiceHttpAddress = "{recycle_endpoint}"
instanceId = "default_instance_id"
multiClusterInstance = "default_instance_id"
instanceId = "12345678"
multiClusterInstance = "12345678"
multiClusterBes = "{multi_cluster_bes}"
cloudUniqueId= "{fe_cloud_unique_id}"
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
} catch (RpcException e) {
LOG.warn("failed to alter storage vault due to RpcException: {}", e);
throw new DdlException(e.getMessage());
Expand Down
157 changes: 152 additions & 5 deletions regression-test/suites/vault_p0/alter/test_alter_s3_vault.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,43 @@ suite("test_alter_s3_vault", "nonConcurrent") {
);
"""

def dupVaultName = "${suiteName}" + "_dup"
sql """
CREATE STORAGE VAULT IF NOT EXISTS ${dupVaultName}
PROPERTIES (
"type"="S3",
"s3.endpoint"="${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}",
"s3.root.path" = "${suiteName}",
"s3.bucket" = "${getS3BucketName()}",
"s3.external_endpoint" = "",
"provider" = "${getS3Provider()}"
);
"""

sql """
DROP TABLE IF EXISTS alter_s3_vault_tbl
"""

sql """
CREATE TABLE IF NOT EXISTS alter_s3_vault_tbl
(
`k1` INT NULL,
`v1` INT NULL
)
UNIQUE KEY (k1)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"storage_vault_name" = "${suiteName}"
);
"""

sql """insert into alter_s3_vault_tbl values(2, 2); """

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
Expand All @@ -62,45 +99,155 @@ suite("test_alter_s3_vault", "nonConcurrent") {
"""
}, "Alter property")

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
PROPERTIES (
"type"="S3",
"s3.access_key" = "new_ak"
);
"""
}, "Accesskey and secretkey must be alter together")

def vaultName = suiteName
String properties;
def String properties;

def vaultInfos = try_sql """show storage vault"""
def vaultInfos = try_sql """show storage vaults"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
properties = vaultInfos[i][2]
}
}

def newVaultName = suiteName + "_new";
// alter ak sk
sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
}

sql """insert into alter_s3_vault_tbl values("2", "2"); """


// rename
newVaultName = vaultName + "_new";

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""
for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(newVaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
if (name.equals(vaultName)) {
assertTrue(false);
}
}

sql """insert into alter_s3_vault_tbl values("2", "2"); """

// rename + aksk
vaultName = newVaultName
newVaultName = vaultName + "_new";

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}",
"s3.access_key" = "new_ak"
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

vaultInfos = sql """SHOW STORAGE VAULT;"""
for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(newVaultName)) {
def newProperties = vaultInfos[i][2]
assert properties == newProperties, "Properties are not the same"
}
if (name.equals(vaultName)) {
assertTrue(false);
}
}
sql """insert into alter_s3_vault_tbl values("2", "2"); """


vaultName = newVaultName;

newVaultName = vaultName + "_new";

vaultInfos = sql """SHOW STORAGE VAULT;"""
boolean exist = false

sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${newVaultName}",
"s3.access_key" = "new_ak_ak",
"s3.secret_key" = "sk"
);
"""

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
assertTrue(false);
}
if (name.equals(newVaultName)) {
assertTrue(vaultInfos[i][2].contains("new_ak"))
assertTrue(vaultInfos[i][2].contains("new_ak_ak"))
exist = true
}
}
assertTrue(exist)

vaultName = newVaultName;

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${vaultName}
PROPERTIES (
"type"="S3",
"VAULT_NAME" = "${dupVaultName}",
"s3.access_key" = "new_ak_ak",
"s3.secret_key" = "sk"
);
"""
}, "already exists")

def count = sql """ select count() from alter_s3_vault_tbl; """
assertTrue(res[0][0] == 4)

// failed to insert due to the wrong ak
expectExceptionLike({ sql """insert into alter_s3_vault_tbl values("2", "2");""" }, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """
CREATE TABLE IF NOT EXISTS alter_use_path_style_tbl
(
`k1` INT NULL,
`v1` INT NULL
)
UNIQUE KEY (k1)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"storage_vault_name" = "${suiteName}"
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

sql """
ALTER STORAGE VAULT ${suiteName}
PROPERTIES (
Expand All @@ -51,6 +68,8 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

def vaultInfos = sql """ SHOW STORAGE VAULT; """
boolean exist = false

Expand All @@ -73,6 +92,8 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""

sql """ insert into alter_use_path_style_tbl values(2, 2); """

vaultInfos = sql """ SHOW STORAGE VAULT; """
exist = false

Expand Down Expand Up @@ -105,4 +126,7 @@ suite("test_alter_use_path_style", "nonConcurrent") {
);
"""
}, "Invalid use_path_style value")

def count = sql """ select count() from alter_use_path_style_tbl; """
assertTrue(res[0][0] == 3)
}
Loading