diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb index 91b13e77f..73c7a2862 100644 --- a/lib/concurrent/atom.rb +++ b/lib/concurrent/atom.rb @@ -95,7 +95,7 @@ class Atom < Synchronization::Object include Concern::Observable safe_initialization! - private(*attr_volatile_with_cas(:value)) + private(*attr_atomic(:value)) public :value # Create a new atom with the given initial value. diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 04852c883..569078431 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -1,6 +1,7 @@ # @!macro [new] atomic_reference # -# An object reference that may be updated atomically. +# An object reference that may be updated atomically. All read and write +# operations have java volatile semantic. # # @!macro thread_safe_variable_comparison # diff --git a/lib/concurrent/channel.rb b/lib/concurrent/channel.rb index a1b6dc118..15f9e1b95 100644 --- a/lib/concurrent/channel.rb +++ b/lib/concurrent/channel.rb @@ -1,3 +1,5 @@ +require 'forwardable' + require 'concurrent/channel/buffer' require 'concurrent/channel/selector' diff --git a/lib/concurrent/edge/atomic_markable_reference.rb b/lib/concurrent/edge/atomic_markable_reference.rb index c22409ae5..a7411a467 100644 --- a/lib/concurrent/edge/atomic_markable_reference.rb +++ b/lib/concurrent/edge/atomic_markable_reference.rb @@ -11,7 +11,7 @@ module Edge # @api Edge class AtomicMarkableReference < ::Concurrent::Synchronization::Object - private(*attr_volatile_with_cas(:reference)) + private(*attr_atomic(:reference)) # @!macro [attach] atomic_markable_reference_method_initialize def initialize(value = nil, mark = false) diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index da97fce2c..98eff1750 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -131,7 +131,7 @@ def post_on(executor, *args, &job) # Represents an event which will happen in future (will be completed). It has to always happen. class Event < Synchronization::LockableObject safe_initialization! - private(*attr_volatile_with_cas(:internal_state)) + private(*attr_atomic(:internal_state)) public :internal_state include Concern::Deprecation include Concern::Logging @@ -176,13 +176,13 @@ def to_sym def initialize(promise, default_executor) super() - @Promise = promise - @DefaultExecutor = default_executor - @Touched = AtomicBoolean.new(false) - @Callbacks = LockFreeStack.new + @Promise = promise + @DefaultExecutor = default_executor + @Touched = AtomicBoolean.new(false) + @Callbacks = LockFreeStack.new # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution - @Waiters = LockFreeStack.new + @Waiters = LockFreeStack.new self.internal_state = PENDING end diff --git a/lib/concurrent/edge/lock_free_stack.rb b/lib/concurrent/edge/lock_free_stack.rb index f163a5600..1749a0d16 100644 --- a/lib/concurrent/edge/lock_free_stack.rb +++ b/lib/concurrent/edge/lock_free_stack.rb @@ -23,7 +23,7 @@ def next_node EMPTY = Empty[nil, nil] - private(*attr_volatile_with_cas(:head)) + private(*attr_atomic(:head)) def initialize super() @@ -64,6 +64,18 @@ def compare_and_clear(head) compare_and_set_head head, EMPTY end + include Enumerable + + def each(head = nil) + return to_enum(:each, head) unless block_given? + it = head || peek + until it.equal?(EMPTY) + yield it.value + it = it.next_node + end + self + end + def clear while true current_head = head @@ -72,16 +84,15 @@ def clear end end - include Enumerable - - def each - return to_enum unless block_given? - it = peek - until it.equal?(EMPTY) - yield it.value - it = it.next_node + def clear_each(&block) + while true + current_head = head + return self if current_head == EMPTY + if compare_and_set_head current_head, EMPTY + each current_head, &block + return self + end end - self end end diff --git a/lib/concurrent/exchanger.rb b/lib/concurrent/exchanger.rb index dd532270d..e5853810f 100644 --- a/lib/concurrent/exchanger.rb +++ b/lib/concurrent/exchanger.rb @@ -143,7 +143,7 @@ class RubyExchanger < AbstractExchanger safe_initialization! class Node < Concurrent::Synchronization::Object - attr_volatile_with_cas :value + attr_atomic :value safe_initialization! def initialize(item) @@ -170,7 +170,7 @@ def initialize private - attr_volatile_with_cas(:slot) + attr_atomic(:slot) # @!macro exchanger_method_do_exchange # diff --git a/lib/concurrent/lazy_register.rb b/lib/concurrent/lazy_register.rb index d4d656a28..43928f886 100644 --- a/lib/concurrent/lazy_register.rb +++ b/lib/concurrent/lazy_register.rb @@ -18,7 +18,7 @@ module Concurrent # @!macro edge_warning class LazyRegister < Synchronization::Object - private(*attr_volatile_with_cas(:data)) + private(*attr_atomic(:data)) def initialize super diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index db53d1fee..2b80c5fc5 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -18,7 +18,7 @@ module Synchronization # Abstract object providing final, volatile, ans CAS extensions to build other concurrent abstractions. # - final instance variables see {Object.safe_initialization!} # - volatile instance variables see {Object.attr_volatile} - # - volatile instance variables see {Object.attr_volatile_with_cas} + # - volatile instance variables see {Object.attr_atomic} class Object < ObjectImplementation # @!method self.attr_volatile(*names) @@ -87,14 +87,14 @@ def self.new(*) # `compare_and_set_value(expected, value) #=> true || false`, `update_value(&block)`. # @param [Array] names of the instance variables to be volatile with CAS. # @return [Array] names of defined method names. - def self.attr_volatile_with_cas(*names) + def self.attr_atomic(*names) @volatile_cas_fields ||= [] @volatile_cas_fields += names safe_initialization! define_initialize_volatile_with_cas names.each do |name| - ivar = :"@VolatileCas#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }}" + ivar = :"@Atomic#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }}" class_eval <<-RUBY, __FILE__, __LINE__ + 1 def #{name} #{ivar}.get @@ -131,7 +131,7 @@ def self.volatile_cas_fields(inherited = true) private def self.define_initialize_volatile_with_cas - assignments = @volatile_cas_fields.map { |name| "@VolatileCas#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }} = AtomicReference.new(nil)" }.join("\n") + assignments = @volatile_cas_fields.map { |name| "@Atomic#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }} = AtomicReference.new(nil)" }.join("\n") class_eval <<-RUBY def initialize_volatile_with_cas super diff --git a/spec/concurrent/synchronization_spec.rb b/spec/concurrent/synchronization_spec.rb index bc99ffa1b..054da88e0 100644 --- a/spec/concurrent/synchronization_spec.rb +++ b/spec/concurrent/synchronization_spec.rb @@ -196,10 +196,10 @@ class BareClass it_should_behave_like :attr_volatile end - describe 'attr_volatile_with_cas' do + describe 'attr_atomic' do specify do a = Class.new(Synchronization::Object) do - attr_volatile_with_cas :a + attr_atomic :a def initialize(*rest) super @@ -208,7 +208,7 @@ def initialize(*rest) end b = Class.new(a) do - attr_volatile_with_cas :b + attr_atomic :b def initialize super