From dace58370ffc46f38ad082e9936d42159f93ce24 Mon Sep 17 00:00:00 2001 From: frytimo Date: Mon, 13 Apr 2026 14:20:43 -0300 Subject: [PATCH] Fix listeners array when socket disconnects (#7841) When the connection is registered as a listener but has not yet communicated, the class failed to remove the listener in the array. --- .../resources/classes/websocket_server.php | 151 ++++++++++++++++-- 1 file changed, 136 insertions(+), 15 deletions(-) diff --git a/core/websockets/resources/classes/websocket_server.php b/core/websockets/resources/classes/websocket_server.php index 8a79418c1..22333dccd 100644 --- a/core/websockets/resources/classes/websocket_server.php +++ b/core/websockets/resources/classes/websocket_server.php @@ -131,6 +131,26 @@ class websocket_server { self::log($message, LOG_DEBUG); } + /** + * Writes a normalized log entry for this class. + * + * @param string $message + * @param int $priority + * + * @return void + */ + private static function log(string $message, int $priority = LOG_INFO): void { + $level = match ($priority) { + LOG_DEBUG => 'DEBUG', + LOG_WARNING => 'WARNING', + LOG_ERR => 'ERROR', + LOG_INFO => 'INFO', + default => 'LOG', + }; + + error_log('[websocket_server][' . $level . '] ' . $message); + } + /** * Log a warning message to the log file. * @@ -181,14 +201,33 @@ class websocket_server { $this->running = true; while ($this->running) { - $listeners = array_column($this->listeners, 0); - $read = array_merge([$this->server_socket], $listeners, $this->clients); - $write = $except = []; - // Server connection issue - if (false === stream_select($read, $write, $except, null)) { + if (!is_resource($this->server_socket)) { + $this->warn('Server socket is not a valid stream resource; stopping server loop.'); $this->running = false; break; } + + // Prune stale/closed sockets before passing anything to stream_select. + $this->listeners = array_values(array_filter($this->listeners, static function ($listener): bool { + return is_array($listener) + && isset($listener[0], $listener[1]) + && is_resource($listener[0]) + && is_callable($listener[1]); + })); + $this->clients = array_values(array_filter($this->clients, static fn($client): bool => is_resource($client))); + + $listeners = array_column($this->listeners, 0); + $read = array_values(array_filter(array_merge([$this->server_socket], $listeners, $this->clients), 'is_resource')); + $write = $except = []; + if (empty($read)) { + $this->warn('No valid sockets available for stream_select; continuing loop.'); + continue; + } + // Server connection issue + if (false === @stream_select($read, $write, $except, null)) { + $this->warn('stream_select failed; continuing after pruning invalid sockets.'); + continue; + } // new connection if (in_array($this->server_socket, $read, true)) { $conn = @stream_socket_accept($this->server_socket, 0); @@ -212,15 +251,27 @@ class websocket_server { if (in_array($client_socket, $listeners, true)) { // Process external listeners $index = array_search($client_socket, $listeners, true); + if ($index === false || !isset($this->listeners[$index][1]) || !is_callable($this->listeners[$index][1])) { + $this->warn('Listener callback missing or invalid; disconnecting listener socket.'); + $this->disconnect_listener($client_socket); + continue; + } try { //send the switch event to the registered callback function call_user_func($this->listeners[$index][1], $client_socket); } catch (\socket_disconnected_exception $s) { $this->info("[INFO] Removed client $s->id from list"); $success = $this->disconnect_client($client_socket); - // By attaching the socket_disconnect error message to \socket_exception we can see where something went wrong - if (!$success) - throw new socket_exception('Socket does not exist in tracking array', 256, $s); + if (!$success) { + $success = $this->disconnect_listener($client_socket); + } + if (!$success) { + $this->warn('Socket does not exist in tracking arrays; closing resource and triggering disconnect callbacks'); + if (is_resource($client_socket)) { + @fclose($client_socket); + } + $this->trigger_disconnect($client_socket); + } } continue; } @@ -380,6 +431,31 @@ class websocket_server { return false; } + /** + * Disconnect a non-websocket listener and invoke disconnect callbacks. + * + * @param resource $resource + * + * @return bool True if listener was tracked and disconnected. + */ + protected function disconnect_listener($resource): bool { + foreach ($this->listeners as $index => $listener) { + if (($listener[0] ?? null) !== $resource) { + continue; + } + + if (is_resource($resource)) { + @fclose($resource); + } + + unset($this->listeners[$index]); + $this->trigger_disconnect($resource); + return true; + } + + return false; + } + /** * Sends a disconnect frame with no payload * @@ -428,21 +504,50 @@ class websocket_server { * @param resource $socket * @param int $length * - * @return string + * @return string|null */ - private function read_bytes($socket, int $length): string { + private function read_bytes($socket, int $length): ?string { $data = ''; while (strlen($data) < $length && is_resource($socket)) { $chunk = fread($socket, $length - strlen($data)); if ($chunk === false || $chunk === '' || !is_resource($socket)) { $this->disconnect_client($socket); - return ''; + return null; } $data .= $chunk; } return $data; } + /** + * Sends a FIN control frame on a specific client socket. + * + * @param resource $socket + * @param int $opcode + * @param string $payload + * + * @return void + */ + private function send_control_frame($socket, int $opcode, string $payload = ''): void { + if (!is_resource($socket)) { + return; + } + + $header = chr(0x80 | $opcode); + $payload_len = strlen($payload); + + if ($payload_len <= 125) { + $header .= chr($payload_len); + } elseif ($payload_len <= 65535) { + $header .= chr(126) . pack('n', $payload_len); + } else { + $payload = substr($payload, 0, 125); + $header .= chr(125); + } + + @fwrite($socket, $header . $payload); + } + /** * Reads a web socket data frame and converts it to a regular string * @@ -460,7 +565,7 @@ class websocket_server { while (!$final_frame) { $header = $this->read_bytes($socket, 2); - if ($header === null) + if ($header === null || strlen($header) !== 2) return null; $byte1 = ord($header[0]); @@ -515,9 +620,9 @@ class websocket_server { switch ($opcode) { case 0x9: // PING // Respond with PONG using same payload - $this->send_control_frame(0xA, $payload); + $this->send_control_frame($socket, 0xA, $payload); $this->info("Received PING, sent PONG"); - continue; // Skip returning PING + continue 2; // Skip returning PING case 0x8: // CLOSE frame $this->info("Received CLOSE frame, connection will be closed."); $this->disconnect_client($socket); @@ -526,7 +631,7 @@ class websocket_server { $this->info("Received PONG"); $reason = $this->read_bytes($socket, 2); $this->info("Reason: $reason"); - continue; // Skip returning PONG + continue 2; // Skip returning PONG case 0x1: // TEXT frame case 0x0: // Continuation frame $payload_data .= $payload; @@ -545,6 +650,22 @@ class websocket_server { return $payload_data; } + public function broadcast(string $payload, $exclude_socket = null): void { + foreach ($this->clients as $client) { + if (!is_resource($client)) { + continue; + } + if ($exclude_socket !== null && $client === $exclude_socket) { + continue; + } + try { + self::send($client, $payload); + } catch (\Throwable $e) { + $this->disconnect_client($client); + } + } + } + /** * Send text frame to client. If the socket connection is not a valid resource, the send * method will fail silently and return false.