Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue in cycle dumping for g.Dump #2367

Merged
merged 7 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions container/gqueue/gqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
// Queue is a concurrent-safe queue built on doubly linked list and channel.
type Queue struct {
limit int // Limit for queue size.
length *gtype.Int64 // Queue length.
list *glist.List // Underlying list structure for data maintaining.
closed *gtype.Bool // Whether queue is closed.
events chan struct{} // Events for data writing.
Expand All @@ -44,6 +45,7 @@ const (
func New(limit ...int) *Queue {
q := &Queue{
closed: gtype.NewBool(),
length: gtype.NewInt64(),
}
if len(limit) > 0 && limit[0] > 0 {
q.limit = limit[0]
Expand All @@ -57,43 +59,10 @@ func New(limit ...int) *Queue {
return q
}

// asyncLoopFromListToChannel starts an asynchronous goroutine,
// which handles the data synchronization from list `q.list` to channel `q.C`.
func (q *Queue) asyncLoopFromListToChannel() {
defer func() {
if q.closed.Val() {
_ = recover()
}
}()
for !q.closed.Val() {
<-q.events
for !q.closed.Val() {
if length := q.list.Len(); length > 0 {
if length > defaultBatchSize {
length = defaultBatchSize
}
for _, v := range q.list.PopFronts(length) {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q.C <- v
}
} else {
break
}
}
// Clear q.events to remain just one event to do the next synchronization check.
for i := 0; i < len(q.events)-1; i++ {
<-q.events
}
}
// It should be here to close `q.C` if `q` is unlimited size.
// It's the sender's responsibility to close channel when it should be closed.
close(q.C)
}

// Push pushes the data `v` into the queue.
// Note that it would panic if Push is called after the queue is closed.
func (q *Queue) Push(v interface{}) {
q.length.Add(1)
if q.limit > 0 {
q.C <- v
} else {
Expand All @@ -107,7 +76,9 @@ func (q *Queue) Push(v interface{}) {
// Pop pops an item from the queue in FIFO way.
// Note that it would return nil immediately if Pop is called after the queue is closed.
func (q *Queue) Pop() interface{} {
return <-q.C
item := <-q.C
q.length.Add(-1)
return item
}

// Close closes the queue.
Expand All @@ -130,15 +101,45 @@ func (q *Queue) Close() {
// Len returns the length of the queue.
// Note that the result might not be accurate as there's an
// asynchronous channel reading the list constantly.
func (q *Queue) Len() (length int) {
if q.list != nil {
length += q.list.Len()
}
length += len(q.C)
return
func (q *Queue) Len() (length int64) {
return q.length.Val()
}

// Size is alias of Len.
func (q *Queue) Size() int {
func (q *Queue) Size() int64 {
return q.Len()
}

// asyncLoopFromListToChannel starts an asynchronous goroutine,
// which handles the data synchronization from list `q.list` to channel `q.C`.
func (q *Queue) asyncLoopFromListToChannel() {
defer func() {
if q.closed.Val() {
_ = recover()
}
}()
for !q.closed.Val() {
<-q.events
for !q.closed.Val() {
if length := q.list.Len(); length > 0 {
if length > defaultBatchSize {
length = defaultBatchSize
}
for _, v := range q.list.PopFronts(length) {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q.C <- v
}
} else {
break
}
}
// Clear q.events to remain just one event to do the next synchronization check.
for i := 0; i < len(q.events)-1; i++ {
<-q.events
}
}
// It should be here to close `q.C` if `q` is unlimited size.
// It's the sender's responsibility to close channel when it should be closed.
close(q.C)
}
46 changes: 24 additions & 22 deletions internal/rwmutex/rwmutex_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,33 @@ func TestRWMutexIsSafe(t *testing.T) {

func TestSafeRWMutex(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
safeLock := rwmutex.New(true)
array := garray.New(true)
var (
localSafeLock = rwmutex.New(true)
array = garray.New(true)
)

go func() {
safeLock.Lock()
localSafeLock.Lock()
array.Append(1)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
array.Append(1)
safeLock.Unlock()
localSafeLock.Unlock()
}()
go func() {
time.Sleep(10 * time.Millisecond)
safeLock.Lock()
time.Sleep(100 * time.Millisecond)
localSafeLock.Lock()
array.Append(1)
time.Sleep(200 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)
array.Append(1)
safeLock.Unlock()
localSafeLock.Unlock()
}()
time.Sleep(50 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
t.Assert(array.Len(), 1)
time.Sleep(80 * time.Millisecond)
time.Sleep(800 * time.Millisecond)
t.Assert(array.Len(), 3)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 3)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 4)
})
}
Expand All @@ -77,33 +79,33 @@ func TestSafeReaderRWMutex(t *testing.T) {
go func() {
localSafeLock.RLock()
array.Append(1)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
array.Append(1)
localSafeLock.RUnlock()
}()
go func() {
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
localSafeLock.RLock()
array.Append(1)
time.Sleep(200 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)
array.Append(1)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
array.Append(1)
localSafeLock.RUnlock()
}()
go func() {
time.Sleep(50 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
localSafeLock.Lock()
array.Append(1)
localSafeLock.Unlock()
}()
time.Sleep(50 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
t.Assert(array.Len(), 2)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 3)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 4)
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
t.Assert(array.Len(), 6)
})
}
Expand Down
60 changes: 39 additions & 21 deletions util/gutil/gutil_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ func DumpTo(writer io.Writer, value interface{}, option DumpOption) {
}

type doDumpOption struct {
WithType bool
ExportedOnly bool
WithType bool
ExportedOnly bool
DumpedPointerSet map[string]struct{}
}

func doDump(value interface{}, indent string, buffer *bytes.Buffer, option doDumpOption) {
if option.DumpedPointerSet == nil {
option.DumpedPointerSet = map[string]struct{}{}
}

if value == nil {
buffer.WriteString(`<nil>`)
return
Expand All @@ -111,26 +116,29 @@ func doDump(value interface{}, indent string, buffer *bytes.Buffer, option doDum
var (
reflectKind = reflectValue.Kind()
reflectTypeName = reflectValue.Type().String()
ptrAddress string
newIndent = indent + dumpIndent
)
reflectTypeName = strings.ReplaceAll(reflectTypeName, `[]uint8`, `[]byte`)
if !option.WithType {
reflectTypeName = ""
}
for reflectKind == reflect.Ptr {
if ptrAddress == "" {
ptrAddress = fmt.Sprintf(`0x%x`, reflectValue.Pointer())
}
reflectValue = reflectValue.Elem()
reflectKind = reflectValue.Kind()
}
var (
exportInternalInput = doDumpInternalInput{
Value: value,
Indent: indent,
NewIndent: newIndent,
Buffer: buffer,
Option: option,
ReflectValue: reflectValue,
ReflectTypeName: reflectTypeName,
ExportedOnly: option.ExportedOnly,
Value: value,
Indent: indent,
NewIndent: newIndent,
Buffer: buffer,
Option: option,
PtrAddress: ptrAddress,
ReflectValue: reflectValue,
ReflectTypeName: reflectTypeName,
ExportedOnly: option.ExportedOnly,
DumpedPointerSet: option.DumpedPointerSet,
}
)
switch reflectKind {
Expand Down Expand Up @@ -185,14 +193,16 @@ func doDump(value interface{}, indent string, buffer *bytes.Buffer, option doDum
}

type doDumpInternalInput struct {
Value interface{}
Indent string
NewIndent string
Buffer *bytes.Buffer
Option doDumpOption
ReflectValue reflect.Value
ReflectTypeName string
ExportedOnly bool
Value interface{}
Indent string
NewIndent string
Buffer *bytes.Buffer
Option doDumpOption
ReflectValue reflect.Value
ReflectTypeName string
PtrAddress string
ExportedOnly bool
DumpedPointerSet map[string]struct{}
}

func doDumpSlice(in doDumpInternalInput) {
Expand Down Expand Up @@ -295,6 +305,14 @@ func doDumpMap(in doDumpInternalInput) {
}

func doDumpStruct(in doDumpInternalInput) {
if in.PtrAddress != "" {
if _, ok := in.DumpedPointerSet[in.PtrAddress]; ok {
in.Buffer.WriteString(fmt.Sprintf(`<cycle dump %s>`, in.PtrAddress))
return
}
}
in.DumpedPointerSet[in.PtrAddress] = struct{}{}

structFields, _ := gstructs.Fields(gstructs.FieldsInput{
Pointer: in.Value,
RecursiveOption: gstructs.RecursiveOptionEmbedded,
Expand Down
15 changes: 15 additions & 0 deletions util/gutil/gutil_z_unit_dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gmeta"
"github.com/gogf/gf/v2/util/gutil"
)
Expand Down Expand Up @@ -273,3 +274,17 @@ func Test_Dump_Issue1661(t *testing.T) {
]`)
})
}

func Test_Dump_Cycle_Attribute(t *testing.T) {
type Abc struct {
ab int
cd *Abc
}
abc := Abc{ab: 3}
abc.cd = &abc
gtest.C(t, func(t *gtest.T) {
buffer := bytes.NewBuffer(nil)
g.DumpTo(buffer, abc, gutil.DumpOption{})
t.Assert(gstr.Contains(buffer.String(), "cycle"), true)
})
}