Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block blob client stage block from url #2442

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ Detailed support matrix:
- Abort Copy Blob (Only supports copy within same Azurite instance)
- Copy Blob From URL (Only supports copy within same Azurite instance, only on Loki)
- Access control based on conditional headers
- Stage Block From URL (Only supports copy within same Azurite instance, only on Loki)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update changelog

- Following features or REST APIs are NOT supported or limited supported in this release (will support more features per customers feedback in future releases)

- SharedKey Lite
Expand Down
287 changes: 24 additions & 263 deletions src/blob/handlers/BlobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,23 @@
import { URLBuilder } from "@azure/ms-rest-js";
import axios, { AxiosResponse } from "axios";
import { URL } from "url";

import IExtentStore from "../../common/persistence/IExtentStore";
import {
convertRawHeadersToMetadata,
getMD5FromStream
} from "../../common/utils/utils";
import BlobStorageContext from "../context/BlobStorageContext";
import NotImplementedError from "../errors/NotImplementedError";
import StorageErrorFactory from "../errors/StorageErrorFactory";
import * as Models from "../generated/artifacts/models";
import Context from "../generated/Context";
import IBlobHandler from "../generated/handlers/IBlobHandler";
import ILogger from "../generated/utils/ILogger";
import { parseXML } from "../generated/utils/xml";
import { extractStoragePartsFromPath } from "../middlewares/blobStorageContext.middleware";
import IBlobMetadataStore, {
BlobModel
} from "../persistence/IBlobMetadataStore";
import IExtentStore from '../../common/persistence/IExtentStore';
import { convertRawHeadersToMetadata, getMD5FromStream } from '../../common/utils/utils';
import BlobStorageContext from '../context/BlobStorageContext';
import NotImplementedError from '../errors/NotImplementedError';
import StorageErrorFactory from '../errors/StorageErrorFactory';
import * as Models from '../generated/artifacts/models';
import Context from '../generated/Context';
import IBlobHandler from '../generated/handlers/IBlobHandler';
import ILogger from '../generated/utils/ILogger';
import { extractStoragePartsFromPath } from '../middlewares/blobStorageContext.middleware';
import IBlobMetadataStore, { BlobModel } from '../persistence/IBlobMetadataStore';
import {
BLOB_API_VERSION,
EMULATOR_ACCOUNT_KIND,
EMULATOR_ACCOUNT_SKUNAME,
HeaderConstants
} from "../utils/constants";
BLOB_API_VERSION, EMULATOR_ACCOUNT_KIND, EMULATOR_ACCOUNT_SKUNAME, HeaderConstants
} from '../utils/constants';
import {
deserializePageBlobRangeHeader,
deserializeRangeHeader,
getBlobTagsCount,
validateBlobTag
} from "../utils/utils";
import BaseHandler from "./BaseHandler";
import IPageBlobRangesManager from "./IPageBlobRangesManager";
deserializePageBlobRangeHeader, downloadBlockBlobOrAppendBlob, getBlobTagsCount,
NewUriFromCopySource, validateBlobTag, validateCopySource
} from '../utils/utils';
import BaseHandler from './BaseHandler';
import IPageBlobRangesManager from './IPageBlobRangesManager';

/**
* BlobHandler handles Azure Storage Blob related requests.
Expand Down Expand Up @@ -81,11 +66,11 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
);

if (blob.properties.blobType === Models.BlobType.BlockBlob) {
return this.downloadBlockBlobOrAppendBlob(options, context, blob);
return downloadBlockBlobOrAppendBlob(this.logger, "BlobHandler", this.extentStore, options, context, blob);
} else if (blob.properties.blobType === Models.BlobType.PageBlob) {
return this.downloadPageBlob(options, context, blob);
} else if (blob.properties.blobType === Models.BlobType.AppendBlob) {
return this.downloadBlockBlobOrAppendBlob(options, context, blob);
return downloadBlockBlobOrAppendBlob(this.logger, "BlobHandler", this.extentStore, options, context, blob);
} else {
throw StorageErrorFactory.getInvalidOperation(context.contextId!);
}
Expand Down Expand Up @@ -646,7 +631,7 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
const blob = blobCtx.blob!;

// TODO: Check dest Lease status, and set to available if it's expired, see sample in BlobHandler.setMetadata()
const url = this.NewUriFromCopySource(copySource, context);
const url = NewUriFromCopySource(copySource, context);
const [
sourceAccount,
sourceContainer,
Expand All @@ -664,7 +649,7 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {

const sig = url.searchParams.get("sig");
if ((sourceAccount !== blobCtx.account) || (sig !== null)) {
await this.validateCopySource(copySource, sourceAccount, context);
await validateCopySource(this.logger, "BlobHandler", copySource, sourceAccount, context);
}

// Preserve metadata key case
Expand Down Expand Up @@ -702,82 +687,6 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
return response;
}

private async validateCopySource(copySource: string, sourceAccount: string, context: Context): Promise<void> {
// Currently the only cross-account copy support is from/to the same Azurite instance. In either case access
// is determined by performing a request to the copy source to see if the authentication is valid.
const blobCtx = new BlobStorageContext(context);

const currentServer = blobCtx.request!.getHeader("Host") || "";
const url = this.NewUriFromCopySource(copySource, context);
if (currentServer !== url.host) {
this.logger.error(
`BlobHandler:startCopyFromURL() Source account ${url} is not on the same Azurite instance as target account ${blobCtx.account}`,
context.contextId
);

throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
404,
"The specified resource does not exist"
);
}

this.logger.debug(
`BlobHandler:startCopyFromURL() Validating access to the source account ${sourceAccount}`,
context.contextId
);

// In order to retrieve proper error details we make a metadata request to the copy source. If we instead issue
// a HEAD request then the error details are not returned and reporting authentication failures to the caller
// becomes a black box.
const metadataUrl = URLBuilder.parse(copySource);
metadataUrl.setQueryParameter("comp", "metadata");
const validationResponse: AxiosResponse = await axios.get(
metadataUrl.toString(),
{
// Instructs axios to not throw an error for non-2xx responses
validateStatus: () => true
}
);
if (validationResponse.status === 200) {
this.logger.debug(
`BlobHandler:startCopyFromURL() Successfully validated access to source account ${sourceAccount}`,
context.contextId
);
} else {
this.logger.debug(
`BlobHandler:startCopyFromURL() Access denied to source account ${sourceAccount} StatusCode=${validationResponse.status}, AuthenticationErrorDetail=${validationResponse.data}`,
context.contextId
);

if (validationResponse.status === 404) {
throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
validationResponse.status,
"The specified resource does not exist"
);
} else {
// For non-successful responses attempt to unwrap the error message from the metadata call.
let message: string =
"Could not verify the copy source within the specified time.";
if (
validationResponse.headers[HeaderConstants.CONTENT_TYPE] ===
"application/xml"
) {
const authenticationError = await parseXML(validationResponse.data);
if (authenticationError.Message !== undefined) {
message = authenticationError.Message.replace(/\n+/gm, "");
}
}

throw StorageErrorFactory.getCannotVerifyCopySource(
context.contextId!,
validationResponse.status,
message
);
}
}
}

/**
* Abort copy from Url.
Expand Down Expand Up @@ -845,7 +754,7 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
const blob = blobCtx.blob!;

// TODO: Check dest Lease status, and set to available if it's expired, see sample in BlobHandler.setMetadata()
const url = this.NewUriFromCopySource(copySource, context);
const url = NewUriFromCopySource(copySource, context);
const [
sourceAccount,
sourceContainer,
Expand All @@ -862,7 +771,7 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
}

if (sourceAccount !== blobCtx.account) {
await this.validateCopySource(copySource, sourceAccount, context);
await validateCopySource(this.logger, "BlobHandler", copySource, sourceAccount, context);
}

// Specifying x-ms-copy-source-tag-option as COPY and x-ms-tags will result in error
Expand Down Expand Up @@ -989,140 +898,6 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
return this.getAccountInfo(context);
}

/**
* Download block blob or append blob.
*
* @private
* @param {Models.BlobDownloadOptionalParams} options
* @param {Context} context
* @param {BlobModel} blob
* @returns {Promise<Models.BlobDownloadResponse>}
* @memberof BlobHandler
*/
private async downloadBlockBlobOrAppendBlob(
options: Models.BlobDownloadOptionalParams,
context: Context,
blob: BlobModel
): Promise<Models.BlobDownloadResponse> {
if (blob.isCommitted === false) {
throw StorageErrorFactory.getBlobNotFound(context.contextId!);
}

// Deserializer doesn't handle range header currently, manually parse range headers here
const rangesParts = deserializeRangeHeader(
context.request!.getHeader("range"),
context.request!.getHeader("x-ms-range")
);
const rangeStart = rangesParts[0];
let rangeEnd = rangesParts[1];

// Start Range is bigger than blob length
if (rangeStart > blob.properties.contentLength!) {
throw StorageErrorFactory.getInvalidPageRange(context.contextId!);
}

// Will automatically shift request with longer data end than blob size to blob size
if (rangeEnd + 1 >= blob.properties.contentLength!) {
// report error is blob size is 0, and rangeEnd is specified but not 0
if (blob.properties.contentLength == 0 && rangeEnd !== 0 && rangeEnd !== Infinity) {
throw StorageErrorFactory.getInvalidPageRange2(context.contextId!);
}
else {
rangeEnd = blob.properties.contentLength! - 1;
}
}

const contentLength = rangeEnd - rangeStart + 1;
const partialRead = contentLength !== blob.properties.contentLength!;

this.logger.info(
// tslint:disable-next-line:max-line-length
`BlobHandler:downloadBlockBlobOrAppendBlob() NormalizedDownloadRange=bytes=${rangeStart}-${rangeEnd} RequiredContentLength=${contentLength}`,
context.contextId
);

let bodyGetter: () => Promise<NodeJS.ReadableStream | undefined>;
const blocks = blob.committedBlocksInOrder;
if (blocks === undefined || blocks.length === 0) {
bodyGetter = async () => {
if (blob.persistency === undefined) {
return this.extentStore.readExtent(undefined, context.contextId);
}
return this.extentStore.readExtent(
{
id: blob.persistency.id,
offset: blob.persistency.offset + rangeStart,
count: Math.min(blob.persistency.count, contentLength)
},
context.contextId
);
};
} else {
bodyGetter = async () => {
return this.extentStore.readExtents(
blocks.map((block) => block.persistency),
rangeStart,
rangeEnd + 1 - rangeStart,
context.contextId
);
};
}

let contentRange: string | undefined;
if (
context.request!.getHeader("range") ||
context.request!.getHeader("x-ms-range")
) {
contentRange = `bytes ${rangeStart}-${rangeEnd}/${blob.properties
.contentLength!}`;
}

let body: NodeJS.ReadableStream | undefined = await bodyGetter();
let contentMD5: Uint8Array | undefined;
if (!partialRead) {
contentMD5 = blob.properties.contentMD5;
}
if (
contentLength <= 4 * 1024 * 1024 &&
contentMD5 === undefined &&
body !== undefined
) {
contentMD5 = await getMD5FromStream(body);
body = await bodyGetter();
}

const response: Models.BlobDownloadResponse = {
statusCode: contentRange ? 206 : 200,
body,
metadata: blob.metadata,
eTag: blob.properties.etag,
requestId: context.contextId,
date: context.startTime!,
version: BLOB_API_VERSION,
...blob.properties,
cacheControl: context.request!.getQuery("rscc") ?? blob.properties.cacheControl,
contentDisposition: context.request!.getQuery("rscd") ?? blob.properties.contentDisposition,
contentEncoding: context.request!.getQuery("rsce") ?? blob.properties.contentEncoding,
contentLanguage: context.request!.getQuery("rscl") ?? blob.properties.contentLanguage,
contentType: context.request!.getQuery("rsct") ?? blob.properties.contentType,
blobContentMD5: blob.properties.contentMD5,
acceptRanges: "bytes",
contentLength,
contentRange,
contentMD5: contentRange ? (context.request!.getHeader("x-ms-range-get-content-md5") ? contentMD5: undefined) : contentMD5,
tagCount: getBlobTagsCount(blob.blobTags),
isServerEncrypted: true,
clientRequestId: options.requestId,
creationTime: blob.properties.creationTime,
blobCommittedBlockCount:
blob.properties.blobType === Models.BlobType.AppendBlob
? (blob.committedBlocksInOrder || []).length
: undefined,
};

return response;
}

/**
* Download page blob.
*
Expand Down Expand Up @@ -1331,18 +1106,4 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
return response;
}

private NewUriFromCopySource(copySource: string, context: Context): URL {
try {
return new URL(copySource)
}
catch
{
throw StorageErrorFactory.getInvalidHeaderValue(
context.contextId,
{
HeaderName: "x-ms-copy-source",
HeaderValue: copySource
})
}
}
}
Loading
Loading