Ractor - Rubys aktorähnliche Nebenläufigkeitsabstraktion

Raktoren sind darauf ausgelegt, Ruby-Code parallel auszuführen, ohne Bedenken hinsichtlich der Threadsicherheit.

Zusammenfassung

Mehrere Raktoren in einem Ruby-Prozess

Sie können mehrere Raktoren erstellen, die Ruby-Code parallel ausführen können.

Begrenzte gemeinsame Nutzung zwischen Raktoren

Raktoren teilen nicht alle Objekte, im Gegensatz zu Threads, die auf jedes Objekt zugreifen können, außer auf Objekte, die in den Thread-Locals eines anderen Threads gespeichert sind.

Kommunikation zwischen Raktoren mit Ractor::Port

Raktoren kommunizieren miteinander und synchronisieren ihre Ausführung durch Nachrichtenaustausch. Die Klasse Ractor::Port bietet diesen Kommunikationsmechanismus.

port = Ractor::Port.new

Ractor.new port do |port|
  # Other ractors can send to the port
  port << 42
end

port.receive # get a message from the port. Only the ractor that created the Port can receive from it.
#=> 42

Alle Raktoren haben einen Standardport, den Ractor#send, Ractor.receive (usw.) verwenden.

Kopier- & Verschiebungssemantik beim Senden von Objekten

Um nicht gemeinsam nutzbare Objekte an einen anderen Raktor zu senden, werden Objekte entweder kopiert oder verschoben.

Threadsicherheit

Raktoren helfen beim Schreiben von threadsicheren, nebenläufigen Programmen. Sie erlauben die gemeinsame Nutzung von Daten nur durch explizites Nachrichtenleiten für nicht gemeinsam nutzbare Objekte. Gemeinsam nutzbare Objekte funktionieren garantiert korrekt über Raktoren hinweg, auch wenn die Raktoren parallel ausgeführt werden. Diese Garantie gilt jedoch nur für Raktoren. Sie müssen immer noch Mutexe und andere Tools zur Threadsicherheit innerhalb eines Raktors verwenden, wenn Sie mehrere Ruby-Threads verwenden.

Erstellung und Beendigung

Ractor.new

# Ractor.new with a block creates a new Ractor
r = Ractor.new do
  # This block can run in parallel with other ractors
end

# You can name a Ractor with a `name:` argument.
r = Ractor.new name: 'my-first-ractor' do
end

r.name #=> 'my-first-ractor'

Blockisolierung

Der Ractor führt expr im angegebenen Block aus. Der angegebene Block wird von seinem äußeren Geltungsbereich isoliert. Um die gemeinsame Nutzung von Objekten zwischen Raktoren zu verhindern, sind äußere Variablen, self und andere Informationen vom Block isoliert.

Diese Isolierung erfolgt zum Zeitpunkt der Ractor-Erstellung (wenn Ractor.new aufgerufen wird). Wenn der angegebene Block aufgrund äußerer Variablen oder self nicht isoliert werden kann, wird ein Fehler ausgelöst.

begin
  a = true
  r = Ractor.new do
    a #=> ArgumentError because this block accesses outer variable `a`.
  end
  r.join # wait for ractor to finish
rescue ArgumentError
end
r = Ractor.new do
  p self.class #=> Ractor
  self.object_id
end
r.value == self.object_id #=> false

An Ractor.new() übergebene Argumente werden zu Blockparametern für den gegebenen Block. Ruby übergibt die Objekte jedoch nicht selbst, sondern sendet sie als Nachrichten (siehe unten für Details).

r = Ractor.new 'ok' do |msg|
  msg #=> 'ok'
end
r.value #=> 'ok'
# similar to the last example
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.value #=> 'ok'

Das Ergebnis der Ausführung des gegebenen Blocks

Der Rückgabewert des gegebenen Blocks wird zu einer ausgehenden Nachricht (siehe unten für Details).

r = Ractor.new do
  'ok'
end
r.value #=> `ok`

Ein Fehler im gegebenen Block wird an den Konsumenten der ausgehenden Nachricht weitergegeben.

r = Ractor.new do
  raise 'ok' # exception will be transferred to the consumer
end

begin
  r.value
rescue Ractor::RemoteError => e
  e.cause.class   #=> RuntimeError
  e.cause.message #=> 'ok'
  e.ractor        #=> r
end

Kommunikation zwischen Raktoren

Die Kommunikation zwischen Raktoren erfolgt durch Senden und Empfangen von Nachrichten. Es gibt zwei Kommunikationsmöglichkeiten:

Benutzer können die Ausführungszeit von Programmen mit (1) steuern, aber nicht mit (2) (nur kritische Abschnitte ausführen).

Für das Senden und Empfangen von Nachrichten sind dies die grundlegenden APIs:

Es gibt 3 Möglichkeiten, ein Objekt als Nachricht zu senden:

1) Referenz senden: Das Senden eines gemeinsam nutzbaren Objekts sendet nur eine Referenz auf das Objekt (schnell).

2) Objekt kopieren: Senden eines nicht gemeinsam nutzbaren Objekts durch tiefe Kopie (kann langsam sein). Beachten Sie, dass Sie ein Objekt nicht auf diese Weise senden können, das keine tiefe Kopie unterstützt. Einige T_DATA-Objekte (Objekte, deren Klasse in einer C-Erweiterung definiert ist, z. B. StringIO) werden nicht unterstützt.

3) Objekt verschieben: Senden eines nicht gemeinsam nutzbaren Objekts über Raktoren mit einem Mitgliedschaftswechsel. Der sendende Ractor kann nach dem Verschieben nicht mehr auf das verschobene Objekt zugreifen, andernfalls wird eine Ausnahme ausgelöst. Implementierungshinweis: T_DATA-Objekte werden nicht unterstützt.

Sie können zwischen „Kopieren“ und „Verschieben“ über den Schlüsselwortparameter move:, Ractor#send(obj, move: true/false), wählen. Der Standardwert ist false („Kopieren“). Wenn das Objekt jedoch gemeinsam nutzbar ist, wird automatisch move verwendet.

Auf mehrere Raktoren warten mit Ractor.select

Sie können auf Nachrichten an mehreren Ports gleichzeitig warten. Der Rückgabewert von Ractor.select() ist [port, msg], wobei port ein bereiter Port und msg die empfangene Nachricht ist.

Um es praktisch zu gestalten, kann Ractor.select auch Raktoren akzeptieren. In diesem Fall wartet es auf deren Beendigung. Der Rückgabewert von Ractor.select() ist [r, msg], wobei r ein beendeter Ractor und msg der Wert des Raktor-Blocks ist.

Auf einen einzelnen Raktor warten (ähnlich wie Ractor#value)

r1 = Ractor.new{'r1'}

r, obj = Ractor.select(r1)
r == r1 and obj == 'r1' #=> true

Auf zwei Raktoren warten

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
values = []

while rs.any?
  r, obj = Ractor.select(*rs)
  rs.delete(r)
  values << obj
end

values.sort == ['r1', 'r2'] #=> true

HINWEIS: Die Verwendung von Ractor.select() für eine sehr große Anzahl von Raktoren hat derzeit dasselbe Problem wie select(2).

Schließen von Ports

Beispiel (Versuch, ein Ergebnis von einem geschlossenen Raktor zu erhalten)

r = Ractor.new do
  'finish'
end
r.join # success (wait for the termination)
r.value # success (will return 'finish')

# The ractor's termination value has already been given to another ractor
Ractor.new r do |r|
  r.value #=> Ractor::Error
end.join

Beispiel (Versuch, an einen geschlossenen Port zu senden)

r = Ractor.new do
end

r.join # wait for termination, closes default port

begin
  r.send(1)
rescue Ractor::ClosedError
  'ok'
end

Nachricht durch Kopieren senden

Ractor::Port#send(obj) kopiert obj tiefgreifend, wenn obj ein nicht gemeinsam nutzbares Objekt ist.

obj = 'str'.dup
r = Ractor.new obj do |msg|
  # return received msg's object_id
  msg.object_id
end

obj.object_id == r.value #=> false

Einige Objekte unterstützen das Kopieren nicht und lösen eine Ausnahme aus.

obj = Thread.new{}
begin
  Ractor.new obj do |msg|
    msg
  end
rescue TypeError => e
  e.message #=> #<TypeError: allocator undefined for Thread>
end

Nachricht durch Verschieben senden

Ractor::Port#send(obj, move: true) verschiebt obj auf den Ziel- Ractor. Wenn der Quell-Raktor das verschobene Objekt verwendet (z. B. eine Methode wie obj.foo() aufruft), wird ein Fehler ausgelöst.

r = Ractor.new do
  obj = Ractor.receive
  obj << ' world'
end

str = 'hello'.dup
r.send str, move: true
# str is now moved, and accessing str from this ractor is prohibited
modified = r.value #=> 'hello world'


begin
  # Error because it uses moved str.
  str << ' exception' # raise Ractor::MovedError
rescue Ractor::MovedError
  modified #=> 'hello world'
end

Einige Objekte unterstützen das Verschieben nicht und es wird eine Ausnahme ausgelöst.

r = Ractor.new do
  Ractor.receive
end

r.send(Thread.new{}, move: true) #=> allocator undefined for Thread (TypeError)

Nachdem ein Objekt verschoben wurde, wird die Klasse des Quellobjekts zu Ractor::MovedObject geändert.

Gemeinsam nutzbare Objekte

Die folgende Liste von gemeinsam nutzbaren Objekten ist nicht erschöpfend:

Um Objekte gemeinsam nutzbar zu machen, wird Ractor.make_shareable(obj) bereitgestellt. Es versucht, das Objekt gemeinsam nutzbar zu machen, indem es obj einfriert und rekursiv seine Referenzen durchläuft, um sie alle einzufrieren. Diese Methode akzeptiert den Schlüsselwortparameter copy: (Standardwert ist false). Ractor.make_shareable(obj, copy: true) versucht, eine tiefe Kopie von obj zu erstellen und das kopierte Objekt gemeinsam nutzbar zu machen. Ractor.make_shareable(copy: false) hat keine Auswirkung auf ein bereits gemeinsam nutzbares Objekt. Wenn das Objekt nicht gemeinsam nutzbar gemacht werden kann, wird eine Ractor::Error-Ausnahme ausgelöst.

Sprachänderungen zur Einschränkung der gemeinsamen Nutzung zwischen Raktoren

Um nicht gemeinsam nutzbare Objekte über Raktoren hinweg zu isolieren, wurden zusätzliche Sprachsemantiken für Multi-Raktor-Ruby-Programme eingeführt.

Beachten Sie, dass diese zusätzlichen Semantiken nicht benötigt werden, wenn keine Raktoren verwendet werden (100 % kompatibel mit Ruby 2).

Globale Variablen

Nur der Haupt- Ractor kann auf globale Variablen zugreifen.

$gv = 1
r = Ractor.new do
  $gv
end

begin
  r.join
rescue Ractor::RemoteError => e
  e.cause.message #=> 'can not access global variables from non-main Ractors'
end

Beachten Sie, dass einige spezielle globale Variablen wie $stdin, $stdout und $stderr für jeden Raktor lokal sind. Weitere Informationen finden Sie unter [Bug #17268].

Instanzvariablen von gemeinsam nutzbaren Objekten

Instanzvariablen von Klassen/Modulen können von Nicht-Haupt-Raktoren nur dann zugegriffen werden, wenn ihre Werte gemeinsam nutzbare Objekte sind.

class C
  @iv = 1
end

p Ractor.new do
  class C
     @iv
  end
end.value #=> 1

Andernfalls kann nur der Haupt- Ractor auf Instanzvariablen von gemeinsam nutzbaren Objekten zugreifen.

class C
  @iv = [] # unshareable object
end

Ractor.new do
  class C
    begin
      p @iv
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not get unshareable values from instance variables of classes/modules from non-main Ractors"
    end

    begin
      @iv = 42
    rescue Ractor::IsolationError
      p $!.message
      #=> "can not set instance variables of classes/modules by non-main Ractors"
    end
  end
end.join
shared = Ractor.new{}
shared.instance_variable_set(:@iv, 'str')

r = Ractor.new shared do |shared|
  p shared.instance_variable_get(:@iv)
end

begin
  r.join
rescue Ractor::RemoteError => e
  e.cause.message #=> can not access instance variables of shareable objects from non-main Ractors (Ractor::IsolationError)
end

Klassenvariablen

Nur der Haupt- Ractor kann auf Klassenvariablen zugreifen.

class C
  @@cv = 'str'
end

r = Ractor.new do
  class C
    p @@cv
  end
end


begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

Constants

Nur der Haupt- Ractor kann Konstanten lesen, die auf ein nicht gemeinsam nutzbares Objekt verweisen.

class C
  CONST = 'str'.dup
end
r = Ractor.new do
  C::CONST
end
begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

Nur der Haupt- Ractor kann Konstanten definieren, die auf ein nicht gemeinsam nutzbares Objekt verweisen.

class C
end
r = Ractor.new do
  C::CONST = 'str'.dup
end
begin
  r.join
rescue => e
  e.class #=> Ractor::IsolationError
end

Beim Erstellen/Aktualisieren einer Bibliothek zur Unterstützung von Raktoren sollten Konstanten nur auf gemeinsam nutzbare Objekte verweisen, wenn sie von Nicht-Haupt-Raktoren verwendet werden sollen.

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}

In diesem Fall verweist TABLE auf ein nicht gemeinsam nutzbares Hash-Objekt. Damit andere Raktoren TABLE verwenden können, müssen wir es gemeinsam nutzbar machen. Wir können Ractor.make_shareable() wie folgt verwenden:

TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

Um es zu vereinfachen, wurde in Ruby 3.0 die neue Dateidirektive shareable_constant_value eingeführt.

# shareable_constant_value: literal

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
#=> Same as: TABLE = Ractor.make_shareable( {a: 'ko1', b: 'ko2', c: 'ko3'} )

Die Direktive shareable_constant_value akzeptiert die folgenden Modi (Beschreibungen verwenden das Beispiel: CONST = expr):

Mit Ausnahme des none-Modus (Standard) wird garantiert, dass diese Konstanten nur auf gemeinsam nutzbare Objekte verweisen.

Weitere Informationen finden Sie unter syntax/comments.rdoc.

Gemeinsam nutzbare Procs

Procs und Lambdas sind nicht gemeinsam nutzbare Objekte, auch wenn sie gefroren sind. Um einen nicht gemeinsam nutzbaren Proc zu erstellen, müssen Sie Ractor.shareable_proc { expr } verwenden. Ähnlich wie bei der Ractor-Erstellung wird der Block des Procs von seiner äußeren Umgebung isoliert, sodass er nicht auf Variablen aus dem äußeren Geltungsbereich zugreifen kann. self wird auch innerhalb des Proc standardmäßig zu nil geändert, obwohl ein self:-Schlüsselwort bereitgestellt werden kann, wenn Sie den Wert auf ein anderes gemeinsam nutzbares Objekt ändern möchten.

p = Ractor.shareable_proc { p self }
p.call #=> nil
begin
  a = 1
  pr = Ractor.shareable_proc { p a }
  pr.call # never gets here
rescue Ractor::IsolationError
end

Um dynamisch eine Methode mit Module#define_method zu definieren, die aus verschiedenen Raktoren verwendet werden kann, müssen Sie sie mit einem gemeinsam nutzbaren Proc definieren. Alternativ können Sie Module#class_eval oder Module#module_eval mit einem String verwenden. Obwohl das self des gemeinsam nutzbaren Procs zunächst an nil gebunden ist, bindet define_method self in der Methode an den korrekten Wert.

class A
  define_method :testing, &Ractor.shareable_proc do
    p self
  end
end
Ractor.new do
  a = A.new
  a.testing #=> #<A:0x0000000101acfe10>
end.join

Diese Isolierung muss erfolgen, um zu verhindern, dass die Methode erfasste äußere Variablen über Raktoren hinweg zugreift und zuweist.

Raktor-lokaler Speicher

Sie können jedes Objekt (auch nicht gemeinsam nutzbare) im Raktor-lokalen Speicher speichern.

r = Ractor.new do
  values = []
  Ractor[:threads] = []
  3.times do |i|
    Ractor[:threads] << Thread.new do
      values << [Ractor.receive, i+1] # Ractor.receive blocks the current thread in the current ractor until it receives a message
    end
  end
  Ractor[:threads].each(&:join)
  values
end

r << 1
r << 2
r << 3
r.value #=> [[1,1],[2,2],[3,3]] (the order can change with each run)

Beispiele

Traditionelles Ring-Beispiel im Actor-Modell

RN = 1_000
CR = Ractor.current

r = Ractor.new do
  p Ractor.receive
  CR << :fin
end

RN.times{
  r = Ractor.new r do |next_r|
    next_r << Ractor.receive
  end
}

p :setup_ok
r << 1
p Ractor.receive

Fork-Join

def fib n
  if n < 2
    1
  else
    fib(n-2) + fib(n-1)
  end
end

RN = 10
rs = (1..RN).map do |i|
  Ractor.new i do |i|
    [i, fib(i)]
  end
end

until rs.empty?
  r, v = Ractor.select(*rs)
  rs.delete r
  p answer: v
end

Worker-Pool

(1) Ein Raktor hat einen Pool

require 'prime'

N = 1000
RN = 10

# make RN workers
workers = (1..RN).map do
  Ractor.new do |; result_port|
    loop do
      n, result_port = Ractor.receive
      result_port << [n, n.prime?, Ractor.current]
    end
  end
end

result_port = Ractor::Port.new
results = []

(1..N).each do |i|
  if workers.empty?
    # receive a result
    n, result, w = result_port.receive
    results << [n, result]
  else
    w = workers.pop
  end

  # send a task to the idle worker ractor
  w << [i, result_port]
end

# receive a result
while results.size != N
  n, result, _w = result_port.receive
  results << [n, result]
end

pp results.sort_by{|n, result| n}

Pipeline

# pipeline with send/receive

r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.receive + 'r3'
end

r2 = Ractor.new r3 do |r3|
  r3.send Ractor.receive + 'r2'
end

r1 = Ractor.new r2 do |r2|
  r2.send Ractor.receive + 'r1'
end

r1 << 'r0'
p Ractor.receive #=> "r0r1r2r3"

Supervise

# ring example again

r = Ractor.current
(1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    r.send Ractor.receive + "r#{i}"
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
# ring example with an error

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0"
p Ractor.select(*rs, Ractor.current)
#=>
# <Thread:0x000056262de28bd8 run> terminated with exception (report_on_exception is true):
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
# Traceback (most recent call last):
#         2: from /home/ko1/src/ruby/trunk/test.rb:7:in `block (2 levels) in <main>'
#         1: from /home/ko1/src/ruby/trunk/test.rb:7:in `loop'
# /home/ko1/src/ruby/trunk/test.rb:9:in `block (3 levels) in <main>': unhandled exception
#         1: from /home/ko1/src/ruby/trunk/test.rb:21:in `<main>'
# <internal:ractor>:69:in `select': thrown by remote Ractor. (Ractor::RemoteError)
# resend non-error message

r = Ractor.current
rs = (1..10).map{|i|
  r = Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
}

r.send "r0"
p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0"
p Ractor.select(*rs, Ractor.current)
[:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0'
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  msg = 'r0'
  retry
end

#=> <internal:ractor>:100:in `send': The incoming-port is already closed (Ractor::ClosedError)
# because r == r[-1] is terminated.
# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]