Skip to content

Commit

Permalink
Subtract connection based offline refinement. which can be used later…
Browse files Browse the repository at this point in the history
… in refactor part (#76)
  • Loading branch information
GabrielWuNR authored Nov 24, 2021
1 parent 4f1d8fd commit a730991
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 27 deletions.
12 changes: 9 additions & 3 deletions include/Algorithm/DBStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Algorithm/Algorithm.hpp>
#include <Algorithm/DataStructure/WeightedAdjacencyList.hpp>
#include <Utils/BenchmarkUtils.hpp>
#include <Algorithm/OfflineClustering/ConnectedRegions.hpp>

namespace SESAME {
typedef std::vector<std::vector<MicroClusterPtr>> Clusters;
Expand Down Expand Up @@ -37,8 +38,10 @@ class DBStream : public Algorithm
int microClusterIndex;
//Final output of clusters
Clusters finalClusters;
ConnectedRegions connectedRegions;
//Connectivity graph
unordered_map<int,std::vector<int>> connecvtivityGraphId;

// unordered_map<int,std::vector<int>> connecvtivityGraphId;

//TODO Need to implement weighted a weighted adjacency list S
DBStream(param_t &cmd_params);
Expand All @@ -48,14 +51,17 @@ class DBStream : public Algorithm
void runOfflineClustering(DataSinkPtr sinkPtr) override;
private:
bool isInitial = false;

void update(PointPtr dataPoint);
bool checkMove( std::vector<MicroClusterPtr> microClusters) const;
std::vector<MicroClusterPtr> findFixedRadiusNN(PointPtr dataPoint);
void cleanUp(clock_t nowTime);
void reCluster(double threshold);
//The underline has moved to offline refinement
/* void reCluster(double threshold);
void insertIntoGraph(int microClusterId,int OtherId);
void insertIntoGraph(int microClusterId);
void findConnectedComponents();
void findConnectedComponents();*/
};

}
#endif //SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_
52 changes: 52 additions & 0 deletions include/Algorithm/OfflineClustering/ConnectedRegions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Created by 1124a on 2021/11/23.
//

#ifndef SESAME_INCLUDE_ALGORITHM_OFFLINECLUSTERING_CONNECTEDREGIONS_HPP_
#define SESAME_INCLUDE_ALGORITHM_OFFLINECLUSTERING_CONNECTEDREGIONS_HPP_

#include <Algorithm/DataStructure/WeightedAdjacencyList.hpp>
#include <Algorithm/DataStructure/MicroCluster.hpp>
#include <Algorithm/DataStructure/Point.hpp>
#include <Algorithm/OfflineClustering/OfflineClustering.hpp>
#include <Algorithm/DataStructure/DataStructureFactory.hpp>
#include <Utils/Logger.hpp>
namespace SESAME {
class ConnectedRegions : public SESAME::OfflineClustering {
public:
double alpha; //intersection factor, alpha
double weightMin; //minimum weight
std::vector<std::vector<MicroClusterPtr>> finalClusters;
unordered_map<int,std::vector<int>> connecvtivityGraphId;
ConnectedRegions();
ConnectedRegions(double alpha, double weightMin);
void connection( std::vector<MicroClusterPtr>& microClusters,
SESAME::WeightedAdjacencyList weightedAdjacencyList);
std::vector<PointPtr> ResultsToDataSink();

/**
* @Description: insert vertices and entries into connectivity graph when micro cluster pair
* connectivity value greater than the intersection threshold
* if the graph has testing micro cluster, add connected strong MC in the corresponding entries
* else, create new V,E into the graph
* @Param: connectivity graph, micro cluster 1 and 2
* @Return: void
*/
void insertIntoGraph( std::vector<MicroClusterPtr> microClusters,
int microClusterId,int OtherId);
void insertIntoGraph( std::vector<MicroClusterPtr> microClusters,
int microClusterId);
/**
* @Description: findConnectedComponents function visit the existing connectivity graph
* and find all connected strong MCs that will finally form arbitrary-shaped macro clusters
* each macro cluster will be stored as a vector of micro clusters, which will be transformed into
* point that stores in sink later
* @Param: connectivity graph
* @Return: void
*/
void findConnectedComponents(std::vector<MicroClusterPtr> microClusters);

};
}

#endif //SESAME_INCLUDE_ALGORITHM_OFFLINECLUSTERING_CONNECTEDREGIONS_HPP_
47 changes: 24 additions & 23 deletions src/Algorithm/DBStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
#include <Algorithm/DBStream.hpp>
#include <Algorithm/WindowModel/WindowFactory.hpp>
#include <Algorithm/DataStructure/DataStructureFactory.hpp>

/**
* @Description: initialize user defined parameters,
Expand Down Expand Up @@ -42,6 +41,7 @@ void SESAME::DBStream::Initilize() {
this->weakEntry= ceil(pow(dbStreamParams.base,(-1)*dbStreamParams.lambda*dbStreamParams.cleanUpInterval));
this->aWeakEntry=ceil(weakEntry*dbStreamParams.alpha);
this->microClusterIndex=-1;
connectedRegions = ConnectedRegions(dbStreamParams.alpha, dbStreamParams.weightMin);
}
/**
* @Description: online clustering stage, input data point incrementally and update the MC list and weight adjacency lists,
Expand Down Expand Up @@ -81,14 +81,20 @@ void SESAME::DBStream::update(PointPtr dataPoint){
this->microClusterNN=findFixedRadiusNN(dataPoint);
std::vector<MicroClusterPtr>::size_type sizeNN=microClusterNN.size();
// SESAME_INFO("find suitable MCs number : "<<sizeNN);
/**
* If this point fits in no micro clusters
* */
if (microClusterNN.empty()) {
microClusterIndex++;
MicroClusterPtr newMicroCluster=SESAME::DataStructureFactory::createMicroCluster(dbStreamParams.dimension,microClusterIndex,
MicroClusterPtr newMicroCluster = SESAME::DataStructureFactory::createMicroCluster(dbStreamParams.dimension,microClusterIndex,
dataPoint,dbStreamParams.radius);
microClusters.push_back(newMicroCluster);
microClusterNN.push_back(newMicroCluster);
// SESAME_INFO("Add new MC!"<<microClusterIndex);
//SESAME_INFO("Add new MC!"<<microClusterIndex);
}
/**
* If this point fits in at least one micro cluster
* */
else {
for (int i = 0; i < sizeNN; i++) {
microClusterNN[i]->insert(dataPoint,decayFactor); // just update weight
Expand Down Expand Up @@ -194,7 +200,20 @@ void SESAME::DBStream::cleanUp(clock_t nowTime){


void SESAME::DBStream::runOfflineClustering(DataSinkPtr sinkPtr) {
reCluster(dbStreamParams.alpha);
SESAME_INFO("micro clusters "<<microClusters.size());
SESAME_INFO("weightedAdjacencyList "<<weightedAdjacencyList.size());
connectedRegions.connection(microClusters,
weightedAdjacencyList);

std::vector<PointPtr> points = connectedRegions.ResultsToDataSink();
for(auto & point : points)
sinkPtr->put(point->copy());

}


/*
* reCluster(dbStreamParams.alpha);
for(auto iter=0; iter!=finalClusters.size();iter++)
{ //initialize pseudo point of macro clusters
PointPtr point = DataStructureFactory::createPoint(iter, 0, finalClusters.at(iter).front()->dimension, 0);
Expand Down Expand Up @@ -222,8 +241,6 @@ void SESAME::DBStream::runOfflineClustering(DataSinkPtr sinkPtr) {
SESAME_INFO("The NO."<<iter<<" Centroid is "<<re.str());
sinkPtr->put(point->copy()); // point index start from 0
}
}

void SESAME::DBStream::reCluster(double threshold){
WeightedAdjacencyList::iterator iterW;
Expand All @@ -243,14 +260,6 @@ void SESAME::DBStream::reCluster(double threshold){
}
findConnectedComponents();
}
/**
* @Description: insert vertices and entries into connectivity graph when micro cluster pair
* connectivity value greater than the intersection threshold
* if the graph has testing micro cluster, add connected strong MC in the corresponding entries
* else, create new V,E into the graph
* @Param: connectivity graph, micro cluster 1 and 2
* @Return: void
*/
void SESAME::DBStream::insertIntoGraph(int microClusterId,int OtherId){
if (connecvtivityGraphId.find(microClusterId)!=connecvtivityGraphId.end())
Expand All @@ -277,14 +286,6 @@ void SESAME::DBStream::insertIntoGraph(int microClusterId){
}
}
/**
* @Description: findConnectedComponents function visit the existing connectivity graph
* and find all connected strong MCs that will finally form arbitrary-shaped macro clusters
* each macro cluster will be stored as a vector of micro clusters, which will be transformed into
* point that stores in sink later
* @Param: connectivity graph
* @Return: void
*/
void SESAME::DBStream::findConnectedComponents(){
unordered_map<int,std::vector<int>>::iterator iter;
//This variable just for indicating the id of micro cluster which forming macro clusters
Expand Down Expand Up @@ -312,6 +313,6 @@ void SESAME::DBStream::findConnectedComponents(){
//SESAME_INFO("New formed macro cluster ... including micro cluster :");
//SESAME_INFO(" " << result.str() );
}

}
}
*/
4 changes: 3 additions & 1 deletion src/Algorithm/OfflineClustering/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
add_source_sesame(
OfflineClustering.cpp
KMeans.cpp
DBSCAN.cpp)
DBSCAN.cpp
ConnectedRegions.cpp
)
145 changes: 145 additions & 0 deletions src/Algorithm/OfflineClustering/ConnectedRegions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//
// Created by 1124a on 2021/11/23.
//

#include <Algorithm/OfflineClustering/ConnectedRegions.hpp>
SESAME::ConnectedRegions::ConnectedRegions(){

}
SESAME::ConnectedRegions::ConnectedRegions(double alpha, double weightMin){
this->alpha = alpha;
this->weightMin = weightMin;
}
void SESAME::ConnectedRegions::connection( std::vector<MicroClusterPtr>& microClusters,

SESAME::WeightedAdjacencyList weightedAdjacencyList) {
SESAME_INFO("alpha."<<alpha<<" weightMin is "<<weightMin);
WeightedAdjacencyList::iterator iterW;
for (iterW = weightedAdjacencyList.begin(); iterW != weightedAdjacencyList.end(); iterW++){
if (iterW->first.microCluster1->weight >= weightMin &&iterW->first.microCluster2->weight >= weightMin){
double val = 2*iterW->second->weight / (iterW->first.microCluster1->weight+iterW->first.microCluster2->weight);
if (val > alpha) {
insertIntoGraph( microClusters,
iterW->first.microCluster1->id.front(),
iterW->first.microCluster2->id.front());

insertIntoGraph(microClusters,
iterW->first.microCluster2->id.front(),
iterW->first.microCluster1->id.front());

}
else
{
insertIntoGraph(microClusters,
iterW->first.microCluster1->id.front());
insertIntoGraph(microClusters,
iterW->first.microCluster2->id.front());
}
}
}
findConnectedComponents(microClusters);
}
/**
* @Description: insert vertices and entries into connectivity graph when micro cluster pair
* connectivity value greater than the intersection threshold
* if the graph has testing micro cluster, add connected strong MC in the corresponding entries
* else, create new V,E into the graph
* @Param: connectivity graph, micro cluster 1 and 2
* @Return: void
*/


void SESAME::ConnectedRegions::insertIntoGraph( std::vector<MicroClusterPtr> microClusters,
int microClusterId,int OtherId){
if (connecvtivityGraphId.find(microClusterId)!=connecvtivityGraphId.end())
{
if(std::find(connecvtivityGraphId[microClusterId].begin(),connecvtivityGraphId[microClusterId].end(),OtherId)==connecvtivityGraphId[microClusterId].end())
connecvtivityGraphId[microClusterId].push_back(OtherId);
} else{
auto microCluster = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(microClusterId));
(*microCluster)->visited=false;
std::vector<int> newMicroClusterIdSet;
newMicroClusterIdSet.push_back(OtherId);
connecvtivityGraphId.insert(make_pair(microClusterId,OtherId));
}
}


void SESAME::ConnectedRegions::insertIntoGraph( std::vector<MicroClusterPtr> microClusters,
int microClusterId){
if (connecvtivityGraphId.find(microClusterId)==connecvtivityGraphId.end())
{
auto microCluster = std::find_if(microClusters.begin(), microClusters.end(),
SESAME::finderMicroCluster(microClusterId));
(*microCluster)->visited=false;
std::vector<int> newMicroClusterIdSet;
connecvtivityGraphId.insert(make_pair(microClusterId,newMicroClusterIdSet));
}

}

void SESAME::ConnectedRegions::findConnectedComponents(std::vector<SESAME::MicroClusterPtr> microClusters){
// SESAME_INFO("micro clusters "<<microClusters.size());
// SESAME_INFO("connectivity Graph "<<connecvtivityGraphId.size());

unordered_map<int,std::vector<int>>::iterator iter;
//This variable just for indicating the id of micro cluster which forming macro clusters
for (iter = connecvtivityGraphId.begin(); iter != connecvtivityGraphId.end(); iter++){
std::vector<int> idList;
auto microClusterKey = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(iter->first));
if (!(*microClusterKey)->visited) {
std::vector<SESAME::MicroClusterPtr> newCluster;
newCluster.push_back((*microClusterKey));
idList.push_back(iter->first);
for(int iterValue : iter->second)
{
auto microClusterElement = std::find_if(microClusters.begin(), microClusters.end(),SESAME::finderMicroCluster(iterValue));
if (!(*microClusterElement)->visited)
{
newCluster.push_back((*microClusterElement));
(*microClusterElement)->visited = true;
idList.push_back((*microClusterElement)->id.front());
}
}
this->finalClusters.push_back(newCluster);
//just used for examine reform ,need to delete later
// std::stringstream result;
// std::copy(idList.begin(),idList.end(),std::ostream_iterator<int>(result, " "));
// SESAME_INFO("New formed macro cluster ... including micro cluster :");
// SESAME_INFO(" " << result.str() );
}
}
}

std::vector<SESAME::PointPtr> SESAME::ConnectedRegions::ResultsToDataSink(){
// SESAME_INFO("Start resize "<<finalClusters.size());
std::vector<SESAME::PointPtr> points;
for(auto iter=0; iter!=finalClusters.size();iter++)
{ //initialize pseudo point of macro clusters
PointPtr point = DataStructureFactory::createPoint(iter, 0, finalClusters.at(iter).front()->dimension, 0);
//This is just for testing, need to delete
std::vector<double> centroid(finalClusters.at(iter).front()->dimension,0);
for(auto j=0; j!=finalClusters.at(iter).size();j++)
{
double currentWeight=point->getWeight()+finalClusters.at(iter).at(j)->weight;
point->setWeight(currentWeight);
for(auto a =0;a<finalClusters.at(iter).at(j)->dimension;a++)
{
if(j==0)
point->setFeatureItem(0,a);
point->setFeatureItem(point->getFeatureItem(a)+finalClusters.at(iter).at(j)->centroid.at(a),a);
centroid[a]=point->getFeatureItem(a);//testing
if(j==finalClusters.at(iter).size()-1)
{
point->setFeatureItem(point->getFeatureItem(a)/finalClusters.at(iter).at(j)->dimension,a);
centroid[a] =centroid[a]/finalClusters.at(iter).at(j)->dimension;//testing
}
}
}
points.push_back(point);
// std::stringstream results;
// std::copy(centroid.begin(),centroid.end(),std::ostream_iterator<double>(results, " "));
// SESAME_INFO("The NO."<<iter<<" Centroid is "<<results.str());
}
return points;
}

0 comments on commit a730991

Please sign in to comment.