From a67adad071b6a652296814c71cb3eedd15238c98 Mon Sep 17 00:00:00 2001 From: frytimo Date: Wed, 4 Mar 2026 10:50:54 -0400 Subject: [PATCH] Fix websocket message - Multiple Changes (#7768) * Fix add domain_uuid getter in subscriber and WebSocket message * Better error reporting and trapping * When a WebSocket connection is miscommunication, the WebSocket service will crash. This helps to prevent a crash by checking the status of the connection. --- .../classes/base_websocket_system_service.php | 8 +- .../resources/classes/subscriber.php | 11 +++ .../resources/classes/websocket_client.php | 31 ++++-- .../resources/classes/websocket_service.php | 97 ++++++++++++++++--- 4 files changed, 126 insertions(+), 21 deletions(-) diff --git a/core/websockets/resources/classes/base_websocket_system_service.php b/core/websockets/resources/classes/base_websocket_system_service.php index c8d79e0ce..d86a05b80 100644 --- a/core/websockets/resources/classes/base_websocket_system_service.php +++ b/core/websockets/resources/classes/base_websocket_system_service.php @@ -323,9 +323,13 @@ abstract class base_websocket_system_service extends service implements websocke // Read the JSON string $json_string = $this->ws_client->read(); - // Nothing to do + // Nothing to do - connection may have been closed by server if ($json_string === null) { - $this->warning('Message received from Websocket is empty'); + if (!$this->ws_client->is_connected()) { + $this->notice('Websocket connection closed by server, will reconnect'); + } else { + $this->warning('Message received from Websocket is empty'); + } return; } diff --git a/core/websockets/resources/classes/subscriber.php b/core/websockets/resources/classes/subscriber.php index 56653162d..d76955ab4 100644 --- a/core/websockets/resources/classes/subscriber.php +++ b/core/websockets/resources/classes/subscriber.php @@ -448,6 +448,17 @@ class subscriber { return $this->domain_name; } + /** + * Returns the domain UUID used. + *

Note:
+ * This value is not validated in the object and must be validated.

+ * + * @return string + */ + public function get_domain_uuid(): string { + return $this->domain_uuid; + } + /** * Returns the associated socket * diff --git a/core/websockets/resources/classes/websocket_client.php b/core/websockets/resources/classes/websocket_client.php index 22f878795..7bde39995 100644 --- a/core/websockets/resources/classes/websocket_client.php +++ b/core/websockets/resources/classes/websocket_client.php @@ -108,7 +108,7 @@ class websocket_client { // Put the blocking back to the previous state if (!$is_blocking) { - $this->disable_block(); + $this->unblock(); } } @@ -367,6 +367,20 @@ class websocket_client { // Handle control frames switch ($opcode) { + case 0x8: // CLOSE frame + // Extract close code if present + $close_code = 0; + if ($payload_len >= 2) { + $close_code = unpack('n', substr($payload, 0, 2))[1]; + } + echo "[INFO] Received CLOSE frame (code: $close_code), sending close response\n"; + // Send close frame back to complete the closing handshake + $this->send_control_frame(0x8, $payload_len >= 2 ? substr($payload, 0, 2) : ''); + // Close the underlying stream + if (is_resource($this->resource)) { + @fclose($this->resource); + } + return null; case 0x9: // PING // Respond with PONG using same payload $this->send_control_frame(0xA, $payload); @@ -376,6 +390,7 @@ class websocket_client { echo "[INFO] Received PONG\n"; break 2; case 0x1: // TEXT frame + case 0x2: // BINARY frame case 0x0: // Continuation frame $payload_data .= $payload; break; @@ -385,9 +400,11 @@ class websocket_client { } } - $meta = stream_get_meta_data($this->resource); - if ($meta['unread_bytes'] > 0) { - echo "[WARNING] {$meta['unread_bytes']} bytes left in socket after read\n"; + if (is_resource($this->resource)) { + $meta = stream_get_meta_data($this->resource); + if ($meta['unread_bytes'] > 0) { + // Data remains in the PHP buffer and will be consumed on the next read + } } return $payload_data; @@ -413,10 +430,12 @@ class websocket_client { $read_size = min($max_chunk_size, $remaining); // Read maximum chunk size or what is remaining - $chunk = fread($this->resource, $read_size); + if (!is_resource($this->resource)) { + return null; + } + $chunk = @fread($this->resource, $read_size); if ($chunk === false) { - echo "[ERROR] fread() failed to read stream\n"; return null; } diff --git a/core/websockets/resources/classes/websocket_service.php b/core/websockets/resources/classes/websocket_service.php index 5931351c5..273ebdff1 100644 --- a/core/websockets/resources/classes/websocket_service.php +++ b/core/websockets/resources/classes/websocket_service.php @@ -511,6 +511,9 @@ class websocket_service extends service { //attach the domain name $message->domain_name($subscriber->get_domain_name()); + //attach the domain uuid + $message->domain_uuid($subscriber->get_domain_uuid()); + //attach the client id so we can track the request $message->resource_id = $subscriber->id; @@ -556,7 +559,13 @@ class websocket_service extends service { // // Merge all sockets to a single array // - $read = array_merge([$this->server_socket], $this->clients); + $this->update_connected_clients(); + $read = [$this->server_socket]; + foreach ($this->clients as $client) { + if (is_resource($client) && !feof($client)) { + $read[] = $client; + } + } $write = $except = []; //$this->debug("Waiting on event. Connected Clients: (".count($this->clients).")", LOG_DEBUG); @@ -872,13 +881,27 @@ class websocket_service extends service { * @return string */ private function read_bytes($socket, int $length): string { + if ($length <= 0 || !is_resource($socket)) { + return ''; + } $data = ''; - while (strlen($data) < $length && is_resource($socket)) { - $chunk = fread($socket, $length - strlen($data)); - if ($chunk === false || $chunk === '' || !is_resource($socket)) { - //$this->disconnect_client($socket); + $retries = 0; + $max_retries = 20; + while (strlen($data) < $length && is_resource($socket) && !feof($socket)) { + $remaining = $length - strlen($data); + $chunk = @fread($socket, min($remaining, 8192)); + if ($chunk === false) { return ''; } + if ($chunk === '') { + $retries++; + if ($retries >= $max_retries) { + return ''; + } + usleep(5000); + continue; + } + $retries = 0; $data .= $chunk; } return $data; @@ -891,14 +914,20 @@ class websocket_service extends service { * * @return string */ + // Maximum allowed payload size per frame (16 MB) + const MAX_FRAME_PAYLOAD = 16 * 1024 * 1024; + private function receive_frame($socket): string { // Read first two header bytes $hdr = $this->read_bytes($socket, 2); // Ensure we have the correct number of bytes if (strlen($hdr) !== 2) { - $this->warning('Header is empty!'); - $this->debug('Header content: ' . bin2hex($hdr) . '(' . strlen($hdr) . ' bytes)'); - $this->update_connected_clients(); + if (!is_resource($socket) || feof($socket)) { + $this->disconnect_client($socket); + } else { + $this->warning('Header is empty!'); + $this->update_connected_clients(); + } return ''; } $bytes = unpack('Cfirst/Csecond', $hdr); @@ -924,14 +953,56 @@ class websocket_service extends service { $length = $arr[1]; } - // Read mask key if client→server frame + // Handle control frames before reading payload + switch ($opcode) { + case 0x8: // CLOSE frame + // Read and discard close payload (status code + reason) + if ($masked) $this->read_bytes($socket, 4); // mask key + if ($length > 0) $this->read_bytes($socket, min($length, 125)); + // Send close response and disconnect + @fwrite($socket, "\x88\x00"); + $this->disconnect_client($socket); + return ''; + case 0x9: // PING frame + $maskKey = $masked ? $this->read_bytes($socket, 4) : ''; + $ping_data = $length > 0 ? $this->read_bytes($socket, min($length, 125)) : ''; + if ($masked && strlen($maskKey) === 4 && $ping_data !== '') { + $unmasked = ''; + for ($i = 0; $i < strlen($ping_data); $i++) { + $unmasked .= $ping_data[$i] ^ $maskKey[$i % 4]; + } + $ping_data = $unmasked; + } + // Respond with PONG + $pong_len = strlen($ping_data); + @fwrite($socket, chr(0x8A) . chr($pong_len) . $ping_data); + return ''; + case 0xA: // PONG frame + // Consume and discard pong payload + if ($masked) $this->read_bytes($socket, 4); + if ($length > 0) $this->read_bytes($socket, min($length, 125)); + return ''; + } + + // Sanity check: reject frames with absurdly large payloads + if ($length > self::MAX_FRAME_PAYLOAD) { + $this->error("Frame payload too large ({$length} bytes), disconnecting client"); + @fwrite($socket, "\x88\x00"); + $this->disconnect_client($socket); + return ''; + } + + // Read mask key if client->server frame $maskKey = $masked ? $this->read_bytes($socket, 4) : ''; // Read payload data - $data = $this->read_bytes($socket, $length); + $data = $length > 0 ? $this->read_bytes($socket, $length) : ''; - if (empty($data)) { - $this->warning("Received empty frame (ID# $socket)"); + if ($data === '' && $length > 0) { + // Client likely disconnected mid-frame + if (!is_resource($socket) || feof($socket)) { + $this->disconnect_client($socket); + } return ''; } @@ -941,7 +1012,7 @@ class websocket_service extends service { if (strlen($maskKey) < 4) return ''; $unmasked = ''; - for ($i = 0; $i < $length; $i++) { + for ($i = 0; $i < strlen($data); $i++) { $unmasked .= $data[$i] ^ $maskKey[$i % 4]; } $data = $unmasked;