diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 98281c62..90cec5b5 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -644,7 +644,7 @@ function (SubscriptionCollection $subscriptions): Result { } if ($subscription->hasCleanupTasks()) { - $error = $this->cleanup($subscription); + $error = $this->cleanup($subscription, true); if ($error) { $errors[] = $error; @@ -1319,7 +1319,7 @@ private function retryStrategy(Subscription $subscription): RetryStrategy return $this->retryStrategyRepository->get($retryStrategy); } - private function cleanup(Subscription $subscription): Error|null + private function cleanup(Subscription $subscription, bool $force = false): Error|null { if (!$this->cleaner) { throw new CleanerNotConfigured(); @@ -1342,6 +1342,15 @@ private function cleanup(Subscription $subscription): Error|null ), ); + if ($force) { + $this->subscriptionManager->remove($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" removed.', + $subscription->id(), + )); + } + return new Error( $subscription->id(), $e->getMessage(), diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index 27417d65..a448433e 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -3785,15 +3785,15 @@ public function testRemoveWithCleanupHandlerError(): void $task = new DropTableTask('test'); - $subscriptionStore = new DummySubscriptionStore([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::Detached, - cleanupTasks: [$task], - ), - ]); + $subscription = new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Detached, + cleanupTasks: [$task], + ); + + $subscriptionStore = new DummySubscriptionStore([$subscription]); $streamableStore = $this->createMock(Store::class); @@ -3818,7 +3818,7 @@ public function testRemoveWithCleanupHandlerError(): void self::assertEquals($subscriptionId, $error->subscriptionId); self::assertInstanceOf(CleanupFailed::class, $error->throwable); - $subscriptionStore->assertNoChanges(); + $subscriptionStore->assertRemoved($subscription); } public function testReactiveDiscoverNewSubscribers(): void