class Fiber::Scheduler
Dies ist keine existierende Klasse, sondern die Dokumentation der Schnittstelle, die ein Scheduler-Objekt erfüllen muss, um als Argument für Fiber.scheduler verwendet zu werden und nicht-blockierende Faser zu verwalten. Siehe auch den Abschnitt „Nicht-blockierende Fasern“ in der Dokumentation der Fiber-Klasse für Erklärungen einiger Konzepte.
Das Verhalten und die Verwendung eines Schedulers werden wie folgt erwartet:
-
Wenn die Ausführung in der nicht-blockierenden
Fibereine blockierende Operation erreicht (wie Schlaf, Warten auf einen Prozess oder ein nicht verfügbares I/O), ruft sie einige der unten aufgeführten Hook-Methoden des Schedulers auf. -
Schedulerregistriert irgendwie, worauf die aktuelle Faser wartet, und gibt die Kontrolle mitFiber.yieldan andere Fasern ab (wodurch die Faser während des Wartens auf das Ende ihrer Wartezeit ausgesetzt wird und andere Fasern im selben Thread ausgeführt werden können). -
Am Ende der Ausführung des aktuellen Threads wird die Scheduler-Methode scheduler_close aufgerufen.
-
Der Scheduler läuft in einer Warte-Schleife, prüft alle blockierten Fasern (die er bei Hook-Aufrufen registriert hat) und setzt sie fort, wenn die benötigte Ressource bereit ist (z. B. I/O bereit oder Schlafzeit abgelaufen).
Auf diese Weise wird eine gleichzeitige Ausführung transparent für den Code jeder einzelnen Faser erreicht.
Scheduler-Implementierungen werden von Gems bereitgestellt, wie z. B. Async.
Hook-Methoden sind:
-
io_wait,io_read,io_write,io_pread,io_pwriteio_selectundio_close -
(Die Liste wird erweitert, da Ruby-Entwickler weitere Methoden mit nicht-blockierenden Aufrufen hinzufügen)
Sofern nicht anders angegeben, sind die Hook-Implementierungen obligatorisch: Wenn sie nicht implementiert sind, schlagen die Methoden, die versuchen, den Hook aufzurufen, fehl. Zur Gewährleistung der Abwärtskompatibilität werden Hooks in Zukunft optional sein (wenn sie nicht implementiert sind, da der Scheduler für eine ältere Ruby-Version erstellt wurde, wird der Code, der diesen Hook benötigt, nicht fehlschlagen und sich einfach blockierend verhalten).
Es wird außerdem dringend empfohlen, dass der Scheduler die Methode fiber implementiert, die von Fiber.schedule delegiert wird.
Ein beispielhaftes *Spielzeug*-Implementierung des Schedulers finden Sie in Rubys Code unter test/fiber/scheduler.rb.
Öffentliche Instanzmethoden
Source
VALUE
rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
{
VALUE arguments[] = {
hostname
};
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
Wird von jeder Methode aufgerufen, die eine nicht-reverse DNS-Auflösung durchführt. Die bemerkenswerteste Methode ist Addrinfo.getaddrinfo, aber es gibt viele andere.
Die Methode soll ein Array von Strings zurückgeben, die den IP-Adressen entsprechen, zu denen hostname aufgelöst wird, oder nil, wenn es nicht aufgelöst werden kann.
Eine ziemlich erschöpfende Liste aller möglichen Aufrufstellen
-
Addrinfo.marshal_load
Source
VALUE
rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
return rb_funcall(scheduler, id_block, 2, blocker, timeout);
}
Wird von Methoden wie Thread.join und von Thread::Mutex aufgerufen, um anzuzeigen, dass die aktuelle Fiber bis auf weiteres (z. B. durch unblock) oder bis zum Ablauf von timeout blockiert ist.
blocker ist das, worauf gewartet wird, nur zur Information (für Debugging und Protokollierung). Es gibt keine Garantie für dessen Wert.
Es wird erwartet, dass ein boolescher Wert zurückgegeben wird, der angibt, ob die blockierende Operation erfolgreich war oder nicht.
Source
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
{
// Check if scheduler supports blocking_operation_wait before creating the object
if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
return Qundef;
}
// Create a new BlockingOperation with the blocking operation
VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
// Get the operation data to check if it was executed
rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
// Invalidate the operation now that we're done with it
operation->function = NULL;
operation->state = NULL;
operation->data = NULL;
operation->data2 = NULL;
operation->unblock_function = NULL;
// If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
return Qundef;
}
return result;
}
Wird von den Kernmethoden von Ruby aufgerufen, um eine blockierende Operation auf nicht-blockierende Weise auszuführen. Die blocking_operation ist ein undurchsichtiges Objekt, das die blockierende Operation kapselt und auf einen Methodenaufruf ohne Argumente reagiert.
Wenn der Scheduler diese Methode nicht implementiert oder der Scheduler die blockierende Operation nicht ausführt, greift Ruby auf die nicht-Scheduler-Implementierung zurück.
Minimale vorgeschlagene Implementierung ist
def blocking_operation_wait(blocking_operation) Thread.new { blocking_operation.call }.join end
Source
VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
RUBY_ASSERT(ruby_thread_has_gvl_p());
VALUE result;
// The reason for calling `scheduler_close` before calling `close` is for
// legacy schedulers which implement `close` and expect the user to call
// it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
// which should call `scheduler_close`. If it were to call `close`, it
// would create an infinite loop.
result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
if (!UNDEF_P(result)) return result;
result = rb_check_funcall(scheduler, id_close, 0, NULL);
if (!UNDEF_P(result)) return result;
return Qnil;
}
Wird aufgerufen, wenn der aktuelle Thread beendet wird. Es wird erwartet, dass der Scheduler diese Methode implementiert, um allen wartenden Fasern die Beendigung ihrer Ausführung zu ermöglichen.
Das vorgeschlagene Muster ist die Implementierung der Hauptereignisschleife in der Methode close.
Source
VALUE
rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
{
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
}
Implementierung von Fiber.schedule. Es wird *erwartet*, dass die Methode den gegebenen Codeblock sofort in einer separaten, nicht-blockierenden Faser ausführt und diese Fiber zurückgibt.
Minimale vorgeschlagene Implementierung ist
def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end
Source
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};
VALUE result;
enum ruby_tag_type state;
// We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
rb_execution_context_t *ec = GET_EC();
int saved_interrupt_mask = ec->interrupt_mask;
ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}
EC_POP_TAG();
ec->interrupt_mask = saved_interrupt_mask;
if (state) {
EC_JUMP_TAG(ec, state);
}
RUBY_VM_CHECK_INTS(ec);
return result;
}
Source
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
VALUE arguments[] = {io};
return rb_check_funcall(scheduler, id_io_close, 1, arguments);
}
Wird von den Kernmethoden von Ruby aufgerufen, um den Scheduler darüber zu informieren, dass das IO-Objekt geschlossen wurde. Beachten Sie, dass die Methode einen ganzzahligen Dateideskriptor des geschlossenen Objekts erhält, nicht das Objekt selbst.
Source
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pread)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
} else {
return fiber_scheduler_io_pread((VALUE)&arguments);
}
}
Wird von IO#pread oder IO::Buffer#pread aufgerufen, um length Bytes von io an Offset from in einen angegebenen buffer (siehe IO::Buffer) am gegebenen offset zu lesen.
Diese Methode ist semantisch identisch mit io_read, ermöglicht aber die Angabe des Offsets, von dem gelesen werden soll, und ist oft besser für asynchrone IO-Vorgänge auf derselben Datei.
Die Methode sollte als *experimentell* betrachtet werden.
Source
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pwrite)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
} else {
return fiber_scheduler_io_pwrite((VALUE)&arguments);
}
}
Wird von IO#pwrite oder IO::Buffer#pwrite aufgerufen, um length Bytes von io an Offset from aus einem angegebenen buffer (siehe IO::Buffer) am gegebenen offset zu schreiben.
Diese Methode ist semantisch identisch mit io_write, ermöglicht aber die Angabe des Offsets, zu dem geschrieben werden soll, und ist oft besser für asynchrone IO-Vorgänge auf derselben Datei.
Die Methode sollte als *experimentell* betrachtet werden.
Source
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_read)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
} else {
return fiber_scheduler_io_read((VALUE)&arguments);
}
}
Wird von IO#read oder IO#Buffer.read aufgerufen, um length Bytes von io in einen angegebenen buffer (siehe IO::Buffer) am gegebenen offset zu lesen.
Das Argument length ist die „minimale zu lesende Länge“. Wenn die Puffergröße von IO 8 KiB beträgt, aber length 1024 (1 KiB) ist, können bis zu 8 KiB gelesen werden, aber mindestens 1 KiB wird gelesen. Im Allgemeinen ist der einzige Fall, in dem weniger Daten als length gelesen werden, ein Fehler beim Lesen der Daten.
Die Angabe einer length von 0 ist gültig und bedeutet, dass versucht wird, mindestens einmal zu lesen und alle verfügbaren Daten zurückzugeben.
Die vorgeschlagene Implementierung sollte versuchen, aus io nicht-blockierend zu lesen und io_wait aufzurufen, wenn io nicht bereit ist (was die Kontrolle an andere Fasern übergibt).
Siehe IO::Buffer für eine Schnittstelle, die zur Rückgabe von Daten verfügbar ist.
Es wird erwartet, dass die Anzahl der gelesenen Bytes zurückgegeben wird oder im Fehlerfall -errno (negativer Wert entsprechend dem Systemfehlercode).
Die Methode sollte als *experimentell* betrachtet werden.
Source
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
{
VALUE arguments[] = {
readables, writables, exceptables, timeout
};
return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
}
Source
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
VALUE arguments[] = {
scheduler, io, events, timeout
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
} else {
return fiber_scheduler_io_wait((VALUE)&arguments);
}
}
Wird von IO#wait, IO#wait_readable, IO#wait_writable aufgerufen, um zu erfragen, ob der angegebene Deskriptor innerhalb des angegebenen timeout für die angegebenen Ereignisse bereit ist.
events ist eine Bitmaske von IO::READABLE, IO::WRITABLE und IO::PRIORITY.
Die vorgeschlagene Implementierung sollte registrieren, welche Fiber auf welche Ressourcen wartet, und sofort Fiber.yield aufrufen, um die Kontrolle an andere Fasern zu übergeben. Dann könnte der Scheduler in der Methode close alle I/O-Ressourcen an wartende Fasern verteilen.
Es wird erwartet, dass die Teilmenge der sofort verfügbaren Ereignisse zurückgegeben wird.
Source
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_write)) {
return RUBY_Qundef;
}
VALUE arguments[] = {
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
} else {
return fiber_scheduler_io_write((VALUE)&arguments);
}
}
Wird von IO#write oder IO::Buffer#write aufgerufen, um length Bytes aus einem angegebenen buffer (siehe IO::Buffer) am gegebenen offset in io zu schreiben.
Das Argument length ist die „minimale zu schreibende Länge“. Wenn die Puffergröße von IO 8 KiB beträgt, aber die angegebene length 1024 (1 KiB) ist, werden höchstens 8 KiB geschrieben, aber mindestens 1 KiB. Im Allgemeinen ist der einzige Fall, in dem weniger Daten als length geschrieben werden, ein Fehler beim Schreiben der Daten.
Die Angabe einer length von 0 ist gültig und bedeutet, dass versucht wird, mindestens einmal so viel Daten wie möglich zu schreiben.
Die vorgeschlagene Implementierung sollte versuchen, aus io nicht-blockierend zu schreiben und io_wait aufzurufen, wenn io nicht bereit ist (was die Kontrolle an andere Fasern übergibt).
Siehe IO::Buffer für eine Schnittstelle, die zur effizienten Abfrage von Daten aus dem Puffer verfügbar ist.
Es wird erwartet, dass die Anzahl der geschriebenen Bytes zurückgegeben wird oder im Fehlerfall -errno (negativer Wert entsprechend dem Systemfehlercode).
Die Methode sollte als *experimentell* betrachtet werden.
Source
VALUE
rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
{
return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
}
Wird von Kernel#sleep und Thread::Mutex#sleep aufgerufen und soll eine nicht-blockierende Implementierung des Schlafs bereitstellen. Die Implementierung könnte die aktuelle Faser in einer Liste von „welche Faser wartet bis wann“ registrieren, Fiber.yield aufrufen, um die Kontrolle zu übergeben, und dann in close die Fasern fortsetzen, deren Wartezeit abgelaufen ist.
Source
VALUE
rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
VALUE arguments[] = {
PIDT2NUM(pid), RB_INT2NUM(flags)
};
return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
}
Wird von Process::Status.wait aufgerufen, um auf einen bestimmten Prozess zu warten. Siehe die Beschreibung dieser Methode für die Argumentbeschreibungen.
Vorgeschlagene minimale Implementierung
Thread.new do Process::Status.wait(pid, flags) end.value
Dieser Hook ist optional: Wenn er im aktuellen Scheduler nicht vorhanden ist, verhält sich Process::Status.wait blockierend.
Es wird erwartet, dass eine Instanz von Process::Status zurückgegeben wird.
Source
VALUE
rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
{
VALUE arguments[] = {
timeout, exception, message
};
return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
}
Wird von Timeout.timeout aufgerufen, um den gegebenen block innerhalb der gegebenen duration auszuführen. Er kann auch direkt vom Scheduler oder vom Benutzercode aufgerufen werden.
Versucht, die Ausführungszeit eines gegebenen block auf die gegebene duration zu begrenzen, wenn möglich. Wenn eine nicht-blockierende Operation dazu führt, dass die Ausführungszeit des block die angegebene duration überschreitet, sollte diese nicht-blockierende Operation durch Auslösen der angegebenen exception_class, konstruiert mit den gegebenen exception_arguments, unterbrochen werden.
Allgemeine Ausführungs-Timeouts gelten oft als riskant. Diese Implementierung unterbricht nur nicht-blockierende Operationen. Dies geschieht absichtlich, da erwartet wird, dass nicht-blockierende Operationen aus einer Vielzahl unvorhersehbarer Gründe fehlschlagen können. Anwendungen sollten daher bereits robust im Umgang mit diesen Bedingungen und implizit mit Timeouts sein.
Als Ergebnis dieses Designs ist es jedoch unmöglich, den block zu unterbrechen, wenn er keine nicht-blockierenden Operationen aufruft. Wenn Sie vorhersagbare Punkte für Timeouts bereitstellen möchten, sollten Sie sleep(0) hinzufügen.
Wenn der Block erfolgreich ausgeführt wird, wird sein Ergebnis zurückgegeben.
Die Ausnahme wird typischerweise mit Fiber#raise ausgelöst.
Source
VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
RUBY_ASSERT(rb_obj_is_fiber(fiber));
VALUE result;
enum ruby_tag_type state;
// `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
//
// If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
int saved_errno = errno;
// We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
rb_execution_context_t *ec = GET_EC();
int saved_interrupt_mask = ec->interrupt_mask;
ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}
EC_POP_TAG();
ec->interrupt_mask = saved_interrupt_mask;
if (state) {
EC_JUMP_TAG(ec, state);
}
RUBY_VM_CHECK_INTS(ec);
errno = saved_errno;
return result;
}
Wird aufgerufen, um eine Fiber zu wecken, die zuvor mit block blockiert wurde (z. B. ruft Thread::Mutex#lock block auf und Thread::Mutex#unlock ruft unblock auf). Der Scheduler sollte den Parameter fiber verwenden, um zu verstehen, welche Faser entblockiert wird.
blocker ist das, worauf gewartet wurde, aber es dient nur zur Information (für Debugging und Protokollierung) und es gibt keine Garantie, dass es mit dem blocker für block übereinstimmt.
Source
VALUE
rb_fiber_scheduler_yield(VALUE scheduler)
{
// First try to call the scheduler's yield method, if it exists:
VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
if (!UNDEF_P(result)) return result;
// Otherwise, we can emulate yield by sleeping for 0 seconds:
return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
}
Übergabe an den Scheduler, um im nächsten Planungszyklus fortgesetzt zu werden.