Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support calls to write_subscription within resolve #5142

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
results = []
queries.each_with_index do |query, idx|
if query.subscription? && !query.subscription_update?
query.context.namespace(:subscriptions)[:events] = []
subs_namespace = query.context.namespace(:subscriptions)
subs_namespace[:events] = []
subs_namespace[:subscriptions] = {}
end
multiplex.dataloader.append_job {
operation = query.selected_operation
Expand Down
54 changes: 50 additions & 4 deletions lib/graphql/schema/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ class Subscription < GraphQL::Schema::Resolver
# propagate null.
null false

# @api private
def initialize(object:, context:, field:)
super
# Figure out whether this is an update or an initial subscription
@mode = context.query.subscription_update? ? :update : :subscribe
@subscription_written = false
@original_arguments = nil
if (subs_ns = context.namespace(:subscriptions)) &&
(sub_insts = subs_ns[:subscriptions])
sub_insts[context.current_path] = self
end
end

# @api private
def resolve_with_support(**args)
@original_arguments = args # before `loads:` have been run
result = nil
unsubscribed = true
unsubscribed_result = catch :graphql_subscription_unsubscribed do
Expand All @@ -46,14 +55,17 @@ def resolve_with_support(**args)
end
end

# Implement the {Resolve} API
# Implement the {Resolve} API.
# You can implement this if you want code to run for _both_ the initial subscription
# and for later updates. Or, implement {#subscribe} and {#update}
def resolve(**args)
# Dispatch based on `@mode`, which will raise a `NoMethodError` if we ever
# have an unexpected `@mode`
public_send("resolve_#{@mode}", **args)
end

# Wrap the user-defined `#subscribe` hook
# @api private
def resolve_subscribe(**args)
ret_val = !args.empty? ? subscribe(**args) : subscribe
if ret_val == :no_response
Expand All @@ -71,6 +83,7 @@ def subscribe(args = {})
end

# Wrap the user-provided `#update` hook
# @api private
def resolve_update(**args)
ret_val = !args.empty? ? update(**args) : update
if ret_val == NO_UPDATE
Expand Down Expand Up @@ -106,14 +119,13 @@ def unsubscribe(update_value = nil)
throw :graphql_subscription_unsubscribed, update_value
end

READING_SCOPE = ::Object.new
# Call this method to provide a new subscription_scope; OR
# call it without an argument to get the subscription_scope
# @param new_scope [Symbol]
# @param optional [Boolean] If true, then don't require `scope:` to be provided to updates to this subscription.
# @return [Symbol]
def self.subscription_scope(new_scope = READING_SCOPE, optional: false)
if new_scope != READING_SCOPE
def self.subscription_scope(new_scope = NOT_CONFIGURED, optional: false)
if new_scope != NOT_CONFIGURED
@subscription_scope = new_scope
@subscription_scope_optional = optional
elsif defined?(@subscription_scope)
Expand Down Expand Up @@ -150,6 +162,40 @@ def self.subscription_scope_optional?
def self.topic_for(arguments:, field:, scope:)
Subscriptions::Serialize.dump_recursive([scope, field.graphql_name, arguments])
end

# Calls through to `schema.subscriptions` to register this subscription with the backend.
# This is automatically called by GraphQL-Ruby after a query finishes successfully,
# but if you need to commit the subscription during `#subscribe`, you can call it there.
# (This method also sets a flag showing that this subscription was already written.)
#
# If you call this method yourself, you may also need to {#unsubscribe}
# or call `subscriptions.delete_subscription` to clean up the database if the query crashes with an error
# later in execution.
# @return [void]
def write_subscription
if subscription_written?
raise GraphQL::Error, "`write_subscription` was called but `#{self.class}#subscription_written?` is already true. Remove a call to `write subscription`."
else
@subscription_written = true
context.schema.subscriptions.write_subscription(context.query, [event])
end
nil
end

# @return [Boolean] `true` if {#write_subscription} was called already
def subscription_written?
@subscription_written
end

# @return [Subscriptions::Event] This object is used as a representation of this subscription for the backend
def event
@event ||= Subscriptions::Event.new(
name: field.name,
arguments: @original_arguments,
context: context,
field: field,
)
end
end
end
end
22 changes: 12 additions & 10 deletions lib/graphql/subscriptions/default_subscription_resolve_extension.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@ def resolve(context:, object:, arguments:)
def after_resolve(value:, context:, object:, arguments:, **rest)
if value.is_a?(GraphQL::ExecutionError)
value
elsif @field.resolver&.method_defined?(:subscription_written?) &&
(subscription_namespace = context.namespace(:subscriptions)) &&
(subscriptions_by_path = subscription_namespace[:subscriptions])
(subscription_instance = subscriptions_by_path[context.current_path])
# If it was already written, don't append this event to be written later
if !subscription_instance.subscription_written?
events = context.namespace(:subscriptions)[:events]
events << subscription_instance.event
end
value
elsif (events = context.namespace(:subscriptions)[:events])
# This is the first execution, so gather an Event
# for the backend to register:
event = Subscriptions::Event.new(
name: field.name,
arguments: arguments_without_field_extras(arguments: arguments),
arguments: arguments,
context: context,
field: field,
)
events << event
value
elsif context.query.subscription_topic == Subscriptions::Event.serialize(
field.name,
arguments_without_field_extras(arguments: arguments),
arguments,
field,
scope: (field.subscription_scope ? context[field.subscription_scope] : nil),
)
Expand All @@ -45,14 +55,6 @@ def after_resolve(value:, context:, object:, arguments:, **rest)
context.skip
end
end

private

def arguments_without_field_extras(arguments:)
arguments.dup.tap do |event_args|
field.extras.each { |k| event_args.delete(k) }
end
end
end
end
end
13 changes: 12 additions & 1 deletion lib/graphql/subscriptions/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Event

def initialize(name:, arguments:, field: nil, context: nil, scope: nil)
@name = name
@arguments = arguments
@arguments = self.class.arguments_without_field_extras(arguments: arguments, field: field)
@context = context
field ||= context.field
scope_key = field.subscription_scope
Expand All @@ -39,6 +39,7 @@ def initialize(name:, arguments:, field: nil, context: nil, scope: nil)
# @return [String] an identifier for this unit of subscription
def self.serialize(_name, arguments, field, scope:, context: GraphQL::Query::NullContext.instance)
subscription = field.resolver || GraphQL::Schema::Subscription
arguments = arguments_without_field_extras(field: field, arguments: arguments)
normalized_args = stringify_args(field, arguments.to_h, context)
subscription.topic_for(arguments: normalized_args, field: field, scope: scope)
end
Expand All @@ -60,6 +61,16 @@ def fingerprint
end

class << self
def arguments_without_field_extras(arguments:, field:)
if !field.extras.empty?
arguments = arguments.dup
field.extras.each do |extra_key|
arguments.delete(extra_key)
end
end
arguments
end

private

# This method does not support cyclic references in the Hash,
Expand Down
71 changes: 71 additions & 0 deletions spec/graphql/schema/subscription_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -696,4 +696,75 @@ class PrivateSubscription < SubscriptionFieldSchema::BaseSubscription
assert_nil PrivateSubscription.subscription_scope
end
end

describe "writing during resolution" do
class DirectWriteSchema < GraphQL::Schema
class WriteCheckSubscriptions
def use(schema)
schema.subscriptions = self
end

def write_subscription(query, events)
query.context[:write_subscription_count] ||= 0
query.context[:write_subscription_count] += 1
evs = query.context[:written_events] ||= []
evs << events
nil
end
end
class ImplicitWrite < GraphQL::Schema::Subscription
type String

def subscribe
"#{context[:written_events]&.size.inspect} / #{context[:write_subscription_count].inspect} / #{subscription_written?}"
end
end

class DirectWrite < ImplicitWrite
type String
def subscribe
write_subscription
super
end
end

class DirectWriteTwice < DirectWrite
type String
def subscribe
write_subscription
super
end
end

class Subscription < GraphQL::Schema::Object
field :direct, subscription: DirectWrite
field :implicit, subscription: ImplicitWrite
field :direct_twice, subscription: DirectWriteTwice
end

use WriteCheckSubscriptions.new
subscription(Subscription)
end

it "only calls write_subscription once" do
res = DirectWriteSchema.execute("subscription { direct }")
assert_equal "1 / 1 / true", res["data"]["direct"]
assert_equal 1, res.context[:write_subscription_count]
assert_equal [1], res.context[:written_events].map(&:size)
assert_equal true, res.context.namespace(:subscriptions)[:subscriptions].values.first.subscription_written?

res = DirectWriteSchema.execute("subscription { implicit }")
assert_equal "nil / nil / false", res["data"]["implicit"]
assert_equal 1, res.context[:write_subscription_count]
assert_equal [1], res.context[:written_events].map(&:size)
assert_equal false, res.context.namespace(:subscriptions)[:subscriptions].values.first.subscription_written?
end

it "raises if write_subscription is called twice" do
err = assert_raises GraphQL::Error do
DirectWriteSchema.execute("subscription { directTwice }")
end
assert_equal "`write_subscription` was called but `DirectWriteSchema::DirectWriteTwice#subscription_written?` is already true. Remove a call to `write subscription`.", err.message
end
end
end
Loading