State Stores provide a common way to interact with different data store implementations, and allow users to opt-in to advanced capabilities using defined metadata.
Currently supported state stores are:
- Aerospike
- Alibaba Cloud Tablestore
- AWS DynamoDB
- Azure Blob Storage
- Azure CosmosDB
- Azure Table Storage
- Cassandra
- Cloud Firestore (Datastore mode)
- Couchbase
- Etcd
- HashiCorp Consul
- Hazelcast
- Memcached
- MongoDB
- MySQL
- PostgreSQL
- Redis
- RethinkDB
- SQL Server
- Zookeeper
A compliant state store needs to implement one or more interfaces: Store
and TransactionalStore
.
The interface for Store:
type Store interface {
Init(metadata Metadata) error
Delete(req *DeleteRequest) error
BulkDelete(req []DeleteRequest) error
Get(req *GetRequest) (*GetResponse, error)
Set(req *SetRequest) error
BulkSet(req []SetRequest) error
}
The interface for TransactionalStore:
type TransactionalStore interface {
Init(metadata Metadata) error
Multi(reqs []TransactionalRequest) error
}
See the documentation site for examples.
State Store has an optional API for querying the state.
Please refer to the documentation site for API description and definition.
// Querier is an interface to execute queries.
type Querier interface {
Query(req *QueryRequest) (*QueryResponse, error)
}
Below are the definitions of structures (including nested) for QueryRequest
and QueryResponse
.
// QueryResponse is the request object for querying the state.
type QueryRequest struct {
Query query.Query `json:"query"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type Query struct {
Filters map[string]interface{} `json:"filter"`
Sort []Sorting `json:"sort"`
Page Pagination `json:"page"`
// derived from Filters
Filter Filter
}
type Sorting struct {
Key string `json:"key"`
Order string `json:"order,omitempty"`
}
type Pagination struct {
Limit int `json:"limit"`
Token string `json:"token,omitempty"`
}
// QueryResponse is the response object on querying state.
type QueryResponse struct {
Results []QueryItem `json:"results"`
Token string `json:"token,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// QueryItem is an object representing a single entry in query results.
type QueryItem struct {
Key string `json:"key"`
Data []byte `json:"data"`
ETag *string `json:"etag,omitempty"`
Error string `json:"error,omitempty"`
}
Upon receiving the query request, Dapr validates it and transforms into object Query
,
which, in turn, is passed on to the state store component.
The Query
object has a member Filter
that implements parsing interface per component as described below.
type Filter interface {
Parse(interface{}) error
}
type FilterEQ struct {
Key string
Val interface{}
}
type FilterIN struct {
Key string
Vals []interface{}
}
type FilterAND struct {
Filters []Filter
}
type FilterOR struct {
Filters []Filter
}
To simplify the process of query translation, we leveraged visitor design pattern. A state store component developer would need to implement the visit
method, and the runtime will use it to construct the native query statement.
type Visitor interface {
// returns "equal" expression
VisitEQ(*FilterEQ) (string, error)
// returns "in" expression
VisitIN(*FilterIN) (string, error)
// returns "and" expression
VisitAND(*FilterAND) (string, error)
// returns "or" expression
VisitOR(*FilterOR) (string, error)
// receives concatenated filters and finalizes the native query
Finalize(string, *MidQuery) error
}
The Dapr runtime implements QueryBuilder
object that takes in Visitor
interface and constructs the native query.
type QueryBuilder struct {
visitor Visitor
}
func (h *QueryBuilder) BuildQuery(mq *MidQuery) error {...}
The last part is to implement Querier
interface in the component:
type Querier interface {
Query(req *QueryRequest) (*QueryResponse, error)
}
A sample implementation might look like that:
func (m *MyComponent) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
query := &Query{} // Query implements Visitor interface
qbuilder := state.NewQueryBuilder(query)
if err := qbuilder.BuildQuery(&req.Query); err != nil {
return &state.QueryResponse{}, err
}
data, token, err := query.execute(ctx)
if err != nil {
return &state.QueryResponse{}, err
}
return &state.QueryResponse{
Results: data,
Token: token,
}, nil
}
Some of the examples of State Query API implementation are MongoDB and CosmosDB state store components.