Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy Concurrency Per Evaluation Layer #1981

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions lib/graphql/execution/execute.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,29 @@ def resolve_field(object, field_ctx)
end

if field_ctx.schema.lazy?(raw_value)
field_ctx.value = Execution::Lazy.new {
inner_value = field_ctx.trace("execute_field_lazy", {context: field_ctx}) {
begin
field_ctx.value = Execution::Lazy.new(
value: -> {
inner_value = field_ctx.trace("execute_field_lazy", {context: field_ctx}) {
begin
field_ctx.field.lazy_resolve(raw_value, arguments, field_ctx)
rescue GraphQL::UnauthorizedError => err
field_ctx.schema.unauthorized_object(err)
begin
field_ctx.field.lazy_resolve(raw_value, arguments, field_ctx)
rescue GraphQL::UnauthorizedError => err
field_ctx.schema.unauthorized_object(err)
end
rescue GraphQL::ExecutionError => err
err
end
rescue GraphQL::ExecutionError => err
err
}
continue_or_wait(inner_value, field_ctx.type, field_ctx)
},
exec: -> {
if field_ctx.schema.concurrent?(raw_value)
field_ctx.trace("execute_field_concurrent", {context: field_ctx}) {
field_ctx.field.concurrent_exec(raw_value, arguments, field_ctx)
}
end
}
continue_or_wait(inner_value, field_ctx.type, field_ctx)
}
)
else
continue_or_wait(raw_value, field_ctx.type, field_ctx)
end
Expand All @@ -160,8 +169,10 @@ def resolve_field(object, field_ctx)
# and resolve child fields
def continue_or_wait(raw_value, field_type, field_ctx)
if field_ctx.schema.lazy?(raw_value)
field_ctx.value = Execution::Lazy.new {
inner_value = begin
field_ctx.value = Execution::Lazy.new(
value: -> {
inner_value =
begin
begin
field_ctx.schema.sync_lazy(raw_value)
rescue GraphQL::UnauthorizedError => err
Expand All @@ -171,8 +182,14 @@ def continue_or_wait(raw_value, field_type, field_ctx)
err
end

field_ctx.value = continue_or_wait(inner_value, field_type, field_ctx)
}
field_ctx.value = continue_or_wait(inner_value, field_type, field_ctx)
},
exec: -> {
if field_ctx.schema.concurrent?(raw_value)
field_ctx.schema.exec_concurrent(raw_value)
end
}
)
else
field_ctx.value = continue_resolve_field(raw_value, field_type, field_ctx)
end
Expand Down
51 changes: 40 additions & 11 deletions lib/graphql/execution/lazy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,49 @@ def self.resolve(val)
attr_reader :path, :field

# Create a {Lazy} which will get its inner value by calling the block
# @param value_proc [Proc] a block to get the inner value (later)
# @param path [Array<String, Integer>]
# @param field [GraphQL::Schema::Field]
# @param get_value_func [Proc] a block to get the inner value (later)
def initialize(path: nil, field: nil, &get_value_func)
@get_value_func = get_value_func
def initialize(original = nil, path: nil, field: nil, value: nil, exec: nil)
@original = original
@value_proc = if value
value
elsif block_given?
Proc.new
else
raise ArgumentError, "A block to call later is required as `value:` ora block"
end
@exec_proc = exec
@resolved = false
@path = path
@field = field
end

def execute
return if @resolved

exec =
begin
e = @exec_proc.call
if e.is_a?(Lazy)
e = e.execute
end
e
rescue GraphQL::ExecutionError => err
err
end

if exec.is_a?(StandardError)
raise exec
end
end

# @return [Object] The wrapped value, calling the lazy block if necessary
def value
if !@resolved
@resolved = true
@value = begin
v = @get_value_func.call
v = @value_proc.call
if v.is_a?(Lazy)
v = v.value
end
Expand All @@ -57,22 +84,24 @@ def value

# @return [Lazy] A {Lazy} whose value depends on another {Lazy}, plus any transformations in `block`
def then
self.class.new {
yield(value)
}
self.class.new(
value: -> { yield(value) },
exec: -> { execute }
)
end

# @param lazies [Array<Object>] Maybe-lazy objects
# @return [Lazy] A lazy which will sync all of `lazies`
def self.all(lazies)
self.new {
lazies.map { |l| l.is_a?(Lazy) ? l.value : l }
}
self.new(
value: -> { lazies.map { |l| l.is_a?(Lazy) ? l.value : l } },
exec: -> { lazies.each { |l| l.is_a?(Lazy) ? l.execute : l } }
)
end

# This can be used for fields which _had no_ lazy results
# @api private
NullResult = Lazy.new(){}
NullResult = Lazy.new(value: -> {}, exec: -> {})
NullResult.value
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/graphql/execution/lazy/lazy_method_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ def initialize_copy(other)
@storage = other.storage.dup
end

LazySpec = Struct.new(:value_method, :exec_method)
private_constant :LazySpec

# @param lazy_class [Class] A class which represents a lazy value (subclasses may also be used)
# @param lazy_value_method [Symbol] The method to call on this class to get its value
def set(lazy_class, lazy_value_method)
@storage[lazy_class] = lazy_value_method
def set(lazy_class, lazy_value_method, concurrent_exec_method)
@storage[lazy_class] = LazySpec.new(lazy_value_method, concurrent_exec_method)
end

# @param value [Object] an object which may have a `lazy_value_method` registered for its class or superclasses
Expand Down
17 changes: 11 additions & 6 deletions lib/graphql/execution/lazy/resolve.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ def self.resolve_in_place(value)
if acc.empty?
Lazy::NullResult
else
Lazy.new {
acc.each_with_index { |ctx, idx|
acc[idx] = ctx.value.value
}
resolve_in_place(acc)
}
Lazy.new(
value: -> {
acc.each { |ctx| ctx.value.execute }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important part. In the resolution, execute is called on each lazy object before the values are resolved. This allows concurrent execution to begin in parallel for multiple objects in the same graph layer.


acc.each_with_index { |ctx, idx|
acc[idx] = ctx.value.value
}
resolve_in_place(acc)
},
exec: -> {}
)
end
end

Expand Down
27 changes: 24 additions & 3 deletions lib/graphql/field.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class Field
include GraphQL::Define::InstanceDefinable
accepts_definitions :name, :description, :deprecation_reason,
:resolve, :lazy_resolve,
:concurrent_exec,
:type, :arguments,
:property, :hash_key, :complexity,
:mutation, :function,
Expand All @@ -138,6 +139,7 @@ class Field
:name, :deprecation_reason, :description, :description=, :property, :hash_key,
:mutation, :arguments, :complexity, :function,
:resolve, :resolve=, :lazy_resolve, :lazy_resolve=, :lazy_resolve_proc, :resolve_proc,
:concurrent_exec, :concurrent_exec=, :concurrent_exec_proc,
:type, :type=, :name=, :property=, :hash_key=,
:relay_node_field, :relay_nodes_field, :edges?, :edge_class, :subscription_scope,
:introspection?
Expand All @@ -155,6 +157,9 @@ class Field
# @return [<#call(obj, args, ctx)>] A proc-like object which can be called trigger a lazy resolution
attr_reader :lazy_resolve_proc

# @return [<#call(obj, args, ctx)>] A proc-like object which can be called trigger a concurrent execution
attr_reader :concurrent_exec_proc

# @return [String] The name of this field on its {GraphQL::ObjectType} (or {GraphQL::InterfaceType})
attr_reader :name
alias :graphql_name :name
Expand Down Expand Up @@ -218,6 +223,7 @@ def initialize
@arguments = {}
@resolve_proc = build_default_resolver
@lazy_resolve_proc = DefaultLazyResolve
@concurrent_exec_proc = DefaultConcurrentExec
@relay_node_field = false
@connection = false
@connection_max_page_size = nil
Expand Down Expand Up @@ -307,12 +313,21 @@ def lazy_resolve=(new_lazy_resolve_proc)
@lazy_resolve_proc = new_lazy_resolve_proc
end

def concurrent_exec(obj, args, ctx)
@concurrent_exec_proc.call(obj, args, ctx)
end

def concurrent_exec=(new_concurrent_exec_proc)
@concurrent_exec_proc = new_concurrent_exec_proc
end

# Prepare a lazy value for this field. It may be `then`-ed and resolved later.
# @return [GraphQL::Execution::Lazy] A lazy wrapper around `obj` and its registered method name
def prepare_lazy(obj, args, ctx)
GraphQL::Execution::Lazy.new {
lazy_resolve(obj, args, ctx)
}
GraphQL::Execution::Lazy.new(
value: -> { lazy_resolve(obj, args, ctx) },
exec: -> { concurrent_exec(obj, args, ctx) }
)
end

private
Expand All @@ -326,5 +341,11 @@ def self.call(obj, args, ctx)
ctx.schema.sync_lazy(obj)
end
end

module DefaultConcurrentExec
def self.call(obj, args, ctx)
ctx.schema.exec_concurrent(obj)
end
end
end
end
66 changes: 51 additions & 15 deletions lib/graphql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ class Schema
},
multiplex_analyzer: ->(schema, analyzer) { schema.multiplex_analyzers << analyzer },
middleware: ->(schema, middleware) { schema.middleware << middleware },
lazy_resolve: ->(schema, lazy_class, lazy_value_method) { schema.lazy_methods.set(lazy_class, lazy_value_method) },
lazy_resolve: ->(schema, lazy_class, lazy_value_method, concurrent_exec_method = nil) {
schema.lazy_methods.set(lazy_class, lazy_value_method, concurrent_exec_method)
},
rescue_from: ->(schema, err_class, &block) { schema.rescue_from(err_class, &block) },
tracer: ->(schema, tracer) { schema.tracers.push(tracer) }

Expand Down Expand Up @@ -172,7 +174,7 @@ def initialize
@parse_error_proc = DefaultParseError
@instrumenters = Hash.new { |h, k| h[k] = [] }
@lazy_methods = GraphQL::Execution::Lazy::LazyMethodMap.new
@lazy_methods.set(GraphQL::Execution::Lazy, :value)
@lazy_methods.set(GraphQL::Execution::Lazy, :value, :execute)
@cursor_encoder = Base64Encoder
# Default to the built-in execution strategy:
@analysis_engine = GraphQL::Analysis
Expand Down Expand Up @@ -639,14 +641,26 @@ class InvalidDocumentError < Error; end;

# @return [Symbol, nil] The method name to lazily resolve `obj`, or nil if `obj`'s class wasn't registered wtih {#lazy_resolve}.
def lazy_method_name(obj)
@lazy_methods.get(obj)
spec = @lazy_methods.get(obj)
spec && spec.value_method
end

# @return [Symbol, nil] The method name to concurrently resolve `obj`, or nil if `obj`'s class wasn't registered wtih {#lazy_resolve} with a concurrent method.
def concurrent_method_name(obj)
spec = @lazy_methods.get(obj)
spec && spec.exec_method
end

# @return [Boolean] True if this object should be lazily resolved
def lazy?(obj)
!!lazy_method_name(obj)
end

# @return [Boolean] True if this object should be concurrently executed
def concurrent?(obj)
!!concurrent_method_name(obj)
end

# Return the GraphQL IDL for the schema
# @param context [Hash]
# @param only [<#call(member, ctx)>]
Expand Down Expand Up @@ -692,7 +706,7 @@ class << self
:static_validator, :introspection_system,
:query_analyzers, :tracers, :instrumenters,
:execution_strategy_for_operation,
:validate, :multiplex_analyzers, :lazy?, :lazy_method_name, :after_lazy, :sync_lazy,
:validate, :multiplex_analyzers, :lazy?, :lazy_method_name, :concurrent_method_name, :after_lazy, :sync_lazy,
# Configuration
:analysis_engine, :analysis_engine=, :using_ast_analysis?, :interpreter?,
:max_complexity=, :max_depth=,
Expand Down Expand Up @@ -758,8 +772,9 @@ def to_graphql
schema_defn.instrumenters[step] << inst
end
end
lazy_classes.each do |lazy_class, value_method|
schema_defn.lazy_methods.set(lazy_class, value_method)

lazy_classes.each do |lazy_class, (value_method, exec_method)|
schema_defn.lazy_methods.set(lazy_class, value_method, exec_method)
end
if @rescues
@rescues.each do |err_class, handler|
Expand Down Expand Up @@ -990,8 +1005,8 @@ def type_error(type_err, ctx)
DefaultTypeError.call(type_err, ctx)
end

def lazy_resolve(lazy_class, value_method)
lazy_classes[lazy_class] = value_method
def lazy_resolve(lazy_class, value_method, exec_method = nil)
lazy_classes[lazy_class] = [value_method, exec_method]
end

def instrument(instrument_step, instrumenter, options = {})
Expand Down Expand Up @@ -1118,13 +1133,16 @@ def resolve_type(type, obj, ctx = :__undefined__)
# @api private
def after_lazy(value)
if lazy?(value)
GraphQL::Execution::Lazy.new do
result = sync_lazy(value)
# The returned result might also be lazy, so check it, too
after_lazy(result) do |final_result|
yield(final_result) if block_given?
end
end
GraphQL::Execution::Lazy.new(
value: -> {
result = sync_lazy(value)
# The returned result might also be lazy, so check it, too
after_lazy(result) do |final_result|
yield(final_result) if block_given?
end
},
exec: -> { exec_concurrent(value) }
)
else
yield(value) if block_given?
end
Expand Down Expand Up @@ -1159,6 +1177,24 @@ def sync_lazy(value)
}
end

# Override this method to handle lazy concurrent objects in a custom way.
# @param value [Object] an instance of a class registered with {.lazy_resolve}
# @param ctx [GraphQL::Query::Context] the context for this query
# @return [Object] A GraphQL-ready (non-lazy) object
def self.exec_concurrent(value)
yield(value)
end

# @see Schema.exec_concurrent for a hook to override
# @api private
def exec_concurrent(value)
self.class.exec_concurrent(value) do |v|
if concurrent_method = concurrent_method_name(v)
value.public_send(concurrent_method)
end
end
end

protected

def rescues?
Expand Down
Loading