Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 3e5c321

Browse files
committed
first update cassandra before updating memory index
1 parent a9788e2 commit 3e5c321

File tree

2 files changed

+39
-58
lines changed

2 files changed

+39
-58
lines changed

idx/cassandra/cassandra.go

+39-57
Original file line numberDiff line numberDiff line change
@@ -438,75 +438,70 @@ func (c *CasIdx) applyMetaRecords(orgId uint32, records []tagquery.MetaTagRecord
438438
}
439439
}
440440

441-
func (c *CasIdx) MetaTagRecordUpsert(orgId uint32, upsertRecord tagquery.MetaTagRecord, persist bool) (tagquery.MetaTagRecord, bool, error) {
442-
record, created, err := c.MemoryIndex.MetaTagRecordUpsert(orgId, upsertRecord, persist)
443-
if err != nil {
444-
return record, created, err
445-
}
446-
441+
func (c *CasIdx) MetaTagRecordUpsert(orgId uint32, record tagquery.MetaTagRecord, persist bool) (tagquery.MetaTagRecord, bool, error) {
447442
if c.Config.updateCassIdx && persist {
448443
var err error
449444

450445
// if a record has no meta tags associated with it, then we delete it
451446
if len(record.MetaTags) > 0 {
452-
err = c.persistMetaRecord(orgId, record, created)
447+
err = c.persistMetaRecord(orgId, record)
453448
} else {
454449
err = c.deleteMetaRecord(orgId, record)
455450
}
456451

457452
if err != nil {
458453
log.Errorf("Failed to update meta records in cassandra: %s", err)
459-
return record, created, fmt.Errorf("Failed to update cassandra: %s", err)
454+
return record, false, fmt.Errorf("Failed to update cassandra: %s", err)
460455
}
461456
}
462457

463-
return record, created, nil
458+
return c.MemoryIndex.MetaTagRecordUpsert(orgId, record, persist)
464459
}
465460

466461
func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecord, persist bool) (uint32, uint32, error) {
467-
added, deleted, err := c.MemoryIndex.MetaTagRecordSwap(orgId, records, persist)
468-
if !c.Config.updateCassIdx || err != nil || !persist {
469-
return added, deleted, err
470-
}
462+
if c.Config.updateCassIdx && persist {
463+
now := time.Now().UnixNano() / 1000000
464+
batch := c.Session.NewBatch(gocql.LoggedBatch).RetryPolicy(&metaRecordRetryPolicy)
465+
466+
// within a batch operation we need to specify timestamps using "USING TIMESTAMP" to ensure
467+
// that the statement execution happens in the order we require.
468+
// the leading DELETE statement gets the timestamp now - 1000 to ensure that it gets executed
469+
// before the sub-sequent inserts.
470+
batch.Query(fmt.Sprintf("DELETE FROM %s USING TIMESTAMP ? WHERE orgid=?", c.Config.MetaRecordTable), now-1000, orgId)
471+
var expressions, metaTags []byte
472+
var qry string
473+
var err error
471474

472-
now := time.Now().UnixNano() / 1000000
473-
batch := c.Session.NewBatch(gocql.LoggedBatch).RetryPolicy(&metaRecordRetryPolicy)
474-
475-
// within a batch operation we need to specify timestamps using "USING TIMESTAMP" to ensure
476-
// that the statement execution happens in the order we require.
477-
// the leading DELETE statement gets the timestamp now - 1000 to ensure that it gets executed
478-
// before the sub-sequent inserts.
479-
batch.Query(fmt.Sprintf("DELETE FROM %s USING TIMESTAMP ? WHERE orgid=?", c.Config.MetaRecordTable), now-1000, orgId)
480-
var expressions, metaTags []byte
481-
var qry string
482-
483-
for _, record := range records {
484-
record.Expressions.Sort()
485-
expressions, err = record.Expressions.MarshalJSON()
486-
if err != nil {
487-
return 0, 0, fmt.Errorf("Failed to marshal expressions: %s", err)
475+
for _, record := range records {
476+
record.Expressions.Sort()
477+
expressions, err = record.Expressions.MarshalJSON()
478+
if err != nil {
479+
return 0, 0, fmt.Errorf("Failed to marshal expressions: %s", err)
480+
}
481+
metaTags, err = record.MetaTags.MarshalJSON()
482+
if err != nil {
483+
return 0, 0, fmt.Errorf("Failed to marshal meta tags: %s", err)
484+
}
485+
qry = fmt.Sprintf("INSERT INTO %s (orgid, expressions, metatags, lastupdate) VALUES (?, ?, ?, ?) USING TIMESTAMP ?", c.Config.MetaRecordTable)
486+
batch.Query(
487+
qry,
488+
orgId,
489+
expressions,
490+
metaTags,
491+
now,
492+
now)
488493
}
489-
metaTags, err = record.MetaTags.MarshalJSON()
494+
495+
err = c.Session.ExecuteBatch(batch)
490496
if err != nil {
491-
return 0, 0, fmt.Errorf("Failed to marshal meta tags: %s", err)
497+
return 0, 0, err
492498
}
493-
qry = fmt.Sprintf("INSERT INTO %s (orgid, expressions, metatags, createdat, lastupdate) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP ?", c.Config.MetaRecordTable)
494-
batch.Query(
495-
qry,
496-
orgId,
497-
expressions,
498-
metaTags,
499-
now,
500-
now,
501-
now)
502499
}
503500

504-
err = c.Session.ExecuteBatch(batch)
505-
506-
return added, deleted, err
501+
return c.MemoryIndex.MetaTagRecordSwap(orgId, records, persist)
507502
}
508503

509-
func (c *CasIdx) persistMetaRecord(orgId uint32, record tagquery.MetaTagRecord, created bool) error {
504+
func (c *CasIdx) persistMetaRecord(orgId uint32, record tagquery.MetaTagRecord) error {
510505
expressions, err := record.Expressions.MarshalJSON()
511506
if err != nil {
512507
return fmt.Errorf("Failed to marshal expressions: %s", err)
@@ -517,19 +512,6 @@ func (c *CasIdx) persistMetaRecord(orgId uint32, record tagquery.MetaTagRecord,
517512
}
518513

519514
now := time.Now().UnixNano() / 1000000
520-
521-
if created {
522-
qry := fmt.Sprintf("INSERT INTO %s (orgid, expressions, metatags, createdat, lastupdate) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP ?", c.Config.MetaRecordTable)
523-
return c.Session.Query(
524-
qry,
525-
orgId,
526-
expressions,
527-
metaTags,
528-
now,
529-
now,
530-
now).RetryPolicy(&metaRecordRetryPolicy).Exec()
531-
}
532-
533515
qry := fmt.Sprintf("INSERT INTO %s (orgid, expressions, metatags, lastupdate) VALUES (?, ?, ?, ?) USING TIMESTAMP ?", c.Config.MetaRecordTable)
534516
return c.Session.Query(
535517
qry,

scripts/config/schema-idx-cassandra.toml

-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ CREATE TABLE IF NOT EXISTS %s.%s (
4040
orgid int,
4141
expressions text,
4242
metatags text,
43-
createdat bigint,
4443
lastupdate bigint,
4544
PRIMARY KEY (orgid, expressions)
4645
)

0 commit comments

Comments
 (0)