Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined cloud blob crypto service #1154

Merged
merged 19 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion ambry-api/src/main/java/com.github.ambry/config/CloudConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class CloudConfig {
public static final String VCR_SSL_PORT = "vcr.ssl.port";
public static final String VCR_REPLICA_MOUNT_PATH_PREFIX = "vcr.replica.mount.path.prefix";
public static final String VCR_REQUIRE_ENCRYPTION = "vcr.require.encryption";
public static final String VCR_KMS_FACTORY = "vcr.key.management.service.factory";
public static final String VCR_CRYPTO_SERVICE_FACTORY = "vcr.crypto.service.factory";

public static final String VCR_MIN_TTL_DAYS = "vcr.min.ttl.days";
public static final String VCR_ASSIGNED_PARTITIONS = "vcr.assigned.partitions";
public static final String VCR_PROXY_HOST = "vcr.proxy.host";
Expand All @@ -34,6 +37,11 @@ public class CloudConfig {
"com.github.ambry.cloud.StaticVcrClusterFactory";
private static final String DEFAULT_CLOUD_DESTINATION_FACTORY_CLASS =
"com.github.ambry.cloud.azure.AzureCloudDestinationFactory";
public static final String KMS_SERVICE_KEY_CONTEXT = "kms.service.key.context";
public static final String DEFAULT_KMS_SERVICE_KEY_CONTEXT = "default-backup";
public static final String CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS = "cloud.blob.crypto.agent.factory.class";
public static final String DEFAULT_CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS =
"com.github.ambry.cloud.CloudBlobCryptoAgentFactoryImpl";
private static final String DEFAULT_VCR_CLUSTER_ZK_CONNECT_STRING = "localhost:2181";
private static final String DEFAULT_VCR_CLUSTER_NAME = "VCRCluster";
private static final String DEFAULT_VCR_REPLICA_MOUNT_PATH_PREFIX = "/tmp/vcr/";
Expand Down Expand Up @@ -89,6 +97,34 @@ public class CloudConfig {
@Default("false")
public final boolean vcrRequireEncryption;

/**
* The kms service factory class name.
*/
@Config(KMS_SERVICE_KEY_CONTEXT)
@Default(DEFAULT_KMS_SERVICE_KEY_CONTEXT)
public final String kmsServiceKeyContext;
lightningrob marked this conversation as resolved.
Show resolved Hide resolved

/**
* The cloud blob crypto agent factory class name.
*/
@Config(CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS)
@Default(DEFAULT_CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS)
public final String cloudBlobCryptoAgentFactoryClass;

/**
* The KeyManagementServiceFactory that will be used to fetch {@link com.github.ambry.router.KeyManagementService}
*/
@Config(VCR_KMS_FACTORY)
@Default(RouterConfig.DEFAULT_KMS_FACTORY)
public final String vcrKeyManagementServiceFactory;

/**
* The CryptoServiceFactory that will be used to fetch {@link com.github.ambry.router.CryptoService}
*/
@Config(VCR_CRYPTO_SERVICE_FACTORY)
@Default(RouterConfig.DEFAULT_CRYPTO_SERVICE_FACTORY)
public final String vcrCryptoServiceFactory;

/**
* Minimum TTL in days required for blobs to be uploaded to cloud.
*/
Expand Down Expand Up @@ -123,15 +159,21 @@ public CloudConfig(VerifiableProperties verifiableProperties) {
DEFAULT_VIRTUAL_REPLICATOR_CLUSTER_FACTORY_CLASS);
cloudDestinationFactoryClass =
verifiableProperties.getString(CLOUD_DESTINATION_FACTORY_CLASS, DEFAULT_CLOUD_DESTINATION_FACTORY_CLASS);
vcrAssignedPartitions = verifiableProperties.getString(VCR_ASSIGNED_PARTITIONS, null);
vcrClusterZkConnectString =
verifiableProperties.getString(VCR_CLUSTER_ZK_CONNECT_STRING, DEFAULT_VCR_CLUSTER_ZK_CONNECT_STRING);
vcrClusterName = verifiableProperties.getString(VCR_CLUSTER_NAME, DEFAULT_VCR_CLUSTER_NAME);
vcrSslPort = verifiableProperties.getInteger(VCR_SSL_PORT, null);
vcrReplicaMountPathPrefix =
verifiableProperties.getString(VCR_REPLICA_MOUNT_PATH_PREFIX, DEFAULT_VCR_REPLICA_MOUNT_PATH_PREFIX);
vcrRequireEncryption = verifiableProperties.getBoolean(VCR_REQUIRE_ENCRYPTION, false);
vcrKeyManagementServiceFactory = verifiableProperties.getString(VCR_KMS_FACTORY, RouterConfig.DEFAULT_KMS_FACTORY);
vcrCryptoServiceFactory =
verifiableProperties.getString(VCR_CRYPTO_SERVICE_FACTORY, RouterConfig.DEFAULT_CRYPTO_SERVICE_FACTORY);
kmsServiceKeyContext = verifiableProperties.getString(KMS_SERVICE_KEY_CONTEXT, DEFAULT_KMS_SERVICE_KEY_CONTEXT);
cloudBlobCryptoAgentFactoryClass = verifiableProperties.getString(CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS,
DEFAULT_CLOUD_BLOB_CRYPTO_AGENT_FACTORY_CLASS);
vcrMinTtlDays = verifiableProperties.getInt(VCR_MIN_TTL_DAYS, DEFAULT_MIN_TTL_DAYS);
vcrAssignedPartitions = verifiableProperties.getString(VCR_ASSIGNED_PARTITIONS, null);

// Proxy settings
vcrProxyHost = verifiableProperties.getString(VCR_PROXY_HOST, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
*/
public class RouterConfig {

public static final String DEFAULT_KMS_FACTORY = "com.github.ambry.router.SingleKeyManagementServiceFactory";
public static final String DEFAULT_CRYPTO_SERVICE_FACTORY = "com.github.ambry.router.GCMCryptoServiceFactory";

/**
* Number of independent scaling units for the router.
*/
Expand Down Expand Up @@ -188,14 +191,14 @@ public class RouterConfig {
* The KeyManagementServiceFactory that will be used to fetch {@link com.github.ambry.router.KeyManagementService}
*/
@Config("router.key.management.service.factory")
@Default("com.github.ambry.router.SingleKeyManagementServiceFactory")
@Default(DEFAULT_KMS_FACTORY)
public final String routerKeyManagementServiceFactory;

/**
* The CryptoServiceFactory that will be used to fetch {@link com.github.ambry.router.CryptoService}
*/
@Config("router.crypto.service.factory")
@Default("com.github.ambry.router.GCMCryptoServiceFactory")
@Default(DEFAULT_CRYPTO_SERVICE_FACTORY)
public final String routerCryptoServiceFactory;

/**
Expand Down Expand Up @@ -274,10 +277,10 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
routerBlobidCurrentVersion =
verifiableProperties.getShortFromAllowedValues("router.blobid.current.version", (short) 6,
new Short[]{1, 2, 3, 4, 5, 6});
routerKeyManagementServiceFactory = verifiableProperties.getString("router.key.management.service.factory",
"com.github.ambry.router.SingleKeyManagementServiceFactory");
routerCryptoServiceFactory = verifiableProperties.getString("router.crypto.service.factory",
"com.github.ambry.router.GCMCryptoServiceFactory");
routerKeyManagementServiceFactory =
verifiableProperties.getString("router.key.management.service.factory", DEFAULT_KMS_FACTORY);
routerCryptoServiceFactory =
verifiableProperties.getString("router.crypto.service.factory", DEFAULT_CRYPTO_SERVICE_FACTORY);
routerCryptoJobsWorkerCount =
verifiableProperties.getIntInRange("router.crypto.jobs.worker.count", 1, 1, Integer.MAX_VALUE);
routerTtlUpdateRequestParallelism =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface KeyManagementService<T> extends Closeable {
*/
void register(short accountId, short containerId) throws GeneralSecurityException;

/**
* Registers with KMS to create key for a unique context.
* @param context refers to the key context to register
* @throws {@link GeneralSecurityException} on KMS unavailability or duplicate registration
*/
void register(String context) throws GeneralSecurityException;

/**
* Fetches the key associated with the pair of AccountId and ContainerId. User is expected to have
* registered using {@link #register(short, short)} for this pair before fetching keys.
Expand All @@ -47,6 +54,15 @@ public interface KeyManagementService<T> extends Closeable {
*/
T getKey(short accountId, short containerId) throws GeneralSecurityException;

/**
* Fetches the key associated with the specified context. User is expected to have
* registered using {@link #register(String)} for this context before fetching keys.
* @param context refers to the context for which key is expected
* @return T the key associated with the context
* @throws {@link GeneralSecurityException} on KMS unavailability or if key is not registered
*/
T getKey(String context) throws GeneralSecurityException;

/**
* Generate and return a random key (of type T)
* @return a random key (of type T)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.network.ConnectionPool;
import com.github.ambry.notification.NotificationSystem;
import com.github.ambry.replication.DiskTokenPersistor;
Expand All @@ -45,22 +46,25 @@
*/
public class CloudBackupManager extends ReplicationEngine {

public CloudBackupManager(CloudConfig cloudConfig, ReplicationConfig replicationConfig,
ClusterMapConfig clusterMapConfig, StoreConfig storeConfig, StoreKeyFactory storeKeyFactory,
ClusterMap clusterMap, VirtualReplicatorCluster virtualReplicatorCluster,
public CloudBackupManager(VerifiableProperties properties, CloudConfig cloudConfig,
ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig, StoreConfig storeConfig,
StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, VirtualReplicatorCluster virtualReplicatorCluster,
CloudDestinationFactory cloudDestinationFactory, ScheduledExecutorService scheduler,
ConnectionPool connectionPool, MetricRegistry metricRegistry, NotificationSystem requestNotification,
StoreKeyConverterFactory storeKeyConverterFactory, String transformerClassName) throws ReplicationException {

super(replicationConfig, clusterMapConfig, storeKeyFactory, clusterMap, scheduler,
virtualReplicatorCluster.getCurrentDataNodeId(), Collections.emptyList(), connectionPool, metricRegistry,
requestNotification, storeKeyConverterFactory, transformerClassName);

lightningrob marked this conversation as resolved.
Show resolved Hide resolved
CloudDestination cloudDestination = cloudDestinationFactory.getCloudDestination();
List<? extends PartitionId> partitionIds = virtualReplicatorCluster.getAssignedPartitionIds();
VcrMetrics vcrMetrics = new VcrMetrics(metricRegistry);

for (PartitionId partitionId : partitionIds) {
ReplicaId cloudReplica =
new CloudReplica(cloudConfig, partitionId, virtualReplicatorCluster.getCurrentDataNodeId());
Store cloudStore = new CloudBlobStore(partitionId, cloudConfig, cloudDestination);
Store cloudStore = new CloudBlobStore(properties, partitionId, cloudDestination, vcrMetrics);
try {
cloudStore.start();
} catch (StoreException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;


/**
* Crypto Service to assist in encrypting and decrypting blob chunks transferred between
* Ambry and the cloud backup.
*/
public interface CloudBlobCryptoAgent {

/**
* Encrypts the blob buffer being uploaded to cloud storage.
* @param buffer The {@link ByteBuffer} that needs to be encrypted.
* @return The encrypted buffer.
* @throws {@link GeneralSecurityException} on any exception with encryption.
*/
ByteBuffer encrypt(ByteBuffer buffer) throws GeneralSecurityException;

/**
* Decrypts the blob buffer being downloaded from cloud storage.
* @param buffer The {@link ByteBuffer} that needs to be decrypted.
* @return The decrypted buffer.
* @throws {@link GeneralSecurityException} on any exception with decryption
*/
ByteBuffer decrypt(ByteBuffer buffer) throws GeneralSecurityException;

/**
* Returns the encryption context (if any)
* @return
*/
String getEncryptionContext();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

/**
* Factory for constructing {@link CloudBlobCryptoAgent} instances.
*/
public interface CloudBlobCryptoAgentFactory {

CloudBlobCryptoAgent getCloudBlobCryptoAgent();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.router.CryptoServiceFactory;
import com.github.ambry.router.KeyManagementServiceFactory;
import com.github.ambry.utils.Utils;
import java.security.GeneralSecurityException;


/**
* Factory for constructing {@link CloudBlobCryptoAgentImpl} instances.
*/
public class CloudBlobCryptoAgentFactoryImpl implements CloudBlobCryptoAgentFactory {

private final CryptoServiceFactory cryptoServiceFactory;
private final KeyManagementServiceFactory keyManagementServiceFactory;
private final String context;

public CloudBlobCryptoAgentFactoryImpl(VerifiableProperties verifiableProperties, String clusterName,
MetricRegistry metricRegistry) throws ReflectiveOperationException {
CloudConfig cloudConfig = new CloudConfig(verifiableProperties);
cryptoServiceFactory = Utils.getObj(cloudConfig.vcrCryptoServiceFactory, verifiableProperties, metricRegistry);
keyManagementServiceFactory =
Utils.getObj(cloudConfig.vcrKeyManagementServiceFactory, verifiableProperties, clusterName, metricRegistry);
context = cloudConfig.kmsServiceKeyContext;
}

@Override
public CloudBlobCryptoAgent getCloudBlobCryptoAgent() {
try {
return new CloudBlobCryptoAgentImpl(cryptoServiceFactory.getCryptoService(),
keyManagementServiceFactory.getKeyManagementService(), context);
} catch (GeneralSecurityException e) {
throw new IllegalStateException(e);
}
}
}
Loading