Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/concurrent/atom.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class Atom < Synchronization::Object
include Concern::Observable

safe_initialization!
private(*attr_volatile_with_cas(:value))
private(*attr_atomic(:value))
public :value

# Create a new atom with the given initial value.
Expand Down
3 changes: 2 additions & 1 deletion lib/concurrent/atomics.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# @!macro [new] atomic_reference
#
# An object reference that may be updated atomically.
# An object reference that may be updated atomically. All read and write
# operations have java volatile semantic.
#
# @!macro thread_safe_variable_comparison
#
Expand Down
2 changes: 2 additions & 0 deletions lib/concurrent/channel.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'forwardable'

require 'concurrent/channel/buffer'
require 'concurrent/channel/selector'

Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/edge/atomic_markable_reference.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Edge
# @api Edge
class AtomicMarkableReference < ::Concurrent::Synchronization::Object

private(*attr_volatile_with_cas(:reference))
private(*attr_atomic(:reference))

# @!macro [attach] atomic_markable_reference_method_initialize
def initialize(value = nil, mark = false)
Expand Down
12 changes: 6 additions & 6 deletions lib/concurrent/edge/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def post_on(executor, *args, &job)
# Represents an event which will happen in future (will be completed). It has to always happen.
class Event < Synchronization::LockableObject
safe_initialization!
private(*attr_volatile_with_cas(:internal_state))
private(*attr_atomic(:internal_state))
public :internal_state
include Concern::Deprecation
include Concern::Logging
Expand Down Expand Up @@ -176,13 +176,13 @@ def to_sym

def initialize(promise, default_executor)
super()
@Promise = promise
@DefaultExecutor = default_executor
@Touched = AtomicBoolean.new(false)
@Callbacks = LockFreeStack.new
@Promise = promise
@DefaultExecutor = default_executor
@Touched = AtomicBoolean.new(false)
@Callbacks = LockFreeStack.new
# TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem
# TODO (pitr 12-Sep-2015): look at java.util.concurrent solution
@Waiters = LockFreeStack.new
@Waiters = LockFreeStack.new
self.internal_state = PENDING
end

Expand Down
31 changes: 21 additions & 10 deletions lib/concurrent/edge/lock_free_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def next_node

EMPTY = Empty[nil, nil]

private(*attr_volatile_with_cas(:head))
private(*attr_atomic(:head))

def initialize
super()
Expand Down Expand Up @@ -64,6 +64,18 @@ def compare_and_clear(head)
compare_and_set_head head, EMPTY
end

include Enumerable

def each(head = nil)
return to_enum(:each, head) unless block_given?
it = head || peek
until it.equal?(EMPTY)
yield it.value
it = it.next_node
end
self
end

def clear
while true
current_head = head
Expand All @@ -72,16 +84,15 @@ def clear
end
end

include Enumerable

def each
return to_enum unless block_given?
it = peek
until it.equal?(EMPTY)
yield it.value
it = it.next_node
def clear_each(&block)
while true
current_head = head
return self if current_head == EMPTY
if compare_and_set_head current_head, EMPTY
each current_head, &block
return self
end
end
self
end

end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/exchanger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class RubyExchanger < AbstractExchanger
safe_initialization!

class Node < Concurrent::Synchronization::Object
attr_volatile_with_cas :value
attr_atomic :value
safe_initialization!

def initialize(item)
Expand All @@ -170,7 +170,7 @@ def initialize

private

attr_volatile_with_cas(:slot)
attr_atomic(:slot)

# @!macro exchanger_method_do_exchange
#
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/lazy_register.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Concurrent
# @!macro edge_warning
class LazyRegister < Synchronization::Object

private(*attr_volatile_with_cas(:data))
private(*attr_atomic(:data))

def initialize
super
Expand Down
8 changes: 4 additions & 4 deletions lib/concurrent/synchronization/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Synchronization
# Abstract object providing final, volatile, ans CAS extensions to build other concurrent abstractions.
# - final instance variables see {Object.safe_initialization!}
# - volatile instance variables see {Object.attr_volatile}
# - volatile instance variables see {Object.attr_volatile_with_cas}
# - volatile instance variables see {Object.attr_atomic}
class Object < ObjectImplementation

# @!method self.attr_volatile(*names)
Expand Down Expand Up @@ -87,14 +87,14 @@ def self.new(*)
# `compare_and_set_value(expected, value) #=> true || false`, `update_value(&block)`.
# @param [Array<Symbol>] names of the instance variables to be volatile with CAS.
# @return [Array<Symbol>] names of defined method names.
def self.attr_volatile_with_cas(*names)
def self.attr_atomic(*names)
@volatile_cas_fields ||= []
@volatile_cas_fields += names
safe_initialization!
define_initialize_volatile_with_cas

names.each do |name|
ivar = :"@VolatileCas#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }}"
ivar = :"@Atomic#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }}"
class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{name}
#{ivar}.get
Expand Down Expand Up @@ -131,7 +131,7 @@ def self.volatile_cas_fields(inherited = true)
private

def self.define_initialize_volatile_with_cas
assignments = @volatile_cas_fields.map { |name| "@VolatileCas#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }} = AtomicReference.new(nil)" }.join("\n")
assignments = @volatile_cas_fields.map { |name| "@Atomic#{name.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }} = AtomicReference.new(nil)" }.join("\n")
class_eval <<-RUBY
def initialize_volatile_with_cas
super
Expand Down
6 changes: 3 additions & 3 deletions spec/concurrent/synchronization_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ class BareClass
it_should_behave_like :attr_volatile
end

describe 'attr_volatile_with_cas' do
describe 'attr_atomic' do
specify do
a = Class.new(Synchronization::Object) do
attr_volatile_with_cas :a
attr_atomic :a

def initialize(*rest)
super
Expand All @@ -208,7 +208,7 @@ def initialize(*rest)
end

b = Class.new(a) do
attr_volatile_with_cas :b
attr_atomic :b

def initialize
super
Expand Down