Fix websocket message - Multiple Changes (#7768)

* Fix add domain_uuid getter in subscriber and WebSocket message

* Better error reporting and trapping

* When a WebSocket connection is miscommunication, the WebSocket service will crash. This helps to prevent a crash by checking the status of the connection.
This commit is contained in:
frytimo
2026-03-04 10:50:54 -04:00
committed by GitHub
parent 34c6330f41
commit a67adad071
4 changed files with 126 additions and 21 deletions
@@ -323,9 +323,13 @@ abstract class base_websocket_system_service extends service implements websocke
// Read the JSON string // Read the JSON string
$json_string = $this->ws_client->read(); $json_string = $this->ws_client->read();
// Nothing to do // Nothing to do - connection may have been closed by server
if ($json_string === null) { if ($json_string === null) {
$this->warning('Message received from Websocket is empty'); if (!$this->ws_client->is_connected()) {
$this->notice('Websocket connection closed by server, will reconnect');
} else {
$this->warning('Message received from Websocket is empty');
}
return; return;
} }
@@ -448,6 +448,17 @@ class subscriber {
return $this->domain_name; return $this->domain_name;
} }
/**
* Returns the domain UUID used.
* <p>Note:<br>
* This value is not validated in the object and must be validated.</p>
*
* @return string
*/
public function get_domain_uuid(): string {
return $this->domain_uuid;
}
/** /**
* Returns the associated socket * Returns the associated socket
* *
@@ -108,7 +108,7 @@ class websocket_client {
// Put the blocking back to the previous state // Put the blocking back to the previous state
if (!$is_blocking) { if (!$is_blocking) {
$this->disable_block(); $this->unblock();
} }
} }
@@ -367,6 +367,20 @@ class websocket_client {
// Handle control frames // Handle control frames
switch ($opcode) { switch ($opcode) {
case 0x8: // CLOSE frame
// Extract close code if present
$close_code = 0;
if ($payload_len >= 2) {
$close_code = unpack('n', substr($payload, 0, 2))[1];
}
echo "[INFO] Received CLOSE frame (code: $close_code), sending close response\n";
// Send close frame back to complete the closing handshake
$this->send_control_frame(0x8, $payload_len >= 2 ? substr($payload, 0, 2) : '');
// Close the underlying stream
if (is_resource($this->resource)) {
@fclose($this->resource);
}
return null;
case 0x9: // PING case 0x9: // PING
// Respond with PONG using same payload // Respond with PONG using same payload
$this->send_control_frame(0xA, $payload); $this->send_control_frame(0xA, $payload);
@@ -376,6 +390,7 @@ class websocket_client {
echo "[INFO] Received PONG\n"; echo "[INFO] Received PONG\n";
break 2; break 2;
case 0x1: // TEXT frame case 0x1: // TEXT frame
case 0x2: // BINARY frame
case 0x0: // Continuation frame case 0x0: // Continuation frame
$payload_data .= $payload; $payload_data .= $payload;
break; break;
@@ -385,9 +400,11 @@ class websocket_client {
} }
} }
$meta = stream_get_meta_data($this->resource); if (is_resource($this->resource)) {
if ($meta['unread_bytes'] > 0) { $meta = stream_get_meta_data($this->resource);
echo "[WARNING] {$meta['unread_bytes']} bytes left in socket after read\n"; if ($meta['unread_bytes'] > 0) {
// Data remains in the PHP buffer and will be consumed on the next read
}
} }
return $payload_data; return $payload_data;
@@ -413,10 +430,12 @@ class websocket_client {
$read_size = min($max_chunk_size, $remaining); $read_size = min($max_chunk_size, $remaining);
// Read maximum chunk size or what is remaining // Read maximum chunk size or what is remaining
$chunk = fread($this->resource, $read_size); if (!is_resource($this->resource)) {
return null;
}
$chunk = @fread($this->resource, $read_size);
if ($chunk === false) { if ($chunk === false) {
echo "[ERROR] fread() failed to read stream\n";
return null; return null;
} }
@@ -511,6 +511,9 @@ class websocket_service extends service {
//attach the domain name //attach the domain name
$message->domain_name($subscriber->get_domain_name()); $message->domain_name($subscriber->get_domain_name());
//attach the domain uuid
$message->domain_uuid($subscriber->get_domain_uuid());
//attach the client id so we can track the request //attach the client id so we can track the request
$message->resource_id = $subscriber->id; $message->resource_id = $subscriber->id;
@@ -556,7 +559,13 @@ class websocket_service extends service {
// //
// Merge all sockets to a single array // Merge all sockets to a single array
// //
$read = array_merge([$this->server_socket], $this->clients); $this->update_connected_clients();
$read = [$this->server_socket];
foreach ($this->clients as $client) {
if (is_resource($client) && !feof($client)) {
$read[] = $client;
}
}
$write = $except = []; $write = $except = [];
//$this->debug("Waiting on event. Connected Clients: (".count($this->clients).")", LOG_DEBUG); //$this->debug("Waiting on event. Connected Clients: (".count($this->clients).")", LOG_DEBUG);
@@ -872,13 +881,27 @@ class websocket_service extends service {
* @return string * @return string
*/ */
private function read_bytes($socket, int $length): string { private function read_bytes($socket, int $length): string {
if ($length <= 0 || !is_resource($socket)) {
return '';
}
$data = ''; $data = '';
while (strlen($data) < $length && is_resource($socket)) { $retries = 0;
$chunk = fread($socket, $length - strlen($data)); $max_retries = 20;
if ($chunk === false || $chunk === '' || !is_resource($socket)) { while (strlen($data) < $length && is_resource($socket) && !feof($socket)) {
//$this->disconnect_client($socket); $remaining = $length - strlen($data);
$chunk = @fread($socket, min($remaining, 8192));
if ($chunk === false) {
return ''; return '';
} }
if ($chunk === '') {
$retries++;
if ($retries >= $max_retries) {
return '';
}
usleep(5000);
continue;
}
$retries = 0;
$data .= $chunk; $data .= $chunk;
} }
return $data; return $data;
@@ -891,14 +914,20 @@ class websocket_service extends service {
* *
* @return string * @return string
*/ */
// Maximum allowed payload size per frame (16 MB)
const MAX_FRAME_PAYLOAD = 16 * 1024 * 1024;
private function receive_frame($socket): string { private function receive_frame($socket): string {
// Read first two header bytes // Read first two header bytes
$hdr = $this->read_bytes($socket, 2); $hdr = $this->read_bytes($socket, 2);
// Ensure we have the correct number of bytes // Ensure we have the correct number of bytes
if (strlen($hdr) !== 2) { if (strlen($hdr) !== 2) {
$this->warning('Header is empty!'); if (!is_resource($socket) || feof($socket)) {
$this->debug('Header content: ' . bin2hex($hdr) . '(' . strlen($hdr) . ' bytes)'); $this->disconnect_client($socket);
$this->update_connected_clients(); } else {
$this->warning('Header is empty!');
$this->update_connected_clients();
}
return ''; return '';
} }
$bytes = unpack('Cfirst/Csecond', $hdr); $bytes = unpack('Cfirst/Csecond', $hdr);
@@ -924,14 +953,56 @@ class websocket_service extends service {
$length = $arr[1]; $length = $arr[1];
} }
// Read mask key if client→server frame // Handle control frames before reading payload
switch ($opcode) {
case 0x8: // CLOSE frame
// Read and discard close payload (status code + reason)
if ($masked) $this->read_bytes($socket, 4); // mask key
if ($length > 0) $this->read_bytes($socket, min($length, 125));
// Send close response and disconnect
@fwrite($socket, "\x88\x00");
$this->disconnect_client($socket);
return '';
case 0x9: // PING frame
$maskKey = $masked ? $this->read_bytes($socket, 4) : '';
$ping_data = $length > 0 ? $this->read_bytes($socket, min($length, 125)) : '';
if ($masked && strlen($maskKey) === 4 && $ping_data !== '') {
$unmasked = '';
for ($i = 0; $i < strlen($ping_data); $i++) {
$unmasked .= $ping_data[$i] ^ $maskKey[$i % 4];
}
$ping_data = $unmasked;
}
// Respond with PONG
$pong_len = strlen($ping_data);
@fwrite($socket, chr(0x8A) . chr($pong_len) . $ping_data);
return '';
case 0xA: // PONG frame
// Consume and discard pong payload
if ($masked) $this->read_bytes($socket, 4);
if ($length > 0) $this->read_bytes($socket, min($length, 125));
return '';
}
// Sanity check: reject frames with absurdly large payloads
if ($length > self::MAX_FRAME_PAYLOAD) {
$this->error("Frame payload too large ({$length} bytes), disconnecting client");
@fwrite($socket, "\x88\x00");
$this->disconnect_client($socket);
return '';
}
// Read mask key if client->server frame
$maskKey = $masked ? $this->read_bytes($socket, 4) : ''; $maskKey = $masked ? $this->read_bytes($socket, 4) : '';
// Read payload data // Read payload data
$data = $this->read_bytes($socket, $length); $data = $length > 0 ? $this->read_bytes($socket, $length) : '';
if (empty($data)) { if ($data === '' && $length > 0) {
$this->warning("Received empty frame (ID# $socket)"); // Client likely disconnected mid-frame
if (!is_resource($socket) || feof($socket)) {
$this->disconnect_client($socket);
}
return ''; return '';
} }
@@ -941,7 +1012,7 @@ class websocket_service extends service {
if (strlen($maskKey) < 4) if (strlen($maskKey) < 4)
return ''; return '';
$unmasked = ''; $unmasked = '';
for ($i = 0; $i < $length; $i++) { for ($i = 0; $i < strlen($data); $i++) {
$unmasked .= $data[$i] ^ $maskKey[$i % 4]; $unmasked .= $data[$i] ^ $maskKey[$i % 4];
} }
$data = $unmasked; $data = $unmasked;