Fix operator panel disconnect (#7913)
Unfiltered agent lists cause a delay resulting in disconnection from switch
This commit is contained in:
@@ -212,6 +212,18 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
/** @var int Seconds between agent-stats broadcasts */
|
/** @var int Seconds between agent-stats broadcasts */
|
||||||
protected $agent_stats_interval;
|
protected $agent_stats_interval;
|
||||||
|
|
||||||
|
/** @var int Unix timestamp of the last agent-stats broadcast attempt */
|
||||||
|
protected $last_agent_stats_broadcast_at = 0;
|
||||||
|
|
||||||
|
/** @var int Minimum seconds between immediate maintenance-triggered broadcasts */
|
||||||
|
protected $agent_stats_min_broadcast_spacing = 2;
|
||||||
|
|
||||||
|
/** @var int Seconds before a per-queue callcenter command is aborted */
|
||||||
|
protected $agent_stats_command_timeout = 2;
|
||||||
|
|
||||||
|
/** @var string Cached domain scope for agent-stats broadcast */
|
||||||
|
protected $agent_stats_domain_name = '';
|
||||||
|
|
||||||
/** @var string Debug permissions mode: 'off', 'bytes', or 'full' */
|
/** @var string Debug permissions mode: 'off', 'bytes', or 'full' */
|
||||||
protected $debug_show_permissions_mode;
|
protected $debug_show_permissions_mode;
|
||||||
|
|
||||||
@@ -877,15 +889,26 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
|
|
||||||
$payload = $message->payload();
|
$payload = $message->payload();
|
||||||
$domain_name = $payload['domain_name'] ?? '';
|
$domain_name = $payload['domain_name'] ?? '';
|
||||||
|
|
||||||
|
// Sanitize domain name to help prevent injection in permission checks and ensure consistent filtering
|
||||||
|
$domain_name = self::sanitize_domain_name("$domain_name");
|
||||||
|
if ($domain_name !== '') {
|
||||||
|
$this->agent_stats_domain_name = $domain_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the permissions from websocket_message
|
||||||
$permissions = $message->get_permissions();
|
$permissions = $message->get_permissions();
|
||||||
|
|
||||||
|
// Get the agent stats for this domain only (busy servers can take too long to respond without the filter)
|
||||||
$agents = $this->get_all_agent_stats($domain_name);
|
$agents = $this->get_all_agent_stats($domain_name);
|
||||||
|
|
||||||
|
// Filter message according to subscriber permissions
|
||||||
$is_supervisor = !empty($permissions['operator_panel_manage']);
|
$is_supervisor = !empty($permissions['operator_panel_manage']);
|
||||||
$agent_name = $this->get_agent_name_for_permission($permissions, $domain_name);
|
$agent_name = $this->get_agent_name_for_permission($permissions, $domain_name);
|
||||||
$filter = new operator_panel_agent_filter($is_supervisor, $agent_name);
|
$filter = new operator_panel_agent_filter($is_supervisor, $agent_name);
|
||||||
$filtered = $filter->filter($agents);
|
$filtered = $filter->filter($agents);
|
||||||
|
|
||||||
|
// Create the response message
|
||||||
$response = new websocket_message();
|
$response = new websocket_message();
|
||||||
$response
|
$response
|
||||||
->payload($filtered)
|
->payload($filtered)
|
||||||
@@ -897,6 +920,7 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
->resource_id($message->resource_id())
|
->resource_id($message->resource_id())
|
||||||
;
|
;
|
||||||
|
|
||||||
|
// Send the response to the connected client
|
||||||
websocket_client::send($this->ws_client->socket(), $response);
|
websocket_client::send($this->ws_client->socket(), $response);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1467,21 +1491,65 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
public function broadcast_agent_stats(): int {
|
public function broadcast_agent_stats(): int {
|
||||||
$this->debug('Broadcasting agent stats');
|
$this->debug('Broadcasting agent stats');
|
||||||
|
|
||||||
// Retrieve all queues from the database (we need the domain for context)
|
if ($this->ws_client === null || !$this->ws_client->is_connected()) {
|
||||||
$agents = $this->get_all_agent_stats();
|
$this->warning('Skipping agent stats broadcast: websocket client not connected');
|
||||||
|
return $this->agent_stats_interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
$domain_name = $this->agent_stats_domain_name;
|
||||||
|
if ($domain_name === '') {
|
||||||
|
$this->debug('Skipping agent stats broadcast: no operator panel domain scope yet');
|
||||||
|
return $this->agent_stats_interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve only queues for the active operator-panel domain.
|
||||||
|
$agents = $this->get_all_agent_stats($domain_name);
|
||||||
|
|
||||||
if (empty($agents)) {
|
if (empty($agents)) {
|
||||||
|
$this->last_agent_stats_broadcast_at = time();
|
||||||
return $this->agent_stats_interval;
|
return $this->agent_stats_interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Keep the push payload compact; UI only needs these fields for rendering.
|
||||||
|
$broadcast_agents = [];
|
||||||
|
foreach ($agents as $agent) {
|
||||||
|
$broadcast_agents[] = [
|
||||||
|
'agent_name' => $agent['agent_name'] ?? '',
|
||||||
|
'status' => $agent['status'] ?? '',
|
||||||
|
'state' => $agent['state'] ?? '',
|
||||||
|
'calls_answered' => $agent['calls_answered'] ?? '0',
|
||||||
|
'talk_time' => $agent['talk_time'] ?? '0',
|
||||||
|
'last_bridge_start' => $agent['last_bridge_start'] ?? '0',
|
||||||
|
'queue_name' => $agent['queue_name'] ?? '',
|
||||||
|
'queue_extension' => $agent['queue_extension'] ?? '',
|
||||||
|
'domain_name' => $agent['domain_name'] ?? '',
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
$payload_json = json_encode($broadcast_agents);
|
||||||
|
$payload_bytes = ($payload_json === false) ? 0 : strlen($payload_json);
|
||||||
|
if ($payload_bytes > 262144) {
|
||||||
|
$this->warning('Large agent stats payload: ' . $payload_bytes . ' bytes for ' . count($broadcast_agents) . ' agents');
|
||||||
|
}
|
||||||
|
|
||||||
$message = new websocket_message();
|
$message = new websocket_message();
|
||||||
$message
|
$message
|
||||||
->service_name(self::get_service_name())
|
->service_name(self::get_service_name())
|
||||||
->topic('agent_stats')
|
->topic('agent_stats')
|
||||||
->payload($agents)
|
->payload($broadcast_agents)
|
||||||
;
|
;
|
||||||
|
|
||||||
websocket_client::send($this->ws_client->socket(), $message);
|
try {
|
||||||
|
$sent = websocket_client::send($this->ws_client->socket(), $message);
|
||||||
|
if (!$sent) {
|
||||||
|
$this->warning('Agent stats broadcast was partially sent; message dropped');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (\Throwable $e) {
|
||||||
|
$this->warning('Agent stats broadcast failed: ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->last_agent_stats_broadcast_at = time();
|
||||||
|
|
||||||
// Return the interval so the timer reschedules itself
|
// Return the interval so the timer reschedules itself
|
||||||
return $this->agent_stats_interval;
|
return $this->agent_stats_interval;
|
||||||
@@ -1528,8 +1596,16 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
$raw = event_socket::api("callcenter_config queue list agents $ext@$domain");
|
$queue_name = $ext . '@' . $domain;
|
||||||
if (empty($raw) || strpos($raw, '-ERR') === 0) {
|
$started_at = microtime(true);
|
||||||
|
$raw = $this->run_queue_agent_list_command($queue_name);
|
||||||
|
$elapsed = microtime(true) - $started_at;
|
||||||
|
|
||||||
|
if ($elapsed >= 1.0) {
|
||||||
|
$this->warning("Slow agent-stats query for {$queue_name}: " . round($elapsed, 3) . 's');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (empty($raw) || strpos($raw, '-ERR') === 0 || trim($raw) === '+OK') {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1570,6 +1646,163 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
return $all_agents;
|
return $all_agents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sanitize a domain name for consistent domain-scoped lookups.
|
||||||
|
*
|
||||||
|
* @param string $domain_name
|
||||||
|
*
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
public static function sanitize_domain_name(string $domain_name): string {
|
||||||
|
$domain_name = trim($domain_name);
|
||||||
|
if ($domain_name === '') {
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
return (string)preg_replace('/:\\d+$/', '', $domain_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a bounded-time callcenter agent list command for one queue.
|
||||||
|
*
|
||||||
|
* @param string $queue_name Queue identifier in format extension@domain.
|
||||||
|
*
|
||||||
|
* @return string|false Raw command output or false on failure/timeout.
|
||||||
|
*/
|
||||||
|
private function run_queue_agent_list_command(string $queue_name) {
|
||||||
|
$timeout = max(1, (int)$this->agent_stats_command_timeout);
|
||||||
|
$api_cmd = 'callcenter_config queue list agents ' . $queue_name;
|
||||||
|
$result = $this->query_switch_api_with_timeout($api_cmd, $timeout);
|
||||||
|
|
||||||
|
if ($result === false) {
|
||||||
|
$this->warning('Agent-stats query failed for ' . $queue_name . ' using event socket');
|
||||||
|
}
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a FreeSWITCH API command over a dedicated ESL socket.
|
||||||
|
*
|
||||||
|
* Creates a new socket connection for each command to ensure that timeouts and errors
|
||||||
|
* do not affect other commands. This is needed because the event socket can be filled
|
||||||
|
* with events for other queues, so we connect and then filter them and then disconnect
|
||||||
|
* to ensure we get the correct response for the command we sent without interference.
|
||||||
|
*
|
||||||
|
* @param string $api_cmd
|
||||||
|
* @param int $timeout_seconds
|
||||||
|
*
|
||||||
|
* @return string|false
|
||||||
|
*/
|
||||||
|
private function query_switch_api_with_timeout(string $api_cmd, int $timeout_seconds) {
|
||||||
|
$host = parent::$config->get('switch.event_socket.host', '127.0.0.1');
|
||||||
|
$port = (int)parent::$config->get('switch.event_socket.port', 8021);
|
||||||
|
$password = parent::$config->get('switch.event_socket.password', 'ClueCon');
|
||||||
|
|
||||||
|
$socket = @stream_socket_client("tcp://{$host}:{$port}", $errno, $errstr, $timeout_seconds);
|
||||||
|
if (!is_resource($socket)) {
|
||||||
|
$this->warning("Unable to connect to event socket {$host}:{$port} ({$errno}) {$errstr}");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream_set_timeout($socket, $timeout_seconds);
|
||||||
|
stream_set_blocking($socket, true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
$auth_request = $this->read_event_socket_message($socket);
|
||||||
|
if ($auth_request === false || ($auth_request['headers']['Content-Type'] ?? '') !== 'auth/request') {
|
||||||
|
$this->warning('Event socket auth request was not received in time');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (@fwrite($socket, "auth {$password}\n\n") === false) {
|
||||||
|
$this->warning('Failed to write event socket auth request');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$auth_reply = $this->read_event_socket_message($socket);
|
||||||
|
if ($auth_reply === false || ($auth_reply['headers']['Reply-Text'] ?? '') !== '+OK accepted') {
|
||||||
|
$this->warning('Event socket authentication failed');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (@fwrite($socket, "api {$api_cmd}\n\n") === false) {
|
||||||
|
$this->warning('Failed to write event socket API command');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
$reply = $this->read_event_socket_message($socket);
|
||||||
|
if ($reply === false) {
|
||||||
|
$this->warning('Timed out waiting for event socket API response');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $reply['body'] ?? '';
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
if (is_resource($socket)) {
|
||||||
|
@fclose($socket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read one ESL message (headers + optional body) with stream timeout enforcement.
|
||||||
|
*
|
||||||
|
* This is similar to the event_socket read function but also returns the headers
|
||||||
|
* as an associative array for easier processing of auth requests and API responses
|
||||||
|
* for agent stats.
|
||||||
|
*
|
||||||
|
* @param resource $socket
|
||||||
|
*
|
||||||
|
* @return array|false
|
||||||
|
*/
|
||||||
|
private function read_event_socket_message($socket) {
|
||||||
|
$headers = [];
|
||||||
|
while (true) {
|
||||||
|
$line = fgets($socket);
|
||||||
|
if ($line === false) {
|
||||||
|
$meta = stream_get_meta_data($socket);
|
||||||
|
if (!empty($meta['timed_out'])) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (feof($socket)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$line = rtrim($line, "\r\n");
|
||||||
|
if ($line === '') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
$parts = explode(':', $line, 2);
|
||||||
|
if (count($parts) === 2) {
|
||||||
|
$headers[trim($parts[0])] = trim($parts[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$body = '';
|
||||||
|
$content_length = (int)($headers['Content-Length'] ?? 0);
|
||||||
|
if ($content_length > 0) {
|
||||||
|
$remaining = $content_length;
|
||||||
|
while ($remaining > 0) {
|
||||||
|
$chunk = fread($socket, $remaining);
|
||||||
|
if ($chunk === false || $chunk === '') {
|
||||||
|
$meta = stream_get_meta_data($socket);
|
||||||
|
if (!empty($meta['timed_out']) || feof($socket)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$body .= $chunk;
|
||||||
|
$remaining -= strlen($chunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ['headers' => $headers, 'body' => $body];
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a conference::maintenance event and broadcast to subscribers.
|
* Process a conference::maintenance event and broadcast to subscribers.
|
||||||
*
|
*
|
||||||
@@ -1629,6 +1862,12 @@ class operator_panel_service extends base_websocket_system_service implements we
|
|||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
private function on_callcenter_maintenance(event_message $event_message): void {
|
private function on_callcenter_maintenance(event_message $event_message): void {
|
||||||
|
$now = time();
|
||||||
|
if (($now - $this->last_agent_stats_broadcast_at) < $this->agent_stats_min_broadcast_spacing) {
|
||||||
|
$this->debug('callcenter::maintenance skipped - too soon since last broadcast');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$this->debug('callcenter::maintenance — triggering immediate agent stats broadcast');
|
$this->debug('callcenter::maintenance — triggering immediate agent stats broadcast');
|
||||||
$this->broadcast_agent_stats();
|
$this->broadcast_agent_stats();
|
||||||
// Note: the periodic timer continues independently; no reschedule needed here.
|
// Note: the periodic timer continues independently; no reschedule needed here.
|
||||||
|
|||||||
@@ -355,15 +355,21 @@ abstract class base_websocket_system_service extends service implements websocke
|
|||||||
|
|
||||||
// Get the web socket message as an object
|
// Get the web socket message as an object
|
||||||
$message = websocket_message::create_from_json_message($json_string);
|
$message = websocket_message::create_from_json_message($json_string);
|
||||||
|
if (!($message instanceof websocket_message)) {
|
||||||
|
$this->warning('Invalid websocket message received; ignoring frame');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$topic = $message->topic();
|
||||||
|
|
||||||
// Nothing to do
|
// Nothing to do
|
||||||
if (empty($message->topic())) {
|
if (empty($topic)) {
|
||||||
$this->error("Message received does not have topic");
|
$this->warning("Message received does not have topic. Message dropped.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the registered topic event
|
// Call the registered topic event
|
||||||
$this->trigger_topic($message->topic, $message, $this->ws_client);
|
$this->trigger_topic($topic, $message, $this->ws_client);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user