Skip to content

Commit

Permalink
Merge 5f95c3d into 666705c
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorkaZ authored May 16, 2024
2 parents 666705c + 5f95c3d commit b4a0351
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/ydb_over_fq/list_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class ListDirectoryRPC
self.set_type(Ydb::Scheme::Entry::DIRECTORY);

for (const auto& binding : Bindings_) {
if (binding.type() == FederatedQuery::BindingSetting::DATA_STREAMS) {
continue;
}

auto& destEntry = *result.add_children();
destEntry.set_name(binding.name());
destEntry.set_owner(binding.meta().created_by());
Expand Down
35 changes: 33 additions & 2 deletions ydb/tests/fq/s3/test_ydb_over_fq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import boto3
import json
import logging
import os
import pytest
import typing
import ydb
Expand Down Expand Up @@ -44,9 +45,13 @@ def read_scan_rows(it) -> typing.List[ydb_pb.Value]:
return scanned_rows


def make_columns(columns: typing.List[typing.Tuple[str, str]]) -> typing.List[ydb_pb.Column]:
return [ydb_pb.Column(name=name, type=ydb_pb.Type(type_id=ydb_pb.Type.PrimitiveTypeId.Value(type))) for name, type in columns]


class TestYdbOverFq(TestYdsBase):
def make_binding(self, client: FederatedQueryClient, name: str, path: str, connection_id: str, columns: typing.List[typing.Tuple[str, str]]):
columns = [ydb_pb.Column(name=name, type=ydb_pb.Type(type_id=ydb_pb.Type.PrimitiveTypeId.Value(type))) for name, type in columns]
columns = make_columns(columns)
client.create_object_storage_binding(name, path, "csv_with_names", connection_id, columns=columns)

def make_yq_driver(self, endpoint: str, folder_id: str, token: str) -> ydb.Driver:
Expand Down Expand Up @@ -91,7 +96,7 @@ def list_directory_test_body(self, kikimr, s3, client):

driver = self.make_yq_driver(kikimr.endpoint(), client.folder_id, "root@builtin")

# empty result
# empty resultм
ls_res = driver.scheme_client.list_directory("/")
assert ls_res.is_directory()
assert len(ls_res.children) == 0
Expand Down Expand Up @@ -130,6 +135,32 @@ def test_list_directory_v2(self, kikimr, s3, client):
def test_list_directory_v1(self, kikimr, s3, client):
self.list_directory_test_body(kikimr, s3, client)

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "list_without_streams"}], indirect=True)
def test_list_without_streams(self, kikimr, s3, client, yq_version):
self.init_topics(f"topic_to_not_list_{yq_version}")

connection_response = client.create_yds_connection("yds_conn", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

logging.debug("connection_response: " + str(connection_response.result))
assert not connection_response.issues, str(connection_response.issues)

binding_response = client.create_yds_binding(name="yds_bind",
stream=self.input_topic,
format="json_each_row",
connection_id=connection_response.result.connection_id,
columns=make_columns([("Data", "STRING")]))

logging.debug("binding_response: " + str(binding_response.result))
assert not binding_response.issues, str(binding_response.issues)

driver = self.make_yq_driver(kikimr.endpoint(), client.folder_id, "root@builtin")
ls_res = driver.scheme_client.list_directory("/")
assert ls_res.is_directory()
# as long as ANALYTICS requests can't process streams, don't list them in ydb_over_fq
# can't check len(children), because other tests' interference
assert list(map(lambda ch: ch.name, ls_res.children)).count("yds_bind") == 0

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_execute_data_query(self, kikimr, s3, client, unique_prefix, yq_version):
Expand Down

0 comments on commit b4a0351

Please sign in to comment.