Skip to content

[ntuple] Refactor RNTupleWriter into RNTupleFillContext #14391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tree/ntuple/v7/inc/ROOT/REntry.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class REntry {
friend class RCollectionNTupleWriter;
friend class RNTupleModel;
friend class RNTupleReader;
friend class RNTupleWriter;
friend class RNTupleFillContext;

/// The entry must be linked to a specific model (or one if its clones), identified by a model ID
std::uint64_t fModelId = 0;
Expand Down
112 changes: 85 additions & 27 deletions tree/ntuple/v7/inc/ROOT/RNTuple.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -342,29 +342,29 @@ public:

// clang-format off
/**
\class ROOT::Experimental::RNTupleWriter
\class ROOT::Experimental::RNTupleFillContext
\ingroup NTuple
\brief An RNTuple that gets filled with entries (data) and writes them to storage
\brief A context for filling entries (data) into clusters of an RNTuple

An output ntuple can be filled with entries. The caller has to make sure that the data that gets filled into an ntuple
An output cluster can be filled with entries. The caller has to make sure that the data that gets filled into a cluster
is not modified for the time of the Fill() call. The fill call serializes the C++ object into the column format and
writes data into the corresponding column page buffers. Writing of the buffers to storage is deferred and can be
triggered by Flush() or by destructing the ntuple. On I/O errors, an exception is thrown.
triggered by CommitCluster() or by destructing the context. On I/O errors, an exception is thrown.

Instances of this class are not meant to be used in isolation. For sequential writing, please refer to RNTupleWriter.
*/
// clang-format on
class RNTupleWriter {
friend RNTupleModel::RUpdater;
class RNTupleFillContext {
friend RNTupleWriter;

private:
/// The page sink's parallel page compression scheduler if IMT is on.
/// Needs to be destructed after the page sink is destructed and so declared before.
std::unique_ptr<Detail::RPageStorage::RTaskScheduler> fZipTasks;
std::unique_ptr<Detail::RPageSink> fSink;
/// Needs to be destructed before fSink
std::unique_ptr<RNTupleModel> fModel;

Detail::RNTupleMetrics fMetrics;

NTupleSize_t fLastCommitted = 0;
NTupleSize_t fLastCommittedClusterGroup = 0;
NTupleSize_t fNEntries = 0;
/// Keeps track of the number of bytes written into the current cluster
std::size_t fUnzippedClusterSize = 0;
Expand All @@ -378,6 +378,69 @@ private:
/// Estimator of uncompressed cluster size, taking into account the estimated compression ratio
std::size_t fUnzippedClusterSizeEst;

RNTupleFillContext(std::unique_ptr<RNTupleModel> model, std::unique_ptr<Detail::RPageSink> sink);
RNTupleFillContext(const RNTupleFillContext &) = delete;
RNTupleFillContext &operator=(const RNTupleFillContext &) = delete;

public:
~RNTupleFillContext();

/// Fill an entry into this context. This method will perform a light check whether the entry comes from the
/// context's own model.
/// \return The number of uncompressed bytes written.
std::size_t Fill(REntry &entry)
{
if (R__unlikely(entry.GetModelId() != fModel->GetModelId()))
throw RException(R__FAIL("mismatch between entry and model"));

const std::size_t bytesWritten = entry.Append();
fUnzippedClusterSize += bytesWritten;
fNEntries++;
if ((fUnzippedClusterSize >= fMaxUnzippedClusterSize) || (fUnzippedClusterSize >= fUnzippedClusterSizeEst))
CommitCluster();
return bytesWritten;
}
/// Ensure that the data from the so far seen Fill calls has been written to storage
void CommitCluster();

std::unique_ptr<REntry> CreateEntry() { return fModel->CreateEntry(); }

/// Return the entry number that was last committed in a cluster.
NTupleSize_t GetLastCommitted() const { return fLastCommitted; }
/// Return the number of entries filled so far.
NTupleSize_t GetNEntries() const { return fNEntries; }

void EnableMetrics() { fMetrics.Enable(); }
const Detail::RNTupleMetrics &GetMetrics() const { return fMetrics; }
};

// clang-format off
/**
\class ROOT::Experimental::RNTupleWriter
\ingroup NTuple
\brief An RNTuple that gets filled with entries (data) and writes them to storage

An output ntuple can be filled with entries. The caller has to make sure that the data that gets filled into an ntuple
is not modified for the time of the Fill() call. The fill call serializes the C++ object into the column format and
writes data into the corresponding column page buffers. Writing of the buffers to storage is deferred and can be
triggered by CommitCluster() or by destructing the writer. On I/O errors, an exception is thrown.
*/
// clang-format on
class RNTupleWriter {
friend RNTupleModel::RUpdater;

private:
/// The page sink's parallel page compression scheduler if IMT is on.
/// Needs to be destructed after the page sink (in the fill context) is destructed and so declared before.
std::unique_ptr<Detail::RPageStorage::RTaskScheduler> fZipTasks;
RNTupleFillContext fFillContext;
Detail::RNTupleMetrics fMetrics;

NTupleSize_t fLastCommittedClusterGroup = 0;

RNTupleModel &GetUpdatableModel() { return *fFillContext.fModel; }
Detail::RPageSink &GetSink() { return *fFillContext.fSink; }

// Helper function that is called from CommitCluster() when necessary
void CommitClusterGroup();

Expand All @@ -400,37 +463,32 @@ public:

/// The simplest user interface if the default entry that comes with the ntuple model is used.
/// \return The number of uncompressed bytes written.
std::size_t Fill() { return Fill(*fModel->GetDefaultEntry()); }
std::size_t Fill() { return fFillContext.Fill(*fFillContext.fModel->GetDefaultEntry()); }
/// Multiple entries can have been instantiated from the ntuple model. This method will perform
/// a light check whether the entry comes from the ntuple's own model.
/// \return The number of uncompressed bytes written.
std::size_t Fill(REntry &entry) {
if (R__unlikely(entry.GetModelId() != fModel->GetModelId()))
throw RException(R__FAIL("mismatch between entry and model"));

const std::size_t bytesWritten = entry.Append();
fUnzippedClusterSize += bytesWritten;
fNEntries++;
if ((fUnzippedClusterSize >= fMaxUnzippedClusterSize) || (fUnzippedClusterSize >= fUnzippedClusterSizeEst))
CommitCluster();
return bytesWritten;
}
std::size_t Fill(REntry &entry) { return fFillContext.Fill(entry); }
/// Ensure that the data from the so far seen Fill calls has been written to storage
void CommitCluster(bool commitClusterGroup = false);
void CommitCluster(bool commitClusterGroup = false)
{
fFillContext.CommitCluster();
if (commitClusterGroup)
CommitClusterGroup();
}

std::unique_ptr<REntry> CreateEntry() { return fModel->CreateEntry(); }
std::unique_ptr<REntry> CreateEntry() { return fFillContext.CreateEntry(); }

/// Return the entry number that was last committed in a cluster.
NTupleSize_t GetLastCommitted() const { return fLastCommitted; }
NTupleSize_t GetLastCommitted() const { return fFillContext.GetLastCommitted(); }
/// Return the entry number that was last committed in a cluster group.
NTupleSize_t GetLastCommittedClusterGroup() const { return fLastCommittedClusterGroup; }
/// Return the number of entries filled so far.
NTupleSize_t GetNEntries() const { return fNEntries; }
NTupleSize_t GetNEntries() const { return fFillContext.GetNEntries(); }

void EnableMetrics() { fMetrics.Enable(); }
const Detail::RNTupleMetrics &GetMetrics() const { return fMetrics; }

const RNTupleModel *GetModel() const { return fModel.get(); }
const RNTupleModel &GetModel() const { return *fFillContext.fModel; }

/// Get a `RNTupleModel::RUpdater` that provides limited support for incremental updates to the underlying
/// model, e.g. addition of new fields.
Expand Down
102 changes: 58 additions & 44 deletions tree/ntuple/v7/src/RNTuple.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ const ROOT::Experimental::RNTupleDescriptor *ROOT::Experimental::RNTupleReader::

//------------------------------------------------------------------------------

ROOT::Experimental::RNTupleWriter::RNTupleWriter(std::unique_ptr<ROOT::Experimental::RNTupleModel> model,
std::unique_ptr<ROOT::Experimental::Detail::RPageSink> sink)
: fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleWriter")
ROOT::Experimental::RNTupleFillContext::RNTupleFillContext(std::unique_ptr<ROOT::Experimental::RNTupleModel> model,
std::unique_ptr<ROOT::Experimental::Detail::RPageSink> sink)
: fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleFillContext")
{
if (!fModel) {
throw RException(R__FAIL("null model"));
Expand All @@ -275,12 +275,6 @@ ROOT::Experimental::RNTupleWriter::RNTupleWriter(std::unique_ptr<ROOT::Experimen
throw RException(R__FAIL("null sink"));
}
fModel->Freeze();
#ifdef R__USE_IMT
if (IsImplicitMTEnabled()) {
fZipTasks = std::make_unique<RNTupleImtTaskScheduler>();
fSink->SetTaskScheduler(fZipTasks.get());
}
#endif
fSink->Create(*fModel.get());
fMetrics.ObserveMetrics(fSink->GetMetrics());

Expand All @@ -291,48 +285,18 @@ ROOT::Experimental::RNTupleWriter::RNTupleWriter(std::unique_ptr<ROOT::Experimen
fUnzippedClusterSizeEst = scale * writeOpts.GetApproxZippedClusterSize();
}

ROOT::Experimental::RNTupleWriter::~RNTupleWriter()
ROOT::Experimental::RNTupleFillContext::~RNTupleFillContext()
{
try {
CommitCluster(true /* commitClusterGroup */);
fSink->CommitDataset();
CommitCluster();
} catch (const RException &err) {
R__LOG_ERROR(NTupleLog()) << "failure committing ntuple: " << err.GetError().GetReport();
}
}

std::unique_ptr<ROOT::Experimental::RNTupleWriter>
ROOT::Experimental::RNTupleWriter::Recreate(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName,
std::string_view storage, const RNTupleWriteOptions &options)
{
return std::make_unique<RNTupleWriter>(std::move(model), Detail::RPageSink::Create(ntupleName, storage, options));
}

std::unique_ptr<ROOT::Experimental::RNTupleWriter>
ROOT::Experimental::RNTupleWriter::Append(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName, TFile &file,
const RNTupleWriteOptions &options)
{
auto sink = std::make_unique<Detail::RPageSinkFile>(ntupleName, file, options);
if (options.GetUseBufferedWrite()) {
auto bufferedSink = std::make_unique<Detail::RPageSinkBuf>(std::move(sink));
return std::make_unique<RNTupleWriter>(std::move(model), std::move(bufferedSink));
}
return std::make_unique<RNTupleWriter>(std::move(model), std::move(sink));
}

void ROOT::Experimental::RNTupleWriter::CommitClusterGroup()
{
if (fNEntries == fLastCommittedClusterGroup)
return;
fSink->CommitClusterGroup();
fLastCommittedClusterGroup = fNEntries;
}

void ROOT::Experimental::RNTupleWriter::CommitCluster(bool commitClusterGroup)
void ROOT::Experimental::RNTupleFillContext::CommitCluster()
{
if (fNEntries == fLastCommitted) {
if (commitClusterGroup)
CommitClusterGroup();
return;
}
if (fSink->GetWriteOptions().GetHasSmallClusters() &&
Expand All @@ -355,9 +319,59 @@ void ROOT::Experimental::RNTupleWriter::CommitCluster(bool commitClusterGroup)

fLastCommitted = fNEntries;
fUnzippedClusterSize = 0;
}

if (commitClusterGroup)
CommitClusterGroup();
//------------------------------------------------------------------------------

ROOT::Experimental::RNTupleWriter::RNTupleWriter(std::unique_ptr<ROOT::Experimental::RNTupleModel> model,
std::unique_ptr<ROOT::Experimental::Detail::RPageSink> sink)
: fFillContext(std::move(model), std::move(sink)), fMetrics("RNTupleWriter")
{
#ifdef R__USE_IMT
if (IsImplicitMTEnabled()) {
fZipTasks = std::make_unique<RNTupleImtTaskScheduler>();
fFillContext.fSink->SetTaskScheduler(fZipTasks.get());
}
#endif
// Observe directly the sink's metrics to avoid an additional prefix from the fill context.
fMetrics.ObserveMetrics(fFillContext.fSink->GetMetrics());
}

ROOT::Experimental::RNTupleWriter::~RNTupleWriter()
{
try {
CommitCluster(true /* commitClusterGroup */);
fFillContext.fSink->CommitDataset();
} catch (const RException &err) {
R__LOG_ERROR(NTupleLog()) << "failure committing ntuple: " << err.GetError().GetReport();
}
}

std::unique_ptr<ROOT::Experimental::RNTupleWriter>
ROOT::Experimental::RNTupleWriter::Recreate(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName,
std::string_view storage, const RNTupleWriteOptions &options)
{
return std::make_unique<RNTupleWriter>(std::move(model), Detail::RPageSink::Create(ntupleName, storage, options));
}

std::unique_ptr<ROOT::Experimental::RNTupleWriter>
ROOT::Experimental::RNTupleWriter::Append(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName, TFile &file,
const RNTupleWriteOptions &options)
{
auto sink = std::make_unique<Detail::RPageSinkFile>(ntupleName, file, options);
if (options.GetUseBufferedWrite()) {
auto bufferedSink = std::make_unique<Detail::RPageSinkBuf>(std::move(sink));
return std::make_unique<RNTupleWriter>(std::move(model), std::move(bufferedSink));
}
return std::make_unique<RNTupleWriter>(std::move(model), std::move(sink));
}

void ROOT::Experimental::RNTupleWriter::CommitClusterGroup()
{
if (GetNEntries() == fLastCommittedClusterGroup)
return;
fFillContext.fSink->CommitClusterGroup();
fLastCommittedClusterGroup = GetNEntries();
}

//------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions tree/ntuple/v7/src/RNTupleModel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ ROOT::Experimental::RNTupleModel::RProjectedFields::Clone(const RNTupleModel *ne
}

ROOT::Experimental::RNTupleModel::RUpdater::RUpdater(RNTupleWriter &writer)
: fWriter(writer), fOpenChangeset(*fWriter.fModel)
: fWriter(writer), fOpenChangeset(fWriter.GetUpdatableModel())
{
}

Expand All @@ -146,7 +146,7 @@ void ROOT::Experimental::RNTupleModel::RUpdater::CommitUpdate()
Detail::RNTupleModelChangeset toCommit{fOpenChangeset.fModel};
std::swap(fOpenChangeset.fAddedFields, toCommit.fAddedFields);
std::swap(fOpenChangeset.fAddedProjectedFields, toCommit.fAddedProjectedFields);
fWriter.fSink->UpdateSchema(toCommit, fWriter.fNEntries);
fWriter.GetSink().UpdateSchema(toCommit, fWriter.GetNEntries());
}

void ROOT::Experimental::RNTupleModel::RUpdater::AddField(std::unique_ptr<Detail::RFieldBase> field)
Expand Down
10 changes: 5 additions & 5 deletions tree/ntuple/v7/test/ntuple_basics.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -630,24 +630,24 @@ TEST(RNTuple, BareEntry)
FileRaii fileGuard("test_ntuple_bare_entry.root");
{
auto ntuple = RNTupleWriter::Recreate(std::move(m), "ntpl", fileGuard.GetPath());
const auto model = ntuple->GetModel();
const auto &model = ntuple->GetModel();
try {
model->GetDefaultEntry();
model.GetDefaultEntry();
FAIL() << "accessing default entry of bare model should throw";
} catch (const RException &err) {
EXPECT_THAT(err.what(), testing::HasSubstr("invalid attempt to use default entry of bare model"));
}
try {
model->Get<float>("pt");
model.Get<float>("pt");
FAIL() << "accessing default entry of bare model should throw";
} catch (const RException &err) {
EXPECT_THAT(err.what(), testing::HasSubstr("invalid attempt to use default entry of bare model"));
}

auto e1 = model->CreateEntry();
auto e1 = model.CreateEntry();
ASSERT_NE(nullptr, e1->Get<float>("pt"));
*(e1->Get<float>("pt")) = 1.0;
auto e2 = model->CreateBareEntry();
auto e2 = model.CreateBareEntry();
EXPECT_EQ(nullptr, e2->Get<float>("pt"));
float pt = 2.0;
e2->CaptureValueUnsafe("pt", &pt);
Expand Down
6 changes: 3 additions & 3 deletions tree/ntuple/v7/test/ntuple_modelext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST(RNTuple, ModelExtensionInvalidUse)
auto fieldPt = model->MakeField<float>("pt", 42.0);

auto ntuple = RNTupleWriter::Recreate(std::move(model), "myNTuple", fileGuard.GetPath());
auto entry = ntuple->GetModel()->CreateEntry();
auto entry = ntuple->GetModel().CreateEntry();

auto modelUpdater = ntuple->CreateModelUpdater();
modelUpdater->BeginUpdate();
Expand All @@ -96,14 +96,14 @@ TEST(RNTuple, ModelExtensionInvalidUse)
// Cannot fill if the model is not frozen
EXPECT_THROW(ntuple->Fill(), ROOT::Experimental::RException);
// Trying to create an entry should throw if model is not frozen
EXPECT_THROW((void)ntuple->GetModel()->CreateEntry(), ROOT::Experimental::RException);
EXPECT_THROW((void)ntuple->GetModel().CreateEntry(), ROOT::Experimental::RException);
modelUpdater->CommitUpdate();

// Using an entry that does not match the model should throw
EXPECT_THROW(ntuple->Fill(*entry), ROOT::Experimental::RException);

ntuple->Fill();
auto entry2 = ntuple->GetModel()->CreateEntry();
auto entry2 = ntuple->GetModel().CreateEntry();
ntuple->Fill(*entry2);

ntuple->Fill();
Expand Down
Loading