Skip to content
28 changes: 17 additions & 11 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
} else {
state = "Unknown";
}
return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
}

bool TPartition::IsActive() const {
Expand Down Expand Up @@ -2134,6 +2134,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)

void TPartition::CommitWriteOperations(TTransaction& t)
{
PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());

Y_ABORT_UNLESS(PersistRequest);
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());

Expand All @@ -2151,6 +2153,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
HaveWriteMsg = true;
}

PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);

if (!t.WriteInfo->BodyKeys.empty()) {
PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
Expand All @@ -2165,6 +2171,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
MaxBlobSize);

for (auto& k : t.WriteInfo->BodyKeys) {
PQ_LOG_D("add key " << k.Key.ToString());
auto write = PartitionedBlob.Add(k.Key, k.Size);
if (write && !write->Value.empty()) {
AddCmdWrite(write, PersistRequest.Get(), ctx);
Expand All @@ -2173,18 +2180,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}
}

}

if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}

if (!t.WriteInfo->BodyKeys.empty()) {
const auto& last = t.WriteInfo->BodyKeys.back();

NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <util/system/types.h>
#include <util/digest/multi.h>
#include <util/str_stl.h>
#include <util/string/builder.h>

#include <functional>

Expand Down Expand Up @@ -51,6 +52,13 @@ class TPartitionId {
}
}

TString ToString() const
{
TStringBuilder s;
s << *this;
return s;
}

bool IsSupportivePartition() const
{
return WriteId.Defined();
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,16 +1064,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
}
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
"PQ: %" PRIu64 ", Partition: %s, "
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
TabletID, Partition.ToString().c_str(),
DataKeysBody.back().Key.ToString().c_str(),
Head.Offset,
x.NewKey.ToString().c_str());
}
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"writing blob: topic '" << TopicName() << "' partition " << Partition
<< " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
);
PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
" old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
" size " << x.Size << " WTime " << ctx.Now().MilliSeconds());

CompactedKeys.emplace_back(x.NewKey, x.Size);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4667,6 +4667,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
Y_ABORT_UNLESS(next);

CheckTxState(ctx, *next);

TryWriteTxs(ctx);
}

void TPersQueue::OnInitComplete(const TActorContext& ctx)
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

// get records
{
WaitForDataRecords(client, shardIt);

auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
Expand All @@ -1267,6 +1269,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}

static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
int n = 0;
for (; n < 100; ++n) {
auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
if (res.GetResult().records().size()) {
break;
}
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
}

static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
TTestYdsEnv env(tableDesc, streamDesc);

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6343,6 +6343,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionPerTablet: 10 "
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
Expand Down Expand Up @@ -6865,7 +6866,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);

TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down
95 changes: 91 additions & 4 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class TFixture : public NUnitTest::TBaseFixture {

void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);

void WriteMessagesInTx(size_t big, size_t small);

const TDriver& GetDriver() const;

void CheckTabletKeys(const TString& topicName);
Expand Down Expand Up @@ -1595,21 +1597,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)

for (size_t i = 0; i < params.OldHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++oldHeadMsgCount;
}

for (size_t i = 0; i < params.BigBlobsCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++bigBlobMsgCount;
}

for (size_t i = 0; i < params.NewHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++newHeadMsgCount;
}

WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);

if (params.RestartMode == ERestartBeforeCommit) {
RestartPQTablet("topic_A", 0);
}
Expand Down Expand Up @@ -1638,7 +1641,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
start += oldHeadMsgCount;

for (size_t i = 0; i < bigBlobMsgCount; ++i) {
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
}
start += bigBlobMsgCount;

Expand Down Expand Up @@ -1903,6 +1906,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}

void TFixture::WriteMessagesInTx(size_t big, size_t small)
{
CreateTopic("topic_A", TEST_CONSUMER);

NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);

for (size_t i = 0; i < big; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

for (size_t i = 0; i < small; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

CommitTx(tx, EStatus::SUCCESS);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 1);
}


Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
{
WriteMessagesInTx(2, 202);
WriteMessagesInTx(2, 200);
WriteMessagesInTx(0, 1);
WriteMessagesInTx(4, 0);
WriteMessagesInTx(0, 1);
}

}

}