diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index ddc3f045..9290b751 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -67,6 +67,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -155,6 +157,8 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java +src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java src/main/java/dev/openfga/sdk/api/model/TupleChange.java src/main/java/dev/openfga/sdk/api/model/TupleKey.java diff --git a/README.md b/README.md index 51a10cc9..9a829ee1 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,7 @@ try { | [**readAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**readAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**writeAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1310,6 +1311,10 @@ try { - [Store](https://github.com/openfga/java-sdk/blob/main/docs/Store.md) +- [StreamResultOfStreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamResultOfStreamedListObjectsResponse.md) + +- [StreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamedListObjectsResponse.md) + - [Tuple](https://github.com/openfga/java-sdk/blob/main/docs/Tuple.md) - [TupleChange](https://github.com/openfga/java-sdk/blob/main/docs/TupleChange.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 80986b1c..09d0f1fb 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -32,6 +32,8 @@ All URIs are relative to *http://localhost* | [**readAuthorizationModelsWithHttpInfo**](OpenFgaApi.md#readAuthorizationModelsWithHttpInfo) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](OpenFgaApi.md#readChanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | | [**readChangesWithHttpInfo**](OpenFgaApi.md#readChangesWithHttpInfo) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](OpenFgaApi.md#streamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | +| [**streamedListObjectsWithHttpInfo**](OpenFgaApi.md#streamedListObjectsWithHttpInfo) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeWithHttpInfo**](OpenFgaApi.md#writeWithHttpInfo) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](OpenFgaApi.md#writeAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | @@ -2301,6 +2303,167 @@ No authorization required | **500** | Request failed due to internal server error. | - | +## streamedListObjects + +> CompletableFuture streamedListObjects(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture result = apiInstance.streamedListObjects(storeId, body); + System.out.println(result.get()); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture<[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md)> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +## streamedListObjectsWithHttpInfo + +> CompletableFuture> streamedListObjects streamedListObjectsWithHttpInfo(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture> response = apiInstance.streamedListObjectsWithHttpInfo(storeId, body); + System.out.println("Status code: " + response.get().getStatusCode()); + System.out.println("Response headers: " + response.get().getHeaders()); + System.out.println("Response body: " + response.get().getData()); + } catch (InterruptedException | ExecutionException e) { + ApiException apiException = (ApiException)e.getCause(); + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + apiException.getCode()); + System.err.println("Response headers: " + apiException.getResponseHeaders()); + System.err.println("Reason: " + apiException.getResponseBody()); + e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Response headers: " + e.getResponseHeaders()); + System.err.println("Reason: " + e.getResponseBody()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + + ## write > CompletableFuture write(storeId, body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..af23d053 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamResultOfStreamedListObjectsResponse + + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] | +|**error** | [**Status**](Status.md) | | [optional] | + + + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..04b00157 --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamedListObjectsResponse + +The response for a StreamedListObjects RPC. + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**_object** | **String** | | | + + + diff --git a/examples/streamed-list-objects/.env.example b/examples/streamed-list-objects/.env.example new file mode 100644 index 00000000..65c1e29d --- /dev/null +++ b/examples/streamed-list-objects/.env.example @@ -0,0 +1,11 @@ +# OpenFGA Configuration (REQUIRED) +FGA_API_URL=http://localhost:8080 +FGA_STORE_ID=store_id_here +FGA_MODEL_ID=model_id_here + +# Authentication (optional - for authenticated OpenFGA instances) +FGA_CLIENT_ID=client_id_here +FGA_CLIENT_SECRET=client_secret_here +FGA_API_AUDIENCE=api_audience_here +FGA_API_TOKEN_ISSUER=api_issuer_here + diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile new file mode 100644 index 00000000..ca3179bc --- /dev/null +++ b/examples/streamed-list-objects/Makefile @@ -0,0 +1,15 @@ +.PHONY: all build run run-openfga + +all: build + +openfga_version=latest + +build: + ./gradlew build + +run: + ./gradlew run + +run-openfga: + docker pull docker.io/openfga/openfga:${openfga_version} && \ + docker run -p 8080:8080 docker.io/openfga/openfga:${openfga_version} run diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md new file mode 100644 index 00000000..33ce170f --- /dev/null +++ b/examples/streamed-list-objects/README.md @@ -0,0 +1,117 @@ +# Streamed List Objects Example + +This example demonstrates working with [OpenFGA's `/streamed-list-objects` endpoint](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) using the Java SDK's `streamedListObjects()` method. + +## What is Streamed ListObjects? + +The StreamedListObjects API is similar to the regular ListObjects API, but with key differences: + +1. **Streaming Response**: Instead of collecting all objects before returning a response, it streams them to the client as they are collected +2. **No Result Limit**: The number of results returned is only limited by the execution timeout specified in the server configuration (`OPENFGA_LIST_OBJECTS_DEADLINE`) +3. **Immediate Processing**: You can start processing results as they arrive, without waiting for the entire result set + +## When to Use Streamed ListObjects + +Use the Streamed ListObjects API when: + +- You expect a large number of results that would take a long time to collect +- You want to start processing results immediately rather than waiting for the complete set +- You might not need all results (e.g., you want to stop after finding a certain number) +- You want to avoid timeout issues with very large result sets + +## Prerequisites + +- Java 11+ +- OpenFGA running on `localhost:8080` + +You can start OpenFGA with Docker by running the following command: + +```bash +docker pull openfga/openfga && docker run -it --rm -p 8080:8080 openfga/openfga run +``` + +## Running the Example + +No additional setup is required to run this example. Simply run the following command: + +```bash +make run +``` + +### Environment Variables (Optional) + +For authenticated OpenFGA instances, set the following environment variables: + +```bash +export FGA_API_URL=http://localhost:8080 # Your OpenFGA server URL +export FGA_CLIENT_ID=your_client_id +export FGA_CLIENT_SECRET=your_client_secret +export FGA_API_TOKEN_ISSUER=your_token_issuer +export FGA_API_AUDIENCE=your_audience +``` + +## Code Examples + +### Basic Usage + +```java +// Create a request +var request = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + +// Call the streaming API and ensure proper resource cleanup +try (var objectStream = fgaClient.streamedListObjects(request).get()) { +// Collect all results +List objects = objectStream + .map(StreamedListObjectsResponse::getObject) + .collect(Collectors.toList()); +} +``` + +### Early Termination + +```java +// Get only the first 10 results, ensuring the stream is closed properly +try (var objectStream = fgaClient.streamedListObjects(request).get()) { + List firstTen = objectStream + .map(StreamedListObjectsResponse::getObject) + .limit(10) + .collect(Collectors.toList()); +} +``` + +### Process as You Go + +```java +// Process each object immediately as it arrives +try (var objectStream = fgaClient.streamedListObjects(request).get()) { + objectStream + .map(StreamedListObjectsResponse::getObject) + .forEach(obj -> { + // Do something with each object + System.out.println("Processing: " + obj); + }); +} +``` + +### With Options + +```java +// Use options to specify consistency preference +var options = new ClientListObjectsOptions() + .consistency(ConsistencyPreference.HIGHER_CONSISTENCY) + .authorizationModelId("01GXSXXXXXXXXXXXXXXXX"); + +var objectStream = fgaClient.streamedListObjects(request, options).get(); +``` + +## Comparison with Regular ListObjects + +| Feature | ListObjects | Streamed ListObjects | +|---------|-------------|---------------------| +| Result Collection | Waits for all results | Streams results as computed | +| Result Limit | Limited by server pagination | Limited only by execution timeout | +| Processing | Must wait for complete response | Can process immediately | +| Use Case | Small to medium result sets | Large result sets, immediate processing | \ No newline at end of file diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle new file mode 100644 index 00000000..53ba4388 --- /dev/null +++ b/examples/streamed-list-objects/build.gradle @@ -0,0 +1,55 @@ +plugins { + id 'application' + id 'com.diffplug.spotless' version '8.0.0' +} + +application { + mainClass = 'dev.openfga.sdk.example.streamedlistobjects.StreamedListObjectsExample' +} + +repositories { + mavenCentral() +} + +ext { + jacksonVersion = "2.20.0" +} + +dependencies { + implementation("dev.openfga:openfga-sdk:0.9.2") + + // Serialization + implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") + implementation("org.openapitools:jackson-databind-nullable:0.2.7") +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +spotless { + // comment out below to run spotless as part of the `check` task + enforceCheck false + format 'misc', { + // define the files (e.g. '*.gradle', '*.md') to apply `misc` to + target '.gitignore' + // define the steps to apply to those files + trimTrailingWhitespace() + indentWithSpaces() // Takes an integer argument if you don't like 4 + endWithNewline() + } + java { + palantirJavaFormat() + removeUnusedImports() + importOrder() + } +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +tasks.register('fmt') { + dependsOn 'spotlessApply' +} \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle.properties b/examples/streamed-list-objects/gradle.properties new file mode 100644 index 00000000..5f544a8e --- /dev/null +++ b/examples/streamed-list-objects/gradle.properties @@ -0,0 +1 @@ +language=java \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..a80b22ce --- /dev/null +++ b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/examples/streamed-list-objects/gradlew b/examples/streamed-list-objects/gradlew new file mode 100755 index 00000000..79a61d42 --- /dev/null +++ b/examples/streamed-list-objects/gradlew @@ -0,0 +1,244 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/examples/streamed-list-objects/model.json b/examples/streamed-list-objects/model.json new file mode 100644 index 00000000..9cfc967e --- /dev/null +++ b/examples/streamed-list-objects/model.json @@ -0,0 +1,121 @@ +{ + "schema_version": "1.1", + "type_definitions": [ + { "type": "user", "relations": {} }, + { + "type": "group", + "relations": { "member": { "this": {} } }, + "metadata": { + "relations": { + "member": { "directly_related_user_types": [{ "type": "user" }] } + } + } + }, + { + "type": "folder", + "relations": { + "can_create_file": { + "computedUserset": { "object": "", "relation": "owner" } + }, + "owner": { "this": {} }, + "parent": { "this": {} }, + "viewer": { + "union": { + "child": [ + { "this": {} }, + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "viewer" } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_create_file": { "directly_related_user_types": [] }, + "owner": { "directly_related_user_types": [{ "type": "user" }] }, + "parent": { "directly_related_user_types": [{ "type": "folder" }] }, + "viewer": { + "directly_related_user_types": [ + { "type": "user" }, + { "type": "user", "wildcard": {} }, + { "type": "group", "relation": "member" } + ] + } + } + } + }, + { + "type": "document", + "relations": { + "can_change_owner": { + "computedUserset": { "object": "", "relation": "owner" } + }, + "owner": { "this": {} }, + "parent": { "this": {} }, + "can_read": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "viewer" } }, + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "viewer" } + } + } + ] + } + }, + "can_share": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "owner" } + } + } + ] + } + }, + "viewer": { "this": {} }, + "can_write": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "owner" } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_change_owner": { "directly_related_user_types": [] }, + "owner": { "directly_related_user_types": [{ "type": "user" }] }, + "parent": { "directly_related_user_types": [{ "type": "folder" }] }, + "can_read": { "directly_related_user_types": [] }, + "can_share": { "directly_related_user_types": [] }, + "viewer": { + "directly_related_user_types": [ + { "type": "user" }, + { "type": "user", "wildcard": {} }, + { "type": "group", "relation": "member" } + ] + }, + "can_write": { "directly_related_user_types": [] } + } + } + } + ] +} \ No newline at end of file diff --git a/examples/streamed-list-objects/settings.gradle b/examples/streamed-list-objects/settings.gradle new file mode 100644 index 00000000..549c237b --- /dev/null +++ b/examples/streamed-list-objects/settings.gradle @@ -0,0 +1,7 @@ +rootProject.name = 'streamed-list-objects-example' + +includeBuild('../../') { + dependencySubstitution { + substitute(module('dev.openfga:openfga-sdk')).using(project(':')) + } +} \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java new file mode 100644 index 00000000..1e9634d8 --- /dev/null +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java @@ -0,0 +1,157 @@ +package dev.openfga.sdk.example.streamedlistobjects; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.OpenFgaClient; +import dev.openfga.sdk.api.client.model.*; +import dev.openfga.sdk.api.configuration.*; +import dev.openfga.sdk.api.model.*; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Example demonstrating the usage of the Streamed ListObjects API. + * + *

This example demonstrates working with OpenFGA's `/streamed-list-objects` endpoint using the + * Java SDK's `streamedListObjects()` method. + * + *

The Streamed ListObjects API returns results as they are computed, rather than collecting all + * results before returning. This is particularly useful for: + * + *

    + *
  • Large result sets that would take a long time to collect + *
  • Scenarios where you want to start processing results immediately + *
  • Cases where you might not need all results (early termination) + *
+ */ +public class StreamedListObjectsExample { + private static final int TUPLE_COUNT = 2000; + + public static void main(String[] args) throws Exception { + // Configure the SDK + var credentials = new Credentials(); + if (System.getenv("FGA_CLIENT_ID") != null) { + credentials = new Credentials(new ClientCredentials() + .apiAudience(System.getenv("FGA_API_AUDIENCE")) + .apiTokenIssuer(System.getenv("FGA_API_TOKEN_ISSUER")) + .clientId(System.getenv("FGA_CLIENT_ID")) + .clientSecret(System.getenv("FGA_CLIENT_SECRET"))); + } + + var configuration = new ClientConfiguration() + .apiUrl(System.getenv("FGA_API_URL")) // e.g., http://localhost:8080 + .credentials(credentials); + + var fgaClient = new OpenFgaClient(configuration); + + // Create our temporary store + String storeId = createStore(fgaClient); + System.out.println("Created temporary store (" + storeId + ")"); + + // Configure the SDK to use the temporary store for the rest of the example + fgaClient.setStoreId(storeId); + + // Load the authorization model from a file and write it to the server + String modelId = writeModel(fgaClient); + System.out.println("Created temporary authorization model (" + modelId + ")\n"); + + // Configure the SDK to use this authorization model for the rest of the example + fgaClient.setAuthorizationModelId(modelId); + + // Write a bunch of example tuples to the temporary store + int wrote = writeTuples(fgaClient, TUPLE_COUNT); + System.out.println("Wrote " + wrote + " tuples to the store.\n"); + + //////////////////////////////// + // Demonstrate streaming vs standard list objects + + // Craft a request to list all `documents` owned by `user:anne` + var request = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + // Send a single request to the server using both the streamed and standard endpoints + List streamedResults = streamedListObjects(fgaClient, request); + List standardResults = listObjects(fgaClient, request); + + System.out.println( + "/streamed-list-objects returned " + streamedResults.size() + " objects in a single request."); + + System.out.println("/list-objects returned " + standardResults.size() + " objects in a single request."); + + //////////////////////////////// + // Clean up - delete the test store + fgaClient.deleteStore().get(); + System.out.println("\nDeleted temporary store (" + storeId + ")"); + } + + /** + * Create a temporary store. The store will be deleted at the end of the example. + */ + private static String createStore(OpenFgaClient fgaClient) throws Exception { + var response = fgaClient + .createStore(new CreateStoreRequest().name("Demo Store")) + .get(); + return response.getId(); + } + + /** + * Load the authorization model from a file and write it to the server. + */ + private static String writeModel(OpenFgaClient fgaClient) throws Exception { + var mapper = new ObjectMapper(); + var modelFile = new File("model.json"); + var authModel = mapper.readValue(modelFile, new TypeReference() {}); + var response = fgaClient.writeAuthorizationModel(authModel).get(); + return response.getAuthorizationModelId(); + } + + /** + * Write a variable number of tuples to the temporary store. + */ + private static int writeTuples(OpenFgaClient fgaClient, int quantity) throws Exception { + int chunks = quantity / 100; + + for (int chunk = 0; chunk < chunks; chunk++) { + List writes = new ArrayList<>(); + for (int t = 0; t < 100; t++) { + writes.add(new ClientTupleKey() + .user("user:anne") + .relation("owner") + ._object("document:" + (chunk * 100 + t))); + } + fgaClient.write(new ClientWriteRequest().writes(writes)).get(); + } + + return quantity; + } + + /** + * Send our request to the streaming endpoint, and collect all results. + * + *

Note that streamedListObjects() returns a Stream, so we could process results as they + * come in. For the sake of this example, we'll just collect all the results into a list and + * return them all at once. + */ + private static List streamedListObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request) + throws Exception { + try (var objectStream = fgaClient.streamedListObjects(request).get()) { + return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } + } + + /** + * For comparison sake, here is the non-streamed version of the same call, using + * listObjects(). + * + *

Note that in the non-streamed version, the server will return a maximum of 1000 results. + */ + private static List listObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request) + throws Exception { + var response = fgaClient.listObjects(request).get(); + return response.getObjects(); + } +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java index d0589ca0..0ea565a2 100644 --- a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +++ b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java @@ -38,6 +38,7 @@ import dev.openfga.sdk.api.model.ReadChangesResponse; import dev.openfga.sdk.api.model.ReadRequest; import dev.openfga.sdk.api.model.ReadResponse; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.WriteAssertionsRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelResponse; @@ -906,6 +907,68 @@ private CompletableFuture> readChanges( } } + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @param configurationOverride Override the {@link Configuration} this OpenFgaApi was constructed with + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration.override(configurationOverride)); + } + + private CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = buildTelemetryAttributes(methodParameters); + + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return new HttpRequestAttempt<>( + request, + "streamedListObjects", + StreamResultOfStreamedListObjectsResponse.class, + apiClient, + configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (ApiException e) { + return CompletableFuture.failedFuture(e); + } + } + /** * Add or delete tuples from the store * The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java new file mode 100644 index 00000000..8ae1eb43 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -0,0 +1,75 @@ +package dev.openfga.sdk.api; + +import static dev.openfga.sdk.util.Validation.assertParamExists; + +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.client.HttpRequestAttempt; +import dev.openfga.sdk.api.client.StreamingResponseBody; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.errors.ApiException; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import dev.openfga.sdk.telemetry.Attribute; +import dev.openfga.sdk.telemetry.Attributes; +import java.net.http.HttpRequest; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * API handler for streamed list objects operations. + * This class is separate from the generated OpenFgaApi to avoid modifications to generated code. + */ +public class StreamedListObjectsApi { + private final Configuration configuration; + private final ApiClient apiClient; + + public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { + this.configuration = configuration; + this.apiClient = apiClient; + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Returns raw NDJSON response for parsing by the client layer. + * + * @param storeId The store ID (required) + * @param body The list objects request body (required) + * @param requestConfiguration The configuration to use for this request + * @return CompletableFuture with raw streaming response + * @throws ApiException if fails to make API call + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration requestConfiguration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/" + storeId + "/streamed-list-objects"; + + try { + byte[] requestBody = apiClient.getObjectMapper().writeValueAsBytes(body); + HttpRequest.Builder requestBuilder = + ApiClient.requestBuilder("POST", path, requestBody, requestConfiguration); + + HttpRequest httpRequest = requestBuilder.build(); + + Map telemetryAttributes = new HashMap<>(); + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + return new HttpRequestAttempt<>( + httpRequest, + "streamedListObjects", + StreamingResponseBody.class, + apiClient, + requestConfiguration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (Exception e) { + return CompletableFuture.failedFuture(new ApiException(e)); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java index 4cf6f6a2..6364a90a 100644 --- a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java +++ b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java @@ -12,6 +12,7 @@ import dev.openfga.sdk.util.RetryAfterHeaderParser; import dev.openfga.sdk.util.RetryStrategy; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.net.http.*; import java.nio.ByteBuffer; @@ -90,18 +91,27 @@ private HttpClient getHttpClient() { private CompletableFuture> attemptHttpRequest( HttpClient httpClient, int retryNumber, Throwable previousError) { - return httpClient - .sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .handle((response, throwable) -> { - if (throwable != null) { - // Handle network errors (no HTTP response received) - return handleNetworkError(throwable, retryNumber); - } - - // Handle HTTP response (including error status codes) - return processHttpResponse(response, retryNumber, previousError); - }) - .thenCompose(Function.identity()); + if (clazz == StreamingResponseBody.class) { + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()) + .handle((response, throwable) -> { + if (throwable != null) { + return handleNetworkError(throwable, retryNumber); + } + return processHttpResponseStreaming(response, retryNumber, previousError); + }) + .thenCompose(Function.identity()); + } else { + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .handle((response, throwable) -> { + if (throwable != null) { + return handleNetworkError(throwable, retryNumber); + } + return processHttpResponse(response, retryNumber, previousError); + }) + .thenCompose(Function.identity()); + } } private CompletableFuture> handleNetworkError(Throwable throwable, int retryNumber) { @@ -211,11 +221,115 @@ private CompletableFuture> processHttpResponse( response.statusCode(), response.headers().map(), response.body(), modeledResponse)); } + private CompletableFuture> processHttpResponseStreaming( + HttpResponse response, int retryNumber, Throwable previousError) { + int statusCode = response.statusCode(); + + if (!HttpStatusCode.isSuccessful(statusCode)) { + try { + String bodyStr = new String(response.body().readAllBytes(), StandardCharsets.UTF_8); + HttpResponse stringResponse = new HttpResponse<>() { + @Override + public int statusCode() { + return response.statusCode(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public java.util.Optional> previousResponse() { + return java.util.Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return response.headers(); + } + + @Override + public String body() { + return bodyStr; + } + + @Override + public java.util.Optional sslSession() { + return response.sslSession(); + } + + @Override + public java.net.URI uri() { + return response.uri(); + } + + @Override + public HttpClient.Version version() { + return response.version(); + } + }; + + Optional fgaError = + FgaError.getError(name, request, configuration, stringResponse, previousError); + if (fgaError.isPresent()) { + FgaError error = fgaError.get(); + if (retryNumber < configuration.getMaxRetries()) { + Optional retryAfterDelay = response.headers() + .firstValue(FgaConstants.RETRY_AFTER_HEADER_NAME) + .flatMap(RetryAfterHeaderParser::parseRetryAfter); + if (RetryStrategy.shouldRetry(statusCode)) { + return handleHttpErrorRetry(retryAfterDelay, retryNumber, error); + } + } + return CompletableFuture.failedFuture(error); + } + } catch (IOException e) { + return CompletableFuture.failedFuture(new ApiException(e)); + } + } + + addTelemetryAttributes(Attributes.fromHttpResponse(response, this.configuration.getCredentials())); + + if (retryNumber > 0) { + addTelemetryAttribute(Attributes.HTTP_REQUEST_RESEND_COUNT, String.valueOf(retryNumber)); + } + + if (response.headers() + .firstValue(FgaConstants.QUERY_DURATION_HEADER_NAME) + .isPresent()) { + String queryDuration = response.headers() + .firstValue(FgaConstants.QUERY_DURATION_HEADER_NAME) + .orElse(null); + + if (!isNullOrWhitespace(queryDuration)) { + try { + double queryDurationDouble = Double.parseDouble(queryDuration); + telemetry.metrics().queryDuration(queryDurationDouble, this.getTelemetryAttributes()); + } catch (NumberFormatException e) { + } + } + } + + Double requestDuration = (double) (System.currentTimeMillis() - requestStarted); + telemetry.metrics().requestDuration(requestDuration, this.getTelemetryAttributes()); + + @SuppressWarnings("unchecked") + T result = (T) new StreamingResponseBody(response.body()); + return CompletableFuture.completedFuture( + new ApiResponse<>(response.statusCode(), response.headers().map(), null, result)); + } + private CompletableFuture deserializeResponse(HttpResponse response) { if (clazz == Void.class && isNullOrWhitespace(response.body())) { return CompletableFuture.completedFuture(null); } - + // Special handling for streaming responses - don't deserialize, just wrap the raw string + if (clazz == StreamingResponseString.class) { + @SuppressWarnings("unchecked") + T result = (T) new StreamingResponseString(response.body()); + return CompletableFuture.completedFuture(result); + } try { T deserialized = apiClient.getObjectMapper().readValue(response.body(), clazz); return CompletableFuture.completedFuture(deserialized); diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index c8334917..a6cc0ed7 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -24,6 +24,7 @@ public class OpenFgaClient { private final ApiClient apiClient; private ClientConfiguration configuration; private OpenFgaApi api; + private StreamedListObjectsApi streamedListObjectsApi; public OpenFgaClient(ClientConfiguration configuration) throws FgaInvalidParameterException { this(configuration, new ApiClient()); @@ -33,6 +34,7 @@ public OpenFgaClient(ClientConfiguration configuration, ApiClient apiClient) thr this.apiClient = apiClient; this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(configuration, apiClient); } /* *********** @@ -57,6 +59,7 @@ public void setAuthorizationModelId(String authorizationModelId) { public void setConfiguration(ClientConfiguration configuration) throws FgaInvalidParameterException { this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(configuration, apiClient); } /* ******** @@ -1104,6 +1107,86 @@ public CompletableFuture listObjects( return call(() -> api.listObjects(storeId, body, overrides)).thenApply(ClientListObjectsResponse::new); } + /** + * StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates) + * + * Returns a Stream of objects that can be iterated. The streaming API returns results as they + * are computed, rather than collecting all results before returning. This is useful for + * large result sets. + * + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture> streamedListObjects(ClientListObjectsRequest request) + throws FgaInvalidParameterException { + return streamedListObjects(request, null); + } + + /** + * StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates) + * + * Returns a Stream of objects that can be iterated. The streaming API returns results as they + * are computed, rather than collecting all results before returning. This is useful for + * large result sets. + * + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture> streamedListObjects( + ClientListObjectsRequest request, ClientListObjectsOptions options) throws FgaInvalidParameterException { + configuration.assertValid(); + String storeId = configuration.getStoreIdChecked(); + + ListObjectsRequest body = new ListObjectsRequest(); + + if (request != null) { + body.user(request.getUser()).relation(request.getRelation()).type(request.getType()); + if (request.getContextualTupleKeys() != null) { + var contextualTuples = request.getContextualTupleKeys(); + var bodyContextualTuples = ClientTupleKey.asContextualTupleKeys(contextualTuples); + body.contextualTuples(bodyContextualTuples); + } + if (request.getContext() != null) { + body.context(request.getContext()); + } + } + + if (options != null) { + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } + + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); + body.authorizationModelId(authorizationModelId); + } else { + body.setAuthorizationModelId(configuration.getAuthorizationModelId()); + } + + Configuration config = configuration.override(new ConfigurationOverride().addHeaders(options)); + + return call(() -> streamedListObjectsApi.streamedListObjects(storeId, body, config)) + .thenApply(response -> { + StreamingResponseBody srb = response.getData(); + java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(srb.getBody(), java.nio.charset.StandardCharsets.UTF_8)); + StreamedResponseIterator iterator = + new StreamedResponseIterator(reader, apiClient.getObjectMapper()); + Stream stream = java.util.stream.StreamSupport.stream( + ((Iterable) () -> iterator).spliterator(), false); + return stream.onClose(() -> { + try { + iterator.close(); + } catch (java.io.IOException ignore) { + } + try { + srb.close(); + } catch (java.io.IOException ignore) { + } + }); + }); + } + /** * ListRelations - List allowed relations a user has with an object (evaluates) */ diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java new file mode 100644 index 00000000..a6e83592 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java @@ -0,0 +1,100 @@ +package dev.openfga.sdk.api.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator for parsing newline-delimited JSON streaming responses. + * Each line in the response is a StreamResultOfStreamedListObjectsResponse. + * + * If an error is encountered in the stream (either from parsing or from an error + * response), it will be thrown as a StreamingException when hasNext() or next() is called. + */ +public class StreamedResponseIterator implements Iterator, AutoCloseable { + private final BufferedReader reader; + private final ObjectMapper objectMapper; + private StreamedListObjectsResponse nextItem; + private boolean hasNext; + private StreamingException pendingException; + + public StreamedResponseIterator(Reader reader, ObjectMapper objectMapper) { + this.reader = reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); + this.objectMapper = objectMapper; + this.hasNext = true; + this.pendingException = null; + advance(); + } + + private void advance() { + try { + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty()) { + continue; + } + + StreamResultOfStreamedListObjectsResponse streamResult = + objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class); + + if (streamResult.getResult() != null) { + nextItem = streamResult.getResult(); + return; + } + + if (streamResult.getError() != null) { + pendingException = new StreamingException(streamResult.getError()); + hasNext = false; + nextItem = null; + return; + } + } + // No more lines + hasNext = false; + nextItem = null; + } catch (IOException e) { + pendingException = new StreamingException("Failed to parse streaming response", e); + hasNext = false; + nextItem = null; + } + } + + @Override + public boolean hasNext() { + if (pendingException != null) { + throw pendingException; + } + return hasNext && nextItem != null; + } + + @Override + public StreamedListObjectsResponse next() { + if (pendingException != null) { + throw pendingException; + } + + if (!hasNext()) { + throw new NoSuchElementException(); + } + + StreamedListObjectsResponse current = nextItem; + advance(); + + if (pendingException != null) { + throw pendingException; + } + + return current; + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingException.java b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java new file mode 100644 index 00000000..a6283222 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java @@ -0,0 +1,31 @@ +package dev.openfga.sdk.api.client; + +import dev.openfga.sdk.api.model.Status; + +public class StreamingException extends RuntimeException { + private final Status error; + + public StreamingException(Status error) { + super(formatErrorMessage(error)); + this.error = error; + } + + public StreamingException(String message, Throwable cause) { + super(message, cause); + this.error = null; + } + + public Status getError() { + return error; + } + + public Integer getCode() { + return error != null ? error.getCode() : null; + } + + private static String formatErrorMessage(Status error) { + String codeStr = error.getCode() != null ? String.valueOf(error.getCode()) : "unknown"; + String messageStr = error.getMessage() != null ? error.getMessage() : "Unknown error"; + return String.format("Error in streaming response: code=%s, message=%s", codeStr, messageStr); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java new file mode 100644 index 00000000..72bb0af1 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java @@ -0,0 +1,24 @@ +package dev.openfga.sdk.api.client; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +public final class StreamingResponseBody implements Closeable { + private final InputStream body; + + public StreamingResponseBody(InputStream body) { + this.body = body; + } + + public InputStream getBody() { + return body; + } + + @Override + public void close() throws IOException { + if (body != null) { + body.close(); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java new file mode 100644 index 00000000..7cf25574 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java @@ -0,0 +1,17 @@ +package dev.openfga.sdk.api.client; + +/** + * Marker class to indicate that the response should not be deserialized by HttpRequestAttempt. + * Instead, the raw response string should be returned for manual parsing (e.g., NDJSON). + */ +public class StreamingResponseString { + private final String rawResponse; + + public StreamingResponseString(String rawResponse) { + this.rawResponse = rawResponse; + } + + public String getRawResponse() { + return rawResponse; + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java new file mode 100644 index 00000000..20331735 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java @@ -0,0 +1,168 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * StreamResultOfStreamedListObjectsResponse + */ +@JsonPropertyOrder({ + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_RESULT, + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_ERROR +}) +public class StreamResultOfStreamedListObjectsResponse { + public static final String JSON_PROPERTY_RESULT = "result"; + private StreamedListObjectsResponse result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResultOfStreamedListObjectsResponse() {} + + public StreamResultOfStreamedListObjectsResponse result(StreamedListObjectsResponse result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public StreamedListObjectsResponse getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(StreamedListObjectsResponse result) { + this.result = result; + } + + public StreamResultOfStreamedListObjectsResponse error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + /** + * Return true if this Stream_result_of_StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResultOfStreamedListObjectsResponse streamResultOfStreamedListObjectsResponse = + (StreamResultOfStreamedListObjectsResponse) o; + return Objects.equals(this.result, streamResultOfStreamedListObjectsResponse.result) + && Objects.equals(this.error, streamResultOfStreamedListObjectsResponse.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResultOfStreamedListObjectsResponse {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `result` to the URL query string + if (getResult() != null) { + joiner.add(getResult().toUrlQueryString(prefix + "result" + suffix)); + } + + // add `error` to the URL query string + if (getError() != null) { + joiner.add(getError().toUrlQueryString(prefix + "error" + suffix)); + } + + return joiner.toString(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java new file mode 100644 index 00000000..43c02ab5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java @@ -0,0 +1,139 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * The response for a StreamedListObjects RPC. + */ +@JsonPropertyOrder({StreamedListObjectsResponse.JSON_PROPERTY_OBJECT}) +public class StreamedListObjectsResponse { + public static final String JSON_PROPERTY_OBJECT = "object"; + private String _object; + + public StreamedListObjectsResponse() {} + + public StreamedListObjectsResponse _object(String _object) { + this._object = _object; + return this; + } + + /** + * Get _object + * @return _object + **/ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public String getObject() { + return _object; + } + + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setObject(String _object) { + this._object = _object; + } + + /** + * Return true if this StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamedListObjectsResponse streamedListObjectsResponse = (StreamedListObjectsResponse) o; + return Objects.equals(this._object, streamedListObjectsResponse._object); + } + + @Override + public int hashCode() { + return Objects.hash(_object); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamedListObjectsResponse {\n"); + sb.append(" _object: ").append(toIndentedString(_object)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `object` to the URL query string + if (getObject() != null) { + joiner.add(String.format( + "%sobject%s=%s", + prefix, + suffix, + URLEncoder.encode(String.valueOf(getObject()), StandardCharsets.UTF_8) + .replaceAll("\\+", "%20"))); + } + + return joiner.toString(); + } +} diff --git a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java index 8ed6255d..5cfab345 100644 --- a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java +++ b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java @@ -328,6 +328,33 @@ public void write_and_listObjects() throws Exception { assertEquals(DEFAULT_DOC, response.getObjects().get(0)); } + @Test + public void write_and_streamedListObjects() throws Exception { + // Given + String storeName = thisTestName(); + String storeId = createStore(storeName); + fga.setStoreId(storeId); + String authModelId = writeAuthModel(storeId); + fga.setAuthorizationModelId(authModelId); + ClientWriteRequest writeRequest = new ClientWriteRequest().writes(List.of(DEFAULT_TUPLE_KEY)); + ClientListObjectsRequest listObjectsRequest = new ClientListObjectsRequest() + .user(DEFAULT_USER) + .relation("reader") + .type("document"); + + // When + fga.write(writeRequest).get(); + List objects; + try (var stream = fga.streamedListObjects(listObjectsRequest).get()) { + objects = stream.map(StreamedListObjectsResponse::getObject).collect(java.util.stream.Collectors.toList()); + } + + // Then + assertNotNull(objects); + assertEquals(1, objects.size()); + assertEquals(DEFAULT_DOC, objects.get(0)); + } + @Test public void write_readAssertions() throws Exception { // Given diff --git a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java index 2850929d..1c2f1843 100644 --- a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java @@ -2586,6 +2586,134 @@ public void listObjectsWithContextTest() throws Exception { assertEquals(List.of(DEFAULT_OBJECT), response.getObjects()); } + /** + * Test streaming list objects API + */ + @Test + public void streamedListObjectsTest() throws Exception { + // Given + String postPath = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + String expectedBody = String.format( + "{\"authorization_model_id\":\"%s\",\"type\":\"%s\",\"relation\":\"%s\",\"user\":\"%s\",\"contextual_tuples\":null,\"context\":null,\"consistency\":\"%s\"}", + DEFAULT_AUTH_MODEL_ID, DEFAULT_TYPE, DEFAULT_RELATION, DEFAULT_USER, DEFAULT_CONSISTENCY); + + // Simulate NDJSON response - each line is a StreamResultOfStreamedListObjectsResponse + String ndjsonResponse = String.format( + "{\"result\":{\"object\":\"document:1\"}}\n{\"result\":{\"object\":\"document:2\"}}\n{\"result\":{\"object\":\"document:3\"}}\n"); + + mockHttpClient.onPost(postPath).withBody(is(expectedBody)).doReturn(200, ndjsonResponse); + + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + List objects; + try (Stream responseStream = + fga.streamedListObjects(request).get()) { + objects = responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } + + // Then + mockHttpClient.verify().post(postPath).withBody(is(expectedBody)).called(1); + assertEquals(3, objects.size()); + assertEquals("document:1", objects.get(0)); + assertEquals("document:2", objects.get(1)); + assertEquals("document:3", objects.get(2)); + } + + @Test + public void streamedListObjects_storeIdRequired() { + // Given + clientConfiguration.storeId(null); + + // When + var exception = assertThrows( + FgaInvalidParameterException.class, + () -> fga.streamedListObjects(new ClientListObjectsRequest()).get()); + + // Then + assertEquals( + "Required parameter storeId was invalid when calling ClientConfiguration.", exception.getMessage()); + } + + @Test + public void streamedListObjects_400() throws Exception { + // Given + String postUrl = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + mockHttpClient + .onPost(postUrl) + .doReturn(400, "{\"code\":\"validation_error\",\"message\":\"Generic validation error\"}"); + + // When + ExecutionException execException = + assertThrows(ExecutionException.class, () -> fga.streamedListObjects(new ClientListObjectsRequest()) + .get()); + + // Then + mockHttpClient.verify().post(postUrl).called(1); + var exception = assertInstanceOf(FgaApiValidationError.class, execException.getCause()); + assertEquals(400, exception.getStatusCode()); + assertEquals( + "{\"code\":\"validation_error\",\"message\":\"Generic validation error\"}", + exception.getResponseData()); + } + + @Test + public void streamedListObjects_500() throws Exception { + // Given + String postUrl = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + mockHttpClient + .onPost(postUrl) + .doReturn(500, "{\"code\":\"internal_error\",\"message\":\"Internal Server Error\"}"); + + // When + ExecutionException execException = + assertThrows(ExecutionException.class, () -> fga.streamedListObjects(new ClientListObjectsRequest()) + .get()); + + // Then + // POST requests retry on 5xx errors (1 initial + 3 retries = 4 total) + mockHttpClient.verify().post(postUrl).called(4); + var exception = assertInstanceOf(FgaApiInternalError.class, execException.getCause()); + assertEquals(500, exception.getStatusCode()); + assertEquals( + "{\"code\":\"internal_error\",\"message\":\"Internal Server Error\"}", exception.getResponseData()); + } + + @Test + public void streamedListObjects_errorInStream() throws Exception { + // Given + String postPath = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + + // Simulate NDJSON response with an error in the stream + String ndjsonResponse = + "{\"result\":{\"object\":\"document:1\"}}\n{\"error\":{\"code\":500,\"message\":\"Internal error\"}}\n"; + + mockHttpClient.onPost(postPath).doReturn(200, ndjsonResponse); + + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + // Then - should throw when processing the stream + var exception = assertThrows(RuntimeException.class, () -> { + try (Stream responseStream = + fga.streamedListObjects(request).get()) { + responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } + }); + + assertTrue(exception.getMessage().contains("Error in streaming response")); + } + /** * Check whether a user is authorized to access an object. */