Skip to content

Commit

Permalink
Introduce classification analysis runner. (elastic#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Oct 4, 2019
1 parent 6b8d3f6 commit 06007cd
Show file tree
Hide file tree
Showing 22 changed files with 610 additions and 109 deletions.
10 changes: 8 additions & 2 deletions include/api/CDataFrameAnalysisRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class CMemoryUsageEstimationResultJsonWriter;
//! early to determine how to implement a good cooperative interrupt scheme.
class API_EXPORT CDataFrameAnalysisRunner {
public:
using TBoolVec = std::vector<bool>;
using TStrVec = std::vector<std::string>;
using TStrVecVec = std::vector<TStrVec>;
using TRowRef = core::data_frame_detail::CRowRef;
using TProgressRecorder = std::function<void(double)>;

Expand Down Expand Up @@ -98,6 +100,9 @@ class API_EXPORT CDataFrameAnalysisRunner {
//! \return The number of columns this analysis appends.
virtual std::size_t numberExtraColumns() const = 0;

//! \return Indicator of columns for which empty value should be treated as missing.
virtual TBoolVec columnsForWhichEmptyIsMissing(const TStrVec& fieldNames) const;

//! Write the extra columns of \p row added by the analysis to \p writer.
//!
//! This should create a new object of the form:
Expand All @@ -114,6 +119,7 @@ class API_EXPORT CDataFrameAnalysisRunner {
//! \param[in] row The row to write the columns added by this analysis.
//! \param[in,out] writer The stream to which to write the extra columns.
virtual void writeOneRow(const TStrVec& featureNames,
const TStrVecVec& categoricalFieldValues,
TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const = 0;

Expand Down Expand Up @@ -189,12 +195,12 @@ class API_EXPORT CDataFrameAnalysisRunnerFactory {

TRunnerUPtr make(const CDataFrameAnalysisSpecification& spec) const;
TRunnerUPtr make(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& params) const;
const rapidjson::Value& jsonParameters) const;

private:
virtual TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec) const = 0;
virtual TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& params) const = 0;
const rapidjson::Value& jsonParameters) const = 0;
};
}
}
Expand Down
4 changes: 4 additions & 0 deletions include/api/CDataFrameAnalysisSpecification.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace api {
//! performance statistics.
class API_EXPORT CDataFrameAnalysisSpecification {
public:
using TBoolVec = std::vector<bool>;
using TStrVec = std::vector<std::string>;
using TDataFrameUPtr = std::unique_ptr<core::CDataFrame>;
using TTemporaryDirectoryPtr = std::shared_ptr<core::CTemporaryDirectory>;
Expand Down Expand Up @@ -173,6 +174,9 @@ class API_EXPORT CDataFrameAnalysisSpecification {
//! 2. disk is used (only one partition needs to be loaded to main memory)
void estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const;

//! \return Indicator of columns for which empty value should be treated as missing.
TBoolVec columnsForWhichEmptyIsMissing(const TStrVec& fieldNames) const;

//! \return shared pointer to the persistence stream.
TDataAdderUPtr persister() const;

Expand Down
2 changes: 2 additions & 0 deletions include/api/CDataFrameAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class CDataFrameAnalysisSpecification;
//!
class API_EXPORT CDataFrameAnalyzer {
public:
using TBoolVec = std::vector<bool>;
using TStrVec = std::vector<std::string>;
using TJsonOutputStreamWrapperUPtr = std::unique_ptr<core::CJsonOutputStreamWrapper>;
using TJsonOutputStreamWrapperUPtrSupplier =
Expand Down Expand Up @@ -104,6 +105,7 @@ class API_EXPORT CDataFrameAnalyzer {
TDataFrameAnalysisSpecificationUPtr m_AnalysisSpecification;
TStrVec m_CategoricalFieldNames;
TStrSizeUMapVec m_CategoricalFieldValues;
TBoolVec m_EmptyAsMissing;
TDataFrameUPtr m_DataFrame;
TStrVec m_FieldNames;
TTemporaryDirectoryPtr m_DataFrameDirectory;
Expand Down
36 changes: 15 additions & 21 deletions include/api/CDataFrameBoostedTreeRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <core/CDataSearcher.h>

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisRunner.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/ImportExport.h>
Expand All @@ -25,11 +26,11 @@ class CBoostedTreeFactory;
namespace api {

//! \brief Runs boosted tree regression on a core::CDataFrame.
class API_EXPORT CDataFrameBoostedTreeRunner final : public CDataFrameAnalysisRunner {
class API_EXPORT CDataFrameBoostedTreeRunner : public CDataFrameAnalysisRunner {
public:
//! This is not intended to be called directly: use CDataFrameBoostedTreeRunnerFactory.
CDataFrameBoostedTreeRunner(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& jsonParameters);
const CDataFrameAnalysisConfigReader::CParameters& parameters);

//! This is not intended to be called directly: use CDataFrameBoostedTreeRunnerFactory.
CDataFrameBoostedTreeRunner(const CDataFrameAnalysisSpecification& spec);
Expand All @@ -39,13 +40,20 @@ class API_EXPORT CDataFrameBoostedTreeRunner final : public CDataFrameAnalysisRu
//! \return The number of columns this adds to the data frame.
std::size_t numberExtraColumns() const override;

//! Write the prediction for \p row to \p writer.
void writeOneRow(const TStrVec& featureNames,
TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const override;
protected:
using TBoostedTreeUPtr = std::unique_ptr<maths::CBoostedTree>;

protected:
//! Parameter reader handling parameters that are shared by subclasses.
static CDataFrameAnalysisConfigReader getParameterReader();
//! Name of dependent variable field.
const std::string& dependentVariableFieldName() const;
//! Name of prediction field.
const std::string& predictionFieldName() const;
//! Underlying boosted tree.
const maths::CBoostedTree& boostedTree() const;

private:
using TBoostedTreeUPtr = std::unique_ptr<maths::CBoostedTree>;
using TBoostedTreeFactoryUPtr = std::unique_ptr<maths::CBoostedTreeFactory>;
using TDataSearcherUPtr = CDataFrameAnalysisSpecification::TDataSearcherUPtr;
using TMemoryEstimator = std::function<void(std::int64_t)>;
Expand All @@ -71,20 +79,6 @@ class API_EXPORT CDataFrameBoostedTreeRunner final : public CDataFrameAnalysisRu
TBoostedTreeUPtr m_BoostedTree;
std::atomic<std::int64_t> m_Memory;
};

//! \brief Makes a core::CDataFrame boosted tree regression runner.
class API_EXPORT CDataFrameBoostedTreeRunnerFactory final : public CDataFrameAnalysisRunnerFactory {
public:
const std::string& name() const override;

private:
static const std::string NAME;

private:
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec) const override;
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& params) const override;
};
}
}

Expand Down
67 changes: 67 additions & 0 deletions include/api/CDataFrameClassificationRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_api_CDataFrameClassificationRunner_h
#define INCLUDED_ml_api_CDataFrameClassificationRunner_h

#include <core/CDataSearcher.h>

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisRunner.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/CDataFrameBoostedTreeRunner.h>
#include <api/ImportExport.h>

#include <rapidjson/fwd.h>

#include <atomic>

namespace ml {
namespace api {

//! \brief Runs boosted tree classification on a core::CDataFrame.
class API_EXPORT CDataFrameClassificationRunner final : public CDataFrameBoostedTreeRunner {
public:
static const CDataFrameAnalysisConfigReader getParameterReader();

//! This is not intended to be called directly: use CDataFrameClassificationRunnerFactory.
CDataFrameClassificationRunner(const CDataFrameAnalysisSpecification& spec,
const CDataFrameAnalysisConfigReader::CParameters& parameters);

//! This is not intended to be called directly: use CDataFrameClassificationRunnerFactory.
CDataFrameClassificationRunner(const CDataFrameAnalysisSpecification& spec);

//! \return Indicator of columns for which empty value should be treated as missing.
TBoolVec columnsForWhichEmptyIsMissing(const TStrVec& fieldNames) const override;

//! Write the prediction for \p row to \p writer.
void writeOneRow(const TStrVec& featureNames,
const TStrVecVec& categoricalFieldValues,
TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const override;

private:
std::size_t m_NumTopClasses;
};

//! \brief Makes a core::CDataFrame boosted tree classification runner.
class API_EXPORT CDataFrameClassificationRunnerFactory final
: public CDataFrameAnalysisRunnerFactory {
public:
const std::string& name() const override;

private:
static const std::string NAME;

private:
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec) const override;
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& jsonParameters) const override;
};
}
}

#endif // INCLUDED_ml_api_CDataFrameClassificationRunner_h
6 changes: 4 additions & 2 deletions include/api/CDataFrameOutliersRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef INCLUDED_ml_api_CDataFrameOutliersRunner_h
#define INCLUDED_ml_api_CDataFrameOutliersRunner_h

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisRunner.h>

#include <api/ImportExport.h>
Expand All @@ -21,7 +22,7 @@ class API_EXPORT CDataFrameOutliersRunner final : public CDataFrameAnalysisRunne
public:
//! This is not intended to be called directly: use CDataFrameOutliersRunnerFactory.
CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& jsonParameters);
const CDataFrameAnalysisConfigReader::CParameters& parameters);

//! This is not intended to be called directly: use CDataFrameOutliersRunnerFactory.
CDataFrameOutliersRunner(const CDataFrameAnalysisSpecification& spec);
Expand All @@ -31,6 +32,7 @@ class API_EXPORT CDataFrameOutliersRunner final : public CDataFrameAnalysisRunne

//! Write the extra columns of \p row added by outlier analysis to \p writer.
void writeOneRow(const TStrVec& featureNames,
const TStrVecVec& categoricalFieldValues,
TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const override;

Expand Down Expand Up @@ -79,7 +81,7 @@ class API_EXPORT CDataFrameOutliersRunnerFactory final : public CDataFrameAnalys
private:
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec) const override;
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& params) const override;
const rapidjson::Value& jsonParameters) const override;
};
}
}
Expand Down
59 changes: 59 additions & 0 deletions include/api/CDataFrameRegressionRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_api_CDataFrameRegressionRunner_h
#define INCLUDED_ml_api_CDataFrameRegressionRunner_h

#include <core/CDataSearcher.h>

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/CDataFrameBoostedTreeRunner.h>
#include <api/ImportExport.h>

#include <rapidjson/fwd.h>

#include <atomic>

namespace ml {
namespace api {

//! \brief Runs boosted tree regression on a core::CDataFrame.
class API_EXPORT CDataFrameRegressionRunner final : public CDataFrameBoostedTreeRunner {
public:
static const CDataFrameAnalysisConfigReader getParameterReader();

//! This is not intended to be called directly: use CDataFrameRegressionRunnerFactory.
CDataFrameRegressionRunner(const CDataFrameAnalysisSpecification& spec,
const CDataFrameAnalysisConfigReader::CParameters& parameters);

//! This is not intended to be called directly: use CDataFrameRegressionRunnerFactory.
CDataFrameRegressionRunner(const CDataFrameAnalysisSpecification& spec);

//! Write the prediction for \p row to \p writer.
void writeOneRow(const TStrVec& featureNames,
const TStrVecVec& categoricalFieldValues,
TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const override;
};

//! \brief Makes a core::CDataFrame boosted tree regression runner.
class API_EXPORT CDataFrameRegressionRunnerFactory final : public CDataFrameAnalysisRunnerFactory {
public:
const std::string& name() const override;

private:
static const std::string NAME;

private:
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec) const override;
TRunnerUPtr makeImpl(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& jsonParameters) const override;
};
}
}

#endif // INCLUDED_ml_api_CDataFrameRegressionRunner_h
10 changes: 8 additions & 2 deletions lib/api/CDataFrameAnalysisRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
namespace ml {
namespace api {
namespace {
using TBoolVec = std::vector<bool>;

std::size_t maximumNumberPartitions(const CDataFrameAnalysisSpecification& spec) {
// We limit the maximum number of partitions to rows^(1/2) because very
// large numbers of partitions are going to be slow and it is better to tell
Expand All @@ -43,6 +45,10 @@ CDataFrameAnalysisRunner::~CDataFrameAnalysisRunner() {
this->waitToFinish();
}

TBoolVec CDataFrameAnalysisRunner::columnsForWhichEmptyIsMissing(const TStrVec& fieldNames) const {
return TBoolVec(fieldNames.size(), false);
}

void CDataFrameAnalysisRunner::estimateMemoryUsage(CMemoryUsageEstimationResultJsonWriter& writer) const {
std::size_t numberRows{m_Spec.numberRows()};
std::size_t numberColumns{m_Spec.numberColumns() + this->numberExtraColumns()};
Expand Down Expand Up @@ -223,8 +229,8 @@ CDataFrameAnalysisRunnerFactory::make(const CDataFrameAnalysisSpecification& spe

CDataFrameAnalysisRunnerFactory::TRunnerUPtr
CDataFrameAnalysisRunnerFactory::make(const CDataFrameAnalysisSpecification& spec,
const rapidjson::Value& params) const {
auto result = this->makeImpl(spec, params);
const rapidjson::Value& jsonParameters) const {
auto result = this->makeImpl(spec, jsonParameters);
result->computeAndSaveExecutionStrategy();
return result;
}
Expand Down
15 changes: 13 additions & 2 deletions lib/api/CDataFrameAnalysisSpecification.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
#include <core/CLogger.h>

#include <api/CDataFrameAnalysisConfigReader.h>
#include <api/CDataFrameBoostedTreeRunner.h>
#include <api/CDataFrameClassificationRunner.h>
#include <api/CDataFrameOutliersRunner.h>
#include <api/CDataFrameRegressionRunner.h>
#include <api/CMemoryUsageEstimationResultJsonWriter.h>

#include <rapidjson/document.h>
Expand Down Expand Up @@ -41,11 +42,13 @@ const std::string CDataFrameAnalysisSpecification::NAME("name");
const std::string CDataFrameAnalysisSpecification::PARAMETERS("parameters");

namespace {
using TBoolVec = std::vector<bool>;
using TRunnerFactoryUPtrVec = ml::api::CDataFrameAnalysisSpecification::TRunnerFactoryUPtrVec;

TRunnerFactoryUPtrVec analysisFactories() {
TRunnerFactoryUPtrVec factories;
factories.push_back(std::make_unique<ml::api::CDataFrameBoostedTreeRunnerFactory>());
factories.push_back(std::make_unique<ml::api::CDataFrameRegressionRunnerFactory>());
factories.push_back(std::make_unique<ml::api::CDataFrameClassificationRunnerFactory>());
factories.push_back(std::make_unique<ml::api::CDataFrameOutliersRunnerFactory>());
// Add new analysis types here.
return factories;
Expand Down Expand Up @@ -206,6 +209,14 @@ void CDataFrameAnalysisSpecification::estimateMemoryUsage(CMemoryUsageEstimation
m_Runner->estimateMemoryUsage(writer);
}

TBoolVec CDataFrameAnalysisSpecification::columnsForWhichEmptyIsMissing(const TStrVec& fieldNames) const {
if (m_Runner == nullptr) {
HANDLE_FATAL(<< "Internal error: no runner available. Please report this problem.");
return TBoolVec(fieldNames.size(), false);
}
return m_Runner->columnsForWhichEmptyIsMissing(fieldNames);
}

void CDataFrameAnalysisSpecification::initializeRunner(const rapidjson::Value& jsonAnalysis) {
// We pass of the interpretation of the parameters object to the appropriate
// analysis runner.
Expand Down
Loading

0 comments on commit 06007cd

Please sign in to comment.