Skip to content

Commit

Permalink
Fix NPE of exploring ADLS Gen2 FS in Spark job conf
Browse files Browse the repository at this point in the history
  • Loading branch information
wezhang committed Mar 10, 2021
1 parent f376245 commit 44c16e2
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import static com.microsoft.azure.hdinsight.sdk.storage.adlsgen2.ADLSGen2FSOperation.PERMISSIONS_HEADER;
import static com.microsoft.azure.hdinsight.sdk.storage.adlsgen2.ADLSGen2FSOperation.UMASK_HEADER;
import static java.util.Collections.emptyList;
import static java.util.Optional.ofNullable;

public class ADLSGen2OAuthHttpObservable extends OAuthTokenHttpObservable {
private static final String resource = "https://storage.azure.com/";
Expand Down Expand Up @@ -51,15 +53,12 @@ public Observable<CloseableHttpResponse> request(@NotNull final HttpRequestBase
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders) {
// Filter out set permission related headers since they are not supported in request with OAuth
List<Header> filteredHeaders = addOrReplaceHeaders;
if (filteredHeaders != null) {
filteredHeaders =
filteredHeaders.stream()
.filter(header -> !header.getName().equalsIgnoreCase(PERMISSIONS_HEADER)
&& !header.getName().equalsIgnoreCase(UMASK_HEADER))
.collect(Collectors.toList());
}
List<Header> filteredHeaders = ofNullable(addOrReplaceHeaders)
.orElse(emptyList())
.stream()
.filter(header -> !header.getName().equalsIgnoreCase(PERMISSIONS_HEADER) && !header.getName().equalsIgnoreCase(UMASK_HEADER))
.collect(Collectors.toList());

return super.request(httpRequest, entity, parameters, filteredHeaders);
return super.request(httpRequest, entity, ofNullable(parameters).orElse(emptyList()), filteredHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Observable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class AzureSparkClusterManager extends AzureSparkCosmosClusterManager implements ILogger {
Expand All @@ -33,10 +34,9 @@ public static AzureSparkClusterManager getInstance() {
return AzureSparkClusterManager.LazyHolder.INSTANCE;
}

@Nullable
@Override
public List<NameValuePair> getAccountFilter() {
return null;
return Collections.emptyList();
}

public Observable<SubscriptionDetail> getSubscriptionDetailByStoreAccountName(String storeAccountName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ public HttpObservable setHttpClient(@NotNull CloseableHttpClient httpClient) {
return this;
}

@Nullable
public Header[] getDefaultHeaders() throws IOException {
return defaultHeaders.getAllHeaders();
}
Expand Down Expand Up @@ -351,8 +350,8 @@ public <T> T convertJsonResponseToObject(@NotNull final HttpResponse resp, @NotN
*/
public Observable<CloseableHttpResponse> request(@NotNull final HttpRequestBase httpRequest,
@Nullable final HttpEntity entity,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders) {
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders) {
return Observable.fromCallable(() -> {
URIBuilder builder = new URIBuilder(httpRequest.getURI());

Expand Down Expand Up @@ -387,8 +386,8 @@ public Observable<CloseableHttpResponse> request(@NotNull final HttpRequestBase
*/
public Observable<HttpResponse> requestWithHttpResponse(@NotNull final HttpRequestBase httpRequest,
@Nullable final HttpEntity entity,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders) {
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders) {
return request(httpRequest, entity, parameters, addOrReplaceHeaders)
.flatMap(HttpObservable::toStringOnlyOkResponse);
}
Expand All @@ -400,41 +399,41 @@ public Observable<HttpResponse> head(@NotNull final String uri,
}

public <T> Observable<T> get(@NotNull final String uri,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders,
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders,
@NotNull final Class<T> clazz) {
return requestWithHttpResponse(new HttpGet(uri), null, parameters, addOrReplaceHeaders)
.map(resp -> this.convertJsonResponseToObject(resp, clazz));
}

public <T> Observable<T> put(@NotNull final String uri,
@Nullable final HttpEntity entity,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders,
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders,
@NotNull final Class<T> clazz) {
return requestWithHttpResponse(new HttpPut(uri), entity, parameters, addOrReplaceHeaders)
.map(resp -> this.convertJsonResponseToObject(resp, clazz));
}

public <T> Observable<T> post(@NotNull final String uri,
@Nullable final HttpEntity entity,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders,
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders,
@NotNull final Class<T> clazz) {
return requestWithHttpResponse(new HttpPost(uri), entity, parameters, addOrReplaceHeaders)
.map(resp -> this.convertJsonResponseToObject(resp, clazz));
}

public Observable<HttpResponse> delete(@NotNull final String uri,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders) {
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders) {
return requestWithHttpResponse(new HttpDelete(uri), null, parameters, addOrReplaceHeaders);
}

public <T> Observable<T> patch(@NotNull final String uri,
@Nullable final HttpEntity entity,
@Nullable final List<NameValuePair> parameters,
@Nullable final List<Header> addOrReplaceHeaders,
final List<NameValuePair> parameters,
final List<Header> addOrReplaceHeaders,
@NotNull final Class<T> clazz) {
return requestWithHttpResponse(new HttpPatch(uri), entity, parameters, addOrReplaceHeaders)
.map(resp -> this.convertJsonResponseToObject(resp, clazz));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public String getAccessToken() throws IOException {
return accessToken;
}

@Nullable
@Override
public Header[] getDefaultHeaders() throws IOException {
Header[] defaultHeaders = super.getDefaultHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.List;
import java.util.UUID;

import static java.util.Collections.emptyList;
import static java.util.Optional.ofNullable;

public class SharedKeyHttpObservable extends HttpObservable {
public static String ApiVersion = "2018-11-09";
private SharedKeyCredential cred;
Expand Down Expand Up @@ -63,16 +66,15 @@ public Observable<CloseableHttpResponse> request(final HttpRequestBase httpReque
// cannot be added to default header group in case of duplication.
headerGroup.addHeader(new BasicHeader("Content-Length", String.valueOf(entity.getContentLength())));
}
addOrReplaceHeaders.stream().forEach(header -> headerGroup.addHeader(header));
String key = cred.generateSharedKey(httpRequest, headerGroup, parameters);
ofNullable(addOrReplaceHeaders).orElse(emptyList()).forEach(headerGroup::addHeader);
String key = cred.generateSharedKey(httpRequest, headerGroup, ofNullable(parameters).orElse(emptyList()));

getDefaultHeaderGroup().updateHeader(new BasicHeader("Authorization", key));

return super.request(httpRequest, entity, parameters, addOrReplaceHeaders);
return super.request(httpRequest, entity, ofNullable(parameters).orElse(emptyList()), ofNullable(addOrReplaceHeaders).orElse(emptyList()));
}

@Override
@Nullable
public Header[] getDefaultHeaders() throws IOException {
return defaultHeaders.getAllHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public AzureSparkCosmosClusterManager() {
// Getters / setters
//

@Nullable
public List<NameValuePair> getAccountFilter() {
return Collections.singletonList(ODataParam.filter(ACCOUNT_FILTER));
}
Expand Down

0 comments on commit 44c16e2

Please sign in to comment.