lars
webmobiledatenbankendevopsarchitektur
hello (at) larskoelpin.de

Skalierbare und Fehlertolerante Presence-Systeme

February 26, 2023
architecturedatenbankenweb

Fast bei allen kollaborativen Echtzeit-Applikationen kann der Nutzer sehen, wer grade aktiv mitarbeitet — auch bekannt als Presence-Systeme. Obwohl die meisten Bibliotheken wie socket.io vermeindlich simple Lösungen anbieten, sind diese in skalierbaren, verteilten Systemen häufig nicht ausreichend. Doch welche Schwierigkeiten sind zu bewältigen, und wie sind diese zu lösen?

Presence Systeme verwalten den Online-Status und das Verteilen von Nachrichten der Nutzer für bestimmte Anwendungsfälle. Diese Anwendungsfälle sind sicher bekannt aus Applikationen wie dem Facebook Messenger oder dem kollaborativen Arbeiten mit Google Docs — Eben dort wo verschiedene Nutzer in Echtzeit miteinander kommunizieren. Verschiedene Nutzer arbeiten hier beispielsweise gemeinsam an einem Dokument. Dazu wird angezeigt, welche Nutzer grade aktiv mitarbeiten (Presence) und dessen Änderungen am Text synchronisiert (das Austauschen von Nachrichten).

Reden wir über das Skalieren dieser Systeme ist das Erhöhen der maximalen Anzahl möglicher technischer Verbindungen, das effiziente Versenden von Nachrichten dieser untereinander und das Erhöhen der Fehlertoleranz dieser Systeme (Ausfallsicherheit) gemeint. Was relativ simpel klingt, ist verteilt — horizontal skalierbar — gedacht nicht ganz trivial.

Doch von vorne: Angenommen ein simples Presence-System besteht aus vielen Web-Clients und einem einzigen Server (dieser wird ggf. Vertikal, d.h. starker Hardware, skaliert).

Bild 1: Simple Presence System
Bild 1: Simple Presence System

Bei diesem System gibt es viele (Web)-Clients und — wichtiger — einen einzigen Server. Dieser Server hat 3 Aufgaben:

  • Verwalten des fachlichen Zustandes (Wer ist wo angemeldet) — abgebildet durch “angemeldete User[]”
  • Halten der technischen Verbindungen (Wem kann ich eine Nachricht senden) — abgebildet durch die TCP-Verbindungen zwischen Server und Client.
  • Austauschen von Nachrichten zwischen den Clients

Der Zustand ist im Falle eines kollaborativen Dokumentes beispielsweise die Menge aller aktiven Nutzer an diesem Dokument. Das Konzept, das Nutzer zusammenkommen lässt (Dokumente, Gruppen usw.) nenne ich im Folgenden Räume. Das Halten der technischen Verbindung passiert in der Regel über TCP, z.B. über WebSockets. Durch diese Verbindung können Nachrichten an den Client gesendet, und zwischen den Clients ausgetauscht, werden.

Wenn wir über skalierbare, fehlertolerante Systeme reden, ist ein einzelner Server nicht ausreichend. Skalierbare Systeme zeichnen sich dadurch aus, dass beliebig Server hinzugefügt- und weggenommen werden können. Das System funktioniert trotz dieser einzelnen Ereignisse.

Werden bei der simplen Implementierung Server hinzugefügt ergibt sich dann ein solches System:

Bild 2: Scaling up
Bild 2: Scaling up

Das Problem ist jetzt also ein Zustandsproblem. Während einige Web-Clients mit dem Server A kommunizieren, ist es möglich, dass ein anderer Web-Client zum Server C sendet. Obwohl sich diese Web-Clients im selben Raum befinden (d.h. Zustand vom Raum abfragen wollen) ist es nicht möglich — Server A kennt Server C und dessen Raum-Zustand nicht.

Eine Möglichkeit wäre dafür zu sorgen, dass Web-Clients die zu einem bestimmten Raum gehören, alle mit demselben Server sprechen. Angenommen alle Räume hätten eine bestimmte alpha-numerische ID. Dann würden alle Clients aus Raum A-H immer mit Server A kommunizieren, alle Clients aus Raum H-P kommunizieren mit Server B usw. Doch was passiert, wenn der Server A nicht mehr erreichbar ist? Alle Nutzer die diesem Server zugewiesen wurden von der Applikation ausgeschlossen: Verbinden sich diese nach einem Neuverbinden mit einem anderen Server, so ist der Raum verloren — oder ein Neuverbinden ist nicht möglich (die Daten sind ja nicht erreichbar). Wir müssten also in diesem Falle selbst Redundanzen schaffen, sodass ein Server-Ausfall verkraftet wird.

Was passiert, wenn ein Raum unendlich groß ist und ein anderer Raum aus nur 2 Nutzern besteht? In diesem Falle ist der Server A überlastet und Server B erhält beinahe keine Anfragen. Wir müssen Server A wieder vertikal skalieren — und vertikales Skalieren ist begrenzt.

Glücklicherweise gibt es dazu bereits eine Lösung — zustandsvolle Systeme: Datenbanken. Datenbanken sind genau auf diese Anwendungsfälle, wie das Replizieren, Aufteilen und das Verwalten von Daten, spezialisiert. Ein System, das besonders für den Fall eines “Presence”-Systems eignet ist Redis. Redis ist eine In-Memory Datenbank, die schnelle Zugriffe für flüchtige Daten anbietet. Sie bietet Untersützung für sharding und asynchrone Replikation — Aufteilen und Kopieren von Daten auf verschiedene Knoten. Außerdem bietet Redis ein PubSub-System an, um Daten zwischen Applikationen auszutauschen.

Angewandt auf den Anwendungsfall, dass viele verschiedene Server auf die Daten zugreifen wollen heißt das, dass Räume nicht auf den einzelnen Servern, sondern im Redis Cluster vorgehalten werden. Der Redis-Cluster bietet die Anfrage GET Raum X / SET Raum X — wie dieser intern verwaltet wird, ist aus architektonischer Sicht irrelevant. Im Folgenden nehme ich also an, dass das Redis-Cluster entsprechend skaliert werden kann.

Bild 3: Redis
Bild 3: Redis

Egal mit welchem Server ein Web-Client kommuniziert, er leitet alle Anfragen auf die gemeinsame Wahrheit — das Redis Cluster — weiter. Somit sind die Server zustandslos und lassen sich skalieren. Die Skalierungstechnik von zuvor, das Aufteilen der Daten auf verschiedene Rechner — genannt Sharding — übernimmt die Implementierung vom Redis Cluster.

Durch das Einsetzen einer speziellen Datenbank kann die Datenhaltung also aus der Skalierung-gleichung der Anwendungsserver genommen werden. Gewonnen haben wir dadurch, dass die Anwendungsserver nun keine Zustandshaltung mehr machen müssen. Es muss also kein Replikations- Sharding- oder Clustering-Mechanismus entworfen werden. Aus Sicht des Entwicklers sind die Anwendungsserver also “Wegwerfware” (im Kubernetes-Slang: “Cattle”) — Naja noch nicht ganz.

Lautzeitbetrachtung und Implementierung

Zwar können jetzt beliebig Anwendungsserver zu- und abgeschaltet werden — der geschriebene und gelesen Zustand lässt sich auf dem Datenbank-Cluster wiederfinden. Es muss denoch auch die Laufzeit betrachtet werden. Also: Wie werden die Nachrichten an alle Clients gesendet? Welcher Zustand wird wann geschrieben?

Das inkludiert die hauptsächlichen Anwendungsfälle:

  • Ein Server (bzw initial der Client) sendet eine Nachricht an alle Nutzer in einem Raum
  • Ein Server (bzw initial der Client) baut eine Verbindung auf (Nutzer zum Raum hinzufügen)
  • Ein Server (bzw initial der Client) baut eine Verbindung ab (Nutzer aus dem Raum entfernen)

Senden von Nachrichten

Zur Laufzeit ist das Austauschen von Nachrichten die wichtigste Funktion. Statisch existieren dazu viele Server. Diese sind in der Abbildung abgebildet Server A, Server B und Server C (D, E… sind nicht abgebildet). Dazu existieren verschiedene Web-Clients, die sich zu beliebigen Servern verbinden können sollen. Zentral in der Architektur ist die Datenbank, die als Datespeicher (State) und als PubSub-System (Kommunikations-Channel) zwischen den Servern (Redis/PubSub) agiert. In der Applikation sind beispielhaft die Web-Applikation von bob und alice abgebildet.

Bild 4: Laufzeitbetrachtung
Bild 4: Laufzeitbetrachtung

Beim System-Start registrieren sich zunächst alle Server mit dem PubSub-System.

Im Schritt (1) und (2) Verbinden sich jeweils bob und alice zu Server A und Server B. Wichtig: Beide möchten über den Raum 4 kommunizieren.

Es soll nun eine Nachricht von bob zu alice im Raum 4 ausgetauscht werden (z.B: das Hinzufügen eines Satzes “hello” in einem Dokument). In (3) wird diese Nachricht deshalb an den Server B übermittelt.

Da der Server A keinen Kommunikationsweg zum Server B kennt, delegiert dieser in (4) die Nachricht an das eingeführte PubSub-System (im Beispiel also Redis).

Dadurch, dass sich alle Server beim System-Start mit dem PubSub-System registriert haben, erhalten jetzt also alle Server durch einen Broadcast Mechanismus die entsprechende Nachricht ((5) und (6)).

In (5) erhält ein Server die Nachricht, dass eine Kommunikation im Raum 4 stattgefunden hat. Da der Server keine verbundenen Clients vorhält, wird die Behandlung der Nachricht verworfen.

Anders sieht es in (6) aus. Der Server B enthält verbundene Clients im Raum 4 vor. Dieser sendet die Nachricht also an alle Clients in diesem Raum, also auch an alice.

Dieses neue System hat natürlich ein neues Problem: statt dass die einzelnen Server Räume verwalten, gehen jetzt alle Nachrichten aller Räume durch das Pub/Sub-System — Der Throughput muss dort also entsprechend Skaliert werden. Auch werden, wie im Fall (5) leere Nachrichten gesendet. Um das Problem zu minimieren ist es beispielsweise denkbar, dass das Redis PubSub-System die Nachricht nur an jene Server sendet, die aktive Clients für einen bestimmten Raum vorhalten — also das Einführen eines “Sharding”-Keys.

Die Kommunikation zwischen den Servern findet über ein Pub/Sub-System statt. Dieses System übernimmt die Kommunikation aller Server und damit aller Clients. Das sorgt dafür, dass das Kommunikationssystem (PubSub) von der Business-Logik (Applikation Server) und dem State (Datenbank) getrennt wird und diese Systeme damit individuell (z.B. Redis) skaliert werden können.

Verhalten bei Verbindungsabbruch

Zu einem Presence-System gehört neben der Kommunikation allerdings auch das Verwalten des Zustandes. Welcher Nutzer ist in welchem Raum? Kann ein Nutzer über mehrere Verbindungen online sein (z.B. auf verschiedenen Geräten)? — Eben Daten, die an den Anwendungsfall der bestimmten Applikation gekoppelt sind.

Diese Daten müssen regelmäßig mit dem Verbindungsmanagement synchronisiert werden. Das passiert in der Regel durch die Ereignisse “onConnect” und “onDisconnect”.

  • onConnect: Nutzer einem Raum hinzufügen
  • onDisconnect: Nutzer einem Raum löschen

Diese werden ausgeführt, wenn ein Client eine neue Verbindung mit einem Server hergestellt hat, bzw. wenn dieser die Verbindung verliert. Diese Art der Synchronisation ist leider problematisch, wenn man Failure-Szenarien annimmt. Ein Beispiel:

  • User Bob meldet sich am Server A mit seinem mobilen Endgerät an. Der Server sendet der Datenbank “Bob Online”.
  • User Bob meldet sich außerdem am Server B mit einem Web-Client an — er gilt also als Online im Web und Mobil.

In der Anwendung-Datenbank wird also festgehalten: Bob ist auf zwei Geräten Online. Jetzt passiert etwas: Server A wird während des Betriebs vom Netz getrennt. Server A hatte also keine Chance eine “disconnect”-Aktion für alle seine Clients zu feuern. Diese Aktionen wäre allerdings dafür verantwortlich gewesen das mobile Endgerät abzumelden (bzw. den Online-Zustand der Nutzer aus der Datenbank zu aktualisieren) — Alle Clients die vom Server A verwaltet werden bleiben werden deshalb auf ewig als online angezeigt.

Die Lösung für ein solches Problem wäre das Ersetzen (oder Ergänzen) der “onConnect” und “onDisconnect” Implementierung durch eine Heartbeat-Timeout-Kombination.

Bei einer Heartbeat Kombination sind die Verbindung-Ereignisse irrelevant. Vielmehr muss jeder Client regelmäßig beweisen, dass er noch im Online-Status ist. Dazu sendet der Client in regelmäßigen Abständen (z.B. alle 5 Sekunden) einen PING.

Bild 5: Heartbeat/Timeout-Konbination
Bild 5: Heartbeat/Timeout-Konbination

Dieser Ping instruiert den Server den Last-Seen-Online-Status des Clients in der Datenbank zu aktualisieren. In der Datenbank selbst wird der aktualisierte Zeitstempel vorgehalten. Um ein Online-Offline flickern zu vermeiden, ergibt es Sinn den Timeout-Parameter größer zu halten, als das “Repeat”-Interval. Dadurch hat der Client die Möglichkeit bei einem Netzwerk-Schluckauf eine Ping-Nachricht zu verzögern.

Ist der Abstand zwischen den Ping-Nachrichten zu groß, muss der Client als Offline markiert werden — Der Timeout Mechanismus muss aktiviert werden. Nutzt man eine Datenbank wie beispielsweise Redis, kann der EXPIRE (https://redis.io/commands/expire/) Command helfen. Dieser ist ein eingebauter Timeout für Datensätze in der Datenbank selbst.

Eine Alternative kann es sein die Aktualisierungs-Routine an eine andere Ping Nachricht zu koppeln. Also immer, wenn eine Ping Nachricht für einen Client in einem Raum erscheint, alle weiteren Zeitstempel zu invalidieren.

Fazit

Presence-Systeme sind auf den ersten Blick einfach zu implementieren, schließlich bieten Bibliotheken wie socket.io bereits Konzepte für Räume und entsprechende Listener für Verbindungs-Ereignisse an. Sobald es ums Skalieren geht (Skalieren von Verbindungen und Einführen von Fehler-Toleranzen), ergeben sich einige Herausforderungen.

Viele Herausforderungen in der Datenhaltung können bereits vom Datenbanksystem gelöst werden. Diese kann als Pub/Sub-System genutzt werden, um ein horizontales Skalieren von Servern für den Nachrichten-Austausch zu ermöglichen. Diese lösen allerdings nicht die Dateninkonsistenzen (Mismatch zwischen Verbindung und Online-Zustand) die von der Applikation aus auftreten können, wenn Server unerwartet ausfallen.

Eine Lösung dafür sind Konzepte, die bei verteilten Systemen häufig zum Einsatz kommen: Heartbeats und die Zeit. Mit Heartbeats melden sich Nutzer in regelmäßigen Abständen am Server an, um zu signalisieren, dass sie noch aktiv sind. Das impliziert zwar eine minimale Verzögerung, sorgt im Gegenzug aber dafür, dass Nutzer bei Server-Ausfällen korrekt abgemeldet werden.