Skip to content

Commit

Permalink
[ENH] Batched GC (#3293)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
   - During log GC, we now get collections in batches of 1000 instead of 1. This would reduce the number of round trips between log service and sysdb significantly. Note that this PR does not reduce the number of calls that sysdb service makes to the database - it is still one per collection. We can utilize the SQL 'IN' operator to do so. Reason is: all of this is short term and eventually the GC service will do it (which means that we will rewrite this in the next 6 months anyways).
   - The PR also changes the GC frequency of deleted collections from 10 secs to 2 hours. 10 secs is too high since deletions are infrequent.

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Dec 13, 2024
1 parent 5a61a95 commit 6b01bd5
Show file tree
Hide file tree
Showing 17 changed files with 627 additions and 391 deletions.
54 changes: 29 additions & 25 deletions chromadb/proto/coordinator_pb2.py

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions chromadb/proto/coordinator_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions chromadb/proto/coordinator_pb2_grpc.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions go/pkg/log/purging/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,28 @@ import (
)

func PerformPurgingLoop(ctx context.Context, lg *repository.LogRepository) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// Log purge runs every 10 seconds.
purgeTicker := time.NewTicker(10 * time.Second)
// GC runs every 2 hours.
gcTicker := time.NewTicker(2 * time.Hour)
defer purgeTicker.Stop()
defer gcTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-purgeTicker.C:
if err := lg.PurgeRecords(ctx); err != nil {
log.Error("failed to purge records", zap.Error(err))
continue
}

log.Info("purged records")
case <-gcTicker.C:
// TODO: Add a RPC to manually trigger garbage collection
if err := lg.GarbageCollection(ctx); err != nil {
log.Error("failed to garbage collect", zap.Error(err))
continue
}
log.Info("garbage collected")
}
}
}
20 changes: 10 additions & 10 deletions go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (r *LogRepository) UpdateCollectionCompactionOffsetPosition(ctx context.Con
}

func (r *LogRepository) PurgeRecords(ctx context.Context) (err error) {
trace_log.Info("Purging records from record_log table")
err = r.queries.PurgeRecords(ctx)
return
}
Expand All @@ -175,21 +174,24 @@ func (r *LogRepository) GarbageCollection(ctx context.Context) error {
if err != nil {
trace_log.Error("Error in getting collections to compact", zap.Error(err))
return err
} else {
trace_log.Info("GC Got collections to compact", zap.Int("collectionCount", len(collectionToCompact)))
}
if collectionToCompact == nil {
return nil
}
collectionsToGC := make([]string, 0)
for _, collection := range collectionToCompact {
exist, err := r.sysDb.CheckCollection(ctx, collection)
// TODO(Sanket): Make batch size configurable.
batchSize := 5000
for i := 0; i < len(collectionToCompact); i += batchSize {
end := min(len(collectionToCompact), i+batchSize)
exists, err := r.sysDb.CheckCollections(ctx, collectionToCompact[i:end])
if err != nil {
trace_log.Error("Error in checking collection in sysdb", zap.Error(err), zap.String("collectionId", collection))
trace_log.Error("Error in checking collection in sysdb", zap.Error(err))
continue
}
if !exist {
collectionsToGC = append(collectionsToGC, collection)
for offset, exist := range exists {
if !exist {
collectionsToGC = append(collectionsToGC, collectionToCompact[i+offset])
}
}
}
if len(collectionsToGC) > 0 {
Expand All @@ -208,13 +210,11 @@ func (r *LogRepository) GarbageCollection(ctx context.Context) error {
err = tx.Commit(ctx)
}
}()
trace_log.Info("Starting garbage collection", zap.Strings("collections", collectionsToGC))
err = queriesWithTx.DeleteRecords(ctx, collectionsToGC)
if err != nil {
trace_log.Error("Error in garbage collection", zap.Error(err))
return err
}
trace_log.Info("Delete collections", zap.Strings("collections", collectionsToGC))
err = queriesWithTx.DeleteCollection(ctx, collectionsToGC)
if err != nil {
trace_log.Error("Error in deleting collection", zap.Error(err))
Expand Down
6 changes: 4 additions & 2 deletions go/pkg/log/server/property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ func (suite *LogServerTestSuite) modelPurgeLogs(ctx context.Context, t *rapid.T)

func (suite *LogServerTestSuite) modelGarbageCollection(ctx context.Context, t *rapid.T) {
for id := range suite.model.CollectionData {
exist, err := suite.sysDb.CheckCollection(ctx, id.String())
exists, err := suite.sysDb.CheckCollections(ctx, []string{id.String()})
if err != nil {
t.Fatal(err)
}
exist := exists[0]
if !exist {
// Collection does not exist, so we can delete it
delete(suite.model.CollectionData, id)
Expand Down Expand Up @@ -503,8 +504,9 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {
err := suite.lr.GarbageCollection(ctx)
suite.NoError(err)
for id := range suite.model.CollectionData {
exist, err := suite.sysDb.CheckCollection(ctx, id.String())
exists, err := suite.sysDb.CheckCollections(ctx, []string{id.String()})
suite.NoError(err)
exist := exists[0]
if !exist {
t.Fatalf("collection id %s does not exist in sysdb", id)
}
Expand Down
10 changes: 7 additions & 3 deletions go/pkg/log/sysdb/mock_sysdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ func NewMockSysDB() *MockSysDB {
}
}

func (s *MockSysDB) CheckCollection(ctx context.Context, collectionId string) (bool, error) {
_, ok := s.collections[collectionId]
return ok, nil
func (s *MockSysDB) CheckCollections(ctx context.Context, collectionIds []string) ([]bool, error) {
result := make([]bool, len(collectionIds))
for i, collectionId := range collectionIds {
_, ok := s.collections[collectionId]
result[i] = ok
}
return result, nil
}

func (s *MockSysDB) AddCollection(ctx context.Context, collectionId string) error {
Expand Down
25 changes: 12 additions & 13 deletions go/pkg/log/sysdb/sysdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type ISysDB interface {
CheckCollection(ctx context.Context, collectionId string) (bool, error)
// Returns true if collection exists, false otherwise.
CheckCollections(ctx context.Context, collectionIds []string) ([]bool, error)
AddCollection(ctx context.Context, collectionId string) error
}

Expand Down Expand Up @@ -46,21 +47,19 @@ func NewSysDB(conn string) *SysDB {
}
}

func (s *SysDB) CheckCollection(ctx context.Context, collectionId string) (bool, error) {
// TODO: make this check a batch API
request := &coordinatorpb.GetCollectionsRequest{
Id: &collectionId,
}
response, err := s.client.GetCollections(ctx, request)
func (s *SysDB) CheckCollections(ctx context.Context, collectionIds []string) ([]bool, error) {
request := &coordinatorpb.CheckCollectionsRequest{}
request.CollectionIds = append(request.CollectionIds, collectionIds...)

response, err := s.client.CheckCollections(ctx, request)
if err != nil {
return false, err
return nil, err
}
for _, collection := range response.Collections {
if collection.Id == collectionId {
return true, nil
}
result := make([]bool, len(response.Deleted))
for i, deleted := range response.Deleted {
result[i] = !deleted
}
return false, nil
return result, nil
}

func (s *SysDB) AddCollection(ctx context.Context, collectionId string) error {
Expand Down
Loading

0 comments on commit 6b01bd5

Please sign in to comment.