Fix listeners array when socket disconnects (#7841)
When the connection is registered as a listener but has not yet communicated, the class failed to remove the listener in the array.
This commit is contained in:
@@ -131,6 +131,26 @@ class websocket_server {
|
||||
self::log($message, LOG_DEBUG);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a normalized log entry for this class.
|
||||
*
|
||||
* @param string $message
|
||||
* @param int $priority
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private static function log(string $message, int $priority = LOG_INFO): void {
|
||||
$level = match ($priority) {
|
||||
LOG_DEBUG => 'DEBUG',
|
||||
LOG_WARNING => 'WARNING',
|
||||
LOG_ERR => 'ERROR',
|
||||
LOG_INFO => 'INFO',
|
||||
default => 'LOG',
|
||||
};
|
||||
|
||||
error_log('[websocket_server][' . $level . '] ' . $message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a warning message to the log file.
|
||||
*
|
||||
@@ -181,14 +201,33 @@ class websocket_server {
|
||||
$this->running = true;
|
||||
|
||||
while ($this->running) {
|
||||
$listeners = array_column($this->listeners, 0);
|
||||
$read = array_merge([$this->server_socket], $listeners, $this->clients);
|
||||
$write = $except = [];
|
||||
// Server connection issue
|
||||
if (false === stream_select($read, $write, $except, null)) {
|
||||
if (!is_resource($this->server_socket)) {
|
||||
$this->warn('Server socket is not a valid stream resource; stopping server loop.');
|
||||
$this->running = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// Prune stale/closed sockets before passing anything to stream_select.
|
||||
$this->listeners = array_values(array_filter($this->listeners, static function ($listener): bool {
|
||||
return is_array($listener)
|
||||
&& isset($listener[0], $listener[1])
|
||||
&& is_resource($listener[0])
|
||||
&& is_callable($listener[1]);
|
||||
}));
|
||||
$this->clients = array_values(array_filter($this->clients, static fn($client): bool => is_resource($client)));
|
||||
|
||||
$listeners = array_column($this->listeners, 0);
|
||||
$read = array_values(array_filter(array_merge([$this->server_socket], $listeners, $this->clients), 'is_resource'));
|
||||
$write = $except = [];
|
||||
if (empty($read)) {
|
||||
$this->warn('No valid sockets available for stream_select; continuing loop.');
|
||||
continue;
|
||||
}
|
||||
// Server connection issue
|
||||
if (false === @stream_select($read, $write, $except, null)) {
|
||||
$this->warn('stream_select failed; continuing after pruning invalid sockets.');
|
||||
continue;
|
||||
}
|
||||
// new connection
|
||||
if (in_array($this->server_socket, $read, true)) {
|
||||
$conn = @stream_socket_accept($this->server_socket, 0);
|
||||
@@ -212,15 +251,27 @@ class websocket_server {
|
||||
if (in_array($client_socket, $listeners, true)) {
|
||||
// Process external listeners
|
||||
$index = array_search($client_socket, $listeners, true);
|
||||
if ($index === false || !isset($this->listeners[$index][1]) || !is_callable($this->listeners[$index][1])) {
|
||||
$this->warn('Listener callback missing or invalid; disconnecting listener socket.');
|
||||
$this->disconnect_listener($client_socket);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
//send the switch event to the registered callback function
|
||||
call_user_func($this->listeners[$index][1], $client_socket);
|
||||
} catch (\socket_disconnected_exception $s) {
|
||||
$this->info("[INFO] Removed client $s->id from list");
|
||||
$success = $this->disconnect_client($client_socket);
|
||||
// By attaching the socket_disconnect error message to \socket_exception we can see where something went wrong
|
||||
if (!$success)
|
||||
throw new socket_exception('Socket does not exist in tracking array', 256, $s);
|
||||
if (!$success) {
|
||||
$success = $this->disconnect_listener($client_socket);
|
||||
}
|
||||
if (!$success) {
|
||||
$this->warn('Socket does not exist in tracking arrays; closing resource and triggering disconnect callbacks');
|
||||
if (is_resource($client_socket)) {
|
||||
@fclose($client_socket);
|
||||
}
|
||||
$this->trigger_disconnect($client_socket);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -380,6 +431,31 @@ class websocket_server {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect a non-websocket listener and invoke disconnect callbacks.
|
||||
*
|
||||
* @param resource $resource
|
||||
*
|
||||
* @return bool True if listener was tracked and disconnected.
|
||||
*/
|
||||
protected function disconnect_listener($resource): bool {
|
||||
foreach ($this->listeners as $index => $listener) {
|
||||
if (($listener[0] ?? null) !== $resource) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (is_resource($resource)) {
|
||||
@fclose($resource);
|
||||
}
|
||||
|
||||
unset($this->listeners[$index]);
|
||||
$this->trigger_disconnect($resource);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a disconnect frame with no payload
|
||||
*
|
||||
@@ -428,21 +504,50 @@ class websocket_server {
|
||||
* @param resource $socket
|
||||
* @param int $length
|
||||
*
|
||||
* @return string
|
||||
* @return string|null
|
||||
*/
|
||||
private function read_bytes($socket, int $length): string {
|
||||
private function read_bytes($socket, int $length): ?string {
|
||||
$data = '';
|
||||
while (strlen($data) < $length && is_resource($socket)) {
|
||||
$chunk = fread($socket, $length - strlen($data));
|
||||
if ($chunk === false || $chunk === '' || !is_resource($socket)) {
|
||||
$this->disconnect_client($socket);
|
||||
return '';
|
||||
return null;
|
||||
}
|
||||
$data .= $chunk;
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a FIN control frame on a specific client socket.
|
||||
*
|
||||
* @param resource $socket
|
||||
* @param int $opcode
|
||||
* @param string $payload
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function send_control_frame($socket, int $opcode, string $payload = ''): void {
|
||||
if (!is_resource($socket)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$header = chr(0x80 | $opcode);
|
||||
$payload_len = strlen($payload);
|
||||
|
||||
if ($payload_len <= 125) {
|
||||
$header .= chr($payload_len);
|
||||
} elseif ($payload_len <= 65535) {
|
||||
$header .= chr(126) . pack('n', $payload_len);
|
||||
} else {
|
||||
$payload = substr($payload, 0, 125);
|
||||
$header .= chr(125);
|
||||
}
|
||||
|
||||
@fwrite($socket, $header . $payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a web socket data frame and converts it to a regular string
|
||||
*
|
||||
@@ -460,7 +565,7 @@ class websocket_server {
|
||||
|
||||
while (!$final_frame) {
|
||||
$header = $this->read_bytes($socket, 2);
|
||||
if ($header === null)
|
||||
if ($header === null || strlen($header) !== 2)
|
||||
return null;
|
||||
|
||||
$byte1 = ord($header[0]);
|
||||
@@ -515,9 +620,9 @@ class websocket_server {
|
||||
switch ($opcode) {
|
||||
case 0x9: // PING
|
||||
// Respond with PONG using same payload
|
||||
$this->send_control_frame(0xA, $payload);
|
||||
$this->send_control_frame($socket, 0xA, $payload);
|
||||
$this->info("Received PING, sent PONG");
|
||||
continue; // Skip returning PING
|
||||
continue 2; // Skip returning PING
|
||||
case 0x8: // CLOSE frame
|
||||
$this->info("Received CLOSE frame, connection will be closed.");
|
||||
$this->disconnect_client($socket);
|
||||
@@ -526,7 +631,7 @@ class websocket_server {
|
||||
$this->info("Received PONG");
|
||||
$reason = $this->read_bytes($socket, 2);
|
||||
$this->info("Reason: $reason");
|
||||
continue; // Skip returning PONG
|
||||
continue 2; // Skip returning PONG
|
||||
case 0x1: // TEXT frame
|
||||
case 0x0: // Continuation frame
|
||||
$payload_data .= $payload;
|
||||
@@ -545,6 +650,22 @@ class websocket_server {
|
||||
return $payload_data;
|
||||
}
|
||||
|
||||
public function broadcast(string $payload, $exclude_socket = null): void {
|
||||
foreach ($this->clients as $client) {
|
||||
if (!is_resource($client)) {
|
||||
continue;
|
||||
}
|
||||
if ($exclude_socket !== null && $client === $exclude_socket) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
self::send($client, $payload);
|
||||
} catch (\Throwable $e) {
|
||||
$this->disconnect_client($client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send text frame to client. If the socket connection is not a valid resource, the send
|
||||
* method will fail silently and return false.
|
||||
|
||||
Reference in New Issue
Block a user