Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

$queue = new Queue(
storage: StorageType::FILE,
queueFile: "./",
debug: true
storagePath: "./",
storageName: "my_queue"
);

$queue->listen(function ($item) {
Expand Down
3 changes: 2 additions & 1 deletion examples/producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

$queue = new Queue(
storage: StorageType::FILE,
queueFile: "./"
storagePath: "./",
storageName: "my_queue"
);
for ($i = 0; $i < 150; $i++) {
$queue->enqueue("test $i");
Expand Down
16 changes: 7 additions & 9 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
class Queue
{
private StorageInterface $storage;
private bool $debug;

public function __construct(StorageType $storage, string $queueFile, bool $debug = false)
{
$this->debug = $debug;
$this->storage = StorageFactory::getStorage($storage, $queueFile);
public function __construct(
StorageType $storage,
string $storagePath = "",
string $storageName = 'queue',
) {
$this->storage = StorageFactory::getStorage($storage, $storagePath, $storageName);
}

public function enqueue(string $data): bool
Expand All @@ -27,7 +28,7 @@ public function dequeue(): ?string
return $this->storage->dequeue();
}

public function exist($value): ?string
public function exist($value): bool
{
return $this->storage->exist($value);
}
Expand All @@ -39,9 +40,6 @@ public function listen(callable $fn, int $delayWhenEmpty = 5): void
if (($item = $this->dequeue()) !== null) {
$fn($item);
} else {
if ($this->debug) {
echo "Queue is empty. Sleeping for $delaySeconds seconds..." . PHP_EOL;
}
sleep($delaySeconds);
}
}
Expand Down
21 changes: 11 additions & 10 deletions src/Storage/Adapters/BeanstalkdStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@ class BeanstalkdStorage implements StorageInterface
{
const DEFAULT_STORAGE_PATH = '127.0.0.1';
const DEFAULT_STORAGE_PORT = 11300;
const DEFAULT_STORAGE_NAME = 'queue';

private Pheanstalk $beanstalkdClient;
private TubeName $tube;

public function __construct(string $storagePath)
{
public function __construct(
string $connectionString = self::DEFAULT_STORAGE_PATH . ':' . self::DEFAULT_STORAGE_PORT,
string $tubeName = 'queue'
) {
$host = self::DEFAULT_STORAGE_PATH;
$port = self::DEFAULT_STORAGE_PORT;

if ($storagePath && strpos($storagePath, ":") > -1) {
$connectionString = explode(':', $storagePath);
$host = $connectionString[0];
$port = $connectionString[1];
} else if ($storagePath) {
$host = $storagePath;
if ($connectionString && strpos($connectionString, ":") > -1) {
$connectionStringParts = explode(':', $connectionString);
$host = $connectionStringParts[0];
$port = $connectionStringParts[1];
} else if ($connectionString) {
Comment on lines +26 to +30
$host = $connectionString;
}

$this->beanstalkdClient = Pheanstalk::create($host, $port);
$this->tube = new TubeName(self::DEFAULT_STORAGE_NAME);
$this->tube = new TubeName($tubeName);
}

public function enqueue(string $data): bool
Expand Down
21 changes: 7 additions & 14 deletions src/Storage/Adapters/FileStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
class FileStorage implements StorageInterface
{
private string $queueFile;
private bool $debug = false;

public function __construct(string $storagePath, bool $debug = false)
{
public function __construct(
string $storagePath,
string $storageName = 'queue'
) {
if (empty($storagePath)) {
$storagePath = ".";
}
$this->queueFile = FileUtils::isFilePath($storagePath) ? $storagePath : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . 'queue.txt';
$this->queueFile = FileUtils::isFilePath($storagePath)
? $storagePath
: rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . $storageName . '.txt';
FileUtils::createFile($this->queueFile);
$this->debug = $debug;
}

public function enqueue(string $data): bool
Expand All @@ -31,11 +33,6 @@ public function enqueue(string $data): bool

fwrite($fileHandle, $data . PHP_EOL);

if ($this->debug) {
echo "Enqueued item: $data" . PHP_EOL;
echo "===" . PHP_EOL;
}

flock($fileHandle, LOCK_UN);
fclose($fileHandle);

Expand Down Expand Up @@ -64,10 +61,6 @@ public function dequeue(): ?string
rewind($fileHandle);
fwrite($fileHandle, implode(PHP_EOL, $lines));
}
if ($this->debug && !empty($data)) {
echo "Dequeued item: $data" . PHP_EOL;
echo "===" . PHP_EOL;
}

flock($fileHandle, LOCK_UN);
fclose($fileHandle);
Expand Down
20 changes: 11 additions & 9 deletions src/Storage/Adapters/RedisStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,42 @@
class RedisStorage implements StorageInterface
{
const DEFAULT_STORAGE_PATH = 'tcp://127.0.0.1:6379';
const DEFAULT_STORAGE_NAME = 'queue';

private Client $redisClient;
private string $storageKey;

public function __construct(string $storagePath)
{
$connectionString = $storagePath ?: self::DEFAULT_STORAGE_PATH;
public function __construct(
string $connectionString = self::DEFAULT_STORAGE_PATH,
string $storageKey = 'queue'
) {
$this->redisClient = new Client($connectionString);
$this->storageKey = $storageKey;
}

public function enqueue(string $data): bool
{
$res = $this->redisClient->lpush(self::DEFAULT_STORAGE_NAME, $data);
$res = $this->redisClient->lpush($this->storageKey, $data);
return !!$res;
}

public function dequeue(): ?string
{
return $this->redisClient->rpop(self::DEFAULT_STORAGE_NAME);
return $this->redisClient->rpop($this->storageKey);
}

public function exist(string $value): bool
{
$exist = $this->redisClient->executeRaw(["LPOS", self::DEFAULT_STORAGE_NAME, $value]);
$exist = $this->redisClient->executeRaw(["LPOS", $this->storageKey, $value]);
return boolval($exist);
}

public function length(): int
{
return $this->redisClient->llen(self::DEFAULT_STORAGE_NAME);
return $this->redisClient->llen($this->storageKey);
}

public function content(): array
{
return $this->redisClient->lrange(self::DEFAULT_STORAGE_NAME, 0, -1);
return $this->redisClient->lrange($this->storageKey, 0, -1);
}
}
12 changes: 7 additions & 5 deletions src/Storage/Adapters/SqliteStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ class SqliteStorage implements StorageInterface
{
private string $queueFile;
private \SQLite3 $connection;
private bool $debug = false;

public function __construct(string $storagePath, bool $debug = false)
{
public function __construct(
string $storagePath,
string $storageName = 'queue',
) {
if (empty($storagePath)) {
$storagePath = ".";
}
$this->queueFile = FileUtils::isFilePath($storagePath) ? $storagePath : rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . 'queue.db';
$this->queueFile = FileUtils::isFilePath($storagePath)
? $storagePath
: rtrim($storagePath, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR . $storageName . '.db';
FileUtils::createFile($this->queueFile);
$this->debug = $debug;

$this->connection = new \SQLite3($this->queueFile);
$this->connection->query("CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT)");
Expand Down
10 changes: 5 additions & 5 deletions src/Storage/StorageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

class StorageFactory
{
public static function getStorage(StorageType $type, string $storagePath = ""): StorageInterface
public static function getStorage(StorageType $type, string $storagePath = "", string $storageName = 'queue'): StorageInterface
{
return match ($type) {
StorageType::FILE => new FileStorage($storagePath),
StorageType::SQLITE => new SqliteStorage($storagePath),
StorageType::REDIS => new RedisStorage($storagePath),
StorageType::BEANSTALKD => new BeanstalkdStorage($storagePath),
StorageType::FILE => new FileStorage($storagePath, $storageName),
StorageType::SQLITE => new SqliteStorage($storagePath, $storageName),
StorageType::REDIS => new RedisStorage($storagePath, $storageName),
StorageType::BEANSTALKD => new BeanstalkdStorage($storagePath, $storageName),
};
}
}
66 changes: 66 additions & 0 deletions tests/FileStorageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

use Slowmove\SimplePhpQueue\Storage\Adapters\FileStorage;

beforeEach(function () {
$this->tempDir = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'simple-php-queue-' . bin2hex(random_bytes(8));
mkdir($this->tempDir, 0777, true);
});

afterEach(function () {
$cleanup = function (string $path) use (&$cleanup): void {
if (!is_dir($path)) {
if (is_file($path)) {
unlink($path);
}

return;
}

$entries = array_diff(scandir($path), ['.', '..']);

foreach ($entries as $entry) {
$childPath = $path . DIRECTORY_SEPARATOR . $entry;
if (is_dir($childPath)) {
$cleanup($childPath);
continue;
}

unlink($childPath);
}

rmdir($path);
};

if (isset($this->tempDir) && is_dir($this->tempDir)) {
$cleanup($this->tempDir);
}
});

it('creates a queue file when initialized with a directory path', function () {
new FileStorage($this->tempDir);

expect(is_file($this->tempDir . DIRECTORY_SEPARATOR . 'queue.txt'))->toBeTrue();
});
Comment on lines +40 to +44

it('stores values in fifo order and reports queue state', function () {
$storage = new FileStorage($this->tempDir);

expect($storage->length())->toBe(0);
expect($storage->exist('first'))->toBeFalse();
expect($storage->enqueue('first'))->toBeTrue();
expect($storage->enqueue('second'))->toBeTrue();

expect($storage->length())->toBe(2);
expect($storage->exist('first'))->toBeTrue();
expect($storage->exist('second'))->toBeTrue();
expect($storage->content())->toBe([
'first' . PHP_EOL,
'second' . PHP_EOL,
]);

expect($storage->dequeue())->toBe('first');
expect($storage->dequeue())->toBe('second');
expect($storage->dequeue())->toBeNull();
expect($storage->length())->toBe(0);
});
66 changes: 66 additions & 0 deletions tests/SqliteStorageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

use Slowmove\SimplePhpQueue\Storage\Adapters\SqliteStorage;

beforeEach(function () {
$this->tempDir = sys_get_temp_dir() . DIRECTORY_SEPARATOR . 'simple-php-queue-' . bin2hex(random_bytes(8));
mkdir($this->tempDir, 0777, true);
});

afterEach(function () {
$cleanup = function (string $path) use (&$cleanup): void {
if (!is_dir($path)) {
if (is_file($path)) {
unlink($path);
}

return;
}

$entries = array_diff(scandir($path), ['.', '..']);

foreach ($entries as $entry) {
$childPath = $path . DIRECTORY_SEPARATOR . $entry;
if (is_dir($childPath)) {
$cleanup($childPath);
continue;
}

unlink($childPath);
}

rmdir($path);
};

if (isset($this->tempDir) && is_dir($this->tempDir)) {
$cleanup($this->tempDir);
}
});

it('creates a sqlite database file when initialized with a directory path', function () {
new SqliteStorage($this->tempDir);

expect(is_file($this->tempDir . DIRECTORY_SEPARATOR . 'queue.db'))->toBeTrue();
});
Comment on lines +40 to +44

it('stores values in fifo order and reports queue state', function () {
$storage = new SqliteStorage($this->tempDir);

expect($storage->length())->toBe(0);
expect($storage->exist('first'))->toBeFalse();
expect($storage->enqueue('first'))->toBeTrue();
expect($storage->enqueue('second'))->toBeTrue();

expect($storage->length())->toBe(2);
expect($storage->exist('first'))->toBeTrue();
expect($storage->exist('second'))->toBeTrue();
expect($storage->content())->toBe([
'first',
'second',
]);

expect($storage->dequeue())->toBe('first');
expect($storage->dequeue())->toBe('second');
expect($storage->dequeue())->toBeNull();
expect($storage->length())->toBe(0);
});
Loading