Class: Concurrent::Actor::Core

Inherits:
Synchronization::LockableObject
  • Object
show all
Includes:
Concern::Logging, TypeCheck
Defined in:
lib/concurrent-ruby-edge/concurrent/actor/core.rb

Overview

Note:

Whole class should be considered private. An user should use Contexts and References only.

Note:

devel: core should not block on anything, e.g. it cannot wait on children to terminate that would eat up all threads in task pool and deadlock

Core of the actor.

API:

  • Actor

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(opts = {}, &block) ⇒ Core

Returns a new instance of Core.

Parameters:

  • for class instantiation

  • (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • name (String)
  • actor_class (Context)

    a class to be instantiated defining Actor’s behaviour

  • args (Array<Object>)

    arguments for actor_class instantiation

  • executor, (Executor)

    default is global_io_executor

  • link, (true, false)

    atomically link the actor to its parent (default: true)

  • reference (Class)

    a custom descendant of Reference to use

  • behaviour_definition, (Array<Array(Behavior::Abstract, Array<Object>)>)

    array of pairs where each pair is behaviour class and its args, see Behaviour.basic_behaviour_definition

  • initialized, (ResolvableFuture, nil)

    if present it’ll be set or failed after Concurrent::Actor::Context initialization

  • parent (Reference, nil)

    **private api** parent of the actor (the one spawning )

  • logger (Proc, nil)

    a proc accepting (level, progname, message = nil, &block) params, can be used to hook actor instance to any logging system, see Concern::Logging

API:

  • Actor



50
51
52
53
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 50

def initialize(opts = {}, &block)
  super(&nil)
  synchronize { ns_initialize(opts, &block) }
end

Instance Attribute Details

#actor_classContext (readonly)

A subclass of AbstractContext representing Actor’s behaviour.

Returns:



35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#behaviour_definitionObject (readonly)

API:

  • Actor



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def behaviour_definition
  @behaviour_definition
end

#contextObject (readonly)

API:

  • Actor



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def context
  @context
end

#context_classObject (readonly)

API:

  • Actor



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def context_class
  @context_class
end

#executorExecutor (readonly)

Executor which is used to process messages.

Returns:



35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#nameString (readonly)

The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances.

Returns:



35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#pathString (readonly)

Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: ‘parent.path + self.name` up to a Concurrent::Actor.root, e.g. /an_actor/its_child.

Returns:



35
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

#referenceReference (readonly)

Reference to this actor which can be safely passed around.

Returns:



35
36
37
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 35

def reference
  @reference
end

Instance Method Details

#add_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



74
75
76
77
78
79
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 74

def add_child(child)
  guard!
  Type! child, Reference
  @children.add child
  nil
end

#allocate_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



150
151
152
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 150

def allocate_context
  @context = @context_class.allocate
end

#behaviour(behaviour_class) ⇒ Behaviour::Abstract?

Returns based on behaviour_class.

Parameters:

Returns:

  • based on behaviour_class

API:

  • Actor



138
139
140
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 138

def behaviour(behaviour_class)
  @behaviours[behaviour_class]
end

#behaviour!(behaviour_class) ⇒ Behaviour::Abstract

Returns based on behaviour_class.

Parameters:

Returns:

  • based on behaviour_class

Raises:

  • when no behaviour

API:

  • Actor



145
146
147
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 145

def behaviour!(behaviour_class)
  @behaviours.fetch behaviour_class
end

#broadcast(public, event) ⇒ Object

API:

  • Actor



131
132
133
134
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 131

def broadcast(public, event)
  log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
  @first_behaviour.on_event(public, event)
end

#build_contextObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



155
156
157
158
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 155

def build_context
  @context.send :initialize_core, self
  @context.send :initialize, *@args, &@block
end

#childrenArray<Reference>

Returns of children actors.

Returns:

  • of children actors

API:

  • Actor



68
69
70
71
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 68

def children
  guard!
  @children.to_a
end

#dead_letter_routingObject



63
64
65
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 63

def dead_letter_routing
  @context.dead_letter_routing
end

#guard!Object

ensures that we are inside of the executor

API:

  • Actor



102
103
104
105
106
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 102

def guard!
  unless Actor.current == reference
    raise "can be called only inside actor #{reference} but was #{Actor.current}"
  end
end

#log(level, message = nil, &block) ⇒ Object

API:

  • Actor



108
109
110
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 108

def log(level, message = nil, &block)
  super level, @path, message, &block
end

#on_envelope(envelope) ⇒ Object

is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations

Parameters:

API:

  • Actor



92
93
94
95
96
97
98
99
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 92

def on_envelope(envelope)
  log(DEBUG) { "is  #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
  schedule_execution do
    log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender} - processing" }
    process_envelope envelope
  end
  nil
end

#parentReference?

A parent Actor. When actor is spawned the Concurrent::Actor.current becomes its parent. When actor is spawned from a thread outside of an actor (Concurrent::Actor.current is nil) Concurrent::Actor.root is assigned.

Returns:

API:

  • Actor



58
59
60
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 58

def parent
  @parent_core && @parent_core.reference
end

#process_envelope(envelope) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



161
162
163
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 161

def process_envelope(envelope)
  @first_behaviour.on_envelope envelope
end

#remove_child(child) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



82
83
84
85
86
87
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 82

def remove_child(child)
  guard!
  Type! child, Reference
  @children.delete child
  nil
end

#schedule_executionObject

Schedules blocks to be executed on executor sequentially, sets Actress.current

API:

  • Actor



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/concurrent-ruby-edge/concurrent/actor/core.rb', line 114

def schedule_execution
  @serialized_execution.post(@executor) do
    synchronize do
      begin
        Thread.current[:__current_actor__] = reference
        yield
      rescue => e
        log FATAL, e
      ensure
        Thread.current[:__current_actor__] = nil
      end
    end
  end

  nil
end