-
Notifications
You must be signed in to change notification settings - Fork 118
Go: Implement Transaction #3518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Go: Implement Transaction #3518
Conversation
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
shohamazon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glide 2.0 will introduce support for batch execution (pipelining) alongside transactions.
-
Transactions allow executing a batch of commands atomically in a single request—ensuring that no other command interrupts the execution. See Valkey Transactions
-
Pipelining also executes a batch of commands in a single request but is not atomic. In cluster mode, pipelining allows commands with keys from different slots, unlike transactions. See Valkey Pipelines
Impact on Glide
This change requires significant updates in both the core implementation and language wrappers.
Core Changes
-
Support for handling pipeline requests is in progress and is expected to be merged soon.
-
Due to the complexity, the work was split across multiple PRs and merged into a side branch. ( Will be merged into main very soon!)
Wrapper Changes
-
The existing Transaction API will be deprecated.
-
A new Batch API will be introduced, featuring an
isAtomicflag to indicate whether the batch is a transaction or a pipeline.
This means renaming:
-
BaseTransaction→BaseBatch -
Transaction→Batch -
ClusterTransaction→ClusterBatch -
These changes require significant updates across all wrappers.
Go Wrapper Considerations
Since the Go wrapper is not fully released and does not yet support transactions, adding transactions now only to replace them with batches before release doesn't make sense.
Instead, we should implement batch support first to minimize future breaking changes and avoid introducing deprecations in Go before release.
Next Steps
This is a major change across all wrappers, and we want to ensure Glide remains the best client available. I know it will take time, but your work is greatly appreciated!
Let’s start with batch support (instead of transactions) to ensure a smoother transition.
Thank you very much! 🙂
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
ffi/src/lib.rs
Outdated
| } | ||
|
|
||
| #[no_mangle] | ||
| pub unsafe extern "C" fn execute_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please notice #3521 as it add changes in socket_listeners.rs :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also in protobuf files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please note that with that you will probably need to add support for Value::ServeError return type :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed with @shohamazon Retry_server_error and Retry_connection_error are not ready at the moment mod.rs. This should be part of subsequent releases.
Retry_server_error - not ready for transaction
Retry_connection_error - not ready for transaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shohamazon I have added the Value::ServerError , now it returns the array with error and it continues with command execution for other remaining commands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add raise on error + a test where i can see the error being returned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you added one, please tag me there, as I am only reviewing the ffi and not the go part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the discussion, pls?
We are going to introduce new error handling in the following release?
ffi/src/lib.rs
Outdated
| route_bytes_len: usize, | ||
| ) -> *mut CommandResult { | ||
| if transaction.is_null() { | ||
| panic!("Transaction pointer is NULL"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to add all of this panics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use failure callback to report failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we used .expect() instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except behaves like panics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shohamazon , We can see expect is being used widely in lib.rs so we are following the same approach. In future, if there is support for failure callback then this can be also updated at the same time.
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: Niharika Bhavaraju <nbhavaraju@google.com>
Signed-off-by: Niharika Bhavaraju <nbhavaraju@google.com>
Signed-off-by: Niharika Bhavaraju <nbhavaraju@google.com>
Hi @shohamazon , |
shohamazon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont really understand why we have so many duplications of tests for cluster mode and standalone, is it a go thing? cant we use the same test for both standalone and cluster?
ffi/src/lib.rs
Outdated
| } | ||
|
|
||
| Value::ServerError(server_error) => { | ||
| let code = server_error.err_code(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the cores error_message func like in other lib.rs's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have updated it into error_message link and adjusted the integration test and example file.
ffi/src/lib.rs
Outdated
| /// `raise_on_error` When set to false, errors will be included as part of the batch response, allowing the caller to process both successful and failed commands together. In this case, error details | ||
| /// When set to true, the first encountered error in the batch will be raised as an exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case what? the line is cut in the middle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already fixed, added the missing lines.
| let transaction = unsafe { transaction.as_ref() }.expect("Transaction pointer is NULL"); | ||
|
|
||
| let _commands = | ||
| unsafe { transaction.commands.as_ref() }.expect("Transaction commands pointer is NULL"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avifenesh @jonathanl-bq , what do you think?
| let mut cmd = cmder | ||
| .request_type | ||
| .get_command() | ||
| .expect("Invalid command type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see for example an expect without unsafe, so your claim is still not enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please help navigate to any example where expect() is already taken care by other ways, it would be good as discussed yesterday.
Or else currently we can keep expect() and once it gets updated across all[ lib.rs](https://github.com/EdricCua/valkey-glide/blob/Go-Implement-Transaction-BaseLine/ffi/src/lib.rs, it can be refactored everywhere. @shohamazon
| } | ||
|
|
||
| _, err := tx.ExecWithOptions(options) | ||
| assert.Error(suite.T(), err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check the error type please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep we are checking the error type here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I mean is, checking the instance of the error, to be instance of
valkey-glide/go/api/errors/errors.go
Line 31 in 80d17cd
| type TimeoutError struct { |
| assert.Contains(suite.T(), resultString, "OK") | ||
| } | ||
|
|
||
| func (suite *GlideTestSuite) TestExecWithOptions_Cluster_RaiseOnErrorFalse() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment I had for raise on error true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already adjusted, same with the raise on error true
| options = &api.TransactionOption{ | ||
| Timeout: 1000, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a sleep before the next execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Go Sleep before execution
| options = &api.TransactionOption{ | ||
| Timeout: 1000, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a sleep before the next execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Go Sleep before execution
| assert.Equal(suite.T(), &errors.RequestError{Msg: "ERR - no such key"}, results[3]) | ||
| } | ||
|
|
||
| func (suite *GlideTestSuite) TestExecWithOptions_RaiseOnErrorTrue() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments for raise on error as bellow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already adjusted, same with the raise on error
| assert.Contains(suite.T(), err.Error(), "WRONGTYPE") | ||
| } | ||
|
|
||
| func (suite *GlideTestSuite) TestExecWithOptions_RaiseOnErrorFalse() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments for raise on error as bellow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already adjusted, same with the raise on error
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
Signed-off-by: EdricCua <ecuartero@google.com>
shohamazon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, small comments, waiting for the teams review 🙂
|
|
||
| /// CGO method which allows the Go client to executes all queued commands as a transaction | ||
| /// | ||
| /// `client_adapter_ptr` is a pointer to a valid `GlideClusterClient` returned in the `ConnectionResponse` from [`create_client`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: GlideClient or GlideClusterClient
| } | ||
|
|
||
| _, err := tx.ExecWithOptions(options) | ||
| assert.Error(suite.T(), err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what I mean is, checking the instance of the error, to be instance of
valkey-glide/go/api/errors/errors.go
Line 31 in 80d17cd
| type TimeoutError struct { |
| simpleRoute, | ||
| ) | ||
|
|
||
| options := &api.TransactionOption{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I am taking about the exec function receiving the options, so cluster mode options and standalone mode options should be different
see here https://github.com/valkey-io/valkey-glide/tree/main/java/client/src/main/java/glide/api/models/commands/batch
| assert.Equal(suite.T(), int64(1), results[2]) | ||
| assert.Equal( | ||
| suite.T(), | ||
| &errors.RequestError{Msg: "An error was signalled by the server: - ResponseError: no such key"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!!
| _, err := tx.ExecWithOptions(options) | ||
|
|
||
| assert.Error(suite.T(), err) | ||
| assert.Contains(suite.T(), err.Error(), "WRONGTYPE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check error type
| } | ||
|
|
||
| // TransactionOptions contains configurable options for transaction execution | ||
| type TransactionOption struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so like here : https://github.com/valkey-io/valkey-glide/tree/main/java/client/src/main/java/glide/api/models/commands/batch
I want us to have different options types for cluster mode and standalone mode
like this:
BaseOptions for options like raise on error and timeout (both standalone and cluster)
ClusterOptions for options that are just suitable for cluster, like route and retry strategy (in the future ofc)
and StandaloneOptions for options that are just for standalone, I know currently we dont have some, but we might have in the future, so we better have different types to avoid breaking changes in the future
the ClusterOptions and StandaloneOptions should inherit from BaseOptions
jbrinkman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should hold this PR until we have final agreement on the use of context.Context in the Go client implementation. Hope to get agreement between Improving/Amazon/Google teams in next day or two.
|
|
||
| type Transaction struct { | ||
| *baseClient // Embed baseClient to inherit all methods like Get | ||
| *GlideClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct doesn't make sense. Both GlideClient and GlideClusterClient already embed *baseClient. As @marcind points out below, transactions are tied to the client type so a given transaction should only have access to either a GlideClient or a GlideClusterClient, but not both. The baseClient is redundant.
| func getExampleTransactionGlideClient() *Transaction { | ||
| standaloneTransactionOnce.Do(func() { | ||
| initFlags() | ||
| addresses := parseHosts(*standaloneNode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you reparsing the standaloneNodes again? This is already done in the initFlags. It looks like maybe this was based on the original implementation of the sync.Once pattern which has since been refactored.
| if errFlush != nil { | ||
| fmt.Println("error flushing database: ", err) | ||
| } | ||
| clientTx := NewTransaction(client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it is using the original pattern of wrapping the entire client creation logic in the sync.Once singleton pattern. What I found with PubSub is that this was overly aggressive since it meant that every example was using the exact same client configuration, and the client was not getting flushed between examples. You should remove the sync.Once implementation here, and just rely on the sync.Once in initFlags() to minimize reparsing flags (flag.Parse() can't be called multiple times). I'll be creating a separate PR to remove the other sync.Once object usage outside of initFlags
| } | ||
|
|
||
| // Exec executes all queued commands as a transaction | ||
| func (t *Transaction) Exec() ([]any, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| cmd := clientTx.GlideClient | ||
| clientTx.Discard() | ||
| cmd.Set("key123", "Glide") | ||
| cmd.Watch([]string{"key123", "key345"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this example shows the watch command being used, it is not at all clear how the behavior in this example would change if the cmd.Watch was not included. I would like to see a case where we are specifically showing a case where Watch was used and the value of key123 mutated during the transaction, or where we were specifically dealing with this error condition. For example, in this Go-Redis code from the Redis docs, you can see that the example handles the case where a key is mutated.
// Set initial value of `shellpath`.
rdb.Set(ctx, "shellpath", "/usr/syscmds/", 0)
const maxRetries = 1000
// Retry if the key has been changed.
for i := 0; i < maxRetries; i++ {
err := rdb.Watch(ctx,
func(tx *redis.Tx) error {
currentPath, err := rdb.Get(ctx, "shellpath").Result()
newPath := currentPath + ":/usr/mycmds/"
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "shellpath", newPath, 0)
return nil
})
return err
},
"shellpath",
)
if err == nil {
// Success.
break
} else if err == redis.TxFailedErr {
// Optimistic lock lost. Retry the transaction.
continue
} else {
// Panic for any other error.
panic(err)
}
}
fmt.Println(rdb.Get(ctx, "shellpath").Val())
// >>> /usr/syscmds/:/usr/mycmds/
```There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bonus points for an example that shows how to handle broken TCP pipes, CLUSTERDOWN, MASTERDOWN, and other transient errors.
avifenesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing my block
shohamazon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing my block 🍒
|
Superseded by #3938 |
Issue link
This Pull Request is linked to issue (URL): #3484
Checklist
Before submitting the PR make sure the following are checked: