Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make store dirty-mark methods public. Add InternalStats() #176

Merged
merged 2 commits into from
Sep 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(
"crdt Datastore created. Number of heads: %d. Current max-height: %d. Dirty: %t",
len(headList),
maxHeight,
dstore.isDirty(),
dstore.IsDirty(),
)

// sendJobWorker + NumWorkers
Expand Down Expand Up @@ -372,7 +372,7 @@ func (store *Datastore) handleNext() {
// processed, thus it did not leave a branch
// half-processed and there's nothign to
// recover.
// disabled: store.markDirty()
// disabled: store.MarkDirty()
}
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (store *Datastore) repair() {
}
return
case <-timer.C:
if !store.isDirty() {
if !store.IsDirty() {
store.logger.Info("store is marked clean. No need to repair")
} else {
store.logger.Warn("store is marked dirty. Starting DAG repair operation")
Expand Down Expand Up @@ -564,7 +564,7 @@ func (store *Datastore) logStats() {
len(heads),
height,
len(store.jobQueue),
store.isDirty(),
store.IsDirty(),
)
case <-store.ctx.Done():
ticker.Stop()
Expand Down Expand Up @@ -631,15 +631,15 @@ func (store *Datastore) dagWorker() {

if err != nil {
store.logger.Error(err)
store.markDirty()
store.MarkDirty()
job.session.Done()
continue
}
go func(j *dagJob) {
err := store.sendNewJobs(j.session, j.nodeGetter, j.root, j.rootPrio, children)
if err != nil {
store.logger.Error(err)
store.markDirty()
store.MarkDirty()
}
j.session.Done()
}(job)
Expand Down Expand Up @@ -722,7 +722,7 @@ func (store *Datastore) sendJobWorker() {
case <-store.ctx.Done():
if len(store.sendJobs) > 0 {
// we left something in the queue
store.markDirty()
store.MarkDirty()
}
close(store.jobQueue)
return
Expand All @@ -748,23 +748,26 @@ func (store *Datastore) dirtyKey() ds.Key {
return store.namespace.ChildString(dirtyBitKey)
}

func (store *Datastore) markDirty() {
store.logger.Error("marking datastore as dirty")
// MarkDirty marks the Datastore as dirty.
func (store *Datastore) MarkDirty() {
store.logger.Warn("marking datastore as dirty")
err := store.store.Put(store.ctx, store.dirtyKey(), nil)
if err != nil {
store.logger.Errorf("error setting dirty bit: %s", err)
}
}

func (store *Datastore) isDirty() bool {
// IsDirty returns whether the datastore is marked dirty.
func (store *Datastore) IsDirty() bool {
ok, err := store.store.Has(store.ctx, store.dirtyKey())
if err != nil {
store.logger.Errorf("error checking dirty bit: %s", err)
}
return ok
}

func (store *Datastore) markClean() {
// MarkClean removes the dirty mark from the datastore.
func (store *Datastore) MarkClean() {
store.logger.Info("marking datastore as clean")
err := store.store.Delete(store.ctx, store.dirtyKey())
if err != nil {
Expand Down Expand Up @@ -880,7 +883,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
return children, nil
}

// repairDAG is used to walk down the chain until a non-processed node is
// RepairDAG is used to walk down the chain until a non-processed node is
// found and at that moment, queues it for processing.
func (store *Datastore) repairDAG() error {
start := time.Now()
Expand Down Expand Up @@ -986,7 +989,7 @@ func (store *Datastore) repairDAG() error {

// If we are here we have successfully reprocessed the chain until the
// bottom.
store.markClean()
store.MarkClean()
return nil
}

Expand Down Expand Up @@ -1023,15 +1026,14 @@ func (store *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err
// Query searches the datastore and returns a query result. This function
// may return before the query actually runs. To wait for the query:
//
// result, _ := ds.Query(q)
// result, _ := ds.Query(q)
//
// // use the channel interface; result may come in at different times
// for entry := range result.Next() { ... }
//
// // or wait for the query to be completely done
// entries, _ := result.Rest()
// for entry := range entries { ... }
// // use the channel interface; result may come in at different times
// for entry := range result.Next() { ... }
//
// // or wait for the query to be completely done
// entries, _ := result.Rest()
// for entry := range entries { ... }
func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) {
qr, err := store.set.Elements(ctx, q)
if err != nil {
Expand Down Expand Up @@ -1108,7 +1110,7 @@ func (store *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
func (store *Datastore) Close() error {
store.cancel()
store.wg.Wait()
if store.isDirty() {
if store.IsDirty() {
store.logger.Warn("datastore is being closed marked as dirty")
}
return nil
Expand Down Expand Up @@ -1257,7 +1259,7 @@ func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) {
nd,
)
if err != nil {
store.markDirty() // not sure if this will fix much if this happens.
store.MarkDirty() // not sure if this will fix much if this happens.
return cid.Undef, errors.Wrap(err, "error processing new block")
}
if len(children) != 0 {
Expand Down Expand Up @@ -1464,6 +1466,26 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
return nil
}

// Stats wraps internal information about the datastore.
// Might be expanded in the future.
type Stats struct {
Heads []cid.Cid
MaxHeight uint64
QueuedJobs int
}

// InternalStats returns internal datastore information like the current heads
// and max height.
func (store *Datastore) InternalStats() Stats {
heads, height, _ := store.heads.List()

return Stats{
Heads: heads,
MaxHeight: height,
QueuedJobs: len(store.jobQueue),
}
}

type cidSafeSet struct {
set map[cid.Cid]struct{}
mux sync.RWMutex
Expand Down