c't 15/2023
S. 140
Wissen
Asynchroner I/O
Bild: KI Midjourney | Bearbeitung: c’t

Pausenlos glücklich

Asynchrone I/O-Operationen in Python

Damit Software keine Zeit beim Warten darauf verplempert, dass eine I/O-Operation ein Ergebnis liefert, muss man Ein- und Ausgabe parallelisieren. Wir zeigen an der eigenen Umsetzung eines Hacking-Werkzeugs, wie das mit Python geht.

Von Oliver Lau

CPUs und I/O-Schnittstellen können noch so schnell sein: Wenn die Software nicht mitspielt, liegen teure Ressourcen brach. Abhilfe schaffen asynchrone I/O-Operationen. Damit muss ein Rechner nicht mehr tatenlos zusehen, bis er beispielsweise von einem Webdienst endlich die Antwort auf eine Anfrage erhält. Währenddessen kann er andere Dinge erledigen, etwa weitere Anfragen verschicken.

Ein Tool, das von asynchroner Verarbeitung profitieren würde, ist der bei Hackern beliebte, in C geschriebene Web-Content-Scanner Dirb (Dir Buster). Man lässt ihn auf einen Webserver los, um herauszufinden, auf welche URLs er antwortet. Dazu übergibt man Dirb Wortlisten, aus denen er URLs zusammenbaut und diese nacheinander anfragt.

Im Wort „nacheinander“ liegt die Krux, denn Dirb schickt die nächste Anfrage erst dann los, wenn die Antwort zur vorherigen eingetroffen ist. In der Umlaufzeit von der Anfrage zur Antwort (Round-Trip Time, RTT) dreht das Programm Däumchen – und mit ihm der Hacker. In Python formuliert sähe das in etwa wie folgt aus:

import urllib.request
def dirbust(urls: list):
  for url in urls:
    with urllib.request.urlopen(url) as response:
      print(response.getcode())

Die Funktion dirbust() ruft in einer Schleife eine URL nach der anderen auf, wartet jeweils auf die Antwort und gibt deren HTTP-Status-Code aus, zum Beispiel 404 für „Not found“ oder 200 für „OK“.

Bei einer Handvoll URLs stört die serielle Verarbeitung nicht, aber wenn die Umlaufzeiten bei nicht ungewöhnlichen 100 Millisekunden und mehr liegen, dann dauert das Abfragen von 10.000 URLs schon fast 17 Minuten. Diese Zeit kann man deutlich verkürzen, wenn das Programm weitere Abfragen losschickt, bis die Antwort auf die erste eintrifft. Kaum dass sie da ist, schickt es die nächste Abfrage raus und so weiter. Es gilt also, die RTT sinnvoll zu nutzen.

Jetzt könnte man wie die Entwickler der längst nicht mehr gepflegten Java-Version „Dirbuster“ auf die Idee kommen, einen Pool von Threads anzulegen, aus denen die Anfragen verschickt werden. Damit hat man zwar den I/O parallelisiert, muss sich aber abmühen, die Resultate aus den Threads einzusammeln und gegebenenfalls Zugriffe auf gemeinsam genutzte Variablen zu serialisieren. Viel einfacher geht es mit asynchronen Operationen, auch Coroutinen genannt.

Python-Programmierer finden die Unterstützung dafür in der Bibliothek asyncio, die es seit Python 3.7 gibt. Echte Parallelität (siehe Grafik links) ist damit leider nicht möglich, weil Python-Skripte grundsätzlich in einem Thread ablaufen. Das ist aber nicht weiter schlimm, denn die möglichen Performancesteigerungen sind auch ohne Multithreading beträchtlich: mit geringem Aufwand um den Faktor 10.

Nebenher

Um den Umgang mit asynchronen Aufrufen zu verstehen, legen Sie obiges Code-Beispiel erst einmal gedanklich beiseite und widmen sich folgendem Skript (1.py im Repository zu diesem Artikel, siehe ct.de/yc1f). Es ist der erste Schritt auf einen Holzweg, den Sie aber einmal gegangen sein sollten, um später immer sofort den richtigen Weg einschlagen zu können:

import time

def a():
  print("a()")
  time.sleep(2)
  print(f"a() {time.time()-t0:.1f}s")
    
def b():
  print("b()")
  time.sleep(1)
  print(f"b() {time.time()-t0:.1f}s")

t0 = time.time()
def main():
  a()
  b()

main()

Die Funktionen a() und b() sollen darin Abrufe von URLs repräsentieren, wobei time.sleep() Umlaufzeiten von 2 Sekunden und 1 Sekunde simuliert. Erwartungsgemäß gibt das Skript beim Aufruf von main() Folgendes aus:

a()
a() 2.0s
b()
b() 3.0s

Da das Ziel lautet, die Abrufe zu parallelisieren, müssen a() und b() asynchron ablaufen können. Ein async vor der Definition macht sie zu einer Coroutine:

async def a():
  # ...

Ruft man diese Funktion mit a() auf, führt Python sie nicht aus, sondern gibt ein Coroutinen-Objekt zurück. Um sie tatsächlich auszuführen, muss man in main() vor dem Aufruf await schreiben:

await a()

Damit ist erst mal nichts gewonnen, denn das await führt dazu, dass Python die Verarbeitung des Codes erst dann in der nächsten Zeile fortsetzt, wenn a() zurückgekehrt ist. Schlimmer noch: Python beschwert sich mit der Fehlermeldung „SyntaxError: 'await' outside async function“ und bricht ab. Aha, await darf man nur innerhalb einer async-Funktion nutzen, also muss man main() noch als solche deklarieren.

Und man braucht einen Mechanismus, der eine Coroutine sofort startet, ohne dass ein await davor steht. Das geht mithilfe eines sogenannten Tasks, den die Funktion create_task() aus einer Coroutine erzeugt:

async def main():
  task1 = asyncio.create_task(a())
  task2 = asyncio.create_task(b())
  await task1
  await task2
  
main()

create_task() ist vor allem dann nützlich, wenn man asynchrone Operationen in einer Schleife abfeuern will, zum Beispiel URL-Abrufe mit URLs aus einer Liste wie im Eingangsbeispiel. Wie das geht, dazu kommen wir gleich.

Da das Programm erst dann enden darf, wenn alle asynchronen Operationen abgeschlossen sind, muss es am Ende von main() auf die Beendigung der Tasks warten. Wenn Sie nun das Skript (2.py im Repository) starten, meckert Python leider abermals, diesmal mit „RuntimeWarning: coroutine 'main' was never awaited“. Oha, ein Skript muss also auf jeden Fall irgendwo mit await auf eine Coroutine warten.

Beißt sich die Katze da nicht in den Schwanz, wenn Funktionen, die eine async-Funktion awaiten, ebenfalls async sein müssen, aber main() wiederum awaitet werden muss? Genau. Es gibt nur einen Weg, dem Teufelskreis zu entkommen: Sie müssen die Coroutine main() der Ereignisverarbeitungsschleife (Event-Loop) von asyncio übergeben, und zwar mit

asyncio.run(main())

Diese Schleife können Sie sich wie eine Endlosschleife vorstellen, die

  1. eine Coroutine startet, wann immer der Programmfluss auf eine stößt,
  2. die Kontrolle von ihr übernimmt, wenn die Coroutine endet oder ihrerseits eine Coroutine startet,
  3. eine inaktive Coroutine aufweckt, wenn das Ergebnis vorliegt, auf das sie seit dem Starten einer asynchronen Operation mit await wartet,
  4. erst dann endet, wenn die letzte Coroutine endet.

Wenn Sie die überarbeitete Fassung des Skripts (3.py) ausprobieren, erscheint:

a()
a() 2.0s
b()
b() 3.0s

Aber Moment mal, wieso ist b() erst nach insgesamt drei Sekunden fertig, wo die Coroutine doch unmittelbar nach a() startet? Ganz einfach: weil der bisherige Code gegen Regel 2 der Event-Loop verstößt. Denn weder a() noch b() rufen eine asynchrone Funktion auf, wonach die Event-Loop wieder die Kontrolle über den Programmablauf übernehmen könnte, um die nächste Coroutine zu starten.

Der Trick ist, die sleep()-Funktion aus asyncio zu verwenden, also jeweils await asyncio.sleep(…) zu schreiben (siehe Skript 4.py). Nun gibt das Skript endlich Folgendes aus:

a()
b()
b() 1.0s
a() 2.0s

Allgemein gilt: Jede Funktion, die asynchron ausgeführt werden soll, darf zwar synchrone Funktionen benutzen, aber besser ist es, eventuell vorhandene asynchrone Versionen davon zu verwenden, weil nur diese die Kontrolle bei einem await an den Haupt-Thread zurückgeben. Das gilt nicht nur für den Abruf von URLs, sondern für alle I/O-Funktionen, so auch etwa für das Lesen und Schreiben von Dateien oder für Datenbankzugriffe. Das Abrufen von URLs mit urllib.request.urlopen() wird niemals asynchron funktionieren, weil diese Funktion nicht async ist.

Dirb-Klon

Asynchrone Funktionen zum Abrufen von URLs bieten die Bibliotheken aiohttp und tornado. Weil uns tornado beim Blick auf den im Internet verfügbaren Beispielcode subjektiv einen Hauch eleganter vorkam, ist unsere Wahl darauf gefallen.

Im Listing am Ende des Artikels sehen Sie den Umgang damit im Kontext einer rudimentären Dirb-Implementierung; das GitHub-Repository zu diesem Artikel hält neben dieser eine weitaus mächtigere Fassung bereit (siehe ct.de/yc1f). Die begleitende README-Datei erklärt, wie Sie das Skript in Betrieb nehmen und mit welchen Parametern Sie es aufrufen können.

Los gehts ganz unten in der Funktion main(), die als Coroutine in der Event-Loop landet. Der Aufruf zeigt, dass man einer Coroutine wie von normalen Funktionen gewohnt Parameter übergeben kann. Hier steht num_workers für die Anzahl von sogenannten Workern. Als Worker bezeichnet man asynchrone Funktionen, die lang andauernde oder CPU-intensive Aufgaben erledigen, ohne den Haupt-Thread oder die Event-Loop zu blockieren. Man sagt auch, dass ein Worker im Hintergrund arbeitet.

Die Methode run() der Klasse NanoDirb ruft die Worker ins Leben, indem sie so viele Tasks aus der asynchronen Funktion worker() erzeugt, wie num_workers vorgibt.

Außerdem schiebt run() alle ihr übergebenen URLs mit self.queue.put() in eine sogenannte Queue, die man sich wie ein langes Fließband vorstellen kann, auf dem sämtliche Arbeitsaufträge Platz finden. Eine asyncio.Queue ist eine FIFO-Warteschlange (first in, first out), die die Aufträge in der Reihenfolge ihres Eingangs herausgibt. Außerdem gibt es eine asyncio.LifoQueue, die genau andersherum funktioniert (last in …), sowie eine asyncio.PriorityQueue, bei der man jeden Auftrag mit einem Etikett versehen kann, das Aufschluss über die Dringlichkeit des Auftrags gibt. Die Prioritätswarteschlange gibt die Aufträge typischerweise nach sinkender Dringlichkeit heraus.

Liegen alle Aufträge auf dem Fließband, wartet self.queue.join() so lange, bis sie abgearbeitet wurden. Die Arbeit verrichten die Worker am Ende des Fließbands, die über den Aufruf von self.queue.get() die URLs eine nach der anderen vom Band herunternehmen und sie über tornado.AsyncHTTPClient().fetch() abrufen. Da diese Funktion asynchron ist, greift Event-Loop-Regel Nummer 2 und die Coroutine gibt die Kontrolle sofort nach dem Absetzen der Anfrage an die Event-Loop zurück. Wenn die Antwort vorliegt, setzt die Event-Loop nach Regel 3 die Ausführung am await fort, womit die Antwort in response landet.

Mit response muss nun irgendwas Sinnvolles geschehen. Idealerweise sammelt das Skript die Ergebnisse an der Stelle, an der es den gesamten Prozess in Gang gesetzt hat: in main(). In den Zeilen nach fetch() sehen Sie, wie eine asynchrone Funktion mit dem Haupt-Thread kommunizieren kann: über eine sogenannte Callback-Funktion. main() hat sie als Keyword-Argument an den Konstruktor (__init__()) der Klasse NanoDirb übergeben, womit sie dort in self.result_callback landet. Handelt es sich bei der Membervariablen um etwas Ausführbares (was callable() prüft), dann ruft der Worker das Callback mit einem Dictionary bestehend aus HTTP-Status-Code und angefragter URL auf. Die Werte landen in der Funktion result_hook() (siehe unten in main()), die sie auf dem Bildschirm ausgibt. Anstelle eines Callbacks hätte man die Antworten zum Beispiel auch in einer Liste innerhalb von NanoDirb speichern können, um sie am Ende abzuholen, wüsste dann aber nicht ohne Weiteres, wie die Verarbeitung voranschreitet.

Sollte bei der Abfrage etwas schiefgegangen sein, wirft der AsyncHTTPClient() einen HTTPClientError, womit der except-Block das Callback mit der URL und dem Fehlercode aufruft. In jedem Fall stellt die finally-Klausel sicher, dass queue.task_done() einen internen Zähler dekrementiert, der angibt, wie viele Aufträge sich in der Warteschlange befinden. Geht der Zähler auf null, kehrt das in run() aufgerufene queue.join() zurück und cancel() läutet für jeden Worker die Feierabendglocke.

Dann greift die zweite Ausnahmebehandlung (except asyncio.CancelledError) und alle Worker kehren zurück. Weil damit in Gestalt von main() die letzte Coroutine endet, greift Regel 4 und die Event-Loop endet – das Skript hat seine Arbeit getan.

Performance

Um die Hypothese zu überprüfen, dass asynchrone Verarbeitung in mehreren Workern die Abrufrate verbessert, haben wir das Skript mit einer unterschiedlichen Anzahl von Workern in drei verschiedenen Szenarien auf einen mit 1 Gbit/s angebundenen Webserver losgelassen: via DSL mit rund 100 Mbit/s im Down- und 40 Mbit/s im Upstream, von einem dicht an einem Internet-Backbone stehenden Rechner sowie im lokalen 1-Gigabit-Ethernet. Die Ergebnisse der Messungen sehen Sie im folgenden Diagramm.

Gut zu erkennen ist, dass das Skript umso schneller fertig ist, je höher die Bandbreite und je kürzer die Latenz. Ebenso gut sieht man, dass bei 10 Workern eine Sättigung in unseren Testläufen eintrat, selbst dann noch, wenn die Latenz wie im lokalen Netzwerk so niedrig ist, dass die serielle Abfrage nicht einmal mehr 8 Sekunden für alle 4403 URLs benötigt. Ergo: Noch mehr Worker können die Laufzeit nicht senken, also die Abfragerate nicht erhöhen. Darum ist die Standardeinstellung für die einzusetzenden Worker im Skript dirb.py gleich 10.

Fazit

Das Parallelisieren von I/O-Zugriffen mithilfe asynchroner Verarbeitung lohnt sich, und zwar unabhängig von Latenz und Bandbreite. Es gilt allerdings nicht das Motto „Viel hilft viel“. Wenn man asynchrone Verarbeitung einsetzt, um den Systemdurchsatz zu erhöhen, sollte man in Feldversuchen empirisch ermitteln, welcher Grad an Parallelisierung der Umgebung angemessen ist.

Für einen kommenden Artikel erproben wir, was die Parallelisierung mit Threads bewirkt, dann aber eher nicht mit Python, sondern aus Performancegründen mit C++. (ola@ct.de)

Quellcode bei GitHub: ct.de/yc1f

Kommentieren