Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finish DBStream #66

Merged
merged 24 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7e335dd
Finish Coding half of DBStream
GabrielWuNR Aug 30, 2021
9d6eb5a
Finish Coding online part
GabrielWuNR Aug 31, 2021
5876359
Finish DBStream coding, still has problems in OfflineClustering, how …
GabrielWuNR Sep 1, 2021
5823f2e
#56
ShuhaoZhangTony Sep 1, 2021
a3c1392
Finish Coding half of DBStream
GabrielWuNR Aug 30, 2021
2b9ff72
Finish Coding half of DBStream
GabrielWuNR Aug 30, 2021
656e35c
Finish Coding online part
GabrielWuNR Aug 31, 2021
a97bc97
Finish DBStream coding, still has problems in OfflineClustering, how …
GabrielWuNR Sep 1, 2021
69283c8
Finish Coding half of DBStream
GabrielWuNR Aug 30, 2021
015e611
Finish Coding online part
GabrielWuNR Aug 31, 2021
9db285a
Finish DBStream coding, still has problems in OfflineClustering, how …
GabrielWuNR Sep 1, 2021
3c7c65d
Merge remote-tracking branch 'origin/DBStream-wzy' into DBStream-wzy
GabrielWuNR Sep 1, 2021
b46c6fc
Fix Time interval bugs in DenStream and CluStream
ShuhaoZhangTony Aug 30, 2021
d8637c7
Merge remote-tracking branch 'origin/main' into DBStream-wzy
GabrielWuNR Sep 3, 2021
b97fe7c
Merge remote-tracking branch 'origin/DBStream-wzy' into DBStream-wzy
GabrielWuNR Sep 3, 2021
759d610
Fix Time interval bugs in DenStream and CluStream
GabrielWuNR Sep 17, 2021
2c0b0cd
modify Micro cluster insert data function and debug online part
GabrielWuNR Sep 18, 2021
e5f11dd
modify Micro cluster insert data function and debug online part
GabrielWuNR Sep 18, 2021
681e950
all test passed but still have bugs in Unordered_map using customed d…
GabrielWuNR Sep 19, 2021
4e48534
all test passed but still have bugs in Unordered_map using customed d…
GabrielWuNR Sep 19, 2021
0370f59
Only need to fix bug in clean up functions when erase ...
GabrielWuNR Sep 19, 2021
9233b34
all bugs fixed, but still have some in unordered_map
GabrielWuNR Sep 20, 2021
722ec41
all fixed in DBStream
GabrielWuNR Sep 21, 2021
55bb552
reformat and recover other test
GabrielWuNR Sep 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/Algorithm/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
using namespace std;
namespace SESAME {

enum algoType { BirchType, StreamKMeansType, CluStreamType, DenStreamType };
enum algoType { BirchType, StreamKMeansType, CluStreamType, DenStreamType, DBStreamType};

class Algorithm;
typedef std::shared_ptr<Algorithm> AlgorithmPtr;
Expand Down
2 changes: 1 addition & 1 deletion include/Algorithm/CluStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class CluStream : public Algorithm {
int pointsForgot;
int pointsMerged;
clock_t startTime;
clock_t lastTime;
clock_t lastUpdateTime;
CluStream(param_t &cmd_params);
~CluStream();

Expand Down
61 changes: 61 additions & 0 deletions include/Algorithm/DBStream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Created by 1124a on 2021/8/30.
//

#ifndef SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_
#define SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_
#include <Algorithm/Algorithm.hpp>
#include <Algorithm/DataStructure/WeightedAdjacencyList.hpp>
#include <Utils/BenchmarkUtils.hpp>

namespace SESAME {
typedef std::vector<std::vector<MicroClusterPtr>> Clusters;
class DBStreamParams : public AlgorithmParameters {
public:
double radius;
double lambda;
int cleanUpInterval;//Time gap
double weightMin;//minimum weight
double alpha;//α, intersection factor
double base;//base of decay function
};


class DBStream : public Algorithm
{
public:
DBStreamParams dbStreamParams;
DampedWindowPtr dampedWindow;
std::vector<MicroClusterPtr> microClusters;
SESAME::WeightedAdjacencyList weightedAdjacencyList;
std::vector<MicroClusterPtr> microClusterNN;//micro clusters found in function findFixedRadiusNN
int weakEntry;//W_weak, weak entries
double aWeakEntry;
clock_t startTime;
clock_t pointArrivingTime;
clock_t lastCleanTime;
int microClusterIndex;
//Final output of clusters
Clusters finalClusters;
//Connectivity graph
unordered_map<int,std::vector<int>> connecvtivityGraphId;

//TODO Need to implement weighted a weighted adjacency list S
DBStream(param_t &cmd_params);
~DBStream();
void Initilize() override;
void runOnlineClustering(PointPtr input) override;
void runOfflineClustering(DataSinkPtr sinkPtr) override;
private:
bool isInitial = false;
void update(PointPtr dataPoint);
bool checkMove( std::vector<MicroClusterPtr> microClusters) const;
std::vector<MicroClusterPtr> findFixedRadiusNN(PointPtr dataPoint);
void cleanUp(clock_t nowTime);
void reCluster(double threshold);
void insertIntoGraph(int microClusterId,int OtherId);
void insertIntoGraph(int microClusterId);
void findConnectedComponents();
};
}
#endif //SESAME_INCLUDE_ALGORITHM_DBSTREAM_HPP_
8 changes: 6 additions & 2 deletions include/Algorithm/DataStructure/DataStructureFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <Algorithm/DataStructure/Snapshot.hpp>
#include <Algorithm/DataStructure/CFTree.hpp>
#include <Algorithm/DataStructure/FeatureVector.hpp>

#include <Algorithm/DataStructure/WeightedAdjacencyList.hpp>
namespace SESAME {
class DataStructureFactory {

Expand All @@ -28,12 +28,16 @@ class DataStructureFactory {
static CoresetTreePtr createCoresetTree();
static void clearCoresetTree(CoresetTreePtr tree);
static MicroClusterPtr createMicroCluster(int dimension, int id);
static MicroClusterPtr createMicroCluster(int dimension, int id,PointPtr dataPoint,double radius);
static void clearMicroCluster(MicroClusterPtr microCluster);
static SnapshotPtr createSnapshot(MicroClusters & otherMicroClusters,int elapsedTime);
static void clearSnapshot(SnapshotPtr snapshot);
static CFTreePtr createCFTree();
static NodePtr createNode();

static MicroClusterPairPtr createMicroClusterPair(MicroClusterPtr microCluster1,MicroClusterPtr microCluster2);
static void clearMicroClusterPair(MicroClusterPairPtr microClusterPair);
static AdjustedWeightPtr createAdjustedWeight(double weight, clock_t pointTime);
static void clearAdjustedWeight(AdjustedWeightPtr adjustedWeight);
};
}
#endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_DATASTRUCTUREFACTORY_HPP_
34 changes: 26 additions & 8 deletions include/Algorithm/DataStructure/MicroCluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,26 @@ class MicroCluster {
int SST;//the sum of the squares of the time stamps Til... Tin
double weight; //number of data point in the clusters
int dimension;

//TODO Need to subtract Base class of CF vector when all cf-vector based-algorithms have been implemented
double radius;//Used in DBStream
//the parameters below is unique for DenStream
clock_t createTime;
clock_t lastUpdateTime;
bool visited;

//TODO this may need to modify in the future (All algorithms used this, e.g.DenStream,CluStream,DenStream,DBStream,SWEM =.=)
//TODO 1. Need to subtract Base class of CF vector when all cf-vector based-algorithms have been implemented
// 2.this may need to modify in the future (All algorithms used this, e.g.DenStream,CluStream,DenStream,DBStream,SWEM =.=)


MicroCluster(int dimension, int id);
MicroCluster(int dimension, int id,PointPtr dataPoint,double radius);//DBStream

~MicroCluster();
void init(PointPtr datapoint, int timestamp);

void insert(PointPtr datapoint, int timestamp);
bool insert(PointPtr datapoint,double decayFactor,double epsilon);//Used in DenStream

void insert(PointPtr datapoint, int timestamp);//Used in CluStream
bool insert(PointPtr datapoint,double decayFactor,double epsilon);// DenStream
void insert(PointPtr datapoint, double decayFactor);//DBStream
void merge(MicroClusterPtr other);
void substractClusterVector(MicroClusterPtr other);
void subtractClusterVector(MicroClusterPtr other);
void updateId(MicroClusterPtr other);

void resetID(int index); //Used in DenStream
Expand All @@ -65,10 +68,25 @@ class MicroCluster {
dataPoint getVarianceVector();
double calCentroidDistance(PointPtr datapoint);
bool judgeMerge(MicroClusterPtr other);
double getDistance(PointPtr datapoint);//DBStream
double getDistance(MicroClusterPtr other);//DBStream
void move();//DBStream
void decayWeight(double decayFactor);
SESAME::MicroClusterPtr copy();

private:
double distance;
static double inverseError(double x);
};
typedef struct finderMicroCluster
{
finderMicroCluster(int n) : id(n) { }
bool operator()(const MicroClusterPtr MC) const
{
return (id == MC->id.front());
}
int id;
}finderMicroCluster;

}
#endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_MICROCLUSTER_HPP_
64 changes: 64 additions & 0 deletions include/Algorithm/DataStructure/WeightedAdjacencyList.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Created by 1124a on 2021/8/30.
//

#ifndef SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_
#define SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_
#include <algorithm>
#include <functional>
#include <unordered_map>
#include<Algorithm/DataStructure/Point.hpp>
#include<Algorithm/DataStructure/MicroCluster.hpp>
#include<Algorithm/WindowModel/DampedWindow.hpp>

namespace SESAME {
struct MicroClusterPair;
typedef std::shared_ptr<MicroClusterPair> MicroClusterPairPtr;
struct MicroClusterPair{
MicroClusterPtr microCluster1;
MicroClusterPtr microCluster2;
MicroClusterPair( MicroClusterPtr microCluster1,MicroClusterPtr microCluster2){
this->microCluster1=microCluster1->copy();
this->microCluster2=microCluster2->copy();
}
//bool operator==(const MicroClusterPair &other) const;

};

struct KeyHasher{
std::size_t operator()(const MicroClusterPair &microClusterPair) const
{
return (std::hash<int>()(microClusterPair.microCluster1->id.front())) ^ (std::hash<int>()(microClusterPair.microCluster2->id.front()));
}
};

struct EqualKey
{
bool operator() (const MicroClusterPair &MCPair1, const MicroClusterPair &MCPair2) const
{
bool equal=false;
if( MCPair1.microCluster1->id.front()==MCPair2.microCluster1->id.front() &&MCPair1.microCluster2->id.front()==MCPair2.microCluster2->id.front() )
equal=true;
if(MCPair1.microCluster1->id.front() ==MCPair2.microCluster2->id.front() &&MCPair1.microCluster2->id.front()==MCPair2.microCluster1->id.front() )
equal=true;

return equal;
}
};


class AdjustedWeight;
typedef std::shared_ptr<AdjustedWeight> AdjustedWeightPtr;
class AdjustedWeight{
public:
double weight;
clock_t updateTime;
AdjustedWeight(double weight, clock_t pointTime);
void add(clock_t startTime,double decayValue);
double getCurrentWeight(double decayFactor);
};

typedef std::unordered_map<MicroClusterPair,AdjustedWeightPtr,KeyHasher,EqualKey> WeightedAdjacencyList;
typedef std::pair<MicroClusterPair, AdjustedWeightPtr> DensityGraph;
}
#endif //SESAME_INCLUDE_ALGORITHM_DATASTRUCTURE_WEIGHTEDADJACENCYLIST_HPP_
5 changes: 5 additions & 0 deletions include/Utils/BenchmarkUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ struct param_t {
double lambda;
double mu;
double beta;
//used in DBStream
double radius;
int cleanUpInterval;
double weightMin;
double alpha;
std::string inputPath;
std::string outputPath;
SESAME::algoType algoType;
Expand Down
5 changes: 5 additions & 0 deletions src/Algorithm/AlgorithmFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Algorithm/StreamKM.hpp>
#include <Algorithm/CluStream.hpp>
#include <Algorithm/DenStream.hpp>
#include <Algorithm/DBStream.hpp>
#include <Algorithm/Birch.hpp>
#include <Algorithm/AlgorithmFactory.hpp>

Expand All @@ -27,5 +28,9 @@ SESAME::AlgorithmPtr SESAME::AlgorithmFactory::create(param_t &cmd_params) {
shared_ptr<DenStream> denStream = std::make_shared<DenStream>(cmd_params);
return (SESAME::AlgorithmPtr) denStream;
}
if (cmd_params.algoType == DBStreamType) {
shared_ptr<DBStream> dbStream = std::make_shared<DBStream>(cmd_params);
return (SESAME::AlgorithmPtr) dbStream;
}
throw std::invalid_argument("Unsupported");
}
1 change: 1 addition & 0 deletions src/Algorithm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_source_sesame(
CluStream.cpp
Birch.cpp
DenStream.cpp
DBStream.cpp
Algorithm.cpp
AlgorithmFactory.cpp
)
Expand Down
44 changes: 22 additions & 22 deletions src/Algorithm/CluStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <iterator>
/**
* @Description: "offline" init micro clusters using KMeans
* @param size: The size of initial data obects,
* initialData:input intial data
* @param size: The size of initial data objects,
* initialData:input initial data
*@Return: void
*/
SESAME::CluStream::CluStream(param_t &cmd_params) {
Expand Down Expand Up @@ -98,7 +98,7 @@ double SESAME::CluStream::calRadius(MicroClusterPtr closestCluster) {
radius = doubleMax;
dataPoint centroid = closestCluster->getCentroid();
for (int i = 0; i < this->CluStreamParam.clusterNumber; i++) {
// SESAME_INFO("i is for wegf"<<i);

if (microClusters[i]->id == closestCluster->id) {
continue;
}
Expand All @@ -118,7 +118,7 @@ void SESAME::CluStream::insertIntoCluster(PointPtr data, MicroClusterPtr operate
operateCluster->insert(data, timestamp);
}

//Delete oldest cluster and create new one case
//Delete the oldest cluster and create new one case
void SESAME::CluStream::deleteCreateCluster(PointPtr data) {
// 3.1 Try to forget old micro clusters

Expand Down Expand Up @@ -172,7 +172,7 @@ void SESAME::CluStream::microClusterToPoint(std::vector<MicroClusterPtr> &microC
for (int i = 0; i < this->CluStreamParam.clusterNumber; i++) {
PointPtr
point = DataStructureFactory::createPoint(i, microClusters[i]->weight, microClusters[i]->centroid.size(), 0);
for (int j = 0; j < microClusters[i]->centroid.size(); j++)
for (SESAME::dataPoint::size_type j = 0; j < microClusters[i]->centroid.size(); j++)
point->setFeatureItem(microClusters[i]->centroid[j], j);
//points;
points.push_back(point);
Expand Down Expand Up @@ -200,7 +200,7 @@ void SESAME::CluStream::Initilize() {
this->window = WindowFactory::createLandmarkWindow();
this->window->pyramidalWindow.timeInterval = this->CluStreamParam.timeInterval;
this->startTime = clock();
this->lastTime= this->startTime;
this->lastUpdateTime=this->startTime;
window->initPyramidalWindow(this->window->pyramidalWindow.timeInterval);

}
Expand Down Expand Up @@ -235,11 +235,11 @@ void SESAME::CluStream::runOnlineClustering(SESAME::PointPtr input) {
} else {
int interval;
clock_t now = clock();
interval = (int) ((now - this->lastTime) / CLOCKS_PER_SEC);
if (interval >= 1)//
interval = (int) ((now - lastUpdateTime) / CLOCKS_PER_SEC);
if (interval >= 1)
{
window->pyramidalWindowProcess(startTime, microClusters);
lastTime = now;
lastUpdateTime = now;
}
incrementalCluster(input);

Expand All @@ -255,17 +255,17 @@ void SESAME::CluStream::runOfflineClustering(SESAME::DataSinkPtr sinkPtr) {
landmarkTime = 0;
SESAME_INFO("Start offline...");
SESAME::SnapshotPtr landmarkSnapshot;
SESAME::SnapshotPtr substractMiroCluster;
SESAME::SnapshotPtr subtractMiroCluster;
//If offlineTimeWindow ==0, Only Observe the end results of micro clusters
substractMiroCluster =
subtractMiroCluster =
DataStructureFactory::createSnapshot(microClusters, (int) ((now - startTime) / CLOCKS_PER_SEC));
SESAME_INFO("Now Miro Cluster is...");
for (int i = 0; i < CluStreamParam.clusterNumber; i++) {
std::stringstream result, re2;
std::copy(substractMiroCluster->microClusters[i]->id.begin(),
substractMiroCluster->microClusters[i]->id.end(),
std::copy(subtractMiroCluster->microClusters[i]->id.begin(),
subtractMiroCluster->microClusters[i]->id.end(),
std::ostream_iterator<int>(re2, " "));
SESAME_INFO("The ID is " << re2.str() << "weight is " << substractMiroCluster->microClusters[i]->weight);
SESAME_INFO("The ID is " << re2.str() << "weight is " << subtractMiroCluster->microClusters[i]->weight);
}

//The offline is to observe a process of data stream clustering
Expand All @@ -282,21 +282,21 @@ void SESAME::CluStream::runOfflineClustering(SESAME::DataSinkPtr sinkPtr) {
SESAME_INFO("The ID is " << re2.str() << "weight is " << landmarkSnapshot->microClusters[i]->weight);
}
if (landmarkSnapshot->elapsedTime == -1)
landmarkSnapshot = substractMiroCluster;
landmarkSnapshot = subtractMiroCluster;

substractMiroCluster = SESAME::Snapshot::substractSnapshot(substractMiroCluster, landmarkSnapshot,
subtractMiroCluster = SESAME::Snapshot::substractSnapshot(subtractMiroCluster, landmarkSnapshot,
this->CluStreamParam.clusterNumber);
}
SESAME_INFO("substract Miro Cluster is...");
SESAME_INFO("subtract Miro Cluster is...");
for (int i = 0; i < CluStreamParam.clusterNumber; i++) {
std::stringstream result, re2;
std::copy(substractMiroCluster->microClusters[i]->id.begin(),
substractMiroCluster->microClusters[i]->id.end(),
std::stringstream re2;
std::copy(subtractMiroCluster->microClusters[i]->id.begin(),
subtractMiroCluster->microClusters[i]->id.end(),
std::ostream_iterator<int>(re2, " "));
SESAME_INFO("The ID is " << re2.str() << "weight is " << substractMiroCluster->microClusters[i]->weight);
SESAME_INFO("The ID is " << re2.str() << "weight is " << subtractMiroCluster->microClusters[i]->weight);
}
vector <PointPtr> TransformedSnapshot;
microClusterToPoint(substractMiroCluster->microClusters, TransformedSnapshot);
microClusterToPoint(subtractMiroCluster->microClusters, TransformedSnapshot);

SESAME_INFO("offline Cluster Number " << this->CluStreamParam.offlineClusterNumber << "Total number of p: "
<< TransformedSnapshot.size());
Expand Down
Loading