Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix trace not work 1.2 #16201

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,8 @@ func (s *service) bootstrap() error {
panic(err)
}

trace.GetService().EnableFlush()

if s.cfg.AutomaticUpgrade {
return s.stopper.RunTask(func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*120)
Expand Down
9 changes: 9 additions & 0 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,15 @@ func (l *localLockTable) handleLockConflictLocked(
for _, txn := range conflictWith.holders.txns {
c.w.waitFor = append(c.w.waitFor, txn.TxnID)
}
c.result.ConflictKey = key
if len(c.w.waitFor) > 0 {
c.result.ConflictTxn = c.w.waitFor[0]
}
c.result.Waiters = uint32(conflictWith.waiters.size())
conflictWith.waiters.iter(func(w *waiter) bool {
c.result.PrevWaiter = w.txn.TxnID
return true
})

conflictWith.addWaiter(c.w)
l.events.add(c)
Expand Down
429 changes: 313 additions & 116 deletions pkg/pb/lock/lock.pb.go

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,37 @@ func doLock(
break
}
}
if err != nil {
return false, false, timestamp.Timestamp{}, err
}

if len(result.ConflictKey) > 0 {
trace.GetService().AddTxnActionInfo(
txnOp,
client.LockEvent,
seq,
tableID,
func(writer trace.Writer) {
writer.WriteHex(result.ConflictKey)
writer.WriteString(":")
writer.WriteHex(result.ConflictTxn)
writer.WriteString("/")
writer.WriteUint(uint64(result.Waiters))
if len(result.PrevWaiter) > 0 {
writer.WriteString("/")
writer.WriteHex(result.PrevWaiter)
}
},
)
}

trace.GetService().AddTxnDurationAction(
txnOp,
client.LockEvent,
seq,
tableID,
time.Since(startAt),
nil)
if err != nil {
return false, false, timestamp.Timestamp{}, err
}

// add bind locks
if err = txnOp.AddLockTable(result.LockedOn); err != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"

_ "go.uber.org/automaxprocs"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -87,6 +82,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/panjf2000/ants/v2"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"
)

// Note: Now the cost going from stat is actually the number of rows, so we can only estimate a number for the size of each row.
Expand Down Expand Up @@ -447,7 +445,18 @@ func (c *Compile) Run(_ uint64) (result *util2.RunResult, err error) {
stats.ExecutionEnd()

cost := time.Since(start)
txnTrace.GetService().TxnStatementCompleted(txnOp, sql, cost, seq, err)
row := 0
if result != nil {
row = int(result.AffectRows)
}
txnTrace.GetService().TxnStatementCompleted(
txnOp,
sql,
cost,
seq,
row,
err,
)
v2.TxnStatementExecuteDurationHistogram.Observe(cost.Seconds())
}()

Expand Down Expand Up @@ -511,6 +520,7 @@ func (c *Compile) Run(_ uint64) (result *util2.RunResult, err error) {
return nil, c.proc.Ctx.Err()
}
result.AffectRows = runC.getAffectedRows()

if c.proc.TxnOperator != nil {
return result, c.proc.TxnOperator.GetWorkspace().Adjust(writeOffset)
}
Expand Down
105 changes: 64 additions & 41 deletions pkg/txn/trace/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,15 @@ func WithFlushDuration(value time.Duration) Option {
}

type service struct {
cn string
client client.TxnClient
clock clock.Clock
executor executor.SQLExecutor
stopper *stopper.Stopper
txnC chan csvEvent
txnBufC chan *buffer
entryC chan csvEvent
entryBufC chan *buffer
txnActionC chan csvEvent
txnActionBufC chan *buffer
statementC chan csvEvent
statementBufC chan *buffer
cn string
client client.TxnClient
clock clock.Clock
executor executor.SQLExecutor
stopper *stopper.Stopper
txnC chan event
entryC chan event
txnActionC chan event
statementC chan event

loadC chan loadAction
seq atomic.Uint64
Expand Down Expand Up @@ -162,14 +158,11 @@ func NewService(
if s.options.bufferSize == 0 {
s.options.bufferSize = 1000000
}
s.txnBufC = make(chan *buffer, s.options.bufferSize)
s.entryBufC = make(chan *buffer, s.options.bufferSize)
s.entryC = make(chan csvEvent, s.options.bufferSize)
s.txnC = make(chan csvEvent, s.options.bufferSize)
s.txnActionC = make(chan csvEvent, s.options.bufferSize)
s.txnActionBufC = make(chan *buffer, s.options.bufferSize)
s.statementC = make(chan csvEvent, s.options.bufferSize)
s.statementBufC = make(chan *buffer, s.options.bufferSize)

s.entryC = make(chan event, s.options.bufferSize)
s.txnC = make(chan event, s.options.bufferSize)
s.txnActionC = make(chan event, s.options.bufferSize)
s.statementC = make(chan event, s.options.bufferSize)

if err := s.stopper.RunTask(s.handleTxnEvents); err != nil {
panic(err)
Expand Down Expand Up @@ -254,10 +247,8 @@ func (s *service) DecodeHexComplexPK(hexPK string) (string, error) {
func (s *service) Close() {
s.stopper.Stop()
s.atomic.closed.Store(true)
close(s.entryBufC)
close(s.entryC)
close(s.txnC)
close(s.txnBufC)
close(s.loadC)
}

Expand All @@ -266,8 +257,7 @@ func (s *service) handleEvent(
fileCreator func() string,
columns int,
tableName string,
csvC chan csvEvent,
bufferC chan *buffer) {
eventC chan event) {
ticker := time.NewTicker(s.options.flushDuration)
defer ticker.Stop()

Expand Down Expand Up @@ -340,23 +330,21 @@ func (s *service) handleEvent(
if s.atomic.flushEnabled.Load() {
flush()
}
case e := <-csvC:
e.toCSVRecord(s.cn, buf, records)
if err := w.Write(records); err != nil {
s.logger.Fatal("failed to write csv record",
zap.Error(err))
}

sum += bytes()
if sum > s.options.flushBytes &&
s.atomic.flushEnabled.Load() {
flush()
}
case e := <-eventC:
if e.buffer != nil {
e.buffer.close()
} else {
e.csv.toCSVRecord(s.cn, buf, records)
if err := w.Write(records); err != nil {
s.logger.Fatal("failed to write csv record",
zap.Error(err))
}

select {
case v := <-bufferC:
v.close()
default:
sum += bytes()
if sum > s.options.flushBytes &&
s.atomic.flushEnabled.Load() {
flush()
}
}
}
}
Expand Down Expand Up @@ -776,6 +764,9 @@ func (l *EntryData) writeToBuf(
rows := l.vecs[0].Length()
for row := 0; row < rows; row++ {
idx := buf.buf.GetWriteIndex()
buf.buf.WriteString("row-")
buf.buf.MustWrite(intToString(dst, int64(row)))
buf.buf.WriteString("{")
for col, name := range l.columns {
if _, ok := disableColumns[name]; ok {
continue
Expand All @@ -796,6 +787,7 @@ func (l *EntryData) writeToBuf(
buf.buf.WriteString(", ")
}
}
buf.buf.WriteString("}")
if buf.buf.GetWriteIndex() > idx {
data := buf.buf.RawSlice(idx, buf.buf.GetWriteIndex())
fn(factory(data, row))
Expand Down Expand Up @@ -884,3 +876,34 @@ type loadAction struct {
sql string
file string
}

type writer struct {
buf *buf.ByteBuf
dst []byte
idx int
}

func (w writer) WriteUint(v uint64) {
w.buf.MustWrite(uintToString(w.dst, v))
}

func (w writer) WriteInt(v int64) {
w.buf.MustWrite(intToString(w.dst, v))
}

func (w writer) WriteString(v string) {
w.buf.WriteString(v)
}

func (w writer) WriteHex(v []byte) {
if len(v) == 0 {
return
}
dst := w.dst[:hex.EncodedLen(len(v))]
hex.Encode(dst, v)
w.buf.MustWrite(dst)
}

func (w writer) data() string {
return util.UnsafeBytesToString(w.buf.RawSlice(w.idx, w.buf.GetWriteIndex()))
}
59 changes: 39 additions & 20 deletions pkg/txn/trace/service_data_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ func (s *service) ApplyLogtail(
entryData.createApply(
buf,
func(e dataEvent) {
s.entryC <- e
s.entryC <- event{
csv: e,
}
},
&s.atomic.complexPKTables)
s.entryBufC <- buf
s.entryC <- event{
buffer: buf,
}
}

func (s *service) ApplyFlush(
Expand Down Expand Up @@ -96,12 +100,17 @@ func (s *service) ApplyFlush(
buf.buf.WriteString(" ")
buf.buf.MustWrite(result)

s.entryC <- newFlushEvent(
time.Now().UnixNano(),
txnID,
tableID,
buf.buf.RawSlice(idx, buf.buf.GetWriteIndex()))
s.entryBufC <- buf
s.entryC <- event{
csv: newFlushEvent(
time.Now().UnixNano(),
txnID,
tableID,
buf.buf.RawSlice(idx, buf.buf.GetWriteIndex()),
),
}
s.entryC <- event{
buffer: buf,
}
}

func (s *service) ApplyTransferRowID(
Expand Down Expand Up @@ -156,12 +165,17 @@ func (s *service) ApplyTransferRowID(
buf.buf.WriteString(toBlockIDHex)
data := buf.buf.RawSlice(idx, buf.buf.GetWriteIndex())

s.entryC <- newTransferEvent(
time.Now().UnixNano(),
txnID,
tableID,
data)
s.entryBufC <- buf
s.entryC <- event{
csv: newTransferEvent(
time.Now().UnixNano(),
txnID,
tableID,
data,
),
}
s.entryC <- event{
buffer: buf,
}
}

func (s *service) ApplyDeleteObject(
Expand Down Expand Up @@ -198,11 +212,16 @@ func (s *service) ApplyDeleteObject(
buf.buf.WriteString(tag)
data := buf.buf.RawSlice(idx, buf.buf.GetWriteIndex())

s.entryC <- newDeleteObjectEvent(
time.Now().UnixNano(),
tableID,
data)
s.entryBufC <- buf
s.entryC <- event{
csv: newDeleteObjectEvent(
time.Now().UnixNano(),
tableID,
data,
),
}
s.entryC <- event{
buffer: buf,
}
}

func (s *service) AddTableFilter(name string, columns []string) error {
Expand Down Expand Up @@ -323,7 +342,7 @@ func (s *service) handleDataEvents(ctx context.Context) {
9,
EventDataTable,
s.entryC,
s.entryBufC)
)
}

func (s *service) dataCSVFile() string {
Expand Down
13 changes: 8 additions & 5 deletions pkg/txn/trace/service_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ func (s *service) AddStatement(
}

sql = truncateSQL(sql)
s.statementC <- newStatement(
op.Txn().ID,
sql,
cost)
s.statementC <- event{
csv: newStatement(
op.Txn().ID,
sql,
cost,
),
}
}

func (s *service) AddStatementFilter(
Expand Down Expand Up @@ -188,7 +191,7 @@ func (s *service) handleStatements(ctx context.Context) {
4,
TraceStatementTable,
s.statementC,
s.statementBufC)
)
}

func addStatementFilterSQL(
Expand Down
Loading
Loading