Draft: Add primitives, identity manager w adapter#381
Draft: Add primitives, identity manager w adapter#381
Conversation
| defer(until = { it.configReady }) { | ||
| effect { IdentityEffect.ResolveSeed(sanitized) } | ||
| effect { IdentityEffect.FetchAssignments } | ||
| effect { IdentityEffect.ReevaluateTestMode(sanitized, base.aliasId) } | ||
| } |
There was a problem hiding this comment.
Deferred effects may never fire after a user switch
When a user is switched (Identify reducer detects a different appUserId), SdkState.Updates.FullResetOnIdentify is dispatched, which sets configReady = false. The deferred effects block then waits until = { it.configReady }.
However, configReady is only set back to true when ConfigManager explicitly dispatches SdkState.Updates.ConfigReady — which only happens after an initial config retrieval or a refreshConfiguration() call. Since FullResetOnIdentify does not trigger a new config fetch in ConfigManager, and completeReset() only resets other managers (not ConfigManager), configReady may never flip back to true in the current session.
This means IdentityEffect.ResolveSeed, IdentityEffect.FetchAssignments, and IdentityEffect.ReevaluateTestMode would only run after the next periodic config refresh — a silent behavioural regression compared to the old configManager.configState.awaitFirstValidConfig() path, which found the already-cached config immediately.
Consider dispatching ConfigReady in ConfigManager explicitly after a reset, or reusing the existing cached config state to release deferred effects when the cached config is already available.
Prompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/main/java/com/superwall/sdk/identity/IdentityManagerActor.kt
Line: 96-100
Comment:
**Deferred effects may never fire after a user switch**
When a user is switched (`Identify` reducer detects a different `appUserId`), `SdkState.Updates.FullResetOnIdentify` is dispatched, which sets `configReady = false`. The deferred effects block then waits `until = { it.configReady }`.
However, `configReady` is only set back to `true` when `ConfigManager` explicitly dispatches `SdkState.Updates.ConfigReady` — which only happens after an initial config retrieval or a `refreshConfiguration()` call. Since `FullResetOnIdentify` does **not** trigger a new config fetch in `ConfigManager`, and `completeReset()` only resets other managers (not ConfigManager), `configReady` may never flip back to `true` in the current session.
This means `IdentityEffect.ResolveSeed`, `IdentityEffect.FetchAssignments`, and `IdentityEffect.ReevaluateTestMode` would only run after the next periodic config refresh — a silent behavioural regression compared to the old `configManager.configState.awaitFirstValidConfig()` path, which found the already-cached config immediately.
Consider dispatching `ConfigReady` in `ConfigManager` explicitly after a reset, or reusing the existing cached config state to release deferred effects when the cached config is already available.
How can I resolve this? If you propose a fix, please make it concise.| withErrorTracking { | ||
| (event as Reducer<SdkState>).applyOn(fx, _state.value) | ||
| }.let { either -> | ||
| when (either) { | ||
| is Success -> either.value | ||
| is Failure -> _state.value // keep current state on error | ||
| } |
There was a problem hiding this comment.
Unsafe cast — IdentityState.Updates dispatched directly would ClassCastException
The engine unconditionally casts every SdkEvent to Reducer<SdkState>. IdentityState.Updates extends Reducer<IdentityState>, not Reducer<SdkState>, so dispatching an IdentityState.Updates directly to engine.dispatch() would throw a ClassCastException at runtime (caught by withErrorTracking, causing silent state loss).
Nothing in the type system prevents engine.dispatch(IdentityState.Updates.Reset). Since Engine is internal and engine is exposed as internal val engine: Engine on IdentityManager, this is an easy mistake to make — the tests already call manager.engine.dispatch(SdkState.Updates.ConfigReady) directly, showing the pattern is expected.
Consider restricting Engine.dispatch to accept only SdkState.Updates (or a dedicated sealed RootEvent type) to make the constraint compile-time safe, rather than relying on a runtime cast suppressed with @Suppress("UNCHECKED_CAST").
Prompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/main/java/com/superwall/sdk/misc/primitives/Engine.kt
Line: 51-57
Comment:
**Unsafe cast — `IdentityState.Updates` dispatched directly would `ClassCastException`**
The engine unconditionally casts every `SdkEvent` to `Reducer<SdkState>`. `IdentityState.Updates` extends `Reducer<IdentityState>`, not `Reducer<SdkState>`, so dispatching an `IdentityState.Updates` directly to `engine.dispatch()` would throw a `ClassCastException` at runtime (caught by `withErrorTracking`, causing silent state loss).
Nothing in the type system prevents `engine.dispatch(IdentityState.Updates.Reset)`. Since `Engine` is `internal` and `engine` is exposed as `internal val engine: Engine` on `IdentityManager`, this is an easy mistake to make — the tests already call `manager.engine.dispatch(SdkState.Updates.ConfigReady)` directly, showing the pattern is expected.
Consider restricting `Engine.dispatch` to accept only `SdkState.Updates` (or a dedicated sealed `RootEvent` type) to make the constraint compile-time safe, rather than relying on a runtime cast suppressed with `@Suppress("UNCHECKED_CAST")`.
How can I resolve this? If you propose a fix, please make it concise.| private val engineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() | ||
| private val engineScope = CoroutineScope(engineDispatcher) |
There was a problem hiding this comment.
Thread executor is never closed — resource leak
Executors.newSingleThreadExecutor().asCoroutineDispatcher() creates a platform thread that is never shut down. The CoroutineDispatcher returned by asCoroutineDispatcher() implements Closeable, but no reference is kept to call .close() when the IdentityManager is no longer needed (e.g., after Superwall.reset()).
This leaks an OS thread for each IdentityManager instance created during the app's lifecycle.
Consider storing the executor reference and providing a teardown path, or using a lifecycle-scoped dispatcher:
private val engineExecutor = Executors.newSingleThreadExecutor()
private val engineDispatcher = engineExecutor.asCoroutineDispatcher()
// ... call engineExecutor.shutdown() / engineDispatcher.close() when tearing downPrompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/main/java/com/superwall/sdk/identity/IdentityManager.kt
Line: 52-53
Comment:
**Thread executor is never closed — resource leak**
`Executors.newSingleThreadExecutor().asCoroutineDispatcher()` creates a platform thread that is never shut down. The `CoroutineDispatcher` returned by `asCoroutineDispatcher()` implements `Closeable`, but no reference is kept to call `.close()` when the `IdentityManager` is no longer needed (e.g., after `Superwall.reset()`).
This leaks an OS thread for each `IdentityManager` instance created during the app's lifecycle.
Consider storing the executor reference and providing a teardown path, or using a lifecycle-scoped dispatcher:
```kotlin
private val engineExecutor = Executors.newSingleThreadExecutor()
private val engineDispatcher = engineExecutor.asCoroutineDispatcher()
// ... call engineExecutor.shutdown() / engineDispatcher.close() when tearing down
```
How can I resolve this? If you propose a fix, please make it concise.| fun log( | ||
| logLevel: LogLevel, | ||
| scope: LogScope, | ||
| message: String = "", | ||
| info: Map<String, Any>? = null, | ||
| error: Throwable? = null, | ||
| ) { | ||
| Logger.debug( | ||
| logLevel, | ||
| scope, | ||
| message, | ||
| info, | ||
| error, | ||
| ) | ||
| } |
There was a problem hiding this comment.
Fx.log() performs an immediate side effect inside the reducer
All other Fx methods add to pending for deferred execution. log() instead calls Logger.debug() directly, making reducers impure — they can perform I/O (logging) during the "pure reduce" phase that the Engine relies on being side-effect-free.
Additionally, Effect.Log is defined in Effects.kt but is never produced by Fx.log(), making it dead code.
If the intent is to keep reducers pure, Fx.log() should enqueue an Effect.Log rather than calling the logger eagerly:
fun log(...) {
pending += Effect.Log(logLevel, scope, message, info, error)
}And EffectRunner / Engine should handle Effect.Log in its when dispatch.
Prompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/main/java/com/superwall/sdk/misc/primitives/Fx.kt
Line: 34-48
Comment:
**`Fx.log()` performs an immediate side effect inside the reducer**
All other `Fx` methods add to `pending` for deferred execution. `log()` instead calls `Logger.debug()` directly, making reducers impure — they can perform I/O (logging) during the "pure reduce" phase that the `Engine` relies on being side-effect-free.
Additionally, `Effect.Log` is defined in `Effects.kt` but is never produced by `Fx.log()`, making it dead code.
If the intent is to keep reducers pure, `Fx.log()` should enqueue an `Effect.Log` rather than calling the logger eagerly:
```kotlin
fun log(...) {
pending += Effect.Log(logLevel, scope, message, info, error)
}
```
And `EffectRunner` / `Engine` should handle `Effect.Log` in its `when` dispatch.
How can I resolve this? If you propose a fix, please make it concise.| IdentityLogic.sanitize(userId).takeIf { !it.isNullOrEmpty() }?.let { sanitized -> | ||
| if (sanitized.isEmpty()) { | ||
| return@let state | ||
| } |
There was a problem hiding this comment.
Unreachable guard inside Identify reducer
The takeIf { !it.isNullOrEmpty() } predicate at the call site guarantees sanitized is non-null and non-empty before the .let lambda runs. The inner if (sanitized.isEmpty()) check is therefore dead code that can never be reached.
| IdentityLogic.sanitize(userId).takeIf { !it.isNullOrEmpty() }?.let { sanitized -> | |
| if (sanitized.isEmpty()) { | |
| return@let state | |
| } | |
| IdentityLogic.sanitize(userId).takeIf { !it.isNullOrEmpty() }?.let { sanitized -> | |
| if (sanitized == state.appUserId) return@let state |
Prompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/main/java/com/superwall/sdk/identity/IdentityManagerActor.kt
Line: 62-65
Comment:
**Unreachable guard inside `Identify` reducer**
The `takeIf { !it.isNullOrEmpty() }` predicate at the call site guarantees `sanitized` is non-null and non-empty before the `.let` lambda runs. The inner `if (sanitized.isEmpty())` check is therefore dead code that can never be reached.
```suggestion
IdentityLogic.sanitize(userId).takeIf { !it.isNullOrEmpty() }?.let { sanitized ->
if (sanitized == state.appUserId) return@let state
```
How can I resolve this? If you propose a fix, please make it concise.| When("reset is called not during identify") { | ||
| manager.reset(duringIdentify = false) | ||
| Thread.sleep(100) | ||
| } | ||
|
|
There was a problem hiding this comment.
Thread.sleep produces flaky tests
Many tests synchronize on async engine operations using Thread.sleep(100) / Thread.sleep(200). Because IdentityManager creates a real Executors.newSingleThreadExecutor() dispatcher that the TestCoroutineScheduler cannot control, advanceUntilIdle() won't drain it — hence the sleeps. However, this makes tests non-deterministic: they can fail spuriously on slow CI machines or pass incorrectly on fast ones.
This applies to tests throughout both IdentityManagerTest and IdentityManagerUserAttributesTest.
Consider making the engine dispatcher injectable (e.g., a constructor parameter defaulting to the single-thread executor) so tests can pass UnconfinedTestDispatcher() or StandardTestDispatcher(testScheduler) and use advanceUntilIdle() / runCurrent() for reliable synchronisation without wall-clock sleeps.
Prompt To Fix With AI
This is a comment left during a code review.
Path: superwall/src/test/java/com/superwall/sdk/identity/IdentityManagerTest.kt
Line: 363-367
Comment:
**`Thread.sleep` produces flaky tests**
Many tests synchronize on async engine operations using `Thread.sleep(100)` / `Thread.sleep(200)`. Because `IdentityManager` creates a real `Executors.newSingleThreadExecutor()` dispatcher that the `TestCoroutineScheduler` cannot control, `advanceUntilIdle()` won't drain it — hence the sleeps. However, this makes tests non-deterministic: they can fail spuriously on slow CI machines or pass incorrectly on fast ones.
This applies to tests throughout both `IdentityManagerTest` and `IdentityManagerUserAttributesTest`.
Consider making the engine dispatcher injectable (e.g., a constructor parameter defaulting to the single-thread executor) so tests can pass `UnconfinedTestDispatcher()` or `StandardTestDispatcher(testScheduler)` and use `advanceUntilIdle()` / `runCurrent()` for reliable synchronisation without wall-clock sleeps.
How can I resolve this? If you propose a fix, please make it concise.
Changes in this pull request
Checklist
CHANGELOG.mdfor any breaking changes, enhancements, or bug fixes.ktlintin the main directory and fixed any issues.Greptile Summary
This draft PR replaces the lock/
runBlocking-basedIdentityManagerwith a Redux-style unidirectional data-flow architecture: a centralEngineprocesses events from an unlimitedChannel, pureReducerlambdas produce the nextSdkState, and side effects are accumulated in anFxobject and dispatched asynchronously. A newIdentityManagerActor.kthouses all identity state, reducers, and effects, whileIdentityManagerbecomes a thin facade with an unchanged public API.ConfigManagergains twoConfigReadydispatch calls to signal the engine when configuration is available.Key concerns to resolve before merging:
FullResetOnIdentifysetsconfigReady = false, butConfigManageronly dispatchesConfigReadyon an actual config fetch or refresh. Because no config refresh is triggered by the user-switch flow,ResolveSeed,FetchAssignments, andReevaluateTestModemay stall indefinitely until the next periodic refresh — a silent regression from the oldawaitFirstValidConfig()path.Engine— everySdkEventis cast toReducer<SdkState>at runtime;IdentityState.Updates(which isReducer<IdentityState>) dispatched directly would throw aClassCastExceptionwith no compile-time protection.IdentityManagercreates anewSingleThreadExecutordispatcher that is never closed, leaking an OS thread per instance.Fx.log()—Fx.log()performs immediate I/O (callsLogger.debug()) instead of queuing anEffect.Log, undermining the pure-reducer contract;Effect.Logis declared but dead.Thread.sleep(100/200)is used throughout both test classes to synchronise with the engine's real executor thread, which theTestCoroutineSchedulercannot control.Confidence Score: 2/5
IdentityManagerActor.kt(deferred-effect deadlock),Engine.kt(unsafe cast),IdentityManager.kt(thread leak),Fx.kt(impure logging), and both test files (Thread.sleep synchronisation).Important Files Changed
SdkEventtoReducer<SdkState>that would silently fail if anIdentityState.Updatesevent is dispatched directly.IdentityState, all its reducers, andIdentityEffectsealed class; contains a deferred-effect deadlock risk after user-switch and unreachable code in theIdentifyreducer.newSingleThreadExecutordispatcher that is never closed, leaking a thread perIdentityManagerinstance.Fx.log()performs an immediate logging side effect rather than queuing anEffect.Log, breaking the pure-reducer model.Effectsealed interface hierarchy;Effect.Logis declared but never produced byFx.log(), making it effectively dead code.engine.dispatch(SdkState.Updates.ConfigReady)calls at config retrieval and refresh points; minimal change but critical for the deferred-effect flow — does not dispatch after a user-switch-triggered reset, which is the root of the deadlock risk.Thread.sleepthroughout for async synchronisation, making tests susceptible to flakiness on slow CI machines.userAttributesconsistency across init/identify/reset/merge scenarios; also relies onThread.sleepfor synchronisation.Sequence Diagram
sequenceDiagram participant Caller as Caller (Superwall.kt) participant IM as IdentityManager participant Engine as Engine (Channel) participant Reducer as SdkState.Updates / IdentityState.Updates participant Fx as Fx (effect accumulator) participant Runner as EffectRunner participant Config as ConfigManager participant Storage as Storage Note over Caller,Storage: Identify new user (user switch scenario) Caller->>IM: identify("user-B") IM->>Engine: dispatch(UpdateIdentity(Identify("user-B"))) Engine->>Reducer: applyOn(Fx, SdkState) Reducer->>Fx: dispatch(FullResetOnIdentify) → configReady=false Reducer->>Fx: effect(CompleteReset) Reducer->>Fx: persist(AppUserId, AliasId, Seed, UserAttributes) Reducer->>Fx: track(IdentityAlias) Reducer->>Fx: defer(until={configReady}) [ResolveSeed, FetchAssignments, ReevaluateTestMode] Reducer->>Fx: effect(CheckWebEntitlements) Engine->>Runner: launch CompleteReset Runner->>Caller: completeReset() Engine->>Runner: launch CheckWebEntitlements Engine->>Runner: launch persist/track effects Runner->>Storage: write(AppUserId, AliasId, Seed, UserAttributes) Note over Engine,Config: Deferred effects wait for configReady=true Config-->>Engine: dispatch(ConfigReady) [only on next config fetch/refresh] Engine->>Fx: configReady=true → release deferred batch Engine->>Runner: launch ResolveSeed Engine->>Runner: launch FetchAssignments Engine->>Runner: launch ReevaluateTestModeLast reviewed commit: 9d7b577