diff --git a/.claude/skills/add-config-option/SKILL.md b/.claude/skills/add-config-option/SKILL.md new file mode 100644 index 00000000..4e1eb6a2 --- /dev/null +++ b/.claude/skills/add-config-option/SKILL.md @@ -0,0 +1,46 @@ +--- +name: add-config-option +description: When the user asks to add a new option/field/config to service, handler, endpoint, ServiceOptions, HandlerOpts, ObjectOptions, WorkflowOptions, or the discovery schema +user-invocable: false +--- + +# Adding a config option to the Restate TypeScript SDK + +There are two kinds of config options: + +1. **Discovery options** — sent to the Restate server during service discovery (e.g. `ingressPrivate`, `enableLazyState`, timeouts). These need to be in the discovery schema. +2. **Runtime-only options** — used only by the SDK at execution time, never sent to the server (e.g. `asTerminalError`, `serde`). These skip the discovery layer entirely. + +Ask the user which kind if unclear. + +## All options: Type definitions — `packages/libs/restate-sdk/src/types/rpc.ts` + +1. **`ServiceHandlerOpts`** — add the field with JSDoc. All handler types inherit this. + - If object/workflow-only (like `enableLazyState`): add to `ObjectHandlerOpts` / `WorkflowHandlerOpts` instead. +2. **`ServiceOptions`** — add the field with JSDoc. + - If object/workflow-only: add to `ObjectOptions` / `WorkflowOptions` instead. + - `DefaultServiceOptions` in `endpoint.ts` = `ServiceOptions & ObjectOptions & WorkflowOptions`, so endpoint-level gets it free. +3. **`HandlerWrapper.from()`** — add `opts?.fieldName` to the positional constructor call. + - Object/workflow-only fields: `opts !== undefined && "fieldName" in opts ? opts?.fieldName : undefined` +4. **`HandlerWrapper` constructor** — add `public readonly fieldName?: Type` parameter. + +## Discovery options only: Wire through discovery + +### `packages/libs/restate-sdk/src/endpoint/discovery.ts` + +Add the field to both **`Service`** and **`Handler`** interfaces. Use wire types (`number` for millis, `boolean` for flags). + +### `packages/libs/restate-sdk/src/endpoint/components.ts` + +- **`commonServiceOptions()`**: `fieldName: options?.fieldName,` +- **`commonHandlerOptions()`**: `fieldName: wrapper.fieldName,` +- Durations: wrap with `millisOrDurationToMillis()` + `!== undefined` guard +- Object/workflow-only in `commonServiceOptions`: `"fieldName" in options` guard + +## Runtime-only options: Wire through execution + +Options that affect handler execution but not discovery (like `asTerminalError`, `serde`) just need to be read where the handler is invoked. Check how existing runtime options are consumed in `components.ts` handler classes. + +## Verification + +Run `npx tsc --noEmit` from `packages/libs/restate-sdk/`. \ No newline at end of file diff --git a/.claude/skills/add-e2e-test/SKILL.md b/.claude/skills/add-e2e-test/SKILL.md new file mode 100644 index 00000000..85f9049b --- /dev/null +++ b/.claude/skills/add-e2e-test/SKILL.md @@ -0,0 +1,120 @@ +--- +name: add-e2e-test +description: When the user asks to add a new e2e test to the restate-e2e-services package +user-invocable: true +--- + +# Adding an e2e test to restate-e2e-services + +All e2e test infrastructure lives under `packages/tests/restate-e2e-services/`. These tests are NOT run directly — they are executed by the Java e2e test runner which spins up Restate, deploys services, and injects `RESTATE_INGRESS_URL` and `RESTATE_ADMIN_URL` environment variables. + +## Architecture + +- **Services** (`src/`): Restate service definitions (the SUT). Registered via `REGISTRY` and imported in `app.ts`. +- **Tests** (`test/`): Vitest test files that call services through the ingress client. +- **Test config** (`custom_tests.yaml`): YAML config read by the Java e2e test runner to know which commands to execute. +- **Shared utils** (`test/utils.ts`): Provides `ingressClient()`, `getIngressUrl()`, `getAdminUrl()` — reads from env vars injected by the test runner. + +## Steps to add a new e2e test + +### 1. Create the service under test in `src/` + +Create `src/.ts` following this pattern: + +```typescript +import * as restate from "@restatedev/restate-sdk"; +import { REGISTRY } from "./services.js"; + +const myService = restate.service({ + name: "MyService", + handlers: { + myHandler: async (ctx: restate.Context, input: string): Promise => { + // handler logic + return `result: ${input}`; + }, + }, +}); + +REGISTRY.addService(myService); +// Use REGISTRY.addObject() for virtual objects, REGISTRY.addWorkflow() for workflows + +export type MyService = typeof myService; +``` + +Key points: +- Always register with `REGISTRY` so the service is discoverable +- Always export the type (`export type MyService = typeof myService`) so tests can import it for typed client calls + +### 2. Import the service in `src/app.ts` + +Add an import line alongside the other service imports: + +```typescript +import "./my_service.js"; +``` + +### 3. Create the test file in `test/` + +Create `test/.test.ts`: + +```typescript +import { describe, it, expect } from "vitest"; +import { ingressClient } from "./utils.js"; +import type { MyService } from "../src/my_service.js"; + +const MyService: MyService = { name: "MyService" }; + +describe("MyService", () => { + it("should do something", async () => { + const ingress = ingressClient(); + const client = ingress.serviceClient(MyService); + + const result = await client.myHandler("input"); + + expect(result).toBe("result: input"); + }); +}); +``` + +Key points: +- Import the service TYPE from `../src/` for typed ingress client calls +- Create a const with `{ name: "ServiceName" }` matching the service's registered name +- Use `ingressClient()` from `./utils.js` — never hardcode URLs +- For virtual objects use `ingress.objectClient(MyObject, "key")` +- For workflows use `ingress.workflowClient(MyWorkflow, "workflowId")` + +### 4. Type-check + +Run from repo root: + +```bash +pnpm --filter @restatedev/restate-e2e-services run _check:types +``` + +## Available client patterns in tests + +From `@restatedev/restate-sdk-clients`: + +```typescript +const ingress = ingressClient(); + +// Service call +const svc = ingress.serviceClient(MyService); +await svc.handler(input); + +// Virtual object call +const obj = ingress.objectClient(MyObject, "key"); +await obj.handler(input); + +// Workflow +const wf = ingress.workflowClient(MyWorkflow, "wfId"); +await wf.workflowSubmit(input); +await wf.workflowAttach(); + +// Send (fire and forget) +const send = ingress.serviceSendClient(MyService); +await send.handler(input); + +// Idempotent call +await svc.handler(input, restate.rpc.opts({ idempotencyKey: "key" })); +``` \ No newline at end of file diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index d02a4d5c..4ffcbc83 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -61,12 +61,27 @@ jobs: checks: write pull-requests: write actions: read + strategy: + matrix: + node-version: [ 20.x ] steps: - uses: actions/checkout@v4 with: repository: restatedev/sdk-typescript + - uses: actions/checkout@v4 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node-version }} + registry-url: "https://registry.npmjs.org" + - uses: pnpm/action-setup@v4 + with: + version: 10.13.1 + - run: pnpm install --frozen-lockfile + - run: pnpm build + # support importing oci-format restate.tar - name: Set up Docker containerd snapshotter uses: docker/setup-docker-action@v4 @@ -133,11 +148,12 @@ jobs: cache-to: type=gha,url=http://127.0.0.1:49160/,mode=max,version=1,scope=${{ github.workflow }} - name: Run test tool - uses: restatedev/sdk-test-suite@v4.0 + uses: restatedev/sdk-test-suite@v4.1 with: restateContainerImage: ${{ inputs.restateCommit != '' && 'localhost/restatedev/restate-commit-download:latest' || (inputs.restateImage != '' && inputs.restateImage || 'ghcr.io/restatedev/restate:main') }} serviceContainerImage: ${{ inputs.serviceImage != '' && inputs.serviceImage || 'restatedev/typescript-test-services' }} exclusionsFile: "packages/tests/restate-e2e-services/exclusions.yaml" + customTestsFile: "packages/tests/restate-e2e-services/custom_tests.yaml" envVars: ${{ inputs.envVars }} testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-typescript-integration-test-report' }} serviceContainerEnvFile: "packages/tests/restate-e2e-services/.env" diff --git a/CLAUDE.md b/CLAUDE.md new file mode 120000 index 00000000..47dc3e3d --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file diff --git a/packages/libs/restate-sdk/src/context.ts b/packages/libs/restate-sdk/src/context.ts index dd5f24aa..0a7d4728 100644 --- a/packages/libs/restate-sdk/src/context.ts +++ b/packages/libs/restate-sdk/src/context.ts @@ -25,8 +25,11 @@ import type { Serde, Duration, } from "@restatedev/restate-sdk-core"; -import { ContextImpl } from "./context_impl.js"; import type { TerminalError } from "./types/errors.js"; +import { + RestateCombinatorPromise, + RestateCompletedPromise, +} from "./promises.js"; /** * Represents the original request as sent to this handler. @@ -705,6 +708,14 @@ export type InvocationHandle = { export type InvocationPromise = RestatePromise & InvocationHandle; export const RestatePromise = { + resolve(value: T): RestatePromise> { + return new RestateCompletedPromise(Promise.resolve(value)); + }, + + reject(reason: TerminalError): RestatePromise { + return new RestateCompletedPromise(Promise.reject(reason)); + }, + /** * Creates a Promise that is resolved with an array of results when all of the provided Promises * resolve, or rejected when any Promise is rejected. @@ -717,12 +728,7 @@ export const RestatePromise = { all[]>( values: T ): RestatePromise<{ -readonly [P in keyof T]: Awaited }> { - if (values.length === 0) { - throw new Error( - "Expected combineable promise to have at least one promise" - ); - } - return ContextImpl.createCombinator( + return RestateCombinatorPromise.fromPromises( (p) => Promise.all(p), values ) as RestatePromise<{ @@ -742,12 +748,7 @@ export const RestatePromise = { race[]>( values: T ): RestatePromise> { - if (values.length === 0) { - throw new Error( - "Expected combineable promise to have at least one promise" - ); - } - return ContextImpl.createCombinator( + return RestateCombinatorPromise.fromPromises( (p) => Promise.race(p), values ) as RestatePromise>; @@ -766,12 +767,7 @@ export const RestatePromise = { any[]>( values: T ): RestatePromise> { - if (values.length === 0) { - throw new Error( - "Expected combineable promise to have at least one promise" - ); - } - return ContextImpl.createCombinator( + return RestateCombinatorPromise.fromPromises( (p) => Promise.any(p), values ) as RestatePromise>; @@ -791,12 +787,7 @@ export const RestatePromise = { ): RestatePromise<{ -readonly [P in keyof T]: PromiseSettledResult>; }> { - if (values.length === 0) { - throw new Error( - "Expected combineable promise to have at least one promise" - ); - } - return ContextImpl.createCombinator( + return RestateCombinatorPromise.fromPromises( (p) => Promise.allSettled(p), values ) as RestatePromise<{ diff --git a/packages/libs/restate-sdk/src/context_impl.ts b/packages/libs/restate-sdk/src/context_impl.ts index 8c946a19..6c57d67d 100644 --- a/packages/libs/restate-sdk/src/context_impl.ts +++ b/packages/libs/restate-sdk/src/context_impl.ts @@ -61,13 +61,11 @@ import type { import { millisOrDurationToMillis, serde } from "@restatedev/restate-sdk-core"; import { RandImpl } from "./utils/rand.js"; import { CompletablePromise } from "./utils/completable_promise.js"; -import type { AsyncResultValue, InternalRestatePromise } from "./promises.js"; +import type { AsyncResultValue } from "./promises.js"; import { - extractContext, InvocationPendingPromise, pendingPromise, PromisesExecutor, - RestateCombinatorPromise, RestateInvocationPromise, RestatePendingPromise, RestateSinglePromise, @@ -647,37 +645,6 @@ export class ContextImpl return new DurablePromiseImpl(this, name, serde); } - // Used by static methods of RestatePromise - public static createCombinator[]>( - combinatorConstructor: (promises: Promise[]) => Promise, - promises: T - ): RestatePromise { - // Extract context from first promise - const self = extractContext(promises[0]); - if (!self) { - throw new Error("Not a combinable promise"); - } - - // Collect first the promises downcasted to the internal promise type - const castedPromises: InternalRestatePromise[] = []; - for (const promise of promises) { - if (extractContext(promise) !== self) { - self.handleInvocationEndError( - new Error( - "You're mixing up RestatePromises from different RestateContext. This is not supported." - ) - ); - return new RestatePendingPromise(self); - } - castedPromises.push(promise as InternalRestatePromise); - } - return new RestateCombinatorPromise( - self, - combinatorConstructor, - castedPromises - ); - } - // -- Various private methods private processNonCompletableEntry( diff --git a/packages/libs/restate-sdk/src/promises.ts b/packages/libs/restate-sdk/src/promises.ts index 97e3b0c0..78225ce6 100644 --- a/packages/libs/restate-sdk/src/promises.ts +++ b/packages/libs/restate-sdk/src/promises.ts @@ -43,15 +43,36 @@ enum PromiseState { NOT_COMPLETED, } -export const RESTATE_CTX_SYMBOL = Symbol("restateContext"); +export abstract class InternalRestatePromise implements RestatePromise { + abstract then( + onfulfilled: + | ((value: T) => PromiseLike | TResult1) + | undefined + | null, + onrejected: + | ((reason: any) => PromiseLike | TResult2) + | undefined + | null + ): Promise; + abstract catch( + onrejected: + | ((reason: any) => PromiseLike | TResult) + | undefined + | null + ): Promise; + abstract finally(onfinally: (() => void) | undefined | null): Promise; + + abstract map( + mapper: (value?: T, failure?: TerminalError) => U + ): RestatePromise; + abstract orTimeout(millis: Duration | number): RestatePromise; -export interface InternalRestatePromise extends RestatePromise { - [RESTATE_CTX_SYMBOL]: ContextImpl; + abstract tryCancel(): void; + abstract tryComplete(): Promise; + abstract uncompletedLeaves(): Array; + abstract publicPromise(): Promise; - tryCancel(): void; - tryComplete(): Promise; - uncompletedLeaves(): Array; - publicPromise(): Promise; + abstract readonly [Symbol.toStringTag]: string; } export type AsyncResultValue = @@ -61,17 +82,20 @@ export type AsyncResultValue = | { StateKeys: string[] } | { InvocationId: string }; -export function extractContext(n: any): ContextImpl | undefined { +const RESTATE_CTX_SYMBOL = Symbol("restateContext"); + +function extractContext(n: any): ContextImpl | undefined { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access return n[RESTATE_CTX_SYMBOL] as ContextImpl | undefined; } -abstract class AbstractRestatePromise implements InternalRestatePromise { +abstract class BaseRestatePromise extends InternalRestatePromise { [RESTATE_CTX_SYMBOL]: ContextImpl; private pollingPromise?: Promise; private cancelPromise: CompletablePromise = new CompletablePromise(); protected constructor(ctx: ContextImpl) { + super(); this[RESTATE_CTX_SYMBOL] = ctx; } @@ -144,16 +168,16 @@ abstract class AbstractRestatePromise implements InternalRestatePromise { this.cancelPromise.reject(new CancelledError()); } - abstract tryComplete(): Promise; + abstract override tryComplete(): Promise; - abstract uncompletedLeaves(): Array; + abstract override uncompletedLeaves(): Array; - abstract publicPromise(): Promise; + abstract override publicPromise(): Promise; - abstract [Symbol.toStringTag]: string; + abstract override [Symbol.toStringTag]: string; } -export class RestateSinglePromise extends AbstractRestatePromise { +export class RestateSinglePromise extends BaseRestatePromise { private state: PromiseState = PromiseState.NOT_COMPLETED; private completablePromise: CompletablePromise = new CompletablePromise(); @@ -214,7 +238,7 @@ export class RestateInvocationPromise } } -export class RestateCombinatorPromise extends AbstractRestatePromise { +export class RestateCombinatorPromise extends BaseRestatePromise { private state: PromiseState = PromiseState.NOT_COMPLETED; private readonly combinatorPromise: Promise; @@ -231,6 +255,45 @@ export class RestateCombinatorPromise extends AbstractRestatePromise { }); } + // Used by static methods of RestatePromise + public static fromPromises[]>( + combinatorConstructor: (promises: Promise[]) => Promise, + promises: T + ): RestatePromise { + const castedPromises: InternalRestatePromise[] = []; + let foundContext: ContextImpl | undefined = undefined; + + for (const [idx, promise] of promises.entries()) { + if (!(promise instanceof InternalRestatePromise)) { + throw new Error( + `Promise index ${idx} used inside the combinator is not an instance of RestatePromise. This is not supported.` + ); + } else if (foundContext === undefined) { + foundContext = extractContext(promise); + } else { + const thisContext = extractContext(promise); + if (thisContext !== undefined && thisContext !== foundContext) { + throw new Error( + "You're mixing up RestatePromises from different RestateContext. This is not supported." + ); + } + } + castedPromises.push(promise); + } + + if (foundContext === undefined) { + // The only situation where this can happen is when the combined promise contains only RestateCompletedPromise as children. + // In this case, just return back a nice and clean RestateCompletedPromise. + return new RestateCompletedPromise(combinatorConstructor(castedPromises)); + } + + return new RestateCombinatorPromise( + foundContext, + combinatorConstructor, + castedPromises + ); + } + uncompletedLeaves(): number[] { return this.state === PromiseState.COMPLETED ? [] @@ -248,10 +311,11 @@ export class RestateCombinatorPromise extends AbstractRestatePromise { readonly [Symbol.toStringTag] = "RestateCombinatorPromise"; } -export class RestatePendingPromise implements InternalRestatePromise { +export class RestatePendingPromise extends InternalRestatePromise { [RESTATE_CTX_SYMBOL]: ContextImpl; constructor(ctx: ContextImpl) { + super(); this[RESTATE_CTX_SYMBOL] = ctx; } @@ -309,7 +373,7 @@ export class InvocationPendingPromise } } -export class RestateMappedPromise extends AbstractRestatePromise { +export class RestateMappedPromise extends BaseRestatePromise { private publicPromiseMapper: ( value?: T, failure?: TerminalError @@ -361,6 +425,62 @@ export class RestateMappedPromise extends AbstractRestatePromise { readonly [Symbol.toStringTag] = "RestateMappedPromise"; } +export class RestateCompletedPromise extends InternalRestatePromise { + constructor(private readonly completedPromise: Promise) { + super(); + } + + // --- Promise methods + + then( + onfulfilled?: ((value: T) => TResult1 | PromiseLike) | null, + onrejected?: ((reason: any) => TResult2 | PromiseLike) | null + ): Promise { + return this.completedPromise.then(onfulfilled, onrejected); + } + + catch( + onrejected?: ((reason: any) => TResult | PromiseLike) | null + ): Promise { + return this.completedPromise.catch(onrejected); + } + + finally(onfinally?: (() => void) | null): Promise { + return this.completedPromise.finally(onfinally); + } + + // --- RestatePromise methods + + orTimeout(): RestatePromise { + return this; // Timeout never kicks in! + } + + map(mapper: (value?: T, failure?: TerminalError) => U): RestatePromise { + return new RestateCompletedPromise( + this.completedPromise.then( + (value) => mapper(value, undefined), + (reason) => mapper(undefined, reason as TerminalError) + ) + ); + } + + tryCancel() {} + + publicPromise(): Promise { + return this.completedPromise; + } + + tryComplete(): Promise { + return Promise.resolve(); + } + + uncompletedLeaves(): Array { + return []; + } + + readonly [Symbol.toStringTag] = "RestateCombinatorPromise"; +} + /** * Promises executor, gluing VM with I/O and Promises given to user space. */ @@ -388,7 +508,7 @@ export class PromisesExecutor { } catch (e) { // This can happen if either take_notification throws an exception or completer throws an exception. // This could either happen for a deserialization issue, or for an SDK bug, but we cover them here. - restatePromise[RESTATE_CTX_SYMBOL].handleInvocationEndError(e); + this.errorCallback(e); return Promise.resolve(); } diff --git a/packages/tests/restate-e2e-services/custom_tests.yaml b/packages/tests/restate-e2e-services/custom_tests.yaml new file mode 100644 index 00000000..06eec779 --- /dev/null +++ b/packages/tests/restate-e2e-services/custom_tests.yaml @@ -0,0 +1,3 @@ +tests: + - name: "all vitest" + command: "pnpm --filter @restatedev/restate-e2e-services run testrunner" diff --git a/packages/tests/restate-e2e-services/package.json b/packages/tests/restate-e2e-services/package.json index 235c7da4..df3bb8a2 100644 --- a/packages/tests/restate-e2e-services/package.json +++ b/packages/tests/restate-e2e-services/package.json @@ -18,9 +18,7 @@ "scripts": { "build": "turbo run _build --filter={.}...", "_build": "tsc -b tsconfig.test.json", - "test": "turbo run _test --filter={.}...", - "_test": "vitest run", - "test:watch": "vitest watch", + "testrunner": "vitest run", "clean": "rm -rf dist *.tsbuildinfo .turbo", "check:types": "turbo run _check:types --filter={.}...", "_check:types": "tsc --noEmit --project tsconfig.test.json", diff --git a/packages/tests/restate-e2e-services/src/app.ts b/packages/tests/restate-e2e-services/src/app.ts index 88e75d3e..86baa129 100644 --- a/packages/tests/restate-e2e-services/src/app.ts +++ b/packages/tests/restate-e2e-services/src/app.ts @@ -24,6 +24,7 @@ import "./proxy.js"; import "./test_utils.js"; import "./kill.js"; import "./virtual_object_command_interpreter.js"; +import "./promise_combinators.js"; import * as http2 from "http2"; import * as heapdump from "heapdump"; import path from "path"; @@ -41,12 +42,13 @@ process.on("SIGUSR2", () => { import { REGISTRY } from "./services.js"; -if (!process.env.SERVICES) { - throw new Error("Cannot find SERVICES env"); -} -const fqdns = new Set(process.env.SERVICES.split(",")); const endpoint = restate.endpoint(); -REGISTRY.register(fqdns, endpoint); +if (!process.env.SERVICES || process.env.SERVICES == "*") { + REGISTRY.registerAll(endpoint); +} else { + const fqdns = new Set(process.env.SERVICES.split(",")); + REGISTRY.register(fqdns, endpoint); +} const settings: http2.Settings = {}; if (process.env.MAX_CONCURRENT_STREAMS) { diff --git a/packages/tests/restate-e2e-services/src/promise_combinators.ts b/packages/tests/restate-e2e-services/src/promise_combinators.ts new file mode 100644 index 00000000..d5051600 --- /dev/null +++ b/packages/tests/restate-e2e-services/src/promise_combinators.ts @@ -0,0 +1,137 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate e2e tests, +// which are released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/e2e/blob/main/LICENSE + +import * as restate from "@restatedev/restate-sdk"; +import { REGISTRY } from "./services.js"; + +const promiseCombinators = restate.service({ + name: "PromiseCombinators", + handlers: { + // --- RestatePromise.resolve / reject --- + + resolveWithValue: async ( + _ctx: restate.Context, + value: string + ): Promise => { + return RestatePromise.resolve(value); + }, + + rejectWithTerminalError: async ( + _ctx: restate.Context, + message: string + ): Promise => { + return RestatePromise.reject(new restate.TerminalError(message)); + }, + + // --- Combinators with RestatePromise.resolve/reject --- + + allWithResolvedPromises: async ( + _ctx: restate.Context, + values: string[] + ): Promise => { + const promises = values.map((v) => RestatePromise.resolve(v)); + return RestatePromise.all(promises); + }, + + allWithOneRejected: async ( + _ctx: restate.Context, + input: { values: string[]; rejectIndex: number; errorMessage: string } + ): Promise => { + const promises = input.values.map((v, i) => + i === input.rejectIndex + ? RestatePromise.reject( + new restate.TerminalError(input.errorMessage) + ) + : RestatePromise.resolve(v) + ); + return RestatePromise.all(promises); + }, + + raceWithResolvedPromises: async ( + _ctx: restate.Context, + values: string[] + ): Promise => { + const promises = values.map((v) => RestatePromise.resolve(v)); + return RestatePromise.race(promises); + }, + + anyWithResolvedPromises: async ( + _ctx: restate.Context, + values: string[] + ): Promise => { + const promises = values.map((v) => RestatePromise.resolve(v)); + return RestatePromise.any(promises); + }, + + anyWithAllRejected: async ( + _ctx: restate.Context, + messages: string[] + ): Promise => { + const promises = messages.map((m) => + RestatePromise.reject(new restate.TerminalError(m)) + ); + return RestatePromise.any(promises); + }, + + allSettledMixed: async ( + _ctx: restate.Context, + input: { values: string[]; rejectIndices: number[] } + ): Promise[]> => { + const rejectSet = new Set(input.rejectIndices); + const promises = input.values.map((v, i) => + rejectSet.has(i) + ? RestatePromise.reject(new restate.TerminalError(v)) + : RestatePromise.resolve(v) + ); + return RestatePromise.allSettled(promises); + }, + + // --- Empty array combinators --- + + allEmpty: async (_ctx: restate.Context): Promise => { + return RestatePromise.all([]); + }, + + allSettledEmpty: async ( + _ctx: restate.Context + ): Promise[]> => { + return RestatePromise.allSettled([]); + }, + + // --- Mixed: context promises + resolved/rejected --- + + allMixedWithSleep: async ( + ctx: restate.Context, + input: { sleepMs: number; resolvedValue: string } + ): Promise<[string, string]> => { + const sleepPromise = ctx + .sleep(input.sleepMs) + .map(() => "slept" as string); + const resolvedPromise = RestatePromise.resolve(input.resolvedValue); + return RestatePromise.all([sleepPromise, resolvedPromise]); + }, + + raceMixedWithSleep: async ( + ctx: restate.Context, + input: { sleepMs: number; resolvedValue: string } + ): Promise => { + const sleepPromise = ctx + .sleep(input.sleepMs) + .map(() => "slept" as string); + const resolvedPromise = RestatePromise.resolve(input.resolvedValue); + return RestatePromise.race([sleepPromise, resolvedPromise]); + }, + }, +}); + +const { RestatePromise } = restate; + +REGISTRY.addService(promiseCombinators); + +export type PromiseCombinators = typeof promiseCombinators; diff --git a/packages/tests/restate-e2e-services/src/services.ts b/packages/tests/restate-e2e-services/src/services.ts index ce2a5690..778c4137 100644 --- a/packages/tests/restate-e2e-services/src/services.ts +++ b/packages/tests/restate-e2e-services/src/services.ts @@ -47,6 +47,12 @@ export class ComponentRegistry { }); } + registerAll(e: RestateEndpoint) { + this.components.forEach((svc) => { + svc.binder(e); + }); + } + register(fqdns: Set, e: RestateEndpoint) { fqdns.forEach((fqdn) => { const c = this.components.get(fqdn); diff --git a/packages/tests/restate-e2e-services/test/promise_combinators.test.ts b/packages/tests/restate-e2e-services/test/promise_combinators.test.ts new file mode 100644 index 00000000..a195f3f0 --- /dev/null +++ b/packages/tests/restate-e2e-services/test/promise_combinators.test.ts @@ -0,0 +1,117 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate e2e tests, +// which are released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/e2e/blob/main/LICENSE + +import { describe, it, expect } from "vitest"; +import { ingressClient } from "./utils.js"; +import type { PromiseCombinators } from "../src/promise_combinators.js"; + +const PromiseCombinators: PromiseCombinators = { + name: "PromiseCombinators", +}; + +describe("PromiseCombinators", () => { + const ingress = ingressClient(); + const client = ingress.serviceClient(PromiseCombinators); + + // --- RestatePromise.resolve --- + + it("resolve returns the value", async () => { + const result = await client.resolveWithValue("hello"); + expect(result).toBe("hello"); + }); + + // --- RestatePromise.reject --- + + it("reject throws TerminalError", async () => { + await expect(client.rejectWithTerminalError("boom")).rejects.toThrow( + "boom" + ); + }); + + // --- RestatePromise.all with resolved promises --- + + it("all with resolved promises returns all values", async () => { + const result = await client.allWithResolvedPromises(["a", "b", "c"]); + expect(result).toEqual(["a", "b", "c"]); + }); + + it("all with one rejected propagates the rejection", async () => { + await expect( + client.allWithOneRejected({ + values: ["a", "b", "c"], + rejectIndex: 1, + errorMessage: "fail at 1", + }) + ).rejects.toThrow("fail at 1"); + }); + + // --- RestatePromise.race with resolved promises --- + + it("race with resolved promises returns first value", async () => { + const result = await client.raceWithResolvedPromises(["first", "second"]); + expect(result).toBe("first"); + }); + + // --- RestatePromise.any --- + + it("any with resolved promises returns first fulfilled", async () => { + const result = await client.anyWithResolvedPromises(["x", "y"]); + expect(result).toBe("x"); + }); + + // TODO: Skipped - AggregateError from Promise.any is not converted to TerminalError by the SDK. + // See: https://github.com/restatedev/sdk-typescript/issues/672 + it.skip("any with all rejected throws", async () => { + await expect(client.anyWithAllRejected(["err1", "err2"])).rejects.toThrow(); + }); + + // --- RestatePromise.allSettled mixed --- + + it("allSettled with mixed resolved and rejected", async () => { + const result = await client.allSettledMixed({ + values: ["ok", "fail", "ok2"], + rejectIndices: [1], + }); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ status: "fulfilled", value: "ok" }); + expect(result[1]).toMatchObject({ status: "rejected" }); + expect(result[2]).toEqual({ status: "fulfilled", value: "ok2" }); + }); + + // --- Empty array combinators --- + + it("all with empty array returns empty array", async () => { + const result = await client.allEmpty(); + expect(result).toEqual([]); + }); + + it("allSettled with empty array returns empty array", async () => { + const result = await client.allSettledEmpty(); + expect(result).toEqual([]); + }); + + // --- Mixed context promises + resolved/rejected --- + + it("all mixed with sleep and resolved promise", async () => { + const result = await client.allMixedWithSleep({ + sleepMs: 100, + resolvedValue: "instant", + }); + expect(result).toEqual(["slept", "instant"]); + }); + + it("race mixed: resolved promise wins over sleep", async () => { + const result = await client.raceMixedWithSleep({ + sleepMs: 60000, + resolvedValue: "instant", + }); + expect(result).toBe("instant"); + }); +}); diff --git a/packages/tests/restate-e2e-services/test/utils.ts b/packages/tests/restate-e2e-services/test/utils.ts new file mode 100644 index 00000000..46119b63 --- /dev/null +++ b/packages/tests/restate-e2e-services/test/utils.ts @@ -0,0 +1,30 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate e2e tests, +// which are released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/e2e/blob/main/LICENSE + +import * as restate from "@restatedev/restate-sdk-clients"; + +export function getIngressUrl(): string { + const url = process.env.RESTATE_INGRESS_URL; + if (!url) { + throw new Error("RESTATE_INGRESS_URL environment variable is not set"); + } + return url; +} + +export function getAdminUrl(): string { + const url = process.env.RESTATE_ADMIN_URL; + if (!url) { + throw new Error("RESTATE_ADMIN_URL environment variable is not set"); + } + return url; +} + +export function ingressClient() { + return restate.connect({ url: getIngressUrl() }); +}