Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature (v3): Add buffered streaming support #3131

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fiber

import (
"bufio"
"bytes"
"context"
"crypto/tls"
Expand Down Expand Up @@ -1671,6 +1672,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}

// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
Comment on lines +1675 to +1676
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add comprehensive documentation for the new public API method.

The method lacks proper documentation. As this is a new public API method for buffered streaming support, it should include:

  • A description of its purpose and use cases (e.g., SSE, large file downloads)
  • Parameter explanation
  • Return value description
  • Usage examples
  • Any important notes about concurrency or memory management

Add documentation like this:

+// SendStreamWriter sets the response body stream writer for sending 
+// chunked data through a callback function.
+//
+// The callback function is called when the response is being written to the client.
+// This allows for efficient memory usage as data can be written in chunks.
+//
+// Perfect for:
+// - Server-Sent Events (SSE)
+// - Large file downloads
+// - Real-time data streaming
+//
+// Example:
+//
+//	app.Get("/", func(c fiber.Ctx) error {
+//		return c.SendStreamWriter(func(w *bufio.Writer) {
+//			fmt.Fprintf(w, "Hello, World!")
+//			w.Flush()
+//		})
+//	})
 // SendStreamWriter sets response body stream writer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
// SendStreamWriter sets the response body stream writer for sending
// chunked data through a callback function.
//
// The callback function is called when the response is being written to the client.
// This allows for efficient memory usage as data can be written in chunks.
//
// Perfect for:
// - Server-Sent Events (SSE)
// - Large file downloads
// - Real-time data streaming
//
// Example:
//
// app.Get("/", func(c fiber.Ctx) error {
// return c.SendStreamWriter(func(w *bufio.Writer) {
// fmt.Fprintf(w, "Hello, World!")
// w.Flush()
// })
// })
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add an example/recipe about this feature after the PR get merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efectn, that's a great idea. Adding an example or recipe about this feature after the PR is merged will be beneficial for users. Would you like me to open a new GitHub issue to track this follow-up action?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +1675 to +1676
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add comprehensive documentation for the new public API method.

As this is a new public API method for buffered streaming support, it should include:

  • A description of its purpose and use cases (e.g., SSE, large file downloads)
  • Parameter explanation
  • Example usage with SSE
  • Notes about concurrency and memory management

Apply this diff to improve the documentation:

-// SendStreamWriter sets response body stream writer
+// SendStreamWriter sets the response body stream writer for sending chunked data through a callback function.
+//
+// The callback function is called when the response is being written to the client.
+// This allows for efficient memory usage as data can be written in chunks.
+//
+// Perfect for:
+// - Server-Sent Events (SSE)
+// - Large file downloads
+// - Real-time data streaming
+//
+// Example:
+//  app.Get("/sse", func(c fiber.Ctx) error {
+//    c.Set("Content-Type", "text/event-stream")
+//    c.Set("Cache-Control", "no-cache")
+//    c.Set("Connection", "keep-alive")
+//    c.Set("Transfer-Encoding", "chunked")
+//
+//    return c.SendStreamWriter(func(w *bufio.Writer) {
+//      fmt.Fprintf(w, "event: message\ndata: Hello SSE\n\n")
+//      w.Flush()
+//    })
+//  })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
// SendStreamWriter sets the response body stream writer for sending chunked data through a callback function.
//
// The callback function is called when the response is being written to the client.
// This allows for efficient memory usage as data can be written in chunks.
//
// Perfect for:
// - Server-Sent Events (SSE)
// - Large file downloads
// - Real-time data streaming
//
// Example:
// app.Get("/sse", func(c fiber.Ctx) error {
// c.Set("Content-Type", "text/event-stream")
// c.Set("Cache-Control", "no-cache")
// c.Set("Connection", "keep-alive")
// c.Set("Transfer-Encoding", "chunked")
//
// return c.SendStreamWriter(func(w *bufio.Writer) {
// fmt.Fprintf(w, "event: message\ndata: Hello SSE\n\n")
// w.Flush()
// })
// })
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {

c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))

return nil
}

// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)
Expand Down
3 changes: 3 additions & 0 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4447,6 +4447,71 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}

// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})

err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))

err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
ReneWerner87 marked this conversation as resolved.
Show resolved Hide resolved
if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))

err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}

// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
app.Get("/", func(c Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck // It is fine to ignore the error

if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}

time.Sleep(400 * time.Millisecond)
}
})
})

req := httptest.NewRequest(MethodGet, "/", nil)
testConfig := TestConfig{
Timeout: 1 * time.Second,
FailOnTimeout: false,
}
resp, err := app.Test(req, testConfig)
require.NoError(t, err)

body, err := io.ReadAll(resp.Body)
t.Logf("%v", err)
require.EqualError(t, err, "unexpected EOF")

require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
}

// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()
Expand Down
60 changes: 60 additions & 0 deletions docs/api/ctx.md
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,66 @@ app.Get("/", func(c fiber.Ctx) error {
})
```

## SendStreamWriter
grivera64 marked this conversation as resolved.
Show resolved Hide resolved

Sets the response body stream writer.

:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::

```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```

```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```
gaby marked this conversation as resolved.
Show resolved Hide resolved

:::info
To send data before `streamWriter` returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::

:::note
`w.Flush()` will return an error if the client disconnects before `streamWriter` finishes writing a response.
:::

```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
// Begin Work
fmt.Fprintf(w, "Please wait for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}

// Send progress over time
time.Sleep(time.Second)
for i := 0; i < 9; i++ {
fmt.Fprintf(w, "Still waiting...\n")
if err := w.Flush(); err != nil {
// If client disconnected, cancel work and finish
log.Print("Client disconnected!")
return
}
time.Sleep(time.Second)
}

// Finish
fmt.Fprintf(w, "Done!\n")
})
})
```

## Set

Sets the response’s HTTP header field to the specified `key`, `value`.
Expand Down
38 changes: 38 additions & 0 deletions docs/whats_new.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ DRAFT section
- Reset
- Schema -> ExpressJs like
- SendStream -> ExpressJs like
- SendStreamWriter
- SendString -> ExpressJs like
- String -> ExpressJs like
- ViewBind -> instead of Bind
Expand Down Expand Up @@ -296,6 +297,43 @@ DRAFT section
- UserContext has been renamed to Context which returns a context.Context object.
- SetUserContext has been renamed to SetContext.

### SendStreamWriter

In v3, we added support for buffered streaming by providing the new method `SendStreamWriter()`.

```go
func (c Ctx) SendStreamWriter(streamWriter func(w *bufio.Writer))
```

With this new method, you can implement:

- Server-Side Events (SSE)
- Large file downloads
- Live data streaming

```go
app.Get("/sse", func(c fiber.Ctx) {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")

return c.SendStreamWriter(func(w *bufio.Writer) {
for {
fmt.Fprintf(w, "event: my-event\n")
fmt.Fprintf(w, "data: Hello SSE\n\n")

if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}
}
})
})
```
Comment on lines +315 to +333
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance the SSE example with best practices and error handling.

The Server-Sent Events example could be improved by:

  1. Adding proper error handling for write operations
  2. Including a cancellation mechanism
  3. Demonstrating how to handle client disconnections gracefully
 app.Get("/sse", func(c fiber.Ctx) {
   c.Set("Content-Type", "text/event-stream")
   c.Set("Cache-Control", "no-cache")
   c.Set("Connection", "keep-alive")
   c.Set("Transfer-Encoding", "chunked")
 
+  // Create a channel to handle cancellation
+  done := make(chan bool)
+
+  // Handle client disconnection
+  c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+    ticker := time.NewTicker(time.Second)
+    defer ticker.Stop()
+
+    for {
+      select {
+      case <-done:
+        return
+      case <-ticker.C:
+        // Write event with proper error handling
+        if _, err := fmt.Fprintf(w, "event: my-event\n"); err != nil {
+          log.Printf("Error writing event: %v", err)
+          return
+        }
+        if _, err := fmt.Fprintf(w, "data: Hello SSE\n\n"); err != nil {
+          log.Printf("Error writing data: %v", err)
+          return
+        }
+
+        if err := w.Flush(); err != nil {
+          log.Printf("Client disconnected: %v", err)
+          return
+        }
+      }
+    }
+  })
+
+  // Clean up when the handler returns
+  defer close(done)
+
   return c.SendStreamWriter(func(w *bufio.Writer) {
     for {
       fmt.Fprintf(w, "event: my-event\n")
       fmt.Fprintf(w, "data: Hello SSE\n\n")
 
       if err := w.Flush(); err != nil {
         log.Print("Client disconnected!")
         return
       }
     }
   })
 })

Committable suggestion skipped: line range outside the PR's diff.


You can find more details about this feature in [/docs/api/ctx.md](./api/ctx.md).

---

## 🌎 Client package
Expand Down
Loading