Cluster
Building multi-core network applications with PHP.
Install / Use
/learn @amphp/ClusterREADME
amphp/cluster
AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind.
amphp/cluster provides tools to transfer network sockets to independent PHP processes, as well as a lightweight framework to create a multiprocess server cluster.
Requirements
- PHP 8.1+
ext-sockets
Installation
This package can be installed as a Composer dependency.
composer require amphp/cluster
Documentation
Transferring Sockets
Clusters are built by transferring sockets from a parent process to child processes, each of which listen for connections and/or handles client sockets. This library provides low-level components which may be used independent of the Cluster framework. These components allow you to write your own server logic which transfers server sockets or client sockets to child processes to distribute load or group related clients.
Transferring Client Sockets
ClientSocketReceivePipe and ClientSocketSendPipe pair of classes are used to send client sockets from one PHP process to another over an existing IPC connection between the two processes.
The example below demonstrates creating a new child process in a parent, then establishing a new IPC socket between the parent and child. That socket is used to create a ClientSocketSendPipe in the parent and a corresponding ClientSocketReceivePipe in the child. The parent then creates a socket server and listens for connections. When a connection is received, the client socket is transferred to the child process where it is handled.
// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\Socket;
use Revolt\EventLoop;
use function Amp\Socket\listen;
$ipcHub = new LocalIpcHub();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub);
$context = $contextFactory->start(__DIR__ . '/child.php');
$connectionKey = $ipcHub->generateKey();
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);
// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);
$socketPipe = new ClientSocketSendPipe($socket);
$server = listen('127.0.0.1:1337');
// Close server when SIGTERM is received.
EventLoop::onSignal(SIGTERM, $server->close(...));
$clientId = 0;
while ($client = $server->accept()) {
// $clientId is an example of arbitrary data which may be
// associated with a transferred socket.
$socketPipe->send($client, ++$clientId);
}
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Socket\Socket;
use Amp\Sync\Channel;
return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
$socketPipe = new ClientSocketReceivePipe($socket);
while ($transferredSocket = $socketPipe->receive()) {
// Handle client socket in a separate coroutine (fiber).
async(
function (Socket $client, int $id) { /* ... */ },
$transferredSocket->getSocket(), // Transferred socket
$transferredSocket->getData(), // Associated data
);
}
};
While this example is somewhat contrived as there would be little reason to send all clients to a single process, it is easy to extrapolate such an example to a parent process which load balances a set of children or distributes clients based on some other factor. Reading and writing may take place on the client socket in the parent before being transferred. For example, an HTTP server may establish a WebSocket connection before transferring the socket to a child process. See amphp/http-server and amphp/websocket-server for additional components to build such a server.
Transferring Server Sockets
The example below demonstrates creating a new child process in a parent, then establishing a new IPC socket between the parent and child. In the parent, the IPC socket is passed to ServerSocketPipeProvider::provideFor(), which listens for requests for server sockets on the IPC socket. In the child, the IPC socket is provided to an instance of ServerSocketPipeFactory. When the child creates a server socket using the ServerSocketPipeFactory, the server socket is created in the parent process, then sent to the child. If the parent created multiple children, any child that requested the same server socket would receive another reference to the same socket, allowing multiple children to listen on the same address and port. Incoming client connections are selected round-robin by the operating system. For greater control over client distribution, consider accepting clients in a single process and transferring client sockets to children instead.
ServerSocketPipeFactory implements ServerSocketFactory, allowing it to be used in place of factories which create the server socket within the same process.
// parent.php
use Amp\CancelledException;
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Cluster\ServerSocketPipeProvider;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;
use Amp\SignalCancellation;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\Socket\listen;
$ipcHub = new LocalIpcHub();
$serverProvider = new ServerSocketPipeProvider();
// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub);
$context = $contextFactory->start(__DIR__ . '/child.php');
$connectionKey = $ipcHub->generateKey();
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);
// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);
// Listen for requests for server sockets on the given socket until cancelled by signal.
try {
$serverProvider->provideFor($socket, new SignalCancellation(SIGTERM));
} catch (CancelledException) {
// Signal cancellation expected.
}
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Cluster\ServerSocketPipeFactory;
use Amp\Sync\Channel;
return function (Channel $channel): void {
['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
// $socket will be a bidirectional socket to the parent.
$socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
$serverFactory = new ServerSocketPipeFactory($socket);
// Requests the server socket from the parent process.
$server = $serverFactory->listen('127.0.0.1:1337');
while ($client = $server->accept()) {
// Handle client socket in a separate coroutine (fiber).
async(function () use ($client) { /* ... */ });
}
};
Clusters
Clusters are created from specialized scripts using static methods of Cluster to create components which communicate with the parent watcher process when running as part of a cluster. Some Cluster methods may also be called when a script is run directly, returning a standalone component which does not require a watcher process. For example, Cluster::getServerSocketFactory() returns an instance which creates and transfers the server socket from the watcher process when running within a cluster, or instead returns a ResourceSocketServerFactory when running a script directly.
Cluster scripts can be run using the included executable vendor/bin/cluster from the command line or programmatically from within an application using the ClusterWatcher class.
vendor/bin/cluster --workers=4 path/to/script.php
When installed as a dependency to your project, the command above will start a cluster of 4 workers, each running the script at path/to/script.php.
Alternatively, your application can launch a cluster from code using ClusterWatcher.
use Amp\Cluster\ClusterWatcher;
use Revolt\EventLoop;
$watcher = new ClusterWatcher('path/to/script.php');
$watcher->start(4); // Start cluster with 4 workers.
// Using a signal to stop the cluster for this example.
EventLoop::onSignal(SIGTERM, fn () => $watcher->stop());
foreach ($watcher->getMessageIterator() as $message) {
// Handle received message from worker.
}
Creating a Server
Components in AMPHP which must use socket servers use instances of Amp\Socket\SocketServerFactory to create these socket severs. One such component is Amp\Http\Server\SocketHttpServer in amphp/http-server. Within a cluster script, Cluster::getServerSocketFactory() should be used to create a socket factory which will either create the socket locally or requests the server socket from the cluster watcher.
The example HTTP server below demonstrates using Cluster::getServerSocketFactory() to create an instance of ServerSocketFactory and provide it when creating a SocketHttpServer.
Logging
Log entries may be sent to the cluster watcher to be logged to a single stream by using Cluster::createLogHandler(). This handler can be attached to a Monolog\Logger instance. The example HTTP server below creates a log handler depending upon if the script is a cluster worker or running as a standalone script.
Cluster::createLogHandler() may only be called when running the cluster script as part of a cluster. Use Cluster::isWorker() to check if the script is running as a cluster worker.
Process Termination
A cluster script may await termination from a signal (one of SIGTERM, SIGINT, SIGQUIT, or SIGHUP) by using Cluster::awaitTermination().
