Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"scripts": {
"lint": "./vendor/bin/pint --preset psr12 --test",
"format": "./vendor/bin/pint --preset psr12",
"check": "./vendor/bin/phpstan analyse --level max src tests",
"check": "./vendor/bin/phpstan analyse -c phpstan.neon",
"test": "./vendor/bin/phpunit --configuration phpunit.xml --debug"
},
"require": {
Expand All @@ -31,7 +31,7 @@
"phpunit/phpunit": "11.*",
"laravel/pint": "1.*",
"phpstan/phpstan": "1.*",
"swoole/ide-helper": "5.1.2"
"swoole/ide-helper": "6.*"
},
"suggests": {
"ext-mongodb": "Needed to support MongoDB database pools",
Expand Down
14 changes: 7 additions & 7 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
parameters:
level: 8

paths:
- src
- tests

scanDirectories:
- vendor/swoole/ide-helper

3 changes: 1 addition & 2 deletions src/Pools/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ abstract public function count(): int;
* Execute a callback with lock protection if the adapter supports it
*
* @param callable $callback
* @param int $timeout Timeout in seconds
* @return mixed
*/
abstract public function synchronized(callable $callback, int $timeout): mixed;
abstract public function synchronized(callable $callback): mixed;
}
4 changes: 1 addition & 3 deletions src/Pools/Adapter/Stack.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ public function count(): int
* Executes the callback without acquiring a lock.
*
* This implementation does not provide mutual exclusion.
* The `$timeout` parameter is ignored.
*
* @param callable $callback Callback to execute.
* @param int $timeout Ignored.
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
return $callback();
}
Expand Down
28 changes: 12 additions & 16 deletions src/Pools/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@

use Utopia\Pools\Adapter;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Lock;

class Swoole extends Adapter
{
/**
* @var Channel<mixed>
*/
protected Channel $pool;

protected Channel $lock;
protected Lock $lock;
public function initialize(int $size): static
{
$this->pool = new Channel($size);

// With channels, the current coroutine suspends and yields control to the event loop,
// allowing other coroutines to continue executing.
// Using a blocking lock freezes the worker thread, causing all coroutines in that
// worker to stop making progress.
$this->lock = new Channel(1);
$this->lock->push(true);
$this->pool = new Channel($size);
$this->lock = new Lock();

return $this;
}
Expand All @@ -45,8 +44,7 @@ public function pop(int $timeout): mixed

public function count(): int
{
$length = $this->pool->length();
return is_int($length) ? $length : 0;
return (int) $this->pool->length();
}

/**
Expand All @@ -56,24 +54,22 @@ public function count(): int
* afterward, even if the callback throws an exception.
*
* @param callable $callback Callback to execute within the critical section.
* @param int $timeout Maximum time (in seconds) to wait for the lock.
* @return mixed The value returned by the callback.
*
* @throws \RuntimeException If the lock cannot be acquired within the timeout.
*/
public function synchronized(callable $callback, int $timeout): mixed
public function synchronized(callable $callback): mixed
{
$acquired = $this->lock->pop($timeout);
$acquired = $this->lock->lock();

if (!$acquired) {
throw new \RuntimeException("Failed to acquire lock within {$timeout} seconds");
throw new \RuntimeException("Failed to acquire lock");
}

try {
return $callback();
} finally {
// Guaranteed to have space here; avoid timeouts so the token isn't lost.
$this->lock->push(true);
$this->lock->unlock();
}
}
}
12 changes: 6 additions & 6 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ public function pop(): Connection
return true;
}
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreateConnections) {
try {
$connection = $this->createConnection();
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
} catch (\Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand All @@ -296,7 +296,7 @@ public function pop(): Connection
if ($connection instanceof Connection) {
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
}, timeout: $this->getSynchronizationTimeout());
});
return $connection;
}
}
Expand Down Expand Up @@ -406,15 +406,15 @@ private function destroyConnection(?Connection $connection = null): static
return true;
};
return false;
}, timeout: $this->getSynchronizationTimeout());
});

if ($shouldCreate) {
try {
$this->pool->push($this->createConnection());
} catch (Exception $e) {
$this->pool->synchronized(function () {
$this->connectionsCreated--;
}, timeout: $this->getSynchronizationTimeout());
});
throw $e;
}
}
Expand Down
11 changes: 5 additions & 6 deletions tests/Pools/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ protected function execute(callable $callback): mixed
$result = null;
$exception = null;

/** @phpstan-ignore-next-line */
Coroutine\run(function () use ($callback, &$result, &$exception): void {
try {
$result = $callback();
Expand All @@ -41,7 +40,6 @@ public function testSwooleCoroutineRaceCondition(): void
$errors = [];
$successCount = 0;

/** @phpstan-ignore-next-line */
\Swoole\Coroutine\run(function () use (&$errors, &$successCount) {
// Create a pool with 5 connections inside coroutine context
$connectionCounter = 0;
Expand Down Expand Up @@ -123,7 +121,6 @@ public function testSwooleCoroutineHighConcurrency(): void
$successCount = 0;
$errorCount = 0;

/** @phpstan-ignore-next-line */
\Swoole\Coroutine\run(function () use ($totalRequests, &$successCount, &$errorCount) {
// Create a pool with 3 connections inside coroutine context
$connectionCounter = 0;
Expand Down Expand Up @@ -183,7 +180,6 @@ public function testSwooleCoroutineConnectionUniqueness(): void
$seenResources = [];
$duplicateResources = [];

/** @phpstan-ignore-next-line */
\Swoole\Coroutine\run(function () use (&$seenResources, &$duplicateResources) {
// Create a pool with 5 connections inside coroutine context
$connectionCounter = 0;
Expand Down Expand Up @@ -248,7 +244,6 @@ public function testSwooleCoroutineIdleConnectionReuse(): void
$connectionIds = [];
$connectionCounter = 0;

/** @phpstan-ignore-next-line */
\Swoole\Coroutine\run(function () use (&$connectionIds, &$connectionCounter) {
// Create a pool with 3 connections inside coroutine context
$pool = new Pool(new Swoole(), 'swoole-reuse', 3, function () use (&$connectionCounter) {
Expand Down Expand Up @@ -308,7 +303,6 @@ public function testSwooleCoroutineStressTest(): void
$errorCount = 0;
$connectionCounter = 0;

/** @phpstan-ignore-next-line */
\Swoole\Coroutine\run(function () use ($totalRequests, &$successCount, &$errorCount, &$connectionCounter) {
// Create a pool with 10 connections inside coroutine context
$pool = new Pool(new Swoole(), 'swoole-stress', 10, function () use (&$connectionCounter) {
Expand Down Expand Up @@ -353,4 +347,9 @@ public function testSwooleCoroutineStressTest(): void
$this->assertSame(10, $pool->count(), 'Pool should have all connections back');
});
}
public function testInitOutsideCoroutineNotThrowAnyError(): void
{
$pool = new Pool(new Swoole(), 'test', 1, fn () => 'x');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the appwrite Realtime pubsub

$this->assertInstanceOf(Pool::class, $pool);
}
}