Skip to content

Commit

Permalink
feat(datastore): Added support for snapshot read (#19422)
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanshumittal authored Feb 10, 2023
1 parent 46689fa commit 5374b69
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@
_(res.get).must_equal 1
end

it "returns count on filter with and without read time" do

sleep(1)
read_time = Time.now
sleep(1)

arya["alive"] = false
dataset.transaction { |tx| tx.save arya }

query = Google::Cloud::Datastore.new
.query("Character")
.ancestor(book)
.where("alive", "=", false)
aggregate_query = query.aggregate_query
.add_count

res = dataset.run_aggregation aggregate_query, read_time: read_time
_(res.get).must_equal 1
res = dataset.run_aggregation aggregate_query
_(res.get).must_equal 2
end

it "returns count on limit" do
query = Google::Cloud::Datastore.new
.query("Character")
Expand Down
57 changes: 57 additions & 0 deletions google-cloud-datastore/acceptance/datastore/datastore_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@
end
end

it "should be able to run queries with and without read_time set" do
kind_val = "Post_#{SecureRandom.hex(4)}"
post.key = Google::Cloud::Datastore::Key.new kind_val, "post_1_#{SecureRandom.hex(4)}"
post2.key = Google::Cloud::Datastore::Key.new kind_val, "post_2_#{SecureRandom.hex(4)}"
dataset.save post, post2

sleep(1)
read_time = Time.now
sleep(1)

post2["isDraft"] = true
dataset.update post2

query = dataset.query(kind_val).where("isDraft", "=", false)

entities = dataset.run query, read_time: read_time
_(entities.count).must_equal 2
_(entities.batch_read_time.seconds).must_equal read_time.to_i
entities = dataset.run query
_(entities.count).must_equal 1
assert( entities.batch_read_time.seconds > read_time.to_i)

dataset.read_only_transaction(read_time: read_time) do |tx|
entities = tx.run query
_(entities.count).must_equal 2
_(entities.batch_read_time.seconds).must_equal read_time.to_i
end
dataset.read_only_transaction do |tx|
entities = tx.run query
_(entities.count).must_equal 1
assert( entities.batch_read_time.seconds > read_time.to_i)
end

dataset.delete post, post2
end

it "should be able to lookup for entities with and without read_time set" do
post.key = Google::Cloud::Datastore::Key.new "Post", "post_1_#{SecureRandom.hex(4)}"
dataset.save post

sleep(1)
read_time = Time.now
sleep(1)

post2.key = Google::Cloud::Datastore::Key.new "Post", "post_2_#{SecureRandom.hex(4)}"
dataset.save post2

entities = dataset.find_all post.key, post2.key
_(entities.count).must_equal 2
assert( entities.response_read_time.seconds > read_time.to_i)
entities = dataset.find_all post.key, post2.key, read_time: read_time
_(entities.count).must_equal 1
_(entities.response_read_time.seconds).must_equal read_time.to_i

dataset.delete post, post2
end

it "should save/find/delete with a key name" do
post.key = Google::Cloud::Datastore::Key.new "Post", "#{prefix}_post1"
post.exclude_from_indexes! "author", true
Expand Down
9 changes: 9 additions & 0 deletions google-cloud-datastore/acceptance/datastore_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ def try_with_backoff msg = nil, limit: 10
end
end

Minitest.after_run do
unless $dataset_2
puts "The multiple database tests were not run. These tests require a secondary " \
"database which is not configured. To enable, ensure that the following " \
"is present in the environment: \n" \
"DATASTORE_MULTI_DB_DATABASE"
end
end

Minitest.after_run do
unless $dataset_2
puts "The multiple database tests were not run. These tests require a secondary " \
Expand Down
35 changes: 23 additions & 12 deletions google-cloud-datastore/lib/google/cloud/datastore/dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ def commit
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Entity, nil]
#
Expand All @@ -355,12 +357,12 @@ def commit
#
# task = datastore.find "Task", "sampleTask"
#
def find key_or_kind, id_or_name = nil, consistency: nil
def find key_or_kind, id_or_name = nil, consistency: nil, read_time: nil
key = key_or_kind
unless key.is_a? Google::Cloud::Datastore::Key
key = Key.new key_or_kind, id_or_name
end
find_all(key, consistency: consistency).first
find_all(key, consistency: consistency, read_time: read_time).first
end
alias get find

Expand All @@ -377,6 +379,8 @@ def find key_or_kind, id_or_name = nil, consistency: nil
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Dataset::LookupResults]
#
Expand All @@ -389,12 +393,12 @@ def find key_or_kind, id_or_name = nil, consistency: nil
# task_key2 = datastore.key "Task", "sampleTask2"
# tasks = datastore.find_all task_key1, task_key2
#
def find_all *keys, consistency: nil
def find_all *keys, consistency: nil, read_time: nil
ensure_service!
check_consistency! consistency
lookup_res = service.lookup(*Array(keys).flatten.map(&:to_grpc),
consistency: consistency)
LookupResults.from_grpc lookup_res, service, consistency
consistency: consistency, read_time: read_time)
LookupResults.from_grpc lookup_res, service, consistency, nil, read_time
end
alias lookup find_all

Expand All @@ -411,6 +415,8 @@ def find_all *keys, consistency: nil
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Dataset::QueryResults]
#
Expand Down Expand Up @@ -461,16 +467,16 @@ def find_all *keys, consistency: nil
# done: false
# tasks = datastore.run gql_query, namespace: "example-ns"
#
def run query, namespace: nil, consistency: nil
def run query, namespace: nil, consistency: nil, read_time: nil
ensure_service!
unless query.is_a?(Query) || query.is_a?(GqlQuery)
raise ArgumentError, "Cannot run a #{query.class} object."
end
check_consistency! consistency
query_res = service.run_query query.to_grpc, namespace,
consistency: consistency
consistency: consistency, read_time: read_time
QueryResults.from_grpc query_res, service, namespace,
query.to_grpc.dup
query.to_grpc.dup, read_time
end
alias run_query run

Expand All @@ -482,6 +488,8 @@ def run query, namespace: nil, consistency: nil
# @param [Symbol] consistency The non-transactional read consistency to
# use. Cannot be set to `:strong` for global queries. Accepted values
# are `:eventual` and `:strong`.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# The default consistency depends on the type of query used. See
# [Eventual Consistency in Google Cloud
Expand Down Expand Up @@ -545,14 +553,14 @@ def run query, namespace: nil, consistency: nil
# done: false
# res = datastore.run_aggregation gql_query, namespace: "example-ns"
#
def run_aggregation aggregate_query, namespace: nil, consistency: nil
def run_aggregation aggregate_query, namespace: nil, consistency: nil, read_time: nil
ensure_service!
unless aggregate_query.is_a?(AggregateQuery) || aggregate_query.is_a?(GqlQuery)
raise ArgumentError, "Cannot run a #{aggregate_query.class} object."
end
check_consistency! consistency
aggregate_query_res = service.run_aggregation_query aggregate_query.to_grpc, namespace,
consistency: consistency
consistency: consistency, read_time: read_time
AggregateQueryResults.from_grpc aggregate_query_res
end

Expand Down Expand Up @@ -677,6 +685,9 @@ def transaction deadline: nil, previous_transaction: nil
# @see https://cloud.google.com/datastore/docs/concepts/transactions
# Transactions
#
# @param [Time] read_time Reads entities at the given time.
# This may not be older than 60 seconds. Optional
#
# @yield [tx] a block yielding a new transaction
# @yieldparam [ReadOnlyTransaction] tx the transaction object
#
Expand All @@ -698,8 +709,8 @@ def transaction deadline: nil, previous_transaction: nil
# end
# end
#
def read_only_transaction
tx = ReadOnlyTransaction.new service
def read_only_transaction read_time: nil
tx = ReadOnlyTransaction.new service, read_time: read_time
return tx unless block_given?

begin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class Dataset
# descriptions.missing #=> raise NoMethodError
#
class LookupResults < DelegateClass(::Array)
##
# The time at which these entities were read or found missing.
attr_reader :response_read_time

##
# Time at which the entities are being read. This would not be
# older than 270 seconds.
attr_reader :read_time

##
# Keys that were not looked up due to resource constraints.
attr_accessor :deferred
Expand Down Expand Up @@ -110,9 +119,9 @@ def next
ensure_service!
lookup_res = @service.lookup(
*Array(@deferred).flatten.map(&:to_grpc),
consistency: @consistency, transaction: @transaction
consistency: @consistency, transaction: @transaction, read_time: @read_time
)
self.class.from_grpc lookup_res, @service, @consistency
self.class.from_grpc lookup_res, @service, @consistency, nil, @read_time
end

##
Expand Down Expand Up @@ -187,14 +196,16 @@ def all request_limit: nil, &block
##
# @private New Dataset::LookupResults from a
# Google::Dataset::V1::LookupResponse object.
def self.from_grpc lookup_res, service, consistency = nil, transaction = nil
def self.from_grpc lookup_res, service, consistency = nil, transaction = nil, read_time = nil
entities = to_gcloud_entities lookup_res.found
deferred = to_gcloud_keys lookup_res.deferred
missing = to_gcloud_entities lookup_res.missing
new(entities).tap do |lr|
lr.instance_variable_set :@service, service
lr.instance_variable_set :@consistency, consistency
lr.instance_variable_set :@transaction, transaction
lr.instance_variable_set :@read_time, read_time
lr.instance_variable_set :@response_read_time, lookup_res.read_time
lr.instance_variable_set :@deferred, deferred
lr.instance_variable_set :@missing, missing
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ class QueryResults < DelegateClass(::Array)
# * `:NO_MORE_RESULTS`
attr_reader :more_results

##
# Read timestamp this batch was returned from.
# This applies to the range of results from the query's `start_cursor` (or
# the beginning of the query if no cursor was given) to this batch's
# `end_cursor` (not the query's `end_cursor`).
#
# In a single transaction, subsequent query result batches for the same query
# can have a greater timestamp. Each batch's read timestamp
# is valid for all preceding batches.
# This value will not be set for eventually consistent queries in Cloud
# Datastore.
attr_reader :batch_read_time

##
# Time at which the entities are being read. This would not be
# older than 270 seconds.
attr_reader :read_time

##
# @private
attr_accessor :service, :namespace, :cursors, :query
Expand Down Expand Up @@ -162,8 +180,8 @@ def next
# Reduce the limit by the number of entities returned in the current batch
query.limit.value -= count
end
query_res = service.run_query query, namespace
self.class.from_grpc query_res, service, namespace, query
query_res = service.run_query query, namespace, read_time: read_time
self.class.from_grpc query_res, service, namespace, query, read_time
end

##
Expand Down Expand Up @@ -360,7 +378,7 @@ def all_with_cursor request_limit: nil, &block
##
# @private New Dataset::QueryResults from a
# Google::Dataset::V1::RunQueryResponse object.
def self.from_grpc query_res, service, namespace, query
def self.from_grpc query_res, service, namespace, query, read_time = nil
r, c = Array(query_res.batch.entity_results).map do |result|
[Entity.from_grpc(result.entity), Cursor.from_grpc(result.cursor)]
end.transpose
Expand All @@ -373,6 +391,8 @@ def self.from_grpc query_res, service, namespace, query
qr.service = service
qr.namespace = namespace
qr.query = query_res.query || query
qr.instance_variable_set :@read_time, read_time
qr.instance_variable_set :@batch_read_time, query_res.batch.read_time
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,22 @@ class ReadOnlyTransaction
# @private The Service object.
attr_accessor :service

##
# Reads entities at the given time.
# This may not be older than 60 seconds.
attr_reader :read_time

##
# @private Creates a new ReadOnlyTransaction instance.
# Takes a Service instead of project and Credentials.
#
def initialize service
# @param [Time] read_time Reads documents as they were at the given time.
# This may not be older than 270 seconds. Optional
#
def initialize service, read_time: nil
@service = service
reset!
@read_time = read_time
start
end

Expand Down Expand Up @@ -194,9 +203,8 @@ def run_aggregation aggregate_query, namespace: nil
#
def start
raise TransactionError, "Transaction already opened." unless @id.nil?

ensure_service!
tx_res = service.begin_transaction read_only: true
tx_res = service.begin_transaction read_only: true, read_time: @read_time
@id = tx_res.transaction
end
alias begin_transaction start
Expand Down
Loading

0 comments on commit 5374b69

Please sign in to comment.