Skip to content

Commit 48b3198

Browse files
HADOOP-19343. Add support for hflush()
Closes #7761 Co-authored-by: Chris Nauroth <cnauroth@apache.org> Signed-off-by: Chris Nauroth <cnauroth@apache.org>
1 parent 4e18572 commit 48b3198

13 files changed

+679
-94
lines changed

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java renamed to hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,38 @@
2828
/**
2929
* Options that can be specified when creating a file in the {@link GoogleCloudStorageFileSystem}.
3030
*/
31-
final class CreateOptions {
31+
final class CreateFileOptions {
3232
private final ImmutableMap<String, byte[]> attributes;
3333
private final String contentType;
3434
private final long overwriteGenerationId;
3535
private final WriteMode mode;
36+
private final boolean ensureNoDirectoryConflict;
3637

37-
private CreateOptions(CreateOperationOptionsBuilder builder) {
38+
private CreateFileOptions(CreateOperationOptionsBuilder builder) {
3839
this.attributes = ImmutableMap.copyOf(builder.attributes);
3940
this.contentType = builder.contentType;
4041
this.overwriteGenerationId = builder.overwriteGenerationId;
4142
this.mode = builder.writeMode;
43+
this.ensureNoDirectoryConflict = builder.ensureNoDirectoryConflict;
4244
}
4345

4446
boolean isOverwriteExisting() {
4547
return this.mode == WriteMode.OVERWRITE;
4648
}
4749

50+
boolean isEnsureNoDirectoryConflict() {
51+
return ensureNoDirectoryConflict;
52+
}
53+
54+
CreateOperationOptionsBuilder toBuilder() {
55+
return builder().setWriteMode(this.mode)
56+
.setEnsureNoDirectoryConflict(ensureNoDirectoryConflict);
57+
}
58+
4859
enum WriteMode {
60+
/** Write new bytes to the end of the existing file rather than the beginning. */
61+
APPEND,
62+
4963
/**
5064
* Creates a new file for write and fails if file already exists.
5165
*/
@@ -98,14 +112,20 @@ static class CreateOperationOptionsBuilder {
98112
private String contentType = "application/octet-stream";
99113
private long overwriteGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
100114
private WriteMode writeMode = WriteMode.CREATE_NEW;
115+
private boolean ensureNoDirectoryConflict = true;
101116

102117
CreateOperationOptionsBuilder setWriteMode(WriteMode mode) {
103118
this.writeMode = mode;
104119
return this;
105120
}
106121

107-
CreateOptions build() {
108-
CreateOptions options = new CreateOptions(this);
122+
CreateOperationOptionsBuilder setEnsureNoDirectoryConflict(boolean ensure) {
123+
this.ensureNoDirectoryConflict = ensure;
124+
return this;
125+
}
126+
127+
CreateFileOptions build() {
128+
CreateFileOptions options = new CreateFileOptions(this);
109129

110130
checkArgument(!options.getAttributes().containsKey("Content-Type"),
111131
"The Content-Type attribute must be set via the contentType option");

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import javax.annotation.Nullable;
2222

23+
import com.google.api.client.http.HttpStatusCodes;
24+
import com.google.cloud.storage.StorageException;
2325
import io.grpc.Status;
2426
import io.grpc.StatusRuntimeException;
2527

@@ -63,8 +65,6 @@ enum ErrorType {
6365
UNAVAILABLE, UNKNOWN
6466
}
6567

66-
// public static final ErrorTypeExtractor INSTANCE = new ErrorTypeExtractor();
67-
6868
private static final String BUCKET_ALREADY_EXISTS_MESSAGE =
6969
"FAILED_PRECONDITION: Your previous request to create the named bucket succeeded and you "
7070
+ "already own it.";
@@ -89,7 +89,28 @@ static ErrorType getErrorType(Exception error) {
8989
case UNAVAILABLE:
9090
return ErrorType.UNAVAILABLE;
9191
default:
92-
return ErrorType.UNKNOWN;
92+
return getErrorTypeFromStorageException(error);
9393
}
9494
}
95+
96+
private static ErrorType getErrorTypeFromStorageException(Exception error) {
97+
if (error instanceof StorageException) {
98+
StorageException se = (StorageException) error;
99+
int httpCode = se.getCode();
100+
101+
if (httpCode == HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED) {
102+
return ErrorType.FAILED_PRECONDITION;
103+
}
104+
105+
if (httpCode == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
106+
return ErrorType.NOT_FOUND;
107+
}
108+
109+
if (httpCode == HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE) {
110+
return ErrorType.UNAVAILABLE;
111+
}
112+
}
113+
114+
return ErrorType.UNKNOWN;
115+
}
95116
}

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.List;
4949
import java.util.Map;
5050
import java.util.Set;
51+
import java.util.stream.Collectors;
5152

5253
/**
5354
* A wrapper around <a href="https://github.com/googleapis/java-storage">Google cloud storage
@@ -89,7 +90,7 @@ private static Storage createStorage(String projectId) {
8990
return StorageOptions.newBuilder().build().getService();
9091
}
9192

92-
WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options)
93+
WritableByteChannel create(final StorageResourceId resourceId, final CreateFileOptions options)
9394
throws IOException {
9495
LOG.trace("create({})", resourceId);
9596

@@ -402,6 +403,47 @@ void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options
402403
}
403404
}
404405

406+
407+
public GoogleCloudStorageItemInfo composeObjects(
408+
List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options)
409+
throws IOException {
410+
LOG.trace("composeObjects({}, {}, {})", sources, destination, options);
411+
for (StorageResourceId inputId : sources) {
412+
if (!destination.getBucketName().equals(inputId.getBucketName())) {
413+
throw new IOException(
414+
String.format(
415+
"Bucket doesn't match for source '%s' and destination '%s'!",
416+
inputId, destination));
417+
}
418+
}
419+
Storage.ComposeRequest request =
420+
Storage.ComposeRequest.newBuilder()
421+
.addSource(
422+
sources.stream().map(StorageResourceId::getObjectName).collect(Collectors.toList()))
423+
.setTarget(
424+
BlobInfo.newBuilder(destination.getBucketName(), destination.getObjectName())
425+
.setContentType(options.getContentType())
426+
.setContentEncoding(options.getContentEncoding())
427+
.setMetadata(encodeMetadata(options.getMetadata()))
428+
.build())
429+
.setTargetOptions(
430+
Storage.BlobTargetOption.generationMatch(
431+
destination.hasGenerationId()
432+
? destination.getGenerationId()
433+
: getWriteGeneration(destination, true)))
434+
.build();
435+
436+
Blob composedBlob;
437+
try {
438+
composedBlob = storage.compose(request);
439+
} catch (StorageException e) {
440+
throw new IOException(e);
441+
}
442+
GoogleCloudStorageItemInfo compositeInfo = createItemInfoForBlob(destination, composedBlob);
443+
LOG.trace("composeObjects() done, returning: {}", compositeInfo);
444+
return compositeInfo;
445+
}
446+
405447
/**
406448
* Helper to check whether an empty object already exists with the expected metadata specified in
407449
* {@code options}, to be used to determine whether it's safe to ignore an exception that was
@@ -450,6 +492,7 @@ private boolean canIgnoreExceptionForEmptyObject(
450492
return true;
451493
}
452494
}
495+
453496
return false;
454497
}
455498

@@ -472,18 +515,14 @@ private void createEmptyObjectInternal(
472515
blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist());
473516
}
474517

475-
try {
476-
// TODO: Set encryption key and related properties
477-
storage.create(
478-
BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName()))
479-
.setMetadata(rewrittenMetadata)
480-
.setContentEncoding(createObjectOptions.getContentEncoding())
481-
.setContentType(createObjectOptions.getContentType())
482-
.build(),
483-
blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
484-
} catch (StorageException e) {
485-
throw new IOException(String.format("Creating empty object %s failed.", resourceId), e);
486-
}
518+
// TODO: Set encryption key and related properties
519+
storage.create(
520+
BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName()))
521+
.setMetadata(rewrittenMetadata)
522+
.setContentEncoding(createObjectOptions.getContentEncoding())
523+
.setContentType(createObjectOptions.getContentType())
524+
.build(),
525+
blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
487526
}
488527

489528
private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
@@ -871,6 +910,23 @@ private static GoogleCloudStorageItemInfo getGoogleCloudStorageItemInfo(
871910
return storageItemInfo;
872911
}
873912

913+
List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceIds)
914+
throws IOException {
915+
LOG.trace("getItemInfos({})", resourceIds);
916+
917+
if (resourceIds.isEmpty()) {
918+
return new ArrayList<>();
919+
}
920+
921+
List<GoogleCloudStorageItemInfo> result = new ArrayList<>(resourceIds.size());
922+
for (StorageResourceId resourceId : resourceIds) {
923+
// TODO: Do this concurrently
924+
result.add(getItemInfo(resourceId));
925+
}
926+
927+
return result;
928+
}
929+
874930
// Helper class to capture the results of list operation.
875931
private class ListOperationResult {
876932
private final Map<String, Blob> prefixes = new HashMap<>();

hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,12 @@ class GoogleCloudStorageClientWriteChannel implements WritableByteChannel {
4444
private final StorageResourceId resourceId;
4545
private WritableByteChannel writableByteChannel;
4646

47-
GoogleCloudStorageClientWriteChannel(final Storage storage,
48-
final StorageResourceId resourceId, final CreateOptions createOptions) throws IOException {
47+
private GoogleCloudStorageItemInfo completedItemInfo = null;
48+
49+
GoogleCloudStorageClientWriteChannel(
50+
final Storage storage,
51+
final StorageResourceId resourceId,
52+
final CreateFileOptions createOptions) throws IOException {
4953
this.resourceId = resourceId;
5054
BlobWriteSession blobWriteSession = getBlobWriteSession(storage, resourceId, createOptions);
5155
try {
@@ -56,7 +60,7 @@ class GoogleCloudStorageClientWriteChannel implements WritableByteChannel {
5660
}
5761

5862
private static BlobInfo getBlobInfo(final StorageResourceId resourceId,
59-
final CreateOptions createOptions) {
63+
final CreateFileOptions createOptions) {
6064
BlobInfo blobInfo = BlobInfo.newBuilder(
6165
BlobId.of(resourceId.getBucketName(), resourceId.getObjectName(),
6266
resourceId.getGenerationId())).setContentType(createOptions.getContentType())
@@ -66,12 +70,12 @@ private static BlobInfo getBlobInfo(final StorageResourceId resourceId,
6670
}
6771

6872
private static BlobWriteSession getBlobWriteSession(final Storage storage,
69-
final StorageResourceId resourceId, final CreateOptions createOptions) {
73+
final StorageResourceId resourceId, final CreateFileOptions createOptions) {
7074
return storage.blobWriteSession(getBlobInfo(resourceId, createOptions),
7175
generateWriteOptions(createOptions));
7276
}
7377

74-
private static BlobWriteOption[] generateWriteOptions(final CreateOptions createOptions) {
78+
private static BlobWriteOption[] generateWriteOptions(final CreateFileOptions createOptions) {
7579
List<BlobWriteOption> blobWriteOptions = new ArrayList<>();
7680

7781
blobWriteOptions.add(BlobWriteOption.disableGzipContent());

0 commit comments

Comments
 (0)