Skip to content

Commit

Permalink
Merge 84df398 into b2e3ab3
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Aug 5, 2024
2 parents b2e3ab3 + 84df398 commit 106b0bb
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
9 changes: 7 additions & 2 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
structuredTokenBuilder.SetNoAuth();
}

auto effectiveFilePattern = meta->TableLocation;
if (meta->TableLocation.EndsWith('/')) {
effectiveFilePattern += '*';
}

const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
auto httpGateway = NYql::IHTTPGateway::Make();
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = meta->TableLocation,
.Pattern = effectiveFilePattern,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
tableMeta->ExternalSource.Properties = description.GetProperties();
tableMeta->ExternalSource.DataSourcePath = tableName;
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
return result;
}

Expand Down Expand Up @@ -839,6 +838,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
promise.SetValue(externalDataSourceMetadata);
return;
}
externalDataSourceMetadata.Metadata->ExternalSource.TableLocation = *externalPath;
LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem)
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
{
Expand Down
58 changes: 58 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,64 @@ def test_inference_optional_types(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[2].items[2].int64_value == 15
assert result_set.rows[2].items[3].int64_value == 33

@yq_v2
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_inference_multiple_files(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

read_data_1 = '''a,b,c
1,2,3
1,2,3'''
read_data_2 = '''a,b,c
1,2,3'''

s3_client.put_object(Body=read_data_1, Bucket='fbucket', Key='/test/1.csv', ContentType='text/plain')
s3_client.put_object(Body=read_data_2, Bucket='fbucket', Key='/test/2.csv', ContentType='text/plain')
kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "multiple_files_bucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
SELECT *
FROM `{storage_connection_name}`.`/test/`
WITH (format=csv_with_names, with_infer='true');
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 3
assert result_set.columns[0].name == "a"
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64
assert result_set.columns[1].name == "b"
assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64
assert result_set.columns[2].name == "c"
assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].int64_value == 1
assert result_set.rows[0].items[1].int64_value == 2
assert result_set.rows[0].items[2].int64_value == 3
assert result_set.rows[1].items[0].int64_value == 1
assert result_set.rows[1].items[1].int64_value == 2
assert result_set.rows[1].items[2].int64_value == 3
assert result_set.rows[2].items[0].int64_value == 1
assert result_set.rows[2].items[1].int64_value == 2
assert result_set.rows[2].items[2].int64_value == 3
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
Expand Down

0 comments on commit 106b0bb

Please sign in to comment.