diff --git a/include/Algorithm/Algorithm.hpp b/include/Algorithm/Algorithm.hpp index 7a2f7a8e..85f60152 100644 --- a/include/Algorithm/Algorithm.hpp +++ b/include/Algorithm/Algorithm.hpp @@ -14,7 +14,7 @@ using namespace std; namespace SESAME { -enum algoType { BirchType, StreamKMeansType, CluStreamType, DenStreamType }; +enum algoType { BirchType, StreamKMeansType, CluStreamType, DenStreamType, DBStreamType}; class Algorithm; typedef std::shared_ptr AlgorithmPtr; diff --git a/include/Algorithm/CluStream.hpp b/include/Algorithm/CluStream.hpp index 60e9e634..fd45606f 100644 --- a/include/Algorithm/CluStream.hpp +++ b/include/Algorithm/CluStream.hpp @@ -44,7 +44,7 @@ class CluStream : public Algorithm { int pointsForgot; int pointsMerged; clock_t startTime; - clock_t lastTime; + clock_t lastUpdateTime; CluStream(param_t &cmd_params); ~CluStream(); diff --git a/include/Algorithm/DBStream.hpp b/include/Algorithm/DBStream.hpp new file mode 100644 index 00000000..5c17756d --- /dev/null +++ b/include/Algorithm/DBStream.hpp @@ -0,0 +1,61 @@ +// +// Created by 1124a on 2021/8/30. +// + +#ifndef SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_ +#define SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_ +#include +#include +#include + +namespace SESAME { +typedef std::vector> Clusters; +class DBStreamParams : public AlgorithmParameters { + public: + double radius; + double lambda; + int cleanUpInterval;//Time gap + double weightMin;//minimum weight + double alpha;//α, intersection factor + double base;//base of decay function +}; + + +class DBStream : public Algorithm + { + public: + DBStreamParams dbStreamParams; + DampedWindowPtr dampedWindow; + std::vector microClusters; + SESAME::WeightedAdjacencyList weightedAdjacencyList; + std::vector microClusterNN;//micro clusters found in function findFixedRadiusNN + int weakEntry;//W_weak, weak entries + double aWeakEntry; + clock_t startTime; + clock_t pointArrivingTime; + clock_t lastCleanTime; + int microClusterIndex; + //Final output of clusters + Clusters finalClusters; + //Connectivity graph + unordered_map> connecvtivityGraphId; + + //TODO Need to implement weighted a weighted adjacency list S + DBStream(param_t &cmd_params); + ~DBStream(); + void Initilize() override; + void runOnlineClustering(PointPtr input) override; + void runOfflineClustering(DataSinkPtr sinkPtr) override; + private: + bool isInitial = false; + void update(PointPtr dataPoint); + bool checkMove( std::vector microClusters) const; + std::vector findFixedRadiusNN(PointPtr dataPoint); + void cleanUp(clock_t nowTime); + void reCluster(double threshold); + void insertIntoGraph(int microClusterId,int OtherId); + void insertIntoGraph(int microClusterId); + void findConnectedComponents(); + }; +} +#endif //SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_ diff --git a/include/Algorithm/DataStructure/DataStructureFactory.hpp b/include/Algorithm/DataStructure/DataStructureFactory.hpp index 6ffea88c..21567023 100644 --- a/include/Algorithm/DataStructure/DataStructureFactory.hpp +++ b/include/Algorithm/DataStructure/DataStructureFactory.hpp @@ -13,7 +13,7 @@ #include #include #include - +#include namespace SESAME { class DataStructureFactory { @@ -28,12 +28,16 @@ class DataStructureFactory { static CoresetTreePtr createCoresetTree(); static void clearCoresetTree(CoresetTreePtr tree); static MicroClusterPtr createMicroCluster(int dimension, int id); + static MicroClusterPtr createMicroCluster(int dimension, int id,PointPtr dataPoint,double radius); static void clearMicroCluster(MicroClusterPtr microCluster); static SnapshotPtr createSnapshot(MicroClusters & otherMicroClusters,int elapsedTime); static void clearSnapshot(SnapshotPtr snapshot); static CFTreePtr createCFTree(); static NodePtr createNode(); - + static MicroClusterPairPtr createMicroClusterPair(MicroClusterPtr microCluster1,MicroClusterPtr microCluster2); + static void clearMicroClusterPair(MicroClusterPairPtr microClusterPair); + static AdjustedWeightPtr createAdjustedWeight(double weight, clock_t pointTime); + static void clearAdjustedWeight(AdjustedWeightPtr adjustedWeight); }; } #endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_DATASTRUCTUREFACTORY_HPP_ diff --git a/include/Algorithm/DataStructure/MicroCluster.hpp b/include/Algorithm/DataStructure/MicroCluster.hpp index 330e3e67..2db4bb81 100644 --- a/include/Algorithm/DataStructure/MicroCluster.hpp +++ b/include/Algorithm/DataStructure/MicroCluster.hpp @@ -32,23 +32,26 @@ class MicroCluster { int SST;//the sum of the squares of the time stamps Til... Tin double weight; //number of data point in the clusters int dimension; - - //TODO Need to subtract Base class of CF vector when all cf-vector based-algorithms have been implemented + double radius;//Used in DBStream //the parameters below is unique for DenStream clock_t createTime; clock_t lastUpdateTime; bool visited; - //TODO this may need to modify in the future (All algorithms used this, e.g.DenStream,CluStream,DenStream,DBStream,SWEM =.=) + //TODO 1. Need to subtract Base class of CF vector when all cf-vector based-algorithms have been implemented + // 2.this may need to modify in the future (All algorithms used this, e.g.DenStream,CluStream,DenStream,DBStream,SWEM =.=) + + MicroCluster(int dimension, int id); + MicroCluster(int dimension, int id,PointPtr dataPoint,double radius);//DBStream + ~MicroCluster(); void init(PointPtr datapoint, int timestamp); - - void insert(PointPtr datapoint, int timestamp); - bool insert(PointPtr datapoint,double decayFactor,double epsilon);//Used in DenStream - + void insert(PointPtr datapoint, int timestamp);//Used in CluStream + bool insert(PointPtr datapoint,double decayFactor,double epsilon);// DenStream + void insert(PointPtr datapoint, double decayFactor);//DBStream void merge(MicroClusterPtr other); - void substractClusterVector(MicroClusterPtr other); + void subtractClusterVector(MicroClusterPtr other); void updateId(MicroClusterPtr other); void resetID(int index); //Used in DenStream @@ -65,10 +68,25 @@ class MicroCluster { dataPoint getVarianceVector(); double calCentroidDistance(PointPtr datapoint); bool judgeMerge(MicroClusterPtr other); + double getDistance(PointPtr datapoint);//DBStream + double getDistance(MicroClusterPtr other);//DBStream + void move();//DBStream + void decayWeight(double decayFactor); SESAME::MicroClusterPtr copy(); private: + double distance; static double inverseError(double x); }; +typedef struct finderMicroCluster +{ + finderMicroCluster(int n) : id(n) { } + bool operator()(const MicroClusterPtr MC) const + { + return (id == MC->id.front()); + } + int id; +}finderMicroCluster; + } #endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_MICROCLUSTER_HPP_ diff --git a/include/Algorithm/DataStructure/WeightedAdjacencyList.hpp b/include/Algorithm/DataStructure/WeightedAdjacencyList.hpp new file mode 100644 index 00000000..029f3422 --- /dev/null +++ b/include/Algorithm/DataStructure/WeightedAdjacencyList.hpp @@ -0,0 +1,64 @@ +// +// Created by 1124a on 2021/8/30. +// + +#ifndef SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_ +#define SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_ +#include +#include +#include +#include +#include +#include + +namespace SESAME { +struct MicroClusterPair; +typedef std::shared_ptr MicroClusterPairPtr; +struct MicroClusterPair{ + MicroClusterPtr microCluster1; + MicroClusterPtr microCluster2; + MicroClusterPair( MicroClusterPtr microCluster1,MicroClusterPtr microCluster2){ + this->microCluster1=microCluster1->copy(); + this->microCluster2=microCluster2->copy(); + } + //bool operator==(const MicroClusterPair &other) const; + +}; + +struct KeyHasher{ + std::size_t operator()(const MicroClusterPair µClusterPair) const + { + return (std::hash()(microClusterPair.microCluster1->id.front())) ^ (std::hash()(microClusterPair.microCluster2->id.front())); + } +}; + +struct EqualKey + { + bool operator() (const MicroClusterPair &MCPair1, const MicroClusterPair &MCPair2) const + { + bool equal=false; + if( MCPair1.microCluster1->id.front()==MCPair2.microCluster1->id.front() &&MCPair1.microCluster2->id.front()==MCPair2.microCluster2->id.front() ) + equal=true; + if(MCPair1.microCluster1->id.front() ==MCPair2.microCluster2->id.front() &&MCPair1.microCluster2->id.front()==MCPair2.microCluster1->id.front() ) + equal=true; + + return equal; + } + }; + + +class AdjustedWeight; +typedef std::shared_ptr AdjustedWeightPtr; +class AdjustedWeight{ + public: + double weight; + clock_t updateTime; + AdjustedWeight(double weight, clock_t pointTime); + void add(clock_t startTime,double decayValue); + double getCurrentWeight(double decayFactor); +}; + +typedef std::unordered_map WeightedAdjacencyList; +typedef std::pair DensityGraph; +} +#endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_ diff --git a/include/Utils/BenchmarkUtils.hpp b/include/Utils/BenchmarkUtils.hpp index 77ed62b3..28901d2b 100644 --- a/include/Utils/BenchmarkUtils.hpp +++ b/include/Utils/BenchmarkUtils.hpp @@ -44,6 +44,11 @@ struct param_t { double lambda; double mu; double beta; + //used in DBStream + double radius; + int cleanUpInterval; + double weightMin; + double alpha; std::string inputPath; std::string outputPath; SESAME::algoType algoType; diff --git a/src/Algorithm/AlgorithmFactory.cpp b/src/Algorithm/AlgorithmFactory.cpp index 95fe6541..89e11085 100644 --- a/src/Algorithm/AlgorithmFactory.cpp +++ b/src/Algorithm/AlgorithmFactory.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -27,5 +28,9 @@ SESAME::AlgorithmPtr SESAME::AlgorithmFactory::create(param_t &cmd_params) { shared_ptr denStream = std::make_shared(cmd_params); return (SESAME::AlgorithmPtr) denStream; } + if (cmd_params.algoType == DBStreamType) { + shared_ptr dbStream = std::make_shared(cmd_params); + return (SESAME::AlgorithmPtr) dbStream; + } throw std::invalid_argument("Unsupported"); } diff --git a/src/Algorithm/CMakeLists.txt b/src/Algorithm/CMakeLists.txt index a31acc24..a6f7bf78 100644 --- a/src/Algorithm/CMakeLists.txt +++ b/src/Algorithm/CMakeLists.txt @@ -3,6 +3,7 @@ add_source_sesame( CluStream.cpp Birch.cpp DenStream.cpp + DBStream.cpp Algorithm.cpp AlgorithmFactory.cpp ) diff --git a/src/Algorithm/CluStream.cpp b/src/Algorithm/CluStream.cpp index 9168baf9..53c16226 100644 --- a/src/Algorithm/CluStream.cpp +++ b/src/Algorithm/CluStream.cpp @@ -9,8 +9,8 @@ #include /** * @Description: "offline" init micro clusters using KMeans - * @param size: The size of initial data obects, - * initialData:input intial data + * @param size: The size of initial data objects, + * initialData:input initial data *@Return: void */ SESAME::CluStream::CluStream(param_t &cmd_params) { @@ -98,7 +98,7 @@ double SESAME::CluStream::calRadius(MicroClusterPtr closestCluster) { radius = doubleMax; dataPoint centroid = closestCluster->getCentroid(); for (int i = 0; i < this->CluStreamParam.clusterNumber; i++) { - // SESAME_INFO("i is for wegf"<id == closestCluster->id) { continue; } @@ -118,7 +118,7 @@ void SESAME::CluStream::insertIntoCluster(PointPtr data, MicroClusterPtr operate operateCluster->insert(data, timestamp); } -//Delete oldest cluster and create new one case +//Delete the oldest cluster and create new one case void SESAME::CluStream::deleteCreateCluster(PointPtr data) { // 3.1 Try to forget old micro clusters @@ -172,7 +172,7 @@ void SESAME::CluStream::microClusterToPoint(std::vector µC for (int i = 0; i < this->CluStreamParam.clusterNumber; i++) { PointPtr point = DataStructureFactory::createPoint(i, microClusters[i]->weight, microClusters[i]->centroid.size(), 0); - for (int j = 0; j < microClusters[i]->centroid.size(); j++) + for (SESAME::dataPoint::size_type j = 0; j < microClusters[i]->centroid.size(); j++) point->setFeatureItem(microClusters[i]->centroid[j], j); //points; points.push_back(point); @@ -200,7 +200,7 @@ void SESAME::CluStream::Initilize() { this->window = WindowFactory::createLandmarkWindow(); this->window->pyramidalWindow.timeInterval = this->CluStreamParam.timeInterval; this->startTime = clock(); - this->lastTime= this->startTime; + this->lastUpdateTime=this->startTime; window->initPyramidalWindow(this->window->pyramidalWindow.timeInterval); } @@ -235,11 +235,11 @@ void SESAME::CluStream::runOnlineClustering(SESAME::PointPtr input) { } else { int interval; clock_t now = clock(); - interval = (int) ((now - this->lastTime) / CLOCKS_PER_SEC); - if (interval >= 1)// + interval = (int) ((now - lastUpdateTime) / CLOCKS_PER_SEC); + if (interval >= 1) { window->pyramidalWindowProcess(startTime, microClusters); - lastTime = now; + lastUpdateTime = now; } incrementalCluster(input); @@ -255,17 +255,17 @@ void SESAME::CluStream::runOfflineClustering(SESAME::DataSinkPtr sinkPtr) { landmarkTime = 0; SESAME_INFO("Start offline..."); SESAME::SnapshotPtr landmarkSnapshot; - SESAME::SnapshotPtr substractMiroCluster; + SESAME::SnapshotPtr subtractMiroCluster; //If offlineTimeWindow ==0, Only Observe the end results of micro clusters - substractMiroCluster = + subtractMiroCluster = DataStructureFactory::createSnapshot(microClusters, (int) ((now - startTime) / CLOCKS_PER_SEC)); SESAME_INFO("Now Miro Cluster is..."); for (int i = 0; i < CluStreamParam.clusterNumber; i++) { std::stringstream result, re2; - std::copy(substractMiroCluster->microClusters[i]->id.begin(), - substractMiroCluster->microClusters[i]->id.end(), + std::copy(subtractMiroCluster->microClusters[i]->id.begin(), + subtractMiroCluster->microClusters[i]->id.end(), std::ostream_iterator(re2, " ")); - SESAME_INFO("The ID is " << re2.str() << "weight is " << substractMiroCluster->microClusters[i]->weight); + SESAME_INFO("The ID is " << re2.str() << "weight is " << subtractMiroCluster->microClusters[i]->weight); } //The offline is to observe a process of data stream clustering @@ -282,21 +282,21 @@ void SESAME::CluStream::runOfflineClustering(SESAME::DataSinkPtr sinkPtr) { SESAME_INFO("The ID is " << re2.str() << "weight is " << landmarkSnapshot->microClusters[i]->weight); } if (landmarkSnapshot->elapsedTime == -1) - landmarkSnapshot = substractMiroCluster; + landmarkSnapshot = subtractMiroCluster; - substractMiroCluster = SESAME::Snapshot::substractSnapshot(substractMiroCluster, landmarkSnapshot, + subtractMiroCluster = SESAME::Snapshot::substractSnapshot(subtractMiroCluster, landmarkSnapshot, this->CluStreamParam.clusterNumber); } - SESAME_INFO("substract Miro Cluster is..."); + SESAME_INFO("subtract Miro Cluster is..."); for (int i = 0; i < CluStreamParam.clusterNumber; i++) { - std::stringstream result, re2; - std::copy(substractMiroCluster->microClusters[i]->id.begin(), - substractMiroCluster->microClusters[i]->id.end(), + std::stringstream re2; + std::copy(subtractMiroCluster->microClusters[i]->id.begin(), + subtractMiroCluster->microClusters[i]->id.end(), std::ostream_iterator(re2, " ")); - SESAME_INFO("The ID is " << re2.str() << "weight is " << substractMiroCluster->microClusters[i]->weight); + SESAME_INFO("The ID is " << re2.str() << "weight is " << subtractMiroCluster->microClusters[i]->weight); } vector TransformedSnapshot; - microClusterToPoint(substractMiroCluster->microClusters, TransformedSnapshot); + microClusterToPoint(subtractMiroCluster->microClusters, TransformedSnapshot); SESAME_INFO("offline Cluster Number " << this->CluStreamParam.offlineClusterNumber << "Total number of p: " << TransformedSnapshot.size()); diff --git a/src/Algorithm/DBStream.cpp b/src/Algorithm/DBStream.cpp new file mode 100644 index 00000000..88ba8c96 --- /dev/null +++ b/src/Algorithm/DBStream.cpp @@ -0,0 +1,317 @@ +// +// Created by Zhenyu on 2021/8/30. +// +#include +#include +#include + +/** + * @Description: initialize user defined parameters, + * @Param: + * radius: radius of micro clusters + * lambda: lambda in decay function + * cleanUpInterval: time gap of clean up + * weightMin: the minimum weight of micro cluster to identify noise MCs + * alpha: intersection factor + * base: decay function base -- Normally 2 + * @Return: void + */ +SESAME::DBStream::DBStream(param_t &cmd_params){ + this->dbStreamParams.pointNumber = cmd_params.pointNumber; + this->dbStreamParams.dimension = cmd_params.dimension; + this->dbStreamParams.radius=cmd_params.radius; + this->dbStreamParams.lambda=cmd_params.lambda; + this->dbStreamParams.cleanUpInterval=cmd_params.cleanUpInterval; + this->dbStreamParams.weightMin=cmd_params.weightMin; + this->dbStreamParams.alpha=cmd_params.alpha; + this->dbStreamParams.base=cmd_params.base; +} +SESAME::DBStream:: ~DBStream() += default; + +/** + * @Description: initialization of the algorithm, + * @Param: void + * @Return: void + */ +void SESAME::DBStream::Initilize() { + this->dampedWindow = WindowFactory::createDampedWindow(dbStreamParams.base, dbStreamParams.lambda); + this->startTime = clock(); + this->pointArrivingTime= clock(); + this->lastCleanTime=clock(); + this->weakEntry= ceil(pow(dbStreamParams.base,(-1)*dbStreamParams.lambda*dbStreamParams.cleanUpInterval)); + this->aWeakEntry=ceil(weakEntry*dbStreamParams.alpha); + this->microClusterIndex=-1; + } + /** + * @Description: online clustering stage, input data point incrementally and update the MC list and weight adjacency lists, + * @Param: void + * @Return: void + */ + void SESAME::DBStream::runOnlineClustering(PointPtr input) { + if (!this->isInitial) { + SESAME_INFO("Start initialize..."); + Initilize(); + this->isInitial = true; + } + else + { + if(input->getIndex()) + this->pointArrivingTime=clock(); + update(input); + } +} + + +/** + * @Description: Insert data point into existing MCs, + * first find the MCs which data point locates in, if finding no MCs, + * create new MC with this data point, else if finding MCs can accept this data + * update these MCs and the corresponding Sij in Weighted adjacency list S; + * After inserting, we check whether moving center of MCs will collapse, + * if it will, we roll back moving center actions, + * finally, we clean up the MCs which is less than Wmin + * @Param: data point + * @Return: void + */ + +void SESAME::DBStream::update(PointPtr dataPoint){ + double decayFactor=dampedWindow->decayFunction(this->pointArrivingTime, clock()); + this->pointArrivingTime=clock(); + this->microClusterNN=findFixedRadiusNN(dataPoint); + std::vector::size_type sizeNN=microClusterNN.size(); + // SESAME_INFO("find suitable MCs number : "<insert(dataPoint,decayFactor); // just update weight + for (int j = i + 1; j < sizeNN; j++) { + MicroClusterPair microClusterPair(microClusterNN[i], microClusterNN.at(j)); + if (weightedAdjacencyList.find(microClusterPair) != weightedAdjacencyList.end()) + { + clock_t startT= weightedAdjacencyList[microClusterPair]->updateTime; + double decayValue = dampedWindow->decayFunction(startT,this->pointArrivingTime); + weightedAdjacencyList[microClusterPair]->add(this->pointArrivingTime,decayValue); + // SESAME_INFO(" weight is "<weight ); + } else{ + // SESAME_INFO("Create microClusterPair!" << microClusterNN.at(i)->id.front()<<", "<id.front()); + AdjustedWeightPtr adjustedWeight = SESAME::DataStructureFactory::createAdjustedWeight(1,this->pointArrivingTime); + DensityGraph densityGraph( microClusterPair ,adjustedWeight); + weightedAdjacencyList.insert(densityGraph); + } + } + } + if (checkMove(microClusterNN)) + for (const MicroClusterPtr& microCluster : microClusterNN) microCluster->move(); + } + if (((pointArrivingTime-this->lastCleanTime)/CLOCKS_PER_SEC)>= dbStreamParams.cleanUpInterval && dataPoint->getIndex()!=0) + { + cleanUp(this->pointArrivingTime); + this->lastCleanTime=this->pointArrivingTime; + } + +} + + +std::vector SESAME::DBStream::findFixedRadiusNN(PointPtr dataPoint) +{ + std::vector result; + std::vector::size_type iter; + for (iter= 0;iter< microClusters.size();iter++) { + //microClusters.at(iter)>decayWeight(decayFactor); //add this line into Micro Cluster insert data functions + double distance = microClusters.at(iter)->getDistance(dataPoint); + // SESAME_INFO("distance is "< microClustersList) const +{ + bool move=true; + if(!microClustersList.empty()) + { + std::vector::size_type i ,j ; + for ( i = 0; i < microClustersList.size(); i++){ + for (j = i + 1; j < microClustersList.size(); j++){ + double distance=microClustersList.at(i)->getDistance(microClustersList.at(j)); + if (distance < dbStreamParams.radius) + move= false; + } + } + } + else + move=false; + return move; +} + +void SESAME::DBStream::cleanUp(clock_t nowTime){ + std::vector removeMicroCluster; + std::vector::size_type iter; + //Check the current micro Clusters whether they have weak MCs + //This just test for remove id + std::vector idList; + for (iter=0;iterweight <= this->weakEntry) + { + removeMicroCluster.push_back(microClusters.at(iter)->copy()); + idList.push_back(microClusters.at(iter)->id.front()); + microClusters.erase(microClusters.begin()+int(iter));//Delete this MC from current MC list + } + } + SESAME_INFO("now rm MCs number is "<(re, " ")); + SESAME_INFO("RM list "<first.microCluster1->id.front())); + auto exist2 = std::find_if(removeMicroCluster.begin(), removeMicroCluster.end(),SESAME::finderMicroCluster(iterW->first.microCluster2->id.front())); + + if ( exist1!=removeMicroCluster.end()|| exist2!=removeMicroCluster.end()) + iterW=weightedAdjacencyList.erase(iterW); + else { + double decayFactor=dampedWindow->decayFunction(iterW->second->updateTime,nowTime); + if (iterW->second->getCurrentWeight(decayFactor) < aWeakEntry) + iterW=weightedAdjacencyList.erase(iterW); + else + iterW++; + } + } + SESAME_INFO("CLEAN! now weightedAdjacencyList size:"<dimension, 0); + //This is just for testing, need to delete + std::vector centroid(finalClusters.at(iter).front()->dimension,0); + for(auto j=0; j!=finalClusters.at(iter).size();j++) + { + double currentWeight=point->getWeight()+finalClusters.at(iter).at(j)->weight; + point->setWeight(currentWeight); + for(auto a =0;adimension;a++) + { + if(j==0) + point->setFeatureItem(0,a); + point->setFeatureItem(point->getFeatureItem(a)+finalClusters.at(iter).at(j)->centroid.at(a),a); + centroid[a]=point->getFeatureItem(a);//testing + if(j==finalClusters.at(iter).size()-1) + { + point->setFeatureItem(point->getFeatureItem(a)/finalClusters.at(iter).at(j)->dimension,a); + centroid[a] =centroid[a]/finalClusters.at(iter).at(j)->dimension;//testing + } + } + } + std::stringstream re; + std::copy(centroid.begin(),centroid.end(),std::ostream_iterator(re, " ")); + SESAME_INFO("The NO."<put(point->copy()); // point index start from 0 + } +} + + +void SESAME::DBStream::reCluster(double threshold){ + WeightedAdjacencyList::iterator iterW; + for (iterW = weightedAdjacencyList.begin(); iterW != weightedAdjacencyList.end(); iterW++){ + if (iterW->first.microCluster1->weight >= dbStreamParams.weightMin &&iterW->first.microCluster2->weight >= dbStreamParams.weightMin){ + double val = 2*iterW->second->weight / (iterW->first.microCluster1->weight+iterW->first.microCluster2->weight); + if (val > threshold) { + insertIntoGraph( iterW->first.microCluster1->id.front(),iterW->first.microCluster2->id.front()); + insertIntoGraph(iterW->first.microCluster2->id.front(), iterW->first.microCluster1->id.front()); + } + else + { + insertIntoGraph(iterW->first.microCluster1->id.front()); + insertIntoGraph(iterW->first.microCluster2->id.front()); + } + } + } + findConnectedComponents(); +} +/** + * @Description: insert vertices and entries into connectivity graph when micro cluster pair + * connectivity value greater than the intersection threshold + * if the graph has testing micro cluster, add connected strong MC in the corresponding entries + * else, create new V,E into the graph + * @Param: connectivity graph, micro cluster 1 and 2 + * @Return: void + */ + +void SESAME::DBStream::insertIntoGraph(int microClusterId,int OtherId){ + if (connecvtivityGraphId.find(microClusterId)!=connecvtivityGraphId.end()) + { + if(std::find(connecvtivityGraphId[microClusterId].begin(),connecvtivityGraphId[microClusterId].end(),OtherId)==connecvtivityGraphId[microClusterId].end()) + connecvtivityGraphId[microClusterId].push_back(OtherId); + } else{ + auto microCluster = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(microClusterId)); + (*microCluster)->visited=false; + std::vector newMicroClusterIdSet; + newMicroClusterIdSet.push_back(OtherId); + connecvtivityGraphId.insert(make_pair(microClusterId,OtherId)); + //SESAME_INFO("Key cluster size: "<visited=false; + std::vector newMicroClusterIdSet; + connecvtivityGraphId.insert(make_pair(microClusterId,newMicroClusterIdSet)); + } +} + +/** + * @Description: findConnectedComponents function visit the existing connectivity graph + * and find all connected strong MCs that will finally form arbitrary-shaped macro clusters + * each macro cluster will be stored as a vector of micro clusters, which will be transformed into + * point that stores in sink later + * @Param: connectivity graph + * @Return: void + */ +void SESAME::DBStream::findConnectedComponents(){ + unordered_map>::iterator iter; + //This variable just for indicating the id of micro cluster which forming macro clusters + for (iter = connecvtivityGraphId.begin(); iter != connecvtivityGraphId.end(); iter++){ + std::vector idList; + auto microClusterKey = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(iter->first)); + if (!(*microClusterKey)->visited) { + std::vector newCluster; + newCluster.push_back((*microClusterKey)); + idList.push_back(iter->first); + for(int iterValue : iter->second) + { + auto microClusterElement = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(iterValue)); + if (!(*microClusterElement)->visited) + { + newCluster.push_back((*microClusterElement)); + (*microClusterElement)->visited = true; + idList.push_back((*microClusterElement)->id.front()); + } + } + this->finalClusters.push_back(newCluster); + //just used for examine reform ,need to delete later + std::stringstream result; + std::copy(idList.begin(),idList.end(),std::ostream_iterator(result, " ")); + //SESAME_INFO("New formed macro cluster ... including micro cluster :"); + //SESAME_INFO(" " << result.str() ); + } + + } +} diff --git a/src/Algorithm/DataStructure/CMakeLists.txt b/src/Algorithm/DataStructure/CMakeLists.txt index 120bafaa..3336d264 100644 --- a/src/Algorithm/DataStructure/CMakeLists.txt +++ b/src/Algorithm/DataStructure/CMakeLists.txt @@ -4,6 +4,7 @@ add_source_sesame( CoresetTree.cpp MicroCluster.cpp Snapshot.cpp + WeightedAdjacencyList.cpp DataStructureFactory.cpp CFTree.cpp FeatureVector.cpp diff --git a/src/Algorithm/DataStructure/DataStructureFactory.cpp b/src/Algorithm/DataStructure/DataStructureFactory.cpp index b0397a81..bf973964 100644 --- a/src/Algorithm/DataStructure/DataStructureFactory.cpp +++ b/src/Algorithm/DataStructure/DataStructureFactory.cpp @@ -33,7 +33,9 @@ void SESAME::DataStructureFactory::clearCoresetTree(SESAME::CoresetTreePtr tree) SESAME::MicroClusterPtr SESAME::DataStructureFactory::createMicroCluster(int id, int dimension){ return std::make_shared( id, dimension); } - +SESAME::MicroClusterPtr SESAME::DataStructureFactory::createMicroCluster(int dimension, int id,PointPtr dataPoint,double radius){ + return std::make_shared( dimension, id,dataPoint,radius); +} void SESAME::DataStructureFactory::clearMicroCluster(SESAME::MicroClusterPtr microCluster){ microCluster.reset(); } @@ -51,3 +53,18 @@ SESAME::CFTreePtr SESAME::DataStructureFactory::createCFTree() { SESAME::NodePtr SESAME::DataStructureFactory::createNode() { return std::make_shared(); } +SESAME::MicroClusterPairPtr SESAME::DataStructureFactory::createMicroClusterPair(MicroClusterPtr microCluster1, + MicroClusterPtr microCluster2){ + return std::make_shared(microCluster1,microCluster2); +} + +void SESAME::DataStructureFactory::clearMicroClusterPair(MicroClusterPairPtr microClusterPair){ + microClusterPair.reset(); +} + +SESAME::AdjustedWeightPtr SESAME::DataStructureFactory::createAdjustedWeight(double weight, clock_t pointTime){ + return std::make_shared(weight,pointTime); +} +void SESAME::DataStructureFactory::clearAdjustedWeight(SESAME::AdjustedWeightPtr adjustedWeight){ + adjustedWeight.reset(); +} \ No newline at end of file diff --git a/src/Algorithm/DataStructure/MicroCluster.cpp b/src/Algorithm/DataStructure/MicroCluster.cpp index 147fea31..1de19268 100644 --- a/src/Algorithm/DataStructure/MicroCluster.cpp +++ b/src/Algorithm/DataStructure/MicroCluster.cpp @@ -6,7 +6,7 @@ #include #include #include - +//Create MC, only initialization, used for DenStream, CluStream SESAME::MicroCluster::MicroCluster(int dimension, int id) { this->dimension=dimension; @@ -17,6 +17,29 @@ SESAME::MicroCluster::MicroCluster(int dimension, int id) this->visited=false; this->createTime=clock(); this->lastUpdateTime=this->createTime; + radius=0; + visited=false; +} +//Create MC, only initialization, only used for DBStream as it has user-defined fixed radius +SESAME::MicroCluster::MicroCluster(int dimension, int id,PointPtr dataPoint,double radius){ + this->dimension=dimension; + weight=1; + this->id.push_back(id); + LST=0; + SST=0; + this->visited=false; + this->createTime=clock(); + this->lastUpdateTime=this->createTime; + this->radius=radius; + + for (int i = 0; i < this->dimension; i++) + { + double data = dataPoint->getFeatureItem(i); + LS.push_back(data); + centroid.push_back(data); + } + + } //Release memory of the current micro cluster @@ -27,6 +50,8 @@ SESAME::MicroCluster::~MicroCluster() std::vector ().swap(LS); std::vector ().swap(SS); } + +//Used in DenStream, DBStream void SESAME::MicroCluster::init(PointPtr datapoint,int timestamp) { weight++; @@ -40,6 +65,7 @@ void SESAME::MicroCluster::init(PointPtr datapoint,int timestamp) SST+=timestamp*timestamp; } +//Used in DenStream, DBStream //insert a new data point from input data stream void SESAME::MicroCluster::insert(PointPtr datapoint,int timestamp) { @@ -53,8 +79,36 @@ void SESAME::MicroCluster::insert(PointPtr datapoint,int timestamp) LST+=timestamp; SST+=timestamp*timestamp; centroid=std::move(getCentroid()); +} +//Used only in DBStream +void SESAME::MicroCluster::insert(PointPtr datapoint,double decayFactor) +{ + decayWeight(decayFactor); + weight++; + double val = exp(-(pow(3 * this->distance / radius, 2) / 2)); + for(int i=0; igetFeatureItem(i); + LS[i] = centroid.at(i) + val * (data - centroid.at(i)); + } + lastUpdateTime=clock(); +} + +double SESAME::MicroCluster::getDistance(PointPtr datapoint){ + this->distance=calCentroidDistance(datapoint); + return this->distance; +} +//Often Used only in DBStream TODO this just a note, need to delete or detailed explain later +double SESAME::MicroCluster::getDistance(MicroClusterPtr other){ + double temp = 0, dist = 0; + for (int i = 0; i < this->dimension; i++) { + temp = this->centroid[i] - other->centroid[i]; + dist += temp * temp; + } + return sqrt(dist); } +//Used in DenStream bool SESAME::MicroCluster::insert(PointPtr datapoint,double decayFactor,double epsilon){ bool result; dataPoint LSPre; LSPre.assign(this->LS.begin(),this->LS.end()); @@ -82,6 +136,9 @@ bool SESAME::MicroCluster::insert(PointPtr datapoint,double decayFactor,double e result=false; return result; } + + + //merge two micro-clusters void SESAME::MicroCluster::merge(MicroClusterPtr other){ weight+=other->weight; @@ -97,7 +154,7 @@ void SESAME::MicroCluster::merge(MicroClusterPtr other){ } //Calculate the process of micro cluster N(Tc-h') -void SESAME::MicroCluster::substractClusterVector(MicroClusterPtr other) +void SESAME::MicroCluster::subtractClusterVector(MicroClusterPtr other) { this->weight-=other->weight; for(int i=0; icentroid=this->LS; +} + +void SESAME::MicroCluster::decayWeight(double decayFactor){ + this->weight *=decayFactor; +} double SESAME::MicroCluster::inverseError(double x){ double z = sqrt(M_PI) * x; double res = (z) / 2; @@ -283,4 +348,5 @@ double SESAME::MicroCluster::inverseError(double x){ SESAME::MicroClusterPtr SESAME::MicroCluster::copy() { return std::make_shared(*this); -} \ No newline at end of file +} + diff --git a/src/Algorithm/DataStructure/Snapshot.cpp b/src/Algorithm/DataStructure/Snapshot.cpp index 694b5e3e..d72658db 100644 --- a/src/Algorithm/DataStructure/Snapshot.cpp +++ b/src/Algorithm/DataStructure/Snapshot.cpp @@ -59,13 +59,13 @@ SESAME::SnapshotPtr SESAME::Snapshot::substractSnapshot(SnapshotPtr snapshotCurr for(unsigned int j=0; jmicroClusters[j]->id.size()>1) { if(snapshotCurrent->microClusters[i]->judgeMerge(snapshotLandmark->microClusters[j])) - snapshotCurrent->microClusters[i]->substractClusterVector(snapshotLandmark->microClusters[j]); + snapshotCurrent->microClusters[i]->subtractClusterVector(snapshotLandmark->microClusters[j]); } else { int clusterIdLandmark; clusterIdLandmark = snapshotLandmark->microClusters[j]->id[0]; if(std::find(snapshotCurrent->microClusters[i]->id.begin(), snapshotCurrent->microClusters[i]->id.end(), clusterIdLandmark)!=snapshotCurrent->microClusters[i]->id.end()) - snapshotCurrent->microClusters[i]->substractClusterVector(snapshotLandmark->microClusters[j]); + snapshotCurrent->microClusters[i]->subtractClusterVector(snapshotLandmark->microClusters[j]); } } } @@ -75,7 +75,7 @@ SESAME::SnapshotPtr SESAME::Snapshot::substractSnapshot(SnapshotPtr snapshotCurr if(snapshotLandmark->microClusters[j]->id.size()==1) { int clusterIdLandmark=snapshotLandmark->microClusters[j]->id[0]; if(snapshotCurrent->microClusters[i]->id[0]==clusterIdLandmark) - snapshotCurrent->microClusters[i]->substractClusterVector(snapshotLandmark->microClusters[j]); + snapshotCurrent->microClusters[i]->subtractClusterVector(snapshotLandmark->microClusters[j]); } } } diff --git a/src/Algorithm/DataStructure/WeightedAdjacencyList.cpp b/src/Algorithm/DataStructure/WeightedAdjacencyList.cpp new file mode 100644 index 00000000..1b93458f --- /dev/null +++ b/src/Algorithm/DataStructure/WeightedAdjacencyList.cpp @@ -0,0 +1,27 @@ +// +// Created by 1124a on 2021/8/30. +// +#include + + + +SESAME::AdjustedWeight::AdjustedWeight(double weight, clock_t pointTime){ + this->weight=weight; + this->updateTime=pointTime; +} +void SESAME::AdjustedWeight::add(clock_t startTime,double decayValue) { + if ( startTime == this->updateTime) { + weight++; + } + else { + weight *= decayValue + 1; + this->updateTime = clock(); + } +} + + +double SESAME::AdjustedWeight::getCurrentWeight(double decayFactor){ + return weight * decayFactor; +} + + diff --git a/src/Algorithm/WindowModel/DampedWindow.cpp b/src/Algorithm/WindowModel/DampedWindow.cpp index 623cf860..1c13dd02 100644 --- a/src/Algorithm/WindowModel/DampedWindow.cpp +++ b/src/Algorithm/WindowModel/DampedWindow.cpp @@ -9,6 +9,6 @@ SESAME::DampedWindow::DampedWindow(double base, double lambda){ double SESAME::DampedWindow::decayFunction(clock_t startTime, clock_t currentTimestamp) const { - double elapsedTime = (int) ((currentTimestamp - startTime) / CLOCKS_PER_SEC); + double elapsedTime = (double) (currentTimestamp - startTime) / CLOCKS_PER_SEC; return pow(this->base, -1 * this->lambda * elapsedTime); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 348c166a..dcbb5001 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,9 +1,10 @@ # adding the Google_Tests_run target add_executable(Google_Tests_run - SystemTest/DenStreamTest.cpp SystemTest/CluStreamTest.cpp SystemTest/StreamKMTest.cpp SystemTest/BirchTest.cpp + SystemTest/DenStreamTest.cpp + SystemTest/DBStreamTest.cpp ) # linking Google_Tests_run with sesame_lib which will be tested diff --git a/test/SystemTest/DBStreamTest.cpp b/test/SystemTest/DBStreamTest.cpp new file mode 100644 index 00000000..9a4071f5 --- /dev/null +++ b/test/SystemTest/DBStreamTest.cpp @@ -0,0 +1,49 @@ +// Copyright (C) 2021 by the IntelliStream team (https://github.com/intellistream) + +// +// Created by Zhenyu on 2021/9/18. +// + + +#include +#include +#include +#include +#include +#include +#include + +TEST(SystemTest, DBStreamTest) { + //Setup Logs. + setupLogging("benchmark.log", LOG_DEBUG); + //Parse parameters. + param_t cmd_params; + cmd_params.pointNumber = 15120; + cmd_params.dimension = 54; + cmd_params.base=2; + cmd_params.lambda= 0.0001; + cmd_params.radius= 500; + cmd_params.cleanUpInterval=3; + cmd_params.weightMin=2; + cmd_params.alpha=0.25; + cmd_params.inputPath = std::filesystem::current_path().generic_string() + "/datasets/CoverType.txt"; + cmd_params.outputPath = "results.txt"; + cmd_params.algoType = SESAME::DBStreamType; + + std::vector input; + std::vector results; + + //Create Spout. + SESAME::DataSourcePtr sourcePtr = SESAME::DataSourceFactory::create(); + //Directly load data from file. TODO: configure it to load from external sensors, e.g., HTTP. + BenchmarkUtils::loadData(cmd_params, sourcePtr); + + //Create Sink. + SESAME::DataSinkPtr sinkPtr = SESAME::DataSinkFactory::create(); + + //Create Algorithm. + SESAME::AlgorithmPtr algoPtr = SESAME::AlgorithmFactory::create(cmd_params); + + //Run algorithm producing results. + BenchmarkUtils::runBenchmark(cmd_params, sourcePtr, sinkPtr, algoPtr); +} \ No newline at end of file