Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(realtime): Add realtime support #146

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ func NewClient(apiKey string, options ...Option) *Client {
if !strings.HasPrefix(apiKey, "ser.") {
panic("In order to use local evaluation, please generate a server key in the environment settings page.")
}

go c.pollEnvironment(c.ctxLocalEval)
if c.config.useRealtime {
go c.startRealtimeUpdates(c.ctxLocalEval)
} else {
go c.pollEnvironment(c.ctxLocalEval)
}
}
// Initialize analytics processor
if c.config.enableAnalytics {
c.analyticsProcessor = NewAnalyticsProcessor(c.ctxAnalytics, c.client, c.config.baseURL, nil, c.log)
}

return c
}

Expand Down Expand Up @@ -331,7 +333,6 @@ func (c *Client) pollEnvironment(ctx context.Context) {
}
}
}

func (c *Client) UpdateEnvironment(ctx context.Context) error {
var env environments.EnvironmentModel
resp, err := c.client.NewRequest().
Expand Down
100 changes: 99 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"testing"
"time"

flagsmith "github.com/Flagsmith/flagsmith-go-client/v4"
"github.com/Flagsmith/flagsmith-go-client/v4/fixtures"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -200,6 +199,7 @@ func TestGetFlags(t *testing.T) {
assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)

}

func TestGetFlagsTransientIdentity(t *testing.T) {
Expand Down Expand Up @@ -861,3 +861,101 @@ func TestPollErrorHandlerIsUsedWhenPollFails(t *testing.T) {
assert.Equal(t, statusCode, 500)
assert.Equal(t, status, "500 Internal Server Error")
}

func TestRealtime(t *testing.T) {
// Given
mux := http.NewServeMux()
requestCount := struct {
mu sync.Mutex
count int
}{}

mux.HandleFunc("/api/v1/environment-document/", func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)
assert.Equal(t, fixtures.EnvironmentAPIKey, req.Header.Get("X-Environment-Key"))
requestCount.mu.Lock()
requestCount.count++
requestCount.mu.Unlock()

rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
_, err := io.WriteString(rw, fixtures.EnvironmentJson)
if err != nil {
panic(err)
}
assert.NoError(t, err)
})
mux.HandleFunc(fmt.Sprintf("/sse/environments/%s/stream", fixtures.ClientAPIKey), func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "GET", req.Method)

// Set the necessary headers for SSE
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")

// Flush headers to the client
flusher, _ := rw.(http.Flusher)
flusher.Flush()

// Use an `updated_at` value that is older than the `updated_at` set on the environment document
// to ensure an older timestamp does not trigger an update.
sendUpdatedAtSSEEvent(rw, flusher, 1640995200.079725)
time.Sleep(10 * time.Millisecond)

// Update the `updated_at`(to trigger the environment update)
sendUpdatedAtSSEEvent(rw, flusher, 1733480514.079725)
time.Sleep(10 * time.Millisecond)
})

ctx := context.Background()

server := httptest.NewServer(mux)
defer server.Close()

// When
client := flagsmith.NewClient(fixtures.EnvironmentAPIKey,
flagsmith.WithBaseURL(server.URL+"/api/v1/"),
flagsmith.WithLocalEvaluation(ctx),
flagsmith.WithRealtime(),
flagsmith.WithRealtimeBaseURL(server.URL+"/"),
)
// Sleep to ensure that the server has time to update the environment
time.Sleep(10 * time.Millisecond)

flags, err := client.GetFlags(ctx, nil)

// Then
assert.NoError(t, err)

allFlags := flags.AllFlags()

assert.Equal(t, 1, len(allFlags))

assert.Equal(t, fixtures.Feature1Name, allFlags[0].FeatureName)
assert.Equal(t, fixtures.Feature1ID, allFlags[0].FeatureID)
assert.Equal(t, fixtures.Feature1Value, allFlags[0].Value)

// Sleep to ensure that the server has time to update the environment
// (After the second sse event)
time.Sleep(10 * time.Millisecond)

requestCount.mu.Lock()
assert.Equal(t, 2, requestCount.count)
}
func sendUpdatedAtSSEEvent(rw http.ResponseWriter, flusher http.Flusher, updatedAt float64) {
// Format the SSE event with the provided updatedAt value
sseEvent := fmt.Sprintf(`event: environment_updated
data: {"updated_at": %f}

`, updatedAt)

// Write the SSE event to the response
_, err := io.WriteString(rw, sseEvent)
if err != nil {
http.Error(rw, "Failed to send SSE event", http.StatusInternalServerError)
return
}

// Flush the event to the client
flusher.Flush()
}
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
DefaultBaseURL = "https://edge.api.flagsmith.com/api/v1/"

bulkIdentifyMaxCount = 100
DefaultRealtimeBaseUrl = "https://realtime.flagsmith.com/"
)

// config contains all configurable Client settings.
Expand All @@ -23,6 +24,8 @@ type config struct {
envRefreshInterval time.Duration
enableAnalytics bool
offlineMode bool
realtimeBaseUrl string
useRealtime bool
}

// defaultConfig returns default configuration.
Expand All @@ -31,5 +34,6 @@ func defaultConfig() config {
baseURL: DefaultBaseURL,
timeout: DefaultTimeout,
envRefreshInterval: time.Second * 60,
realtimeBaseUrl: DefaultRealtimeBaseUrl,
}
}
2 changes: 2 additions & 0 deletions fixtures/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ const Feature1Name = "feature_1"
const Feature1ID = 1

const Feature1OverriddenValue = "some-overridden-value"
const ClientAPIKey = "B62qaMZNwfiqT76p38ggrQ"

const EnvironmentJson = `
{
"api_key": "B62qaMZNwfiqT76p38ggrQ",
"updated_at": "2023-12-06T10:21:54.079725Z",
"project": {
"name": "Test project",
"organisation": {
Expand Down
2 changes: 2 additions & 0 deletions flagengine/environments/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/features"
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/identities"
"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/projects"
"time"
)

type EnvironmentModel struct {
Expand All @@ -12,4 +13,5 @@ type EnvironmentModel struct {
Project *projects.ProjectModel `json:"project"`
FeatureStates []*features.FeatureStateModel `json:"feature_states"`
IdentityOverrides []*identities.IdentityModel `json:"identity_overrides"`
UpdatedAt time.Time `json:"updated_at"`
}
22 changes: 22 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flagsmith

import (
"context"
"strings"
"time"
)

Expand All @@ -19,6 +20,8 @@ var _ = []Option{
WithCustomHeaders(nil),
WithDefaultHandler(nil),
WithProxy(""),
WithRealtime(),
WithRealtimeBaseURL(""),
}

func WithBaseURL(url string) Option {
Expand Down Expand Up @@ -124,3 +127,22 @@ func WithErrorHandler(handler func(handler *FlagsmithAPIError)) Option {
c.errorHandler = handler
}
}

// WithRealtime returns an Option function that enables real-time updates for the Client.
// NOTE: Before enabling real-time updates, ensure that local evaluation is enabled.
func WithRealtime() Option {
return func(c *Client) {
c.config.useRealtime = true
}
}

// WithRealtimeBaseURL returns an Option function for configuring the real-time base URL of the Client.
func WithRealtimeBaseURL(url string) Option {
return func(c *Client) {
// Ensure the URL ends with a trailing slash
if !strings.HasSuffix(url, "/") {
url += "/"
}
c.config.realtimeBaseUrl = url
}
}
83 changes: 83 additions & 0 deletions realtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package flagsmith

import (
"bufio"
"context"
"encoding/json"
"errors"
"net/http"
"strings"
"time"

"github.com/Flagsmith/flagsmith-go-client/v4/flagengine/environments"
)

func (c *Client) startRealtimeUpdates(ctx context.Context) {
err := c.UpdateEnvironment(ctx)
if err != nil {
panic("Failed to fetch the environment while configuring real-time updates")
}
env, _ := c.environment.Load().(*environments.EnvironmentModel)
stream_url := c.config.realtimeBaseUrl + "sse/environments/" + env.APIKey + "/stream"
envUpdatedAt := env.UpdatedAt
for {
select {
case <-ctx.Done():
return
default:
resp, err := http.Get(stream_url)
if err != nil {
c.log.Errorf("Error connecting to realtime server: %v", err)
continue
}
defer resp.Body.Close()

scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
parsedTime, err := parseUpdatedAtFromSSE(line)
if err != nil {
c.log.Errorf("Error reading realtime stream: %v", err)
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you want to return here? Shouldn't we go back to listening to the stream on the hopes of the next data line includes the updated_at field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you are right. I will update

if parsedTime.After(envUpdatedAt) {
err = c.UpdateEnvironment(ctx)
if err != nil {
c.log.Errorf("Failed to update the environment: %v", err)
continue
}
env, _ := c.environment.Load().(*environments.EnvironmentModel)

envUpdatedAt = env.UpdatedAt
}
}
}
if err := scanner.Err(); err != nil {
c.log.Errorf("Error reading realtime stream: %v", err)
}
}
}
}
func parseUpdatedAtFromSSE(line string) (time.Time, error) {
var eventData struct {
UpdatedAt float64 `json:"updated_at"`
}

data := strings.TrimPrefix(line, "data: ")
err := json.Unmarshal([]byte(data), &eventData)
if err != nil {
return time.Time{}, errors.New("failed to parse event data: " + err.Error())
}

if eventData.UpdatedAt <= 0 {
return time.Time{}, errors.New("invalid 'updated_at' value in event data")
}

// Convert the float timestamp into seconds and nanoseconds
seconds := int64(eventData.UpdatedAt)
nanoseconds := int64((eventData.UpdatedAt - float64(seconds)) * 1e9)

// Return the parsed time
return time.Unix(seconds, nanoseconds), nil
}
Loading