-
Notifications
You must be signed in to change notification settings - Fork 358
zstd: Decoder implements io.ReadCloser interface #1094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughAdds an error-returning, idempotent Close on Decoder, updates interface conformance to io.ReadCloser, revises close wrapper to ignore Close errors, and integrates errors.Is checks with updated shutdown flow (drain output, cancel, wait, close inner decoders, mark closed). Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant D as Decoder
participant O as Output Drain
participant Ctx as Cancelation
participant G as Goroutines
participant ID as Inner Decoder
C->>D: Close()
alt Already closed
D->>D: errors.Is(current.err, ErrDecoderClosed)
D-->>C: return nil
else Active
D->>O: drainOutput()
D->>Ctx: cancel()
D->>G: wait()
D->>ID: Close()
D->>D: set current.err = ErrDecoderClosed
D-->>C: return nil
end
sequenceDiagram
autonumber
participant W as closeWrapper
participant D as Decoder
W->>D: Close() error
D-->>W: error (ignored)
W-->>W: return nil
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Possibly related PRs
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
This is a breaking change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
zstd/decoder.go (1)
575-599: Fix send-on-closed-channel panic and make Close concurrency-safe.Two high-impact issues in Close:
Closing d.decoders while a concurrent DecodeAll is in-flight can panic. DecodeAll always returns the borrowed block back to the pool in a defer via “d.decoders <- block” (see Line 329). If Close closes the channel first, that deferred send will panic (“send on closed channel”). This is a realistic race: callers can (and do) run stateless decodes concurrently with streaming, and may call Close from another goroutine.
Close is not concurrency-safe. Two goroutines calling Close concurrently can race on closing d.decoders (panic: close of closed channel) and on writing d.current.err without synchronization.
Recommended fix (minimal behavioral change):
- Track in-flight stateless decodes with a WaitGroup and wait for them to finish before closing the channel.
- Use sync.Once to make Close concurrency-safe and idempotent.
- Wait for stream goroutines unconditionally after drainOutput, then wait for stateless decodes, then close and drain the pool.
Apply this diff within Close:
-func (d *Decoder) Close() error { - if errors.Is(d.current.err, ErrDecoderClosed) { - // Calling Close() multiple times will not throw an error. - return nil - } - d.drainOutput() - if d.current.cancel != nil { - d.current.cancel() - d.streamWg.Wait() - d.current.cancel = nil - } - if d.decoders != nil { - close(d.decoders) - for dec := range d.decoders { - dec.Close() - } - d.decoders = nil - } - if d.current.d != nil { - d.current.d.Close() - d.current.d = nil - } - d.current.err = ErrDecoderClosed - return nil -} +func (d *Decoder) Close() error { + // Concurrency-safe, idempotent shutdown. + d.closeOnce.Do(func() { + d.drainOutput() + // Ensure stream goroutines have fully terminated. + d.streamWg.Wait() + // Ensure no in-flight stateless decodes hold a blockDec. + d.statelessWg.Wait() + if d.decoders != nil { + close(d.decoders) + for dec := range d.decoders { + dec.Close() + } + d.decoders = nil + } + if d.current.d != nil { + d.current.d.Close() + d.current.d = nil + } + d.current.err = ErrDecoderClosed + }) + return nil +}And add the following outside this hunk (new fields + DecodeAll bookkeeping):
- In type Decoder (near streamWg):
// track in-flight stateless DecodeAll users that hold a blockDec. statelessWg sync.WaitGroup // ensure Close runs exactly once even under concurrent calls. closeOnce sync.Once
- In DecodeAll, after taking a block from the pool, account for the in-flight user and ensure Done occurs after the block is returned:
block := <-d.decoders d.statelessWg.Add(1) frame := block.localFrame initialSize := len(dst) defer func() { if debugDecoder { printf("re-adding decoder: %p", block) } frame.rawInput = nil frame.bBuf = nil if frame.history.decoders.br != nil { frame.history.decoders.br.in = nil frame.history.decoders.br.cursor = 0 } d.decoders <- block d.statelessWg.Done() }()This removes the panic window and guarantees Close is safe under concurrency and truly idempotent.
🧹 Nitpick comments (2)
zstd/decoder.go (2)
575-599: Order-of-operations nit: rely on WaitGroup, not cancel presence.drainOutput() already clears d.current.cancel, so the subsequent “if d.current.cancel != nil { d.streamWg.Wait() }” never runs. Even with the essential refactor above, prefer an unconditional d.streamWg.Wait() after drainOutput() for clarity and robustness.
625-628: Propagate Close errors from the wrapper (or confirm intentional suppression).closeWrapper.Close currently swallows decoder.Close() errors. If underlying Close ever returns a non-nil error in the future, callers will never see it. If the intent is to preserve old “no-error” behavior for IOReadCloser users, keep as-is; otherwise, propagate:
func (c closeWrapper) Close() error { - c.d.Close() - return nil + return c.d.Close() }Please confirm which behavior you want long-term; I can follow up with tests either way.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
zstd/decoder.go(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
zstd/decoder.go (1)
zstd/zstd.go (1)
ErrDecoderClosed(90-90)
🔇 Additional comments (2)
zstd/decoder.go (2)
7-13: Importing errors aligns with idempotent Close semantics.The new errors import is used by Close via errors.Is and is appropriate for the idempotency check.
68-72: Assert Decoder implements io.ReadCloser (in addition to io.WriterTo).Good compile-time check. This makes the new Close() error part of the public surface and enables seamless use anywhere io.ReadCloser is expected.
Summary by CodeRabbit
New Features
Refactor
Chores