diff --git a/build.gradle.kts b/build.gradle.kts index 5cddda22..44c8b06b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -5,8 +5,8 @@ plugins { application kotlin("jvm") version "2.1.10" kotlin("plugin.serialization") version "2.1.10" + kotlin("plugin.allopen") version "2.1.10" - alias(libs.plugins.ksp) id("org.jsonschema2pojo") version "1.2.2" alias(libs.plugins.openapi.generator) @@ -38,8 +38,7 @@ dependencies { implementation(libs.clikt) implementation(libs.mordant) - ksp(libs.restate.sdk.api.kotlin.gen) { isChanging = true } - implementation(libs.restate.sdk.kotlin.http) { isChanging = true } + implementation(libs.restate.sdk.kotlin.http) implementation(libs.vertx) implementation(libs.junit.all) @@ -123,6 +122,7 @@ openApiGenerate { tasks { withType().configureEach { + dependsOn(openApiGenerate) dependsOn(generateJsonSchema2Pojo) dependsOn(withType()) } @@ -130,8 +130,6 @@ tasks { test { useJUnitPlatform() } } -afterEvaluate { tasks.named("kspKotlin").configure { mustRunAfter("openApiGenerate") } } - spotless { kotlin { ktfmt() @@ -141,6 +139,12 @@ spotless { kotlinGradle { ktfmt() } } +allOpen { + annotation("dev.restate.sdk.annotation.Service") + annotation("dev.restate.sdk.annotation.VirtualObject") + annotation("dev.restate.sdk.annotation.Workflow") +} + licenseReport { renderers = arrayOf( diff --git a/gradle.properties b/gradle.properties index 4ad3a26a..c42726b1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,3 @@ kotlin.code.style=official -ksp.useKSP2 = true javaLanguageVersion = 21 \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index edacbf32..b4ccfcbf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,5 @@ [versions] -restate = "2.4.2" +restate = "2.6.0" ktor = "2.3.13" [libraries] diff --git a/settings.gradle.kts b/settings.gradle.kts index fcd4fc1b..6a48009e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,12 +5,8 @@ rootProject.name = "restate-e2e" dependencyResolutionManagement { versionCatalogs { create("libs") { - library("restate-sdk-client", "dev.restate", "client").versionRef("restate") library("restate-sdk-client-kotlin", "dev.restate", "client-kotlin").versionRef("restate") - library("restate-sdk-api-kotlin", "dev.restate", "sdk-api-kotlin").versionRef("restate") library("restate-sdk-kotlin-http", "dev.restate", "sdk-kotlin-http").versionRef("restate") - library("restate-sdk-api-kotlin-gen", "dev.restate", "sdk-api-kotlin-gen") - .versionRef("restate") version("log4j", "2.24.3") library("log4j-api", "org.apache.logging.log4j", "log4j-api").versionRef("log4j") @@ -70,11 +66,6 @@ dependencyResolutionManagement { .version("1.10.1") library("kotlinx-coroutines-test", "org.jetbrains.kotlinx", "kotlinx-coroutines-test") .version("1.10.1") - - version("ksp", "2.1.10-1.0.31") - library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api") - .versionRef("ksp") - plugin("ksp", "com.google.devtools.ksp").versionRef("ksp") } } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableIngressEndpointTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableIngressEndpointTest.kt index a20931f8..8cf94ed7 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableIngressEndpointTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/AwakeableIngressEndpointTest.kt @@ -11,11 +11,16 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client import dev.restate.client.kotlin.rejectSuspend import dev.restate.client.kotlin.resolveSuspend +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* +import dev.restate.sdk.kotlin.awakeable +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import dev.restate.serde.kotlinx.jsonSerde @@ -33,14 +38,13 @@ class AwakeableIngressEndpointTest { class MyService { @Handler - suspend fun run(ctx: ObjectContext): String { - val awk = ctx.awakeable() - ctx.set("awk", awk.id) + suspend fun run(): String { + val awk = awakeable() + state().set("awk", awk.id) return awk.await() } - @Shared - suspend fun getAwakeable(ctx: SharedObjectContext): String = ctx.get("awk") ?: "" + @Shared suspend fun getAwakeable(): String = state().get("awk") ?: "" } companion object { @@ -55,18 +59,21 @@ class AwakeableIngressEndpointTest { @InjectClient ingressClient: Client, ) = runTest { val key = UUID.randomUUID().toString() - val client = AwakeableIngressEndpointTestMyServiceClient.fromClient(ingressClient, key) + val client = ingressClient.toVirtualObject(key) - val runResult = async { client.run(idempotentCallOptions) } + val runResult = async { + client.request { run() }.options(idempotentCallOptions).call().response + } // Wait for awakeable to be registered await withAlias "awakeable is registered" untilAsserted { - assertThat(client.getAwakeable()).isNotBlank() + assertThat(client.request { getAwakeable() }.call().response).isNotBlank() } - val awakeableId = client.getAwakeable(idempotentCallOptions) + val awakeableId = + client.request { getAwakeable() }.options(idempotentCallOptions).call().response val expectedResult = "solved!" @@ -82,18 +89,22 @@ class AwakeableIngressEndpointTest { @Test fun completeWithFailure(@InjectClient ingressClient: Client) = runTest { val key = UUID.randomUUID().toString() - val client = AwakeableIngressEndpointTestMyServiceClient.fromClient(ingressClient, key) + val client = ingressClient.toVirtualObject(key) - val runResult = async { runCatching { client.run(idempotentCallOptions) }.exceptionOrNull() } + val runResult = async { + runCatching { client.request { run() }.options(idempotentCallOptions).call().response } + .exceptionOrNull() + } // Wait for awakeable to be registered await withAlias "awakeable is registered" untilAsserted { - assertThat(client.getAwakeable()).isNotBlank() + assertThat(client.request { getAwakeable() }.call().response).isNotBlank() } - val awakeableId = client.getAwakeable(idempotentCallOptions) + val awakeableId = + client.request { getAwakeable() }.options(idempotentCallOptions).call().response val expectedReason = "my bad!" diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt index 406ef5e5..7a64526d 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt @@ -15,15 +15,22 @@ import dev.restate.admin.model.UpdateHttpDeploymentRequest import dev.restate.client.Client import dev.restate.client.SendResponse import dev.restate.client.kotlin.* +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toService +import dev.restate.client.kotlin.toVirtualObject import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* +import dev.restate.sdk.kotlin.awakeable import dev.restate.sdk.kotlin.endpoint.inactivityTimeout import dev.restate.sdk.kotlin.endpoint.journalRetention +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.runBlock +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdktesting.infra.Deployer import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient @@ -66,25 +73,24 @@ class BackwardCompatibilityTest { @VirtualObject class MyService { @Handler - suspend fun run(ctx: ObjectContext): String { - val awk = ctx.awakeable() - ctx.set("awk", awk.id) + suspend fun run(): String { + val awk = awakeable() + state().set("awk", awk.id) return awk.await() } - @Shared - suspend fun getAwakeable(ctx: SharedObjectContext): String = ctx.get("awk") ?: "" + @Shared suspend fun getAwakeable(): String = state().get("awk") ?: "" } @Service @Name("RetryableService") interface RetryableService { - @Handler suspend fun runRetryableOperation(ctx: Context): String + @Handler suspend fun runRetryableOperation(): String } class FailingRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { + override suspend fun runRetryableOperation(): String { + return runBlock { runBlockRetryCounter.incrementAndGet() throw RuntimeException("This should fail in old version") } @@ -92,8 +98,8 @@ class BackwardCompatibilityTest { } class FixedRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { "Success in new version!" } + override suspend fun runRetryableOperation(): String { + return runBlock { "Success in new version!" } } } @@ -101,15 +107,15 @@ class BackwardCompatibilityTest { @Name("ProxyService") class ProxyService { @Handler - suspend fun proxy(ctx: Context): String { + suspend fun proxy(): String { callRetryCounter.incrementAndGet() - return BackwardCompatibilityTestCalleeServiceClient.fromContext(ctx).call().await() + return dev.restate.sdk.kotlin.toService().request { call() }.call().await() } @Handler - suspend fun proxyOneWay(ctx: Context): String { + suspend fun proxyOneWay(): String { oneWayCallRetryCounter.incrementAndGet() - BackwardCompatibilityTestCalleeServiceClient.fromContext(ctx).send().call() + dev.restate.sdk.kotlin.toService().request { call() }.send() return "Done" } } @@ -117,7 +123,7 @@ class BackwardCompatibilityTest { @Service @Name("CalleeService") class CalleeService { - @Handler fun call(ctx: Context) = "Hello from callee!" + @Handler suspend fun call() = "Hello from callee!" } companion object { @@ -167,25 +173,27 @@ class BackwardCompatibilityTest { @Test fun createAwakeable(@InjectClient ingressClient: Client) = runTest { - val client = BackwardCompatibilityTestMyServiceClient.fromClient(ingressClient, awakeableKey) + val client = ingressClient.toVirtualObject(awakeableKey) - client.send().run(init = idempotentCallOptions) + client.request { run() }.options(idempotentCallOptions).send() // Wait for awakeable to be registered await withAlias "awakeable is registered" untilAsserted { - assertThat(client.getAwakeable()).isNotBlank() + assertThat(client.request { getAwakeable() }.call().response).isNotBlank() } } @Test fun startRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = - BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() // Send the request and expect it to fail - retryableClient.send().runRetryableOperation { idempotencyKey = idempotencyKeyRunBlockTest } + retryableClient + .request { runRetryableOperation() } + .options { idempotencyKey = idempotencyKeyRunBlockTest } + .send() // Wait for at least one retry await withAlias @@ -199,9 +207,9 @@ class BackwardCompatibilityTest { @Test fun startProxyCall(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - retryableClient.send().proxy { idempotencyKey = idempotencyKeyCallTest } + retryableClient.request { proxy() }.options { idempotencyKey = idempotencyKeyCallTest }.send() // Wait for at least one retry await withAlias @@ -213,9 +221,12 @@ class BackwardCompatibilityTest { @Test fun startOneWayProxyCall(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - retryableClient.send().proxyOneWay { idempotencyKey = idempotencyKeyOneWayCallTest } + retryableClient + .request { proxyOneWay() } + .options { idempotencyKey = idempotencyKeyOneWayCallTest } + .send() // Wait for at least one retry await withAlias @@ -299,10 +310,12 @@ class BackwardCompatibilityTest { @Test fun completeAwakeable(@InjectClient ingressClient: Client) = runTest { - val client = BackwardCompatibilityTestMyServiceClient.fromClient(ingressClient, awakeableKey) + val client = ingressClient.toVirtualObject(awakeableKey) - val awakeableId = client.getAwakeable(idempotentCallOptions) - assertThat(client.getAwakeable(idempotentCallOptions)).isNotBlank() + val awakeableId = + client.request { getAwakeable() }.options(idempotentCallOptions).call().response + assertThat(client.request { getAwakeable() }.options(idempotentCallOptions).call().response) + .isNotBlank() val expectedResult = "solved!" await withAlias @@ -314,13 +327,13 @@ class BackwardCompatibilityTest { @Test fun completeRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = - BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() val result = - retryableClient.send().runRetryableOperation { - idempotencyKey = idempotencyKeyRunBlockTest - } + retryableClient + .request { runRetryableOperation() } + .options { idempotencyKey = idempotencyKeyRunBlockTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) @@ -330,9 +343,13 @@ class BackwardCompatibilityTest { @Test fun proxyCallShouldBeDone(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - val result = retryableClient.send().proxy { idempotencyKey = idempotencyKeyCallTest } + val result = + retryableClient + .request { proxy() } + .options { idempotencyKey = idempotencyKeyCallTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) @@ -342,10 +359,13 @@ class BackwardCompatibilityTest { @Test fun proxyOneWayCallShouldBeDone(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() val result = - retryableClient.send().proxyOneWay { idempotencyKey = idempotencyKeyOneWayCallTest } + retryableClient + .request { proxyOneWay() } + .options { idempotencyKey = idempotencyKeyOneWayCallTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt index 25353518..679d03f3 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt @@ -13,15 +13,22 @@ import dev.restate.admin.client.ApiClient import dev.restate.client.Client import dev.restate.client.SendResponse import dev.restate.client.kotlin.* +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toService +import dev.restate.client.kotlin.toVirtualObject import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* +import dev.restate.sdk.kotlin.awakeable import dev.restate.sdk.kotlin.endpoint.inactivityTimeout import dev.restate.sdk.kotlin.endpoint.journalRetention +import dev.restate.sdk.kotlin.get +import dev.restate.sdk.kotlin.runBlock +import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdktesting.infra.Deployer import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient @@ -54,25 +61,24 @@ class ForwardCompatibilityTest { @VirtualObject class MyService { @Handler - suspend fun run(ctx: ObjectContext): String { - val awk = ctx.awakeable() - ctx.set("awk", awk.id) + suspend fun run(): String { + val awk = awakeable() + state().set("awk", awk.id) return awk.await() } - @Shared - suspend fun getAwakeable(ctx: SharedObjectContext): String = ctx.get("awk") ?: "" + @Shared suspend fun getAwakeable(): String = state().get("awk") ?: "" } @Service @Name("RetryableService") interface RetryableService { - @Handler suspend fun runRetryableOperation(ctx: Context): String + @Handler suspend fun runRetryableOperation(): String } class FailingRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { + override suspend fun runRetryableOperation(): String { + return runBlock { runBlockRetryCounter.incrementAndGet() throw RuntimeException("This should fail in old version") } @@ -80,8 +86,8 @@ class ForwardCompatibilityTest { } class FixedRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { "Success in new version!" } + override suspend fun runRetryableOperation(): String { + return runBlock { "Success in new version!" } } } @@ -89,15 +95,15 @@ class ForwardCompatibilityTest { @Name("ProxyService") class ProxyService { @Handler - suspend fun proxy(ctx: Context): String { + suspend fun proxy(): String { callRetryCounter.incrementAndGet() - return ForwardCompatibilityTestCalleeServiceClient.fromContext(ctx).call().await() + return dev.restate.sdk.kotlin.toService().request { call() }.call().await() } @Handler - suspend fun proxyOneWay(ctx: Context): String { + suspend fun proxyOneWay(): String { oneWayCallRetryCounter.incrementAndGet() - ForwardCompatibilityTestCalleeServiceClient.fromContext(ctx).send().call() + dev.restate.sdk.kotlin.toService().request { call() }.send() return "Done" } } @@ -105,7 +111,7 @@ class ForwardCompatibilityTest { @Service @Name("CalleeService") class CalleeService { - @Handler fun call(ctx: Context) = "Hello from callee!" + @Handler suspend fun call() = "Hello from callee!" } companion object { @@ -153,24 +159,27 @@ class ForwardCompatibilityTest { @Test fun createAwakeable(@InjectClient ingressClient: Client) = runTest { - val client = ForwardCompatibilityTestMyServiceClient.fromClient(ingressClient, awakeableKey) + val client = ingressClient.toVirtualObject(awakeableKey) - client.send().run(init = idempotentCallOptions) + client.request { run() }.options(idempotentCallOptions).send() // Wait for awakeable to be registered await withAlias "awakeable is registered" untilAsserted { - assertThat(client.getAwakeable()).isNotBlank() + assertThat(client.request { getAwakeable() }.call().response).isNotBlank() } } @Test fun startRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() // Send the request and expect it to fail - retryableClient.send().runRetryableOperation { idempotencyKey = idempotencyKeyRunBlockTest } + retryableClient + .request { runRetryableOperation() } + .options { idempotencyKey = idempotencyKeyRunBlockTest } + .send() // Wait for at least one retry await withAlias @@ -184,9 +193,9 @@ class ForwardCompatibilityTest { @Test fun startProxyCall(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - retryableClient.send().proxy { idempotencyKey = idempotencyKeyCallTest } + retryableClient.request { proxy() }.options { idempotencyKey = idempotencyKeyCallTest }.send() // Wait for at least one retry await withAlias @@ -198,9 +207,12 @@ class ForwardCompatibilityTest { @Test fun startOneWayProxyCall(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - retryableClient.send().proxyOneWay { idempotencyKey = idempotencyKeyOneWayCallTest } + retryableClient + .request { proxyOneWay() } + .options { idempotencyKey = idempotencyKeyOneWayCallTest } + .send() // Wait for at least one retry await withAlias @@ -300,10 +312,12 @@ class ForwardCompatibilityTest { @Test fun completeAwakeable(@InjectClient ingressClient: Client) = runTest { - val client = ForwardCompatibilityTestMyServiceClient.fromClient(ingressClient, awakeableKey) + val client = ingressClient.toVirtualObject(awakeableKey) - val awakeableId = client.getAwakeable(idempotentCallOptions) - assertThat(client.getAwakeable(idempotentCallOptions)).isNotBlank() + val awakeableId = + client.request { getAwakeable() }.options(idempotentCallOptions).call().response + assertThat(client.request { getAwakeable() }.options(idempotentCallOptions).call().response) + .isNotBlank() val expectedResult = "solved!" await withAlias @@ -315,12 +329,13 @@ class ForwardCompatibilityTest { @Test fun completeRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() val result = - retryableClient.send().runRetryableOperation { - idempotencyKey = idempotencyKeyRunBlockTest - } + retryableClient + .request { runRetryableOperation() } + .options { idempotencyKey = idempotencyKeyRunBlockTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) @@ -330,9 +345,13 @@ class ForwardCompatibilityTest { @Test fun proxyCallShouldBeDone(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() - val result = retryableClient.send().proxy { idempotencyKey = idempotencyKeyCallTest } + val result = + retryableClient + .request { proxy() } + .options { idempotencyKey = idempotencyKeyCallTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) @@ -342,10 +361,13 @@ class ForwardCompatibilityTest { @Test fun proxyOneWayCallShouldBeDone(@InjectClient ingressClient: Client) = runTest { - val retryableClient = ForwardCompatibilityTestProxyServiceClient.fromClient(ingressClient) + val retryableClient = ingressClient.toService() val result = - retryableClient.send().proxyOneWay { idempotencyKey = idempotencyKeyOneWayCallTest } + retryableClient + .request { proxyOneWay() } + .options { idempotencyKey = idempotencyKeyOneWayCallTest } + .send() assertThat(result.sendStatus).isEqualTo(SendResponse.SendStatus.PREVIOUSLY_ACCEPTED) diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/IngressTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/IngressTest.kt index 916992a7..83a8f76b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/IngressTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/IngressTest.kt @@ -16,7 +16,10 @@ import dev.restate.client.IngressException import dev.restate.client.SendResponse.SendStatus import dev.restate.client.kotlin.getOutputSuspend import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toService +import dev.restate.client.kotlin.toVirtualObject import dev.restate.common.Target +import dev.restate.common.reflections.ReflectionUtils.extractServiceName import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service @@ -25,10 +28,8 @@ import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.common.StateKey import dev.restate.sdk.common.TerminalException import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.Context -import dev.restate.sdk.kotlin.ObjectContext -import dev.restate.sdk.kotlin.SharedObjectContext import dev.restate.sdk.kotlin.awakeable +import dev.restate.sdk.kotlin.awakeableHandle import dev.restate.sdk.kotlin.call import dev.restate.sdk.kotlin.endpoint.idempotencyRetention import dev.restate.sdk.kotlin.endpoint.ingressPrivate @@ -37,11 +38,13 @@ import dev.restate.sdk.kotlin.get import dev.restate.sdk.kotlin.resolve import dev.restate.sdk.kotlin.send import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdk.kotlin.stateKey import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension import dev.restate.sdktesting.tests.IngressTest.Counter.CounterUpdateResponse +import dev.restate.serde.TypeTag import java.net.URI import java.util.* import kotlin.text.get @@ -72,17 +75,15 @@ class IngressTest { @Serializable data class CounterUpdateResponse(val oldValue: Long, val newValue: Long) @Handler - suspend fun add(context: ObjectContext, value: Long): CounterUpdateResponse { - val oldCount: Long = context.get(COUNTER_KEY) ?: 0L + suspend fun add(value: Long): CounterUpdateResponse { + val oldCount: Long = state().get(COUNTER_KEY) ?: 0L val newCount = oldCount + value - context.set(COUNTER_KEY, newCount) + state().set(COUNTER_KEY, newCount) return CounterUpdateResponse(oldCount, newCount) } - @Handler - @Shared - suspend fun get(context: SharedObjectContext): Long = context.get(COUNTER_KEY) ?: 0L + @Handler @Shared suspend fun get(): Long = state().get(COUNTER_KEY) ?: 0L } @Service @@ -90,8 +91,11 @@ class IngressTest { @Serializable data class ProxyRequest(val key: String, val value: Long) @Handler - suspend fun proxyThrough(context: Context, request: ProxyRequest) { - IngressTestCounterHandlers.add(request.key, request.value).send(context) + suspend fun proxyThrough(request: ProxyRequest) { + dev.restate.sdk.kotlin + .toVirtualObject(request.key) + .request { add(request.value) } + .send() } } @@ -99,35 +103,34 @@ class IngressTest { class AwakeableHolder { @Handler - suspend fun run(ctx: ObjectContext): String { - val awk = ctx.awakeable() - ctx.set("awk", awk.id) + suspend fun run(): String { + val awk = awakeable() + state().set("awk", awk.id) return awk.await() } - @Shared - suspend fun getAwakeable(ctx: SharedObjectContext): String = ctx.get("awk") ?: "" + @Shared suspend fun getAwakeable(): String = state().get("awk") ?: "" @Shared - suspend fun resolveAwakeable(ctx: SharedObjectContext, response: String) { - val awkKey = ctx.get("awk") + suspend fun resolveAwakeable(response: String) { + val awkKey = state().get("awk") if (awkKey.isNullOrEmpty()) { throw TerminalException("Expected awakeable to be non null") } - ctx.awakeableHandle(awkKey).resolve(response) + awakeableHandle(awkKey).resolve(response) } } @Service class PrivateGreeter { - @Handler fun greet(ctx: Context, name: String) = "Hello $name" + @Handler suspend fun greet(name: String) = "Hello $name" } @Service class ProxyGreeter { @Handler - suspend fun greet(ctx: Context, name: String): String = - IngressTestPrivateGreeterHandlers.greet(name).call(ctx).await() + suspend fun greet(name: String): String = + dev.restate.sdk.kotlin.toService().request { greet(name) }.call().await() } companion object { @@ -159,16 +162,26 @@ class IngressTest { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val counterClient = IngressTestCounterClient.fromClient(ingressClient, counterRandomName) + val counterClient = ingressClient.toVirtualObject(counterRandomName) // First call updates the value - val firstResponse = counterClient.add(2) { idempotencyKey = myIdempotencyId } + val firstResponse = + counterClient + .request { add(2) } + .options { idempotencyKey = myIdempotencyId } + .call() + .response assertThat(firstResponse) .returns(0, CounterUpdateResponse::oldValue) .returns(2, CounterUpdateResponse::newValue) // Next call returns the same value - val secondResponse = counterClient.add(2) { idempotencyKey = myIdempotencyId } + val secondResponse = + counterClient + .request { add(2) } + .options { idempotencyKey = myIdempotencyId } + .call() + .response assertThat(secondResponse) .returns(0L, CounterUpdateResponse::oldValue) .returns(2L, CounterUpdateResponse::newValue) @@ -179,7 +192,12 @@ class IngressTest { "cleanup of the previous idempotent request" withTimeout 20.seconds untilAsserted { - assertThat(counterClient.add(2) { idempotencyKey = myIdempotencyId }) + assertThat( + counterClient + .request { add(2) } + .options { idempotencyKey = myIdempotencyId } + .call() + .response) .returns(2, CounterUpdateResponse::oldValue) .returns(4, CounterUpdateResponse::newValue) } @@ -188,7 +206,7 @@ class IngressTest { await withAlias "Get returns 4 now" untilAsserted { - assertThat(counterClient.get()).isEqualTo(4L) + assertThat(counterClient.request { get() }.call().response).isEqualTo(4L) } } @@ -198,22 +216,26 @@ class IngressTest { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val counterClient = IngressTestCounterClient.fromClient(ingressClient, counterRandomName) - val proxyCounterClient = IngressTestCounterProxyClient.fromClient(ingressClient) + val counterClient = ingressClient.toVirtualObject(counterRandomName) + val proxyCounterClient = ingressClient.toService() // Send request twice with same idempotency key. Should proxy the request only once! - proxyCounterClient.proxyThrough(CounterProxy.ProxyRequest(counterRandomName, 2)) { - idempotencyKey = myIdempotencyId - } - proxyCounterClient.proxyThrough(CounterProxy.ProxyRequest(counterRandomName, 2)) { - idempotencyKey = myIdempotencyId - } + proxyCounterClient + .request { proxyThrough(CounterProxy.ProxyRequest(counterRandomName, 2)) } + .options { idempotencyKey = myIdempotencyId } + .call() + .response + proxyCounterClient + .request { proxyThrough(CounterProxy.ProxyRequest(counterRandomName, 2)) } + .options { idempotencyKey = myIdempotencyId } + .call() + .response // Wait for get - await untilAsserted { assertThat(counterClient.get()).isEqualTo(2) } + await untilAsserted { assertThat(counterClient.request { get() }.call().response).isEqualTo(2) } // Hitting directly the counter client should be executed immediately and return 4 - assertThat(counterClient.add(2, idempotentCallOptions)) + assertThat(counterClient.request { add(2) }.options(idempotentCallOptions).call().response) .returns(2, CounterUpdateResponse::oldValue) .returns(4, CounterUpdateResponse::newValue) } @@ -224,13 +246,14 @@ class IngressTest { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val counterClient = IngressTestCounterClient.fromClient(ingressClient, counterRandomName) + val counterClient = ingressClient.toVirtualObject(counterRandomName) // Send request twice with same idempotency key - val firstInvocationSendStatus = counterClient.send().add(2) { idempotencyKey = myIdempotencyId } + val firstInvocationSendStatus = + counterClient.request { add(2) }.options { idempotencyKey = myIdempotencyId }.send() assertThat(firstInvocationSendStatus.sendStatus()).isEqualTo(SendStatus.ACCEPTED) val secondInvocationSendStatus = - counterClient.send().add(2) { idempotencyKey = myIdempotencyId } + counterClient.request { add(2) }.options { idempotencyKey = myIdempotencyId }.send() assertThat(secondInvocationSendStatus.sendStatus()).isEqualTo(SendStatus.PREVIOUSLY_ACCEPTED) // IDs should be the same @@ -239,10 +262,10 @@ class IngressTest { .isEqualTo(secondInvocationSendStatus.invocationId()) // Wait for get - await untilAsserted { assertThat(counterClient.get()).isEqualTo(2) } + await untilAsserted { assertThat(counterClient.request { get() }.call().response).isEqualTo(2) } // Changing idempotency key should be executed immediately and return 4 - assertThat(counterClient.add(2, idempotentCallOptions)) + assertThat(counterClient.request { add(2) }.options(idempotentCallOptions).call().response) .returns(2, CounterUpdateResponse::oldValue) .returns(4, CounterUpdateResponse::newValue) } @@ -255,16 +278,21 @@ class IngressTest { val interpreterId = UUID.randomUUID().toString() // Send request - val awakeableHolder = IngressTestAwakeableHolderClient.fromClient(ingressClient, interpreterId) - assertThat(awakeableHolder.send().run { idempotencyKey = myIdempotencyId }.sendStatus()) + val awakeableHolder = ingressClient.toVirtualObject(interpreterId) + assertThat( + awakeableHolder + .request { run() } + .options { idempotencyKey = myIdempotencyId } + .send() + .sendStatus()) .isEqualTo(SendStatus.ACCEPTED) val invocationHandle = ingressClient.idempotentInvocationHandle( Target.virtualObject( - IngressTestAwakeableHolderHandlers.Metadata.SERVICE_NAME, interpreterId, "run"), + extractServiceName(AwakeableHolder::class.java), interpreterId, "run"), myIdempotencyId, - IngressTestAwakeableHolderHandlers.Metadata.Serde.RUN_OUTPUT) + TypeTag.of(String::class.java)) // Attach to request val blockedFut = invocationHandle.attachAsync() @@ -279,9 +307,13 @@ class IngressTest { await withAlias "sync point" untilAsserted { - assertThat(awakeableHolder.getAwakeable()).isNotBlank + assertThat(awakeableHolder.request { getAwakeable() }.call().response).isNotBlank } - awakeableHolder.resolveAwakeable(response, idempotentCallOptions) + awakeableHolder + .request { resolveAwakeable(response) } + .options(idempotentCallOptions) + .call() + .response // Attach should be completed assertThat(blockedFut.await().response).isEqualTo(response) @@ -298,34 +330,40 @@ class IngressTest { @InjectClient ingressClient: Client, ) = runTest { val adminServiceClient = ServiceApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) - val greeterClient = IngressTestPrivateGreeterClient.fromClient(ingressClient) + val greeterClient = ingressClient.toService() // Wait for the service to be private await withAlias "the service is private" untilAsserted { val ctx = currentCoroutineContext() - assertThatThrownBy { runBlocking(ctx) { greeterClient.greet("Francesco") } } + assertThatThrownBy { + runBlocking(ctx) { greeterClient.request { greet("Francesco") }.call().response } + } .asInstanceOf(InstanceOfAssertFactories.type(IngressException::class.java)) .returns(400, IngressException::getStatusCode) } // Send a request through the proxy client assertThat( - IngressTestProxyGreeterClient.fromClient(ingressClient) - .greet("Francesco", init = idempotentCallOptions)) + ingressClient + .toService() + .request { greet("Francesco") } + .options(idempotentCallOptions) + .call() + .response) .isEqualTo("Hello Francesco") // Make the service public again adminServiceClient.modifyService( - IngressTestPrivateGreeterHandlers.Metadata.SERVICE_NAME, - ModifyServiceRequest()._public(true)) + extractServiceName(PrivateGreeter::class.java), ModifyServiceRequest()._public(true)) // Wait to get the correct count await withAlias "the service becomes public again" untilAsserted { - assertThat(greeterClient.greet("Francesco")).isEqualTo("Hello Francesco") + assertThat(greeterClient.request { greet("Francesco") }.call().response) + .isEqualTo("Hello Francesco") } } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/JournalRetentionTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/JournalRetentionTest.kt index 4cfd460b..dd2ea18e 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/JournalRetentionTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/JournalRetentionTest.kt @@ -9,11 +9,12 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client +import dev.restate.client.kotlin.toService import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Service import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* import dev.restate.sdk.kotlin.endpoint.* +import dev.restate.sdk.kotlin.sleep import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension @@ -32,8 +33,8 @@ class JournalRetentionTest { class MyService { @Handler - suspend fun greet(ctx: Context, input: String): String { - ctx.sleep(100.milliseconds) + suspend fun greet(input: String): String { + sleep(100.milliseconds) return input } } @@ -50,8 +51,9 @@ class JournalRetentionTest { @InjectClient ingressClient: Client, @InjectAdminURI adminURI: URI, ) = runTest { - val client = JournalRetentionTestMyServiceClient.fromClient(ingressClient) - val invocationId = client.send().greet("Francesco", init = idempotentCallOptions).invocationId() + val client = ingressClient.toService() + val invocationId = + client.request { greet("Francesco") }.options(idempotentCallOptions).send().invocationId() await withAlias "got the invocation completed, with the journal retained" untilAsserted diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt index a59d39c8..988d85f8 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt @@ -11,12 +11,16 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client import dev.restate.client.kotlin.attachSuspend import dev.restate.client.kotlin.getOutputSuspend +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toWorkflow +import dev.restate.client.kotlin.workflowHandle +import dev.restate.common.reflections.ReflectionUtils.extractServiceName import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.Workflow import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.SharedWorkflowContext -import dev.restate.sdk.kotlin.WorkflowContext import dev.restate.sdk.kotlin.durablePromiseKey +import dev.restate.sdk.kotlin.promise +import dev.restate.sdk.kotlin.promiseHandle import dev.restate.sdktesting.infra.* import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema import java.net.URI @@ -38,15 +42,14 @@ class KafkaAndWorkflowAPITest { val PROMISE = durablePromiseKey("promise") } - @Workflow suspend fun run(ctx: WorkflowContext, myTask: String) = "Run $myTask" + @Workflow suspend fun run(myTask: String) = "Run $myTask" @Shared - suspend fun setPromise(ctx: SharedWorkflowContext, myValue: String) { - ctx.promiseHandle(PROMISE).resolve(myValue) + suspend fun setPromise(myValue: String) { + promiseHandle(PROMISE).resolve(myValue) } - @Shared - suspend fun getPromise(ctx: SharedWorkflowContext) = ctx.promise(PROMISE).future().await() + @Shared suspend fun getPromise() = promise(PROMISE).future().await() } companion object { @@ -71,10 +74,7 @@ class KafkaAndWorkflowAPITest { ) = runTest { // Create subscription Kafka.createKafkaSubscription( - adminURI, - WORKFLOW_TOPIC, - KafkaAndWorkflowAPITestMyWorkflowHandlers.Metadata.SERVICE_NAME, - "run") + adminURI, WORKFLOW_TOPIC, extractServiceName(MyWorkflow::class.java), "run") val keyMessages = linkedMapOf("a" to "1", "b" to "2", "c" to "3") @@ -88,11 +88,11 @@ class KafkaAndWorkflowAPITest { "Workflow invocations from Kafka" untilAsserted { assertThat( - KafkaAndWorkflowAPITestMyWorkflowClient.fromClient( - ingressClient, keyMessage.key) - .workflowHandle() + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) .attachSuspend() - .response()) + .response) .isEqualTo("Run ${keyMessage.value}") } } @@ -102,11 +102,11 @@ class KafkaAndWorkflowAPITest { "Workflow invocations from Kafka" untilAsserted { assertThat( - KafkaAndWorkflowAPITestMyWorkflowClient.fromClient( - ingressClient, keyMessage.key) - .workflowHandle() + ingressClient + .workflowHandle( + extractServiceName(MyWorkflow::class.java), keyMessage.key) .getOutputSuspend() - .response() + .response .value) .isEqualTo("Run ${keyMessage.value}") } @@ -123,10 +123,7 @@ class KafkaAndWorkflowAPITest { ) = runTest { // Create subscription Kafka.createKafkaSubscription( - adminURI, - SHARED_HANDLER_TOPIC, - KafkaAndWorkflowAPITestMyWorkflowHandlers.Metadata.SERVICE_NAME, - "setPromise") + adminURI, SHARED_HANDLER_TOPIC, extractServiceName(MyWorkflow::class.java), "setPromise") val keyMessages = linkedMapOf("a" to "a", "b" to "b", "c" to "c") @@ -142,9 +139,11 @@ class KafkaAndWorkflowAPITest { "Workflow invocations from Kafka" untilAsserted { assertThat( - KafkaAndWorkflowAPITestMyWorkflowClient.fromClient( - ingressClient, keyMessage.key) - .getPromise()) + ingressClient + .toWorkflow(keyMessage.key) + .request { getPromise() } + .call() + .response) .isEqualTo(keyMessage.value) } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt index 18e932ae..086fbda3 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt @@ -9,14 +9,15 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.common.StateKey import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.Context -import dev.restate.sdk.kotlin.ObjectContext +import dev.restate.sdk.kotlin.state import dev.restate.sdk.kotlin.stateKey import dev.restate.sdktesting.infra.* import dev.restate.sdktesting.tests.Kafka.createKafkaSubscription @@ -49,16 +50,16 @@ class KafkaDynamicSetupTest { } @Handler - suspend fun add(ctx: ObjectContext, value: Long): Long { - val current = ctx.get(COUNTER_KEY) ?: 0L + suspend fun add(value: Long): Long { + val current = state().get(COUNTER_KEY) ?: 0L val newValue = current + value - ctx.set(COUNTER_KEY, newValue) + state().set(COUNTER_KEY, newValue) return newValue } @Handler - suspend fun get(ctx: ObjectContext): Long { - return ctx.get(COUNTER_KEY) ?: 0L + suspend fun get(): Long { + return state().get(COUNTER_KEY) ?: 0L } } @@ -68,8 +69,11 @@ class KafkaDynamicSetupTest { @Serializable data class ProxyRequest(val key: String, val value: Long) @Handler - suspend fun oneWayCall(ctx: Context, request: ProxyRequest) { - KafkaDynamicSetupTestCounterClient.fromContext(ctx, request.key).send().add(request.value) + suspend fun oneWayCall(request: ProxyRequest) { + dev.restate.sdk.kotlin + .toVirtualObject(request.key) + .request { add(request.value) } + .send() } } @@ -112,7 +116,8 @@ class KafkaDynamicSetupTest { await withAlias "Updates from Kafka are visible in the counter" untilAsserted { - assertThat(KafkaDynamicSetupTestCounterClient.fromClient(ingressClient, counter).get()) + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) .isEqualTo(6L) } } @@ -142,7 +147,8 @@ class KafkaDynamicSetupTest { await withAlias "Updates from Kafka are visible in the counter" untilAsserted { - assertThat(KafkaDynamicSetupTestCounterClient.fromClient(ingressClient, counter).get()) + assertThat( + ingressClient.toVirtualObject(counter).request { get() }.call().response) .isEqualTo(6L) } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt index 85649921..02d51f0d 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt @@ -9,15 +9,17 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject +import dev.restate.common.reflections.ReflectionUtils.extractServiceName import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Shared import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.ObjectContext -import dev.restate.sdk.kotlin.SharedObjectContext import dev.restate.sdk.kotlin.get import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.InjectContainerPort @@ -40,12 +42,12 @@ class KafkaTracingTest { @Name("Counter") class Counter { @Handler - suspend fun set(ctx: ObjectContext, value: String) { - check(ctx.get("state") == null) - ctx.set("state", value) + suspend fun set(value: String) { + check(state().get("state") == null) + state().set("state", value) } - @Shared suspend fun get(ctx: SharedObjectContext) = ctx.get("state") + @Shared suspend fun get() = state().get("state") } companion object { @@ -72,22 +74,24 @@ class KafkaTracingTest { @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) kafkaPort: Int, ) = runTest { - Kafka.createKafkaSubscription( - adminURI, TOPIC, KafkaTracingTestCounterHandlers.Metadata.SERVICE_NAME, "set") + Kafka.createKafkaSubscription(adminURI, TOPIC, extractServiceName(Counter::class.java), "set") // Produce message to kafka Kafka.produceMessagesToKafka(kafkaPort, TOPIC, listOf("a" to Json.encodeToString("a"))) // Await that state is updated - val client = KafkaTracingTestCounterClient.fromClient(ingressClient, "a") - await withAlias "state is updated" untilAsserted { assertThat(client.get()).isEqualTo("a") } + val client = ingressClient.toVirtualObject("a") + await withAlias + "state is updated" untilAsserted + { + assertThat(client.request { get() }.call().response).isEqualTo("a") + } // Check the traces await withAlias "traces are available" untilAsserted { - val traces = - Tracing.getTraces(jaegerPort, KafkaTracingTestCounterHandlers.Metadata.SERVICE_NAME) + val traces = Tracing.getTraces(jaegerPort, extractServiceName(Counter::class.java)) assertThat(traces.result.resourceSpans).isNotEmpty() diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/OpenAPITest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/OpenAPITest.kt index 0228a914..f0f2bdcb 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/OpenAPITest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/OpenAPITest.kt @@ -12,7 +12,6 @@ import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.Context import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.RestateDeployerExtension import java.net.URI @@ -32,7 +31,7 @@ class OpenAPITest { @Name("GreeterService") class GreeterService { @Handler - suspend fun greet(ctx: Context, name: String): String { + suspend fun greet(name: String): String { return "Hello, $name!" } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt index efbf1151..38529e6b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeChangingDeploymentTest.kt @@ -12,11 +12,11 @@ import dev.restate.admin.api.InvocationApi import dev.restate.admin.client.ApiClient import dev.restate.client.Client import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.toService import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.Context import dev.restate.sdk.kotlin.runBlock import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient @@ -35,18 +35,18 @@ class PauseResumeChangingDeploymentTest { @Service @Name("RetryableService") interface RetryableService { - @Handler suspend fun runRetryableOperation(ctx: Context): String + @Handler suspend fun runRetryableOperation(): String } class FailingRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { throw RuntimeException("This should fail in old version") } + override suspend fun runRetryableOperation(): String { + return runBlock { throw RuntimeException("This should fail in old version") } } } class FixedRetryableService : RetryableService { - override suspend fun runRetryableOperation(ctx: Context): String { - return ctx.runBlock { "Success in new version!" } + override suspend fun runRetryableOperation(): String { + return runBlock { "Success in new version!" } } } @@ -71,11 +71,11 @@ class PauseResumeChangingDeploymentTest { @InjectAdminURI adminURI: URI, ) = runTest { // Create client for RetryableService - val retryClient = - PauseResumeChangingDeploymentTestRetryableServiceClient.fromClient(ingressClient) + val retryClient = ingressClient.toService() // Send idempotent request to trigger retries and pause - val sendResult = retryClient.send().runRetryableOperation(init = idempotentCallOptions) + val sendResult = + retryClient.request { runRetryableOperation() }.options(idempotentCallOptions).send() val invocationId = sendResult.invocationId() // Wait until the invocation is paused (or suspended) by the runtime diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt index 9de12b21..72395cc8 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/PauseResumeTest.kt @@ -12,10 +12,10 @@ import dev.restate.admin.api.InvocationApi import dev.restate.admin.client.ApiClient import dev.restate.client.Client import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.toService import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Service import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.Context import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension @@ -36,7 +36,7 @@ class PauseResumeTest { } @Handler - suspend fun echo(ctx: Context, input: String): String { + suspend fun echo(input: String): String { // Load if we should fail val shouldFail = shouldFail.get() if (shouldFail) { @@ -71,10 +71,10 @@ class PauseResumeTest { FailingService.shouldFail.set(true) // Create client for RetryableService - val retryClient = PauseResumeTestFailingServiceClient.fromClient(ingressClient) + val retryClient = ingressClient.toService() // Send idempotent request to trigger retries and pause - val sendResult = retryClient.send().echo("input", init = idempotentCallOptions) + val sendResult = retryClient.request { echo("input") }.options(idempotentCallOptions).send() val invocationId = sendResult.invocationId() // Wait until the invocation is paused (or suspended) by the runtime diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt index 50eafebf..3ef16efd 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/RestartAsNewInvocationTest.kt @@ -14,12 +14,13 @@ import dev.restate.client.Client import dev.restate.client.IngressException import dev.restate.client.kotlin.* import dev.restate.client.kotlin.attachSuspend +import dev.restate.client.kotlin.toService import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Service import dev.restate.sdk.common.TerminalException import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* import dev.restate.sdk.kotlin.endpoint.journalRetention +import dev.restate.sdk.kotlin.runBlock import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension @@ -45,8 +46,8 @@ class RestartAsNewInvocationTest { } @Handler - suspend fun echo(ctx: Context, input: String): String { - val from = ctx.runBlock { ctxRunResult.get() } + suspend fun echo(input: String): String { + val from = runBlock { ctxRunResult.get() } // Load if we should fail val shouldFail = shouldFail.get() @@ -78,10 +79,10 @@ class RestartAsNewInvocationTest { val input = "my-input" // Create clients for the services - val restartClient = RestartAsNewInvocationTestRestartInvocationClient.fromClient(ingressClient) + val restartClient = ingressClient.toService() // Send request first time - val sendResult = restartClient.send().echo(input, init = idempotentCallOptions) + val sendResult = restartClient.request { echo(input) }.options(idempotentCallOptions).send() val initialInvocationId = sendResult.invocationId() // Should fail with terminal exception @@ -136,10 +137,10 @@ class RestartAsNewInvocationTest { val input = "my-input" // Create clients for the services - val restartClient = RestartAsNewInvocationTestRestartInvocationClient.fromClient(ingressClient) + val restartClient = ingressClient.toService() // Send request first time - val sendResult = restartClient.send().echo(input, init = idempotentCallOptions) + val sendResult = restartClient.request { echo(input) }.options(idempotentCallOptions).send() val initialInvocationId = sendResult.invocationId() // Should fail with terminal exception diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/StatePatchingTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/StatePatchingTest.kt index 4bf18b2a..eb55248d 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/StatePatchingTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/StatePatchingTest.kt @@ -12,13 +12,15 @@ import dev.restate.admin.api.ServiceApi import dev.restate.admin.client.ApiClient import dev.restate.admin.model.ModifyServiceStateRequest import dev.restate.client.Client +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toVirtualObject import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.VirtualObject import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.ObjectContext import dev.restate.sdk.kotlin.get import dev.restate.sdk.kotlin.set +import dev.restate.sdk.kotlin.state import dev.restate.sdktesting.infra.InjectAdminURI import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.RestateDeployerExtension @@ -36,13 +38,13 @@ class StatePatchingTest { @Name("StateObject") class StateObject { @Handler - suspend fun setState(ctx: ObjectContext, value: String) { - ctx.set("state", value) + suspend fun setState(value: String) { + state().set("state", value) } @Handler - suspend fun getState(ctx: ObjectContext): String? { - return ctx.get("state") + suspend fun getState(): String? { + return state().get("state") } } @@ -59,14 +61,15 @@ class StatePatchingTest { @InjectAdminURI adminURI: URI, ) = runTest { // Create a client for the StateObject - val client = StatePatchingTestStateObjectClient.fromClient(ingressClient, "test-key") + val client = ingressClient.toVirtualObject("test-key") // Set initial state val initialState = "initial-state" - client.setState(initialState, idempotentCallOptions) + client.request { setState(initialState) }.options(idempotentCallOptions).call() // Verify initial state - assertThat(client.getState(idempotentCallOptions)).isEqualTo(initialState) + assertThat(client.request { getState() }.options(idempotentCallOptions).call().response) + .isEqualTo(initialState) // Create admin client for state patching val serviceApi = ServiceApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port)) @@ -89,7 +92,7 @@ class StatePatchingTest { await withAlias "state is patched" untilAsserted { - assertThat(client.getState()).isEqualTo(newState) + assertThat(client.request { getState() }.call().response).isEqualTo(newState) } } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/TracingTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/TracingTest.kt index 82c0d2bc..7afef2fe 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/TracingTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/TracingTest.kt @@ -9,11 +9,14 @@ package dev.restate.sdktesting.tests import dev.restate.client.Client +import dev.restate.client.kotlin.response +import dev.restate.client.kotlin.toService import dev.restate.sdk.annotation.Handler import dev.restate.sdk.annotation.Name import dev.restate.sdk.annotation.Service import dev.restate.sdk.endpoint.Endpoint -import dev.restate.sdk.kotlin.* +import dev.restate.sdk.kotlin.openTelemetryContext +import dev.restate.sdk.kotlin.request import dev.restate.sdktesting.infra.InjectClient import dev.restate.sdktesting.infra.InjectContainerPort import dev.restate.sdktesting.infra.RestateDeployerExtension @@ -35,9 +38,9 @@ class TracingTest { @Name("GreeterService") class GreeterService { @Handler - suspend fun greet(ctx: Context, name: String): String { + suspend fun greet(name: String): String { // Get the current span from the OpenTelemetry context - val span = io.opentelemetry.api.trace.Span.fromContext(ctx.request().openTelemetryContext) + val span = io.opentelemetry.api.trace.Span.fromContext(request().openTelemetryContext) // Verify that this is a server span (meaning it was created from a parent) assertThat(span.spanContext.isRemote).isTrue() @@ -74,8 +77,8 @@ class TracingTest { @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, ) = runTest { // Call the greeter service - val greeter = TracingTestGreeterServiceClient.fromClient(client) - val response = greeter.greet("Alice", idempotentCallOptions) + val greeter = client.toService() + val response = greeter.request { greet("Alice") }.options(idempotentCallOptions).call().response assertThat(response).isEqualTo("Hello, Alice!") await withAlias diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt index c2a08a86..6956f04e 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/utils.kt @@ -13,7 +13,7 @@ import dev.restate.admin.client.ApiClient import dev.restate.admin.client.ApiException import dev.restate.admin.model.RegisterDeploymentRequest import dev.restate.admin.model.RegisterHttpDeploymentRequest -import dev.restate.common.RequestBuilder +import dev.restate.common.InvocationOptions import dev.restate.sdk.endpoint.Endpoint import dev.restate.sdk.http.vertx.RestateHttpServer import io.vertx.core.http.HttpServer @@ -61,7 +61,7 @@ suspend infix fun ConditionFactory.untilAsserted(fn: suspend () -> Unit) { fun runTest(timeout: Duration = 60.seconds, testBody: suspend TestScope.() -> Unit) = runTest(context = additionalLoggingContext(), timeout = timeout, testBody = testBody) -val idempotentCallOptions: RequestBuilder<*, *>.() -> Unit = { +val idempotentCallOptions: InvocationOptions.Builder.() -> Unit = { idempotencyKey = UUID.randomUUID().toString() }