Fiber

Fibers bieten einen Mechanismus für kooperierende Nebenläufigkeit.

Kontextwechsel

Fibers führen einen vom Benutzer bereitgestellten Block aus. Während der Ausführung kann der Block Fiber.yield oder Fiber.transfer aufrufen, um zu einem anderen Fiber zu wechseln. Fiber#resume wird verwendet, um die Ausführung von der Stelle fortzusetzen, an der Fiber.yield aufgerufen wurde.

#!/usr/bin/env ruby

puts "1: Start program."

f = Fiber.new do
  puts "3: Entered fiber."
  Fiber.yield
  puts "5: Resumed fiber."
end

puts "2: Resume fiber first time."
f.resume

puts "4: Resume fiber second time."
f.resume

puts "6: Finished."

Dieses Programm demonstriert den Kontrollfluss von Fibers.

Scheduler

Die Scheduler-Schnittstelle wird verwendet, um blockierende Operationen abzufangen. Eine typische Implementierung wäre ein Wrapper für ein Gem wie EventMachine oder Async. Dieses Design bietet eine Trennung der Verantwortlichkeiten zwischen der Implementierung der Ereignisschleife und dem Anwendungscode. Es ermöglicht auch geschichtete Scheduler, die Instrumentierung durchführen können.

Um den Scheduler für den aktuellen Thread festzulegen

Fiber.set_scheduler(MyScheduler.new)

Wenn der Thread beendet wird, gibt es einen impliziten Aufruf von set_scheduler

Fiber.set_scheduler(nil)

Design

Die Scheduler-Schnittstelle ist als eine unaufdringliche, leichtgewichtige Schicht zwischen Benutzer-Code und blockierenden Operationen konzipiert. Die Scheduler-Hooks sollten es vermeiden, Argumente oder Rückgabewerte zu übersetzen oder zu konvertieren. Idealerweise werden dieselben Argumente aus dem Benutzer-Code direkt an den Scheduler-Hook übergeben, ohne Änderungen.

Schnittstelle

Dies ist die Schnittstelle, die Sie implementieren müssen.

class Scheduler
  # Wait for the specified process ID to exit.
  # This hook is optional.
  # @parameter pid [Integer] The process ID to wait for.
  # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
  # @returns [Process::Status] A process status instance.
  def process_wait(pid, flags)
    Thread.new do
      Process::Status.wait(pid, flags)
    end.value
  end

  # Wait for the given io readiness to match the specified events within
  # the specified timeout.
  # @parameter event [Integer] A bit mask of `IO::READABLE`,
  #   `IO::WRITABLE` and `IO::PRIORITY`.
  # @parameter timeout [Numeric] The amount of time to wait for the event in seconds.
  # @returns [Integer] The subset of events that are ready.
  def io_wait(io, events, timeout)
  end

  # Read from the given io into the specified buffer.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to read from.
  # @parameter buffer [IO::Buffer] The buffer to read into.
  # @parameter length [Integer] The minimum amount to read.
  def io_read(io, buffer, length)
  end

  # Write from the given buffer into the specified IO.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to write to.
  # @parameter buffer [IO::Buffer] The buffer to write from.
  # @parameter length [Integer] The minimum amount to write.
  def io_write(io, buffer, length)
  end

  # Sleep the current task for the specified duration, or forever if not
  # specified.
  # @parameter duration [Numeric] The amount of time to sleep in seconds.
  def kernel_sleep(duration = nil)
  end

  # Execute the given block. If the block execution exceeds the given timeout,
  # the specified exception `klass` will be raised. Typically, only non-blocking
  # methods which enter the scheduler will raise such exceptions.
  # @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.
  # @parameter klass [Class] The exception class to raise.
  # @parameter *arguments [Array] The arguments to send to the constructor of the exception.
  # @yields {...} The user code to execute.
  def timeout_after(duration, klass, *arguments, &block)
  end

  # Resolve hostname to an array of IP addresses.
  # This hook is optional.
  # @parameter hostname [String] Example: "www.ruby-lang.org".
  # @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.
  def address_resolve(hostname)
  end

  # Block the calling fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.
  # @returns [Boolean] Whether the blocking operation was successful or not.
  def block(blocker, timeout = nil)
  end

  # Unblock the specified fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter fiber [Fiber] The fiber to unblock.
  # @reentrant Thread safe.
  def unblock(blocker, fiber)
  end

  # Intercept the creation of a non-blocking fiber.
  # @returns [Fiber]
  def fiber(&block)
    Fiber.new(blocking: false, &block)
  end

  # Invoked when the thread exits.
  def close
    self.run
  end

  def run
    # Implement event loop here.
  end
end

Zukünftig können zusätzliche Hooks eingeführt werden; wir werden Feature-Erkennung verwenden, um diese Hooks zu aktivieren.

Nicht-blockierende Ausführung

Die Scheduler-Hooks werden nur in speziellen nicht-blockierenden Ausführungskontexten verwendet. Nicht-blockierende Ausführungskontexte führen Nicht-Determinismus ein, da die Ausführung von Scheduler-Hooks Punkte für den Kontextwechsel in Ihr Programm einführen kann.

Fibers

Fibers können verwendet werden, um nicht-blockierende Ausführungskontexte zu erstellen.

Fiber.new do
  puts Fiber.current.blocking? # false

  # May invoke `Fiber.scheduler&.io_wait`.
  io.read(...)

  # May invoke `Fiber.scheduler&.io_wait`.
  io.write(...)

  # Will invoke `Fiber.scheduler&.kernel_sleep`.
  sleep(n)
end.resume

Wir führen auch eine neue Methode ein, die die Erstellung dieser nicht-blockierenden Fibers vereinfacht

Fiber.schedule do
  puts Fiber.current.blocking? # false
end

Der Zweck dieser Methode ist es, dem Scheduler zu ermöglichen, intern die Richtlinie zu bestimmen, wann der Fiber gestartet werden soll und ob symmetrische oder asymmetrische Fibers verwendet werden sollen.

Sie können auch blockierende Ausführungskontexte erstellen

Fiber.new(blocking: true) do
  # Won't use the scheduler:
  sleep(n)
end

Sie sollten dies jedoch generell vermeiden, es sei denn, Sie implementieren einen Scheduler.

IO

Standardmäßig ist die I/O nicht blockierend. Nicht alle Betriebssysteme unterstützen nicht-blockierende I/O. Windows ist ein bemerkenswertes Beispiel, bei dem Socket-I/O nicht blockierend sein kann, aber Pipe-I/O blockierend ist. Vorausgesetzt, es *gibt* einen Scheduler und der aktuelle Thread *ist nicht blockierend*, ruft die Operation den Scheduler auf.

IO#close

Das Schließen eines IO unterbricht alle blockierenden Operationen auf diesem IO. Wenn ein Thread IO#close aufruft, versucht er zunächst, alle Threads oder Fibers zu unterbrechen, die auf diesem IO blockiert sind. Der schließende Thread wartet, bis alle blockierten Threads und Fibers ordnungsgemäß unterbrochen und von der blockierenden Liste des IO entfernt wurden. Jeder unterbrochene Thread oder Fiber erhält einen IOError und wird sauber von der blockierenden Operation entfernt. Erst nachdem alle blockierenden Operationen unterbrochen und bereinigt wurden, wird der eigentliche Dateideskriptor geschlossen, was eine ordnungsgemäße Ressourcenbereinigung gewährleistet und potenzielle Race Conditions verhindert.

Für Fibers, die von einem Scheduler verwaltet werden, beinhaltet der Unterbrechungsprozess den Aufruf von rb_fiber_scheduler_fiber_interrupt an den Scheduler. Dies ermöglicht es dem Scheduler, die Unterbrechung auf eine Weise zu handhaben, die für seine Ereignisschleifenimplementierung geeignet ist. Der Scheduler kann dann den Fiber benachrichtigen, der einen IOError erhält und von der blockierenden Operation entfernt wird. Dieser Mechanismus stellt sicher, dass die Fiber-basierte Nebenläufigkeit korrekt mit IO-Operationen funktioniert, auch wenn diese Operationen durch IO#close unterbrochen werden.

sequenceDiagram
    participant ThreadB
    participant ThreadA
    participant Scheduler
    participant IO
    participant Fiber1
    participant Fiber2

    Note over ThreadA: Thread A has a fiber scheduler
    activate Scheduler
    ThreadA->>Fiber1: Schedule Fiber 1
    activate Fiber1
    Fiber1->>IO: IO.read
    IO->>Scheduler: rb_thread_io_blocking_region
    deactivate Fiber1

    ThreadA->>Fiber2: Schedule Fiber 2
    activate Fiber2
    Fiber2->>IO: IO.read
    IO->>Scheduler: rb_thread_io_blocking_region
    deactivate Fiber2

    Note over Fiber1,Fiber2: Both fibers blocked on same IO

    Note over ThreadB: IO.close
    activate ThreadB
    ThreadB->>IO: thread_io_close_notify_all
    Note over ThreadB: rb_mutex_sleep

    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)
    Scheduler->>Fiber1: fiber_interrupt with IOError
    activate Fiber1
    Note over IO: fiber_interrupt causes removal from blocking list
    Fiber1->>IO: rb_io_blocking_operation_exit()
    IO-->>ThreadB: Wakeup thread
    deactivate Fiber1

    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)
    Scheduler->>Fiber2: fiber_interrupt with IOError
    activate Fiber2
    Note over IO: fiber_interrupt causes removal from blocking list
    Fiber2->>IO: rb_io_blocking_operation_exit()
    IO-->>ThreadB: Wakeup thread
    deactivate Fiber2
    deactivate Scheduler

    Note over ThreadB: Blocking operations list empty
    ThreadB->>IO: close(fd)
    deactivate ThreadB

Mutex

Die Klasse Mutex kann in einem nicht-blockierenden Kontext verwendet werden und ist Fiber-spezifisch.

ConditionVariable

Die Klasse ConditionVariable kann in einem nicht-blockierenden Kontext verwendet werden und ist Fiber-spezifisch.

Queue / SizedQueue

Die Klassen Queue und SizedQueue können in einem nicht-blockierenden Kontext verwendet werden und sind Fiber-spezifisch.

Thread

Die Operation Thread#join kann in einem nicht-blockierenden Kontext verwendet werden und ist Fiber-spezifisch.