Skip to content

Commit

Permalink
feat(datastore): Added support for snapshot read
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanshumittal committed Dec 1, 2022
1 parent 1fd0a0e commit a57677a
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 28 deletions.
111 changes: 111 additions & 0 deletions google-cloud-datastore/acceptance/datastore/datastore_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@
end
end

it "should be able to run queries with and without read_time set" do
kind_val = "Post_#{SecureRandom.hex(4)}"
post.key = Google::Cloud::Datastore::Key.new kind_val, "post_1_#{SecureRandom.hex(4)}"
post2.key = Google::Cloud::Datastore::Key.new kind_val, "post_2_#{SecureRandom.hex(4)}"
dataset.save post, post2

sleep(1)
read_time = Time.now
sleep(1)

post2["isDraft"] = true
dataset.update post2

query = dataset.query(kind_val).where("isDraft", "=", false)

entities = dataset.run query, read_time: read_time
_(entities.count).must_equal 2
_(entities.batch_read_time.seconds).must_equal read_time.to_i
entities = dataset.run query
_(entities.count).must_equal 1
assert( entities.batch_read_time.seconds > read_time.to_i)

dataset.read_only_transaction(read_time: read_time) do |tx|
entities = tx.run query
_(entities.count).must_equal 2
_(entities.batch_read_time.seconds).must_equal read_time.to_i
end
dataset.read_only_transaction do |tx|
entities = tx.run query
_(entities.count).must_equal 1
assert( entities.batch_read_time.seconds > read_time.to_i)
end

dataset.delete post, post2
end

it "should be able to lookup for entities with and without read_time set" do
post.key = Google::Cloud::Datastore::Key.new "Post", "post_1_#{SecureRandom.hex(4)}"
dataset.save post

sleep(1)
read_time = Time.now
sleep(1)

post2.key = Google::Cloud::Datastore::Key.new "Post", "post_2_#{SecureRandom.hex(4)}"
dataset.save post2

entities = dataset.find_all post.key, post2.key
_(entities.count).must_equal 2
assert( entities.response_read_time.seconds > read_time.to_i)
entities = dataset.find_all post.key, post2.key, read_time: read_time
_(entities.count).must_equal 1
_(entities.response_read_time.seconds).must_equal read_time.to_i

dataset.delete post, post2
end

it "should save/find/delete with a key name" do
post.key = Google::Cloud::Datastore::Key.new "Post", "#{prefix}_post1"
post.exclude_from_indexes! "author", true
Expand Down Expand Up @@ -815,4 +872,58 @@
_(refresh).must_be :nil?
end
end

describe "querying with limit > 300" do
let(:kind_val) { "Post_#{SecureRandom.hex(4)}" }
let(:limit) { 700 }
let(:post) do
Google::Cloud::Datastore::Entity.new.tap do |e|
e["title"] = "How to make the perfect pizza in your grill"
end
end

before :all do
# Add 1000 entities of the same kind to the datastore
1000.times.each do |id|
post_temp = post.dup
post_temp.key = Google::Cloud::Datastore::Key.new kind_val, "Post_#{id+1}"
dataset.save post_temp
end
end

after :all do
# Delete the entities added
1000.times.each do |id|
post_temp = post.dup
post_temp.key = Google::Cloud::Datastore::Key.new kind_val, "Post_#{id+1}"
dataset.delete post_temp
end
end

it "should limit results when limit > 300 in query" do
# Testing limit with query
query = dataset.query(kind_val).limit(limit)
entities_count = 0
results = dataset.run query
loop do
entities_count += results.count
break unless results.next?
results = results.next
end
_(entities_count).must_equal limit
end

it "should limit results when limit > 300 in GQL query" do
# Testing limit with GQL query
query_gql = dataset.gql "SELECT * FROM #{kind_val} LIMIT @limit", {limit: limit}
entities_count = 0
results = dataset.run query_gql
loop do
entities_count += results.count
break unless results.next?
results = results.next
end
_(entities_count).must_equal limit
end
end
end
29 changes: 19 additions & 10 deletions google-cloud-datastore/lib/google/cloud/datastore/dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ def commit
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Entity, nil]
#
Expand All @@ -334,12 +336,12 @@ def commit
#
# task = datastore.find "Task", "sampleTask"
#
def find key_or_kind, id_or_name = nil, consistency: nil
def find key_or_kind, id_or_name = nil, consistency: nil, read_time: nil
key = key_or_kind
unless key.is_a? Google::Cloud::Datastore::Key
key = Key.new key_or_kind, id_or_name
end
find_all(key, consistency: consistency).first
find_all(key, consistency: consistency, read_time: read_time).first
end
alias get find

Expand All @@ -356,6 +358,8 @@ def find key_or_kind, id_or_name = nil, consistency: nil
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Dataset::LookupResults]
#
Expand All @@ -368,12 +372,12 @@ def find key_or_kind, id_or_name = nil, consistency: nil
# task_key2 = datastore.key "Task", "sampleTask2"
# tasks = datastore.find_all task_key1, task_key2
#
def find_all *keys, consistency: nil
def find_all *keys, consistency: nil, read_time: nil
ensure_service!
check_consistency! consistency
lookup_res = service.lookup(*Array(keys).flatten.map(&:to_grpc),
consistency: consistency)
LookupResults.from_grpc lookup_res, service, consistency
consistency: consistency, read_time: read_time)
LookupResults.from_grpc lookup_res, service, consistency, nil, read_time
end
alias lookup find_all

Expand All @@ -390,6 +394,8 @@ def find_all *keys, consistency: nil
# [Eventual Consistency in Google Cloud
# Datastore](https://cloud.google.com/datastore/docs/articles/balancing-strong-and-eventual-consistency-with-google-cloud-datastore/#h.tf76fya5nqk8)
# for more information.
# @param [Time] read_time Reads entities as they were at the given time.
# This may not be older than 270 seconds. Optional
#
# @return [Google::Cloud::Datastore::Dataset::QueryResults]
#
Expand Down Expand Up @@ -440,16 +446,16 @@ def find_all *keys, consistency: nil
# done: false
# tasks = datastore.run gql_query, namespace: "example-ns"
#
def run query, namespace: nil, consistency: nil
def run query, namespace: nil, consistency: nil, read_time: nil
ensure_service!
unless query.is_a?(Query) || query.is_a?(GqlQuery)
raise ArgumentError, "Cannot run a #{query.class} object."
end
check_consistency! consistency
query_res = service.run_query query.to_grpc, namespace,
consistency: consistency
consistency: consistency, read_time: read_time
QueryResults.from_grpc query_res, service, namespace,
query.to_grpc.dup
query.to_grpc.dup, read_time
end
alias run_query run

Expand Down Expand Up @@ -574,6 +580,9 @@ def transaction deadline: nil, previous_transaction: nil
# @see https://cloud.google.com/datastore/docs/concepts/transactions
# Transactions
#
# @param [Time] read_time Reads entities at the given time.
# This may not be older than 60 seconds. Optional
#
# @yield [tx] a block yielding a new transaction
# @yieldparam [ReadOnlyTransaction] tx the transaction object
#
Expand All @@ -595,8 +604,8 @@ def transaction deadline: nil, previous_transaction: nil
# end
# end
#
def read_only_transaction
tx = ReadOnlyTransaction.new service
def read_only_transaction read_time: nil
tx = ReadOnlyTransaction.new service, read_time: read_time
return tx unless block_given?

begin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class Dataset
# descriptions.missing #=> raise NoMethodError
#
class LookupResults < DelegateClass(::Array)
##
# The time at which these entities were read or found missing.
attr_reader :response_read_time

##
# Time at which the entities are being read. This would not be
# older than 270 seconds.
attr_reader :read_time

##
# Keys that were not looked up due to resource constraints.
attr_accessor :deferred
Expand Down Expand Up @@ -110,9 +119,9 @@ def next
ensure_service!
lookup_res = @service.lookup(
*Array(@deferred).flatten.map(&:to_grpc),
consistency: @consistency, transaction: @transaction
consistency: @consistency, transaction: @transaction, read_time: @read_time
)
self.class.from_grpc lookup_res, @service, @consistency
self.class.from_grpc lookup_res, @service, @consistency, nil, @read_time
end

##
Expand Down Expand Up @@ -187,14 +196,16 @@ def all request_limit: nil, &block
##
# @private New Dataset::LookupResults from a
# Google::Dataset::V1::LookupResponse object.
def self.from_grpc lookup_res, service, consistency = nil, transaction = nil
def self.from_grpc lookup_res, service, consistency = nil, transaction = nil, read_time = nil
entities = to_gcloud_entities lookup_res.found
deferred = to_gcloud_keys lookup_res.deferred
missing = to_gcloud_entities lookup_res.missing
new(entities).tap do |lr|
lr.instance_variable_set :@service, service
lr.instance_variable_set :@consistency, consistency
lr.instance_variable_set :@transaction, transaction
lr.instance_variable_set :@read_time, read_time
lr.instance_variable_set :@response_read_time, lookup_res.read_time
lr.instance_variable_set :@deferred, deferred
lr.instance_variable_set :@missing, missing
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ class QueryResults < DelegateClass(::Array)
# * `:NO_MORE_RESULTS`
attr_reader :more_results

##
# Read timestamp this batch was returned from.
# This applies to the range of results from the query's `start_cursor` (or
# the beginning of the query if no cursor was given) to this batch's
# `end_cursor` (not the query's `end_cursor`).
#
# In a single transaction, subsequent query result batches for the same query
# can have a greater timestamp. Each batch's read timestamp
# is valid for all preceding batches.
# This value will not be set for eventually consistent queries in Cloud
# Datastore.
attr_reader :batch_read_time

##
# Time at which the entities are being read. This would not be
# older than 270 seconds.
attr_reader :read_time

##
# @private
attr_accessor :service, :namespace, :cursors, :query
Expand Down Expand Up @@ -158,8 +176,12 @@ def next
ensure_service!
query.start_cursor = cursor.to_grpc # should always be a Cursor...
query.offset = 0 # Never carry an offset across batches
query_res = service.run_query query, namespace
self.class.from_grpc query_res, service, namespace, query
unless query.limit.nil?
# Reduce the limit by the number of entities returned in the current batch
query.limit.value -= count
end
query_res = service.run_query query, namespace, read_time: read_time
self.class.from_grpc query_res, service, namespace, query, read_time
end

##
Expand Down Expand Up @@ -356,7 +378,7 @@ def all_with_cursor request_limit: nil, &block
##
# @private New Dataset::QueryResults from a
# Google::Dataset::V1::RunQueryResponse object.
def self.from_grpc query_res, service, namespace, query
def self.from_grpc query_res, service, namespace, query, read_time = nil
r, c = Array(query_res.batch.entity_results).map do |result|
[Entity.from_grpc(result.entity), Cursor.from_grpc(result.cursor)]
end.transpose
Expand All @@ -369,6 +391,8 @@ def self.from_grpc query_res, service, namespace, query
qr.service = service
qr.namespace = namespace
qr.query = query_res.query || query
qr.instance_variable_set :@read_time, read_time
qr.instance_variable_set :@batch_read_time, query_res.batch.read_time
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,22 @@ class ReadOnlyTransaction
# @private The Service object.
attr_accessor :service

##
# Reads entities at the given time.
# This may not be older than 60 seconds.
attr_reader :read_time

##
# @private Creates a new ReadOnlyTransaction instance.
# Takes a Service instead of project and Credentials.
#
def initialize service
# @param [Time] read_time Reads documents as they were at the given time.
# This may not be older than 270 seconds. Optional
#
def initialize service, read_time: nil
@service = service
reset!
@read_time = read_time
start
end

Expand Down Expand Up @@ -163,9 +172,8 @@ def run query, namespace: nil
#
def start
raise TransactionError, "Transaction already opened." unless @id.nil?

ensure_service!
tx_res = service.begin_transaction read_only: true
tx_res = service.begin_transaction read_only: true, read_time: @read_time
@id = tx_res.transaction
end
alias begin_transaction start
Expand Down
Loading

0 comments on commit a57677a

Please sign in to comment.