Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/blob_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class IBlobManager {
return DoSaveBlobBatch(std::move(blobBatch), db);
}

// Deletes the blob that was previously permanently saved
virtual void DeleteBlobOnExecute(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
virtual void DeleteBlobOnComplete(const TUnifiedBlobId& blobId) = 0;
};
Expand Down
20 changes: 10 additions & 10 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ class TStorageAction {
return !!Writing;
}

void OnExecuteTxAfterAction(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
void OnExecuteTxAfterAction(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
if (Removing) {
Removing->OnExecuteTxAfterRemoving(self, dbBlobs, success);
Removing->OnExecuteTxAfterRemoving(self, dbBlobs, blobsWroteSuccessfully);
}
if (Writing) {
Writing->OnExecuteTxAfterWrite(self, dbBlobs, success);
Writing->OnExecuteTxAfterWrite(self, dbBlobs, blobsWroteSuccessfully);
}
}

void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool success) {
void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) {
if (Removing) {
Removing->OnCompleteTxAfterRemoving(self, success);
Removing->OnCompleteTxAfterRemoving(self, blobsWroteSuccessfully);
}
if (Writing) {
Writing->OnCompleteTxAfterWrite(self, success);
Writing->OnCompleteTxAfterWrite(self, blobsWroteSuccessfully);
}
}
};
Expand Down Expand Up @@ -144,15 +144,15 @@ class TBlobsAction {
return false;
}

void OnExecuteTxAfterAction(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
void OnExecuteTxAfterAction(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
for (auto&& i : StorageActions) {
i.second.OnExecuteTxAfterAction(self, dbBlobs, success);
i.second.OnExecuteTxAfterAction(self, dbBlobs, blobsWroteSuccessfully);
}
}

void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool success) {
void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) {
for (auto&& i : StorageActions) {
i.second.OnCompleteTxAfterAction(self, success);
i.second.OnCompleteTxAfterAction(self, blobsWroteSuccessfully);
}
}

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/blobs_action/abstract/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class IBlobsDeclareRemovingAction: public ICommonBlobsAction {
YDB_READONLY_DEF(THashSet<TUnifiedBlobId>, DeclaredBlobs);
protected:
virtual void DoDeclareRemove(const TUnifiedBlobId& blobId) = 0;
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) = 0;
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool success) = 0;
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) = 0;
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) = 0;
public:
IBlobsDeclareRemovingAction(const TString& storageId)
: TBase(storageId)
Expand All @@ -33,11 +33,11 @@ class IBlobsDeclareRemovingAction: public ICommonBlobsAction {
}

void DeclareRemove(const TUnifiedBlobId& blobId);
void OnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
return DoOnExecuteTxAfterRemoving(self, dbBlobs, success);
void OnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
return DoOnExecuteTxAfterRemoving(self, dbBlobs, blobsWroteSuccessfully);
}
void OnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool success) {
return DoOnCompleteTxAfterRemoving(self, success);
void OnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) {
return DoOnCompleteTxAfterRemoving(self, blobsWroteSuccessfully);
}
};

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/blobs_action/abstract/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class IBlobsWritingAction: public ICommonBlobsAction {
virtual void DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) = 0;
virtual void DoOnBlobWriteResult(const TUnifiedBlobId& blobId, const NKikimrProto::EReplyStatus status) = 0;

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) = 0;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool success) = 0;
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) = 0;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) = 0;

virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) = 0;
public:
Expand Down Expand Up @@ -75,12 +75,12 @@ class IBlobsWritingAction: public ICommonBlobsAction {
return DoOnCompleteTxBeforeWrite(self);
}

void OnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
return DoOnExecuteTxAfterWrite(self, dbBlobs, success);
void OnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
return DoOnExecuteTxAfterWrite(self, dbBlobs, blobsWroteSuccessfully);
}

void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool success) {
return DoOnCompleteTxAfterWrite(self, success);
void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool blobsWroteSuccessfully) {
return DoOnCompleteTxAfterWrite(self, blobsWroteSuccessfully);
}

void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/bs/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ class TDeclareRemovingAction: public IBlobsDeclareRemovingAction {

}

virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
if (success) {
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
if (blobsWroteSuccessfully) {
for (auto&& i : GetDeclaredBlobs()) {
Manager->DeleteBlobOnExecute(i, dbBlobs);
}
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool success) {
if (success) {
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) {
if (blobsWroteSuccessfully) {
for (auto&& i : GetDeclaredBlobs()) {
Manager->DeleteBlobOnComplete(i);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/bs/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

namespace NKikimr::NOlap::NBlobOperations::NBlobStorage {

void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
ui64 blobsWritten = BlobBatch.GetBlobCount();
ui64 bytesWritten = BlobBatch.GetTotalSize();
if (success) {
if (blobsWroteSuccessfully) {
self.IncCounter(NColumnShard::COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
self.IncCounter(NColumnShard::COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
// self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_UPSERTED, insertedBytes);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/bs/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class TWriteAction: public IBlobsWritingAction {
return;
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {

}
public:
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class TMemoryWriteAction: public IBlobsWritingAction {
return;
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/, const bool /*success*/) override {
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/, const bool /*blobsWroteSuccessfully*/) override {

}
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {

}
public:
Expand Down Expand Up @@ -101,12 +101,12 @@ class TMemoryDeclareRemovingAction: public IBlobsDeclareRemovingAction {

}

virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/, const bool /*success*/) {
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/, const bool /*blobsWroteSuccessfully*/) {
for (auto&& i : GetDeclaredBlobs()) {
Storage->DeclareDataForRemove(i);
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) {
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) {

}
public:
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/tier/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ class TDeclareRemovingAction: public IBlobsDeclareRemovingAction {

}

virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
if (success) {
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
if (blobsWroteSuccessfully) {
for (auto&& i : GetDeclaredBlobs()) {
dbBlobs.AddTierBlobToDelete(GetStorageId(), i);
}
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool success) {
if (success) {
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) {
if (blobsWroteSuccessfully) {
for (auto&& i : GetDeclaredBlobs()) {
if (GCInfo->IsBlobInUsage(i)) {
Y_ABORT_UNLESS(GCInfo->MutableBlobsToDeleteInFuture().emplace(i).second);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/tier/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ void TWriteAction::DoSendWriteBlobRequest(const TString& data, const TUnifiedBlo
ExternalStorageOperator->Execute(evPtr);
}

void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
if (success) {
void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) {
if (blobsWroteSuccessfully) {
for (auto&& i : GetBlobsForWrite()) {
dbBlobs.RemoveTierDraftBlobId(GetStorageId(), i.first);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/tier/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class TWriteAction: public IBlobsWritingAction {
return;
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {

}
public:
Expand Down