diff --git a/examples/consumer.php b/examples/consumer.php index a2e8a72..f958ab4 100644 --- a/examples/consumer.php +++ b/examples/consumer.php @@ -7,8 +7,8 @@ $queue = new Queue( storage: StorageType::FILE, - queueFile: "./", - debug: true + storagePath: "./", + storageName: "my_queue" ); $queue->listen(function ($item) { diff --git a/examples/producer.php b/examples/producer.php index e66fe31..f5e1a7a 100644 --- a/examples/producer.php +++ b/examples/producer.php @@ -7,7 +7,8 @@ $queue = new Queue( storage: StorageType::FILE, - queueFile: "./" + storagePath: "./", + storageName: "my_queue" ); for ($i = 0; $i < 150; $i++) { $queue->enqueue("test $i"); diff --git a/src/Queue.php b/src/Queue.php index 6deb018..108c72a 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -9,12 +9,13 @@ class Queue { private StorageInterface $storage; - private bool $debug; - public function __construct(StorageType $storage, string $queueFile, bool $debug = false) - { - $this->debug = $debug; - $this->storage = StorageFactory::getStorage($storage, $queueFile); + public function __construct( + StorageType $storage, + string $storagePath = "", + string $storageName = 'queue', + ) { + $this->storage = StorageFactory::getStorage($storage, $storagePath, $storageName); } public function enqueue(string $data): bool @@ -27,7 +28,7 @@ public function dequeue(): ?string return $this->storage->dequeue(); } - public function exist($value): ?string + public function exist($value): bool { return $this->storage->exist($value); } @@ -39,9 +40,6 @@ public function listen(callable $fn, int $delayWhenEmpty = 5): void if (($item = $this->dequeue()) !== null) { $fn($item); } else { - if ($this->debug) { - echo "Queue is empty. Sleeping for $delaySeconds seconds..." . PHP_EOL; - } sleep($delaySeconds); } } diff --git a/src/Storage/Adapters/BeanstalkdStorage.php b/src/Storage/Adapters/BeanstalkdStorage.php index 309b96f..57c54fa 100644 --- a/src/Storage/Adapters/BeanstalkdStorage.php +++ b/src/Storage/Adapters/BeanstalkdStorage.php @@ -12,26 +12,27 @@ class BeanstalkdStorage implements StorageInterface { const DEFAULT_STORAGE_PATH = '127.0.0.1'; const DEFAULT_STORAGE_PORT = 11300; - const DEFAULT_STORAGE_NAME = 'queue'; private Pheanstalk $beanstalkdClient; private TubeName $tube; - public function __construct(string $storagePath) - { + public function __construct( + string $connectionString = self::DEFAULT_STORAGE_PATH . ':' . self::DEFAULT_STORAGE_PORT, + string $tubeName = 'queue' + ) { $host = self::DEFAULT_STORAGE_PATH; $port = self::DEFAULT_STORAGE_PORT; - if ($storagePath && strpos($storagePath, ":") > -1) { - $connectionString = explode(':', $storagePath); - $host = $connectionString[0]; - $port = $connectionString[1]; - } else if ($storagePath) { - $host = $storagePath; + if ($connectionString && strpos($connectionString, ":") > -1) { + $connectionStringParts = explode(':', $connectionString); + $host = $connectionStringParts[0]; + $port = $connectionStringParts[1]; + } else if ($connectionString) { + $host = $connectionString; } $this->beanstalkdClient = Pheanstalk::create($host, $port); - $this->tube = new TubeName(self::DEFAULT_STORAGE_NAME); + $this->tube = new TubeName($tubeName); } public function enqueue(string $data): bool diff --git a/src/Storage/Adapters/FileStorage.php b/src/Storage/Adapters/FileStorage.php index b231422..63d3017 100644 --- a/src/Storage/Adapters/FileStorage.php +++ b/src/Storage/Adapters/FileStorage.php @@ -8,16 +8,18 @@ class FileStorage implements StorageInterface { private string $queueFile; - private bool $debug = false; - public function __construct(string $storagePath, bool $debug = false) - { + public function __construct( + string $storagePath, + string $storageName = 'queue' + ) { if (empty($storagePath)) { $storagePath = "."; } - $this->queueFile = FileUtils::isFilePath($storagePath) ? $storagePath : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . 'queue.txt'; + $this->queueFile = FileUtils::isFilePath($storagePath) + ? $storagePath + : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . $storageName . '.txt'; FileUtils::createFile($this->queueFile); - $this->debug = $debug; } public function enqueue(string $data): bool @@ -31,11 +33,6 @@ public function enqueue(string $data): bool fwrite($fileHandle, $data . PHP_EOL); - if ($this->debug) { - echo "Enqueued item: $data" . PHP_EOL; - echo "===" . PHP_EOL; - } - flock($fileHandle, LOCK_UN); fclose($fileHandle); @@ -64,10 +61,6 @@ public function dequeue(): ?string rewind($fileHandle); fwrite($fileHandle, implode(PHP_EOL, $lines)); } - if ($this->debug && !empty($data)) { - echo "Dequeued item: $data" . PHP_EOL; - echo "===" . PHP_EOL; - } flock($fileHandle, LOCK_UN); fclose($fileHandle); diff --git a/src/Storage/Adapters/RedisStorage.php b/src/Storage/Adapters/RedisStorage.php index 201af59..b101610 100644 --- a/src/Storage/Adapters/RedisStorage.php +++ b/src/Storage/Adapters/RedisStorage.php @@ -9,40 +9,42 @@ class RedisStorage implements StorageInterface { const DEFAULT_STORAGE_PATH = 'tcp://127.0.0.1:6379'; - const DEFAULT_STORAGE_NAME = 'queue'; private Client $redisClient; + private string $storageKey; - public function __construct(string $storagePath) - { - $connectionString = $storagePath ?: self::DEFAULT_STORAGE_PATH; + public function __construct( + string $connectionString = self::DEFAULT_STORAGE_PATH, + string $storageKey = 'queue' + ) { $this->redisClient = new Client($connectionString); + $this->storageKey = $storageKey; } public function enqueue(string $data): bool { - $res = $this->redisClient->lpush(self::DEFAULT_STORAGE_NAME, $data); + $res = $this->redisClient->lpush($this->storageKey, $data); return !!$res; } public function dequeue(): ?string { - return $this->redisClient->rpop(self::DEFAULT_STORAGE_NAME); + return $this->redisClient->rpop($this->storageKey); } public function exist(string $value): bool { - $exist = $this->redisClient->executeRaw(["LPOS", self::DEFAULT_STORAGE_NAME, $value]); + $exist = $this->redisClient->executeRaw(["LPOS", $this->storageKey, $value]); return boolval($exist); } public function length(): int { - return $this->redisClient->llen(self::DEFAULT_STORAGE_NAME); + return $this->redisClient->llen($this->storageKey); } public function content(): array { - return $this->redisClient->lrange(self::DEFAULT_STORAGE_NAME, 0, -1); + return $this->redisClient->lrange($this->storageKey, 0, -1); } } diff --git a/src/Storage/Adapters/SqliteStorage.php b/src/Storage/Adapters/SqliteStorage.php index a85e086..dac90ae 100644 --- a/src/Storage/Adapters/SqliteStorage.php +++ b/src/Storage/Adapters/SqliteStorage.php @@ -9,16 +9,18 @@ class SqliteStorage implements StorageInterface { private string $queueFile; private \SQLite3 $connection; - private bool $debug = false; - public function __construct(string $storagePath, bool $debug = false) - { + public function __construct( + string $storagePath, + string $storageName = 'queue', + ) { if (empty($storagePath)) { $storagePath = "."; } - $this->queueFile = FileUtils::isFilePath($storagePath) ? $storagePath : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . 'queue.db'; + $this->queueFile = FileUtils::isFilePath($storagePath) + ? $storagePath + : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . $storageName . '.db'; FileUtils::createFile($this->queueFile); - $this->debug = $debug; $this->connection = new \SQLite3($this->queueFile); $this->connection->query("CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT)"); diff --git a/src/Storage/StorageFactory.php b/src/Storage/StorageFactory.php index e37ad5c..5679d27 100644 --- a/src/Storage/StorageFactory.php +++ b/src/Storage/StorageFactory.php @@ -9,13 +9,13 @@ class StorageFactory { - public static function getStorage(StorageType $type, string $storagePath = ""): StorageInterface + public static function getStorage(StorageType $type, string $storagePath = "", string $storageName = 'queue'): StorageInterface { return match ($type) { - StorageType::FILE => new FileStorage($storagePath), - StorageType::SQLITE => new SqliteStorage($storagePath), - StorageType::REDIS => new RedisStorage($storagePath), - StorageType::BEANSTALKD => new BeanstalkdStorage($storagePath), + StorageType::FILE => new FileStorage($storagePath, $storageName), + StorageType::SQLITE => new SqliteStorage($storagePath, $storageName), + StorageType::REDIS => new RedisStorage($storagePath, $storageName), + StorageType::BEANSTALKD => new BeanstalkdStorage($storagePath, $storageName), }; } } diff --git a/tests/FileStorageTest.php b/tests/FileStorageTest.php new file mode 100644 index 0000000..0f5a256 --- /dev/null +++ b/tests/FileStorageTest.php @@ -0,0 +1,66 @@ +tempDir = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'simple-php-queue-' . bin2hex(random_bytes(8)); + mkdir($this->tempDir, 0777, true); +}); + +afterEach(function () { + $cleanup = function (string $path) use (&$cleanup): void { + if (!is_dir($path)) { + if (is_file($path)) { + unlink($path); + } + + return; + } + + $entries = array_diff(scandir($path), ['.', '..']); + + foreach ($entries as $entry) { + $childPath = $path . DIRECTORY_SEPARATOR . $entry; + if (is_dir($childPath)) { + $cleanup($childPath); + continue; + } + + unlink($childPath); + } + + rmdir($path); + }; + + if (isset($this->tempDir) && is_dir($this->tempDir)) { + $cleanup($this->tempDir); + } +}); + +it('creates a queue file when initialized with a directory path', function () { + new FileStorage($this->tempDir); + + expect(is_file($this->tempDir . DIRECTORY_SEPARATOR . 'queue.txt'))->toBeTrue(); +}); + +it('stores values in fifo order and reports queue state', function () { + $storage = new FileStorage($this->tempDir); + + expect($storage->length())->toBe(0); + expect($storage->exist('first'))->toBeFalse(); + expect($storage->enqueue('first'))->toBeTrue(); + expect($storage->enqueue('second'))->toBeTrue(); + + expect($storage->length())->toBe(2); + expect($storage->exist('first'))->toBeTrue(); + expect($storage->exist('second'))->toBeTrue(); + expect($storage->content())->toBe([ + 'first' . PHP_EOL, + 'second' . PHP_EOL, + ]); + + expect($storage->dequeue())->toBe('first'); + expect($storage->dequeue())->toBe('second'); + expect($storage->dequeue())->toBeNull(); + expect($storage->length())->toBe(0); +}); diff --git a/tests/SqliteStorageTest.php b/tests/SqliteStorageTest.php new file mode 100644 index 0000000..81b2c00 --- /dev/null +++ b/tests/SqliteStorageTest.php @@ -0,0 +1,66 @@ +tempDir = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'simple-php-queue-' . bin2hex(random_bytes(8)); + mkdir($this->tempDir, 0777, true); +}); + +afterEach(function () { + $cleanup = function (string $path) use (&$cleanup): void { + if (!is_dir($path)) { + if (is_file($path)) { + unlink($path); + } + + return; + } + + $entries = array_diff(scandir($path), ['.', '..']); + + foreach ($entries as $entry) { + $childPath = $path . DIRECTORY_SEPARATOR . $entry; + if (is_dir($childPath)) { + $cleanup($childPath); + continue; + } + + unlink($childPath); + } + + rmdir($path); + }; + + if (isset($this->tempDir) && is_dir($this->tempDir)) { + $cleanup($this->tempDir); + } +}); + +it('creates a sqlite database file when initialized with a directory path', function () { + new SqliteStorage($this->tempDir); + + expect(is_file($this->tempDir . DIRECTORY_SEPARATOR . 'queue.db'))->toBeTrue(); +}); + +it('stores values in fifo order and reports queue state', function () { + $storage = new SqliteStorage($this->tempDir); + + expect($storage->length())->toBe(0); + expect($storage->exist('first'))->toBeFalse(); + expect($storage->enqueue('first'))->toBeTrue(); + expect($storage->enqueue('second'))->toBeTrue(); + + expect($storage->length())->toBe(2); + expect($storage->exist('first'))->toBeTrue(); + expect($storage->exist('second'))->toBeTrue(); + expect($storage->content())->toBe([ + 'first', + 'second', + ]); + + expect($storage->dequeue())->toBe('first'); + expect($storage->dequeue())->toBe('second'); + expect($storage->dequeue())->toBeNull(); + expect($storage->length())->toBe(0); +});