Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"php": "~8.2.0 || ~8.3.0 || ~8.4.0 || ~8.5.0",
"doctrine/dbal": "^4.4.0",
"doctrine/migrations": "^3.3.2",
"patchlevel/hydrator": "^1.8.0",
"patchlevel/hydrator": "^1.19.0",
"patchlevel/worker": "^1.4.0",
"psr/cache": "^2.0.0 || ^3.0.0",
"psr/clock": "^1.0",
Expand Down
2 changes: 1 addition & 1 deletion composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ parameters:
count: 1
path: src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php

-
message: '#^Parameter \#1 \$json of function json_decode expects string, mixed given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php

-
message: '#^Parameter \#2 \$data of method Patchlevel\\Hydrator\\HydratorWithContext\:\:hydrate\(\) expects array\<string, mixed\>, mixed given\.$#'
identifier: argument.type
count: 1
path: src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php

-
message: '#^Parameter \#3 \$errorContext of class Patchlevel\\EventSourcing\\Subscription\\SubscriptionError constructor expects list\<array\{namespace\: string, short_name\: string, class\: class\-string, message\: string, code\: int\|string, file\: string, line\: int, trace\: list\<array\{file\?\: string, line\?\: int, function\?\: string, class\?\: string, type\?\: string, args\?\: array\<mixed\>\}\>\}\>\|null, mixed given\.$#'
identifier: argument.type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\StatefulSubscriber;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Subscriber\SubscriberMetadataFactory;
use Patchlevel\EventSourcing\Schema\DoctrineHelper;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator;
use Patchlevel\Hydrator\HydratorWithContext;
use Patchlevel\Hydrator\MetadataHydrator;

use function json_decode;
use function json_encode;

final readonly class DoctrineStatefulSubscriberStore implements StatefulSubscriberStore, DoctrineSchemaConfigurator
{
public function __construct(
private Connection $connection,
private HydratorWithContext $hydrator = new MetadataHydrator(),
private SubscriberMetadataFactory $subscriberMetadataFactory = new AttributeSubscriberMetadataFactory(),
private string $tableName = 'stateful_subscriber_state',
) {
}

public function store(StatefulSubscriber $subscriber): void
{
$subscriberId = $this->subscriberId($subscriber);
$data = $this->hydrator->extract($subscriber);

$this->connection->insert(
$this->tableName,
[
'id' => $subscriberId,
'state' => json_encode($data),
],
);
}

public function load(StatefulSubscriber $subscriber): void
{
$subscriberId = $this->subscriberId($subscriber);

$data = $this->connection->fetchAssociative(
'SELECT * FROM ' . $this->tableName . ' WHERE id = ?',
[$subscriberId],
);

if (!$data) {
return;
}

$this->hydrator->hydrate(
$subscriber::class,
json_decode($data['state'], true),
[HydratorWithContext::OBJECT_TO_POPULATE => $subscriber],
);
}

public function configureSchema(Schema $schema, Connection $connection): void
{
if (!DoctrineHelper::sameDatabase($this->connection, $connection)) {
return;
}

$table = $schema->createTable($this->tableName);

$table->addColumn('id', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('state', Types::JSON)
->setNotnull(true);

$table->setPrimaryKey(['id']);
}

public function subscriberId(StatefulSubscriber $projection): string
{
return $this->subscriberMetadataFactory->metadata($projection::class)->id;
}
}
55 changes: 55 additions & 0 deletions src/Subscription/StatefulSubscriber/StatefulSubscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\StatefulSubscriber;

use LogicException;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;
use Patchlevel\Hydrator\Attribute\Ignore;
use ReflectionClass;

use const PHP_VERSION_ID;

abstract class StatefulSubscriber implements BatchableSubscriber
{
public function __construct(
#[Ignore]
private readonly StatefulSubscriberStore $store,
) {
$this->store->load($this);
}

public function beginBatch(): void
{
// do nothing
}

public function commitBatch(): void
{
$this->store->store($this);
}

public function rollbackBatch(): void
{
$this->store->load($this);
}

public function forceCommit(): bool
{
return false;
}

public static function createLazy(StatefulSubscriberStore $store): static
{
if (PHP_VERSION_ID < 80400) {
throw new LogicException('Lazy subscriber is only supported on PHP 8.4 or higher');
}

$reflection = new ReflectionClass(static::class);

return $reflection->newLazyGhost(static function ($object) use ($store): void {
$object->__construct($store);
});
}
}
12 changes: 12 additions & 0 deletions src/Subscription/StatefulSubscriber/StatefulSubscriberStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\StatefulSubscriber;

interface StatefulSubscriberStore
{
public function store(StatefulSubscriber $subscriber): void;

public function load(StatefulSubscriber $subscriber): void;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber;

use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Subscription\StatefulSubscriber\StatefulSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\ProfileId;

#[Projector('profile_inline')]
final class ProfileInlineStatefulSubscriber extends StatefulSubscriber
{
/** @var array<string, string> */
public array $profiles = [];

#[Subscribe(ProfileCreated::class)]
public function handleProfileCreated(ProfileCreated $profileCreated): void
{
$this->profiles[$profileCreated->profileId->toString()] = $profileCreated->name;
}

public function findById(ProfileId $id): string|null
{
return $this->profiles[$id->toString()] ?? null;
}
}
129 changes: 129 additions & 0 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\StatefulSubscriber\DoctrineStatefulSubscriberStore;
use Patchlevel\EventSourcing\Subscription\Status;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver;
Expand All @@ -43,6 +44,7 @@
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\MigrateAggregateToStreamStoreSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileInlineStatefulSubscriber;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection;
Expand All @@ -53,6 +55,7 @@

use function gc_collect_cycles;
use function iterator_to_array;
use function json_decode;
use function sprintf;

#[CoversNothing]
Expand Down Expand Up @@ -1632,6 +1635,132 @@ class {
self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode());
}

public function testStatefulSubscriber(): void
{
$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
);

$clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00'));

$subscriptionStore = new DoctrineSubscriptionStore(
$this->connection,
$clock,
);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
);

$repository = $manager->get(Profile::class);

$stateStore = new DoctrineStatefulSubscriberStore($this->connection);

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
new ChainDoctrineSchemaConfigurator([
$store,
$subscriptionStore,
$stateStore,
]),
);

$schemaDirector->create();

$subscriberRepository = new MetadataSubscriberAccessorRepository([
ProfileInlineStatefulSubscriber::createLazy(
$stateStore,
),
]);

$engine = new DefaultSubscriptionEngine(
new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository),
$subscriptionStore,
$subscriberRepository,
);

self::assertEquals(
[
new Subscription(
'profile_inline',
'projector',
lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'),
),
],
$engine->subscriptions(),
);

$result = $engine->setup();

self::assertEquals([], $result->errors);

$result = $engine->boot();

self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

self::assertEquals(
[
new Subscription(
'profile_inline',
'projector',
RunMode::FromBeginning,
Status::Active,
lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'),
),
],
$engine->subscriptions(),
);

$profileId = ProfileId::fromString('019d3991-e575-73b2-a18c-7da3fd7f5d70');
$profile = Profile::create($profileId, 'John');
$repository->save($profile);

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals([], $result->errors);

self::assertEquals(
[
new Subscription(
'profile_inline',
'projector',
RunMode::FromBeginning,
Status::Active,
1,
lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'),
),
],
$engine->subscriptions(),
);

$result = $this->connection->fetchAssociative(
'SELECT * FROM stateful_subscriber_state WHERE id = ?',
['profile_inline'],
);

self::assertIsArray($result);
self::assertArrayHasKey('id', $result);
self::assertEquals('profile_inline', $result['id']);

self::assertArrayHasKey('state', $result);
self::assertIsString($result['state']);

self::assertEquals(
['profiles' => ['019d3991-e575-73b2-a18c-7da3fd7f5d70' => 'John']],
json_decode($result['state'], true),
);

$projection = new ProfileInlineStatefulSubscriber(
$stateStore,
);

self::assertEquals(['019d3991-e575-73b2-a18c-7da3fd7f5d70' => 'John'], $projection->profiles);
}

/** @param list<Subscription> $subscriptions */
private static function findSubscription(array $subscriptions, string $id): Subscription
{
Expand Down
Loading