Skip to content

Commit e94cca3

Browse files
authored
Merge pull request #83 from nyaruka/stats_tweak
Record stats inside indexing batch loop
2 parents a88111a + 491961a commit e94cca3

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

indexers/base.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type":
2424
type Stats struct {
2525
Indexed int64 // total number of documents indexed
2626
Deleted int64 // total number of documents deleted
27-
Elapsed time.Duration // total time spent actually indexing
27+
Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
2828
}
2929

3030
// Indexer is base interface for indexers
@@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger {
8484
return slog.With("indexer", i.name)
8585
}
8686

87-
// records a complete index and updates statistics
88-
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
87+
// records indexing activity and updates statistics
88+
func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
8989
i.stats.Indexed += int64(indexed)
9090
i.stats.Deleted += int64(deleted)
9191
i.stats.Elapsed += elapsed

indexers/contacts.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
6363
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)
6464

6565
// now index our docs
66-
start := time.Now()
67-
created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
66+
err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
6867
if err != nil {
6968
return "", fmt.Errorf("error indexing documents: %w", err)
7069
}
7170

72-
i.recordComplete(created+updated, deleted, time.Since(start))
73-
7471
// if the index didn't previously exist or we are rebuilding, remap to our alias
7572
if remapAlias {
7673
err := i.updateAlias(physicalIndex)
@@ -153,7 +150,7 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
153150
`
154151

155152
// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
156-
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
153+
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
157154
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0
158155

159156
var modifiedOn time.Time
@@ -193,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
193190

194191
// no more rows? return
195192
if err == sql.ErrNoRows {
196-
return 0, 0, 0, nil
193+
return nil
197194
}
198195
if err != nil {
199-
return 0, 0, 0, err
196+
return err
200197
}
201198
defer rows.Close()
202199

203200
for rows.Next() {
204201
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
205202
if err != nil {
206-
return 0, 0, 0, err
203+
return err
207204
}
208205

209206
batchFetched++
@@ -226,14 +223,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
226223
// write to elastic search in batches
227224
if batchFetched%i.batchSize == 0 {
228225
if err := indexSubBatch(subBatch); err != nil {
229-
return 0, 0, 0, err
226+
return err
230227
}
231228
}
232229
}
233230

234231
if subBatch.Len() > 0 {
235232
if err := indexSubBatch(subBatch); err != nil {
236-
return 0, 0, 0, err
233+
return err
237234
}
238235
}
239236

@@ -268,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
268265
log.Debug("indexed contact batch")
269266
}
270267

268+
i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))
269+
271270
// last modified stayed the same and we didn't add anything, seen it all, break out
272271
if lastModified.Equal(queryModified) && batchCreated == 0 {
273272
break
274273
}
275274
}
276275

277-
return totalCreated, totalUpdated, totalDeleted, nil
276+
return nil
278277
}
279278

280279
func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {

0 commit comments

Comments
 (0)