From 3a3641a1136086a01ab9f7e9bc2edce3588df831 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Wed, 29 Oct 2025 21:26:42 +0530 Subject: [PATCH 1/9] feat: generate streamedlistobjects API layer from generator --- .openapi-generator/FILES | 4 + README.md | 5 + docs/OpenFgaApi.md | 163 +++++++++++++++++ ...reamResultOfStreamedListObjectsResponse.md | 14 ++ docs/StreamedListObjectsResponse.md | 14 ++ .../java/dev/openfga/sdk/api/OpenFgaApi.java | 63 +++++++ ...amResultOfStreamedListObjectsResponse.java | 168 ++++++++++++++++++ .../model/StreamedListObjectsResponse.java | 139 +++++++++++++++ 8 files changed, 570 insertions(+) create mode 100644 docs/StreamResultOfStreamedListObjectsResponse.md create mode 100644 docs/StreamedListObjectsResponse.md create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java create mode 100644 src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java diff --git a/.openapi-generator/FILES b/.openapi-generator/FILES index ddc3f045..9290b751 100644 --- a/.openapi-generator/FILES +++ b/.openapi-generator/FILES @@ -67,6 +67,8 @@ docs/RelationshipCondition.md docs/SourceInfo.md docs/Status.md docs/Store.md +docs/StreamResultOfStreamedListObjectsResponse.md +docs/StreamedListObjectsResponse.md docs/Tuple.md docs/TupleChange.md docs/TupleKey.md @@ -155,6 +157,8 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java src/main/java/dev/openfga/sdk/api/model/SourceInfo.java src/main/java/dev/openfga/sdk/api/model/Status.java src/main/java/dev/openfga/sdk/api/model/Store.java +src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java +src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java src/main/java/dev/openfga/sdk/api/model/Tuple.java src/main/java/dev/openfga/sdk/api/model/TupleChange.java src/main/java/dev/openfga/sdk/api/model/TupleKey.java diff --git a/README.md b/README.md index 51a10cc9..9a829ee1 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,7 @@ try { | [**readAuthorizationModel**](docs/OpenFgaApi.md#readauthorizationmodel) | **GET** /stores/{store_id}/authorization-models/{id} | Return a particular version of an authorization model | | [**readAuthorizationModels**](docs/OpenFgaApi.md#readauthorizationmodels) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](docs/OpenFgaApi.md#readchanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](docs/OpenFgaApi.md#streamedlistobjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](docs/OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](docs/OpenFgaApi.md#writeassertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | | [**writeAuthorizationModel**](docs/OpenFgaApi.md#writeauthorizationmodel) | **POST** /stores/{store_id}/authorization-models | Create a new authorization model | @@ -1310,6 +1311,10 @@ try { - [Store](https://github.com/openfga/java-sdk/blob/main/docs/Store.md) +- [StreamResultOfStreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamResultOfStreamedListObjectsResponse.md) + +- [StreamedListObjectsResponse](https://github.com/openfga/java-sdk/blob/main/docs/StreamedListObjectsResponse.md) + - [Tuple](https://github.com/openfga/java-sdk/blob/main/docs/Tuple.md) - [TupleChange](https://github.com/openfga/java-sdk/blob/main/docs/TupleChange.md) diff --git a/docs/OpenFgaApi.md b/docs/OpenFgaApi.md index 80986b1c..09d0f1fb 100644 --- a/docs/OpenFgaApi.md +++ b/docs/OpenFgaApi.md @@ -32,6 +32,8 @@ All URIs are relative to *http://localhost* | [**readAuthorizationModelsWithHttpInfo**](OpenFgaApi.md#readAuthorizationModelsWithHttpInfo) | **GET** /stores/{store_id}/authorization-models | Return all the authorization models for a particular store | | [**readChanges**](OpenFgaApi.md#readChanges) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | | [**readChangesWithHttpInfo**](OpenFgaApi.md#readChangesWithHttpInfo) | **GET** /stores/{store_id}/changes | Return a list of all the tuple changes | +| [**streamedListObjects**](OpenFgaApi.md#streamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | +| [**streamedListObjectsWithHttpInfo**](OpenFgaApi.md#streamedListObjectsWithHttpInfo) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with | | [**write**](OpenFgaApi.md#write) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeWithHttpInfo**](OpenFgaApi.md#writeWithHttpInfo) | **POST** /stores/{store_id}/write | Add or delete tuples from the store | | [**writeAssertions**](OpenFgaApi.md#writeAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | @@ -2301,6 +2303,167 @@ No authorization required | **500** | Request failed due to internal server error. | - | +## streamedListObjects + +> CompletableFuture streamedListObjects(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture result = apiInstance.streamedListObjects(storeId, body); + System.out.println(result.get()); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Reason: " + e.getResponseBody()); + System.err.println("Response headers: " + e.getResponseHeaders()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture<[**StreamResultOfStreamedListObjectsResponse**](StreamResultOfStreamedListObjectsResponse.md)> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + +## streamedListObjectsWithHttpInfo + +> CompletableFuture> streamedListObjects streamedListObjectsWithHttpInfo(storeId, body) + +Stream all objects of the given type that the user has a relation with + +The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + +### Example + +```java +// Import classes: +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiException; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.client.models.*; +import dev.openfga.sdk.api.OpenFgaApi; +import java.util.concurrent.CompletableFuture; + +public class Example { + public static void main(String[] args) { + ApiClient defaultClient = Configuration.getDefaultApiClient(); + defaultClient.setBasePath("http://localhost"); + + OpenFgaApi apiInstance = new OpenFgaApi(defaultClient); + String storeId = "storeId_example"; // String | + ListObjectsRequest body = new ListObjectsRequest(); // ListObjectsRequest | + try { + CompletableFuture> response = apiInstance.streamedListObjectsWithHttpInfo(storeId, body); + System.out.println("Status code: " + response.get().getStatusCode()); + System.out.println("Response headers: " + response.get().getHeaders()); + System.out.println("Response body: " + response.get().getData()); + } catch (InterruptedException | ExecutionException e) { + ApiException apiException = (ApiException)e.getCause(); + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + apiException.getCode()); + System.err.println("Response headers: " + apiException.getResponseHeaders()); + System.err.println("Reason: " + apiException.getResponseBody()); + e.printStackTrace(); + } catch (ApiException e) { + System.err.println("Exception when calling OpenFgaApi#streamedListObjects"); + System.err.println("Status code: " + e.getCode()); + System.err.println("Response headers: " + e.getResponseHeaders()); + System.err.println("Reason: " + e.getResponseBody()); + e.printStackTrace(); + } + } +} +``` + +### Parameters + + +| Name | Type | Description | Notes | +|------------- | ------------- | ------------- | -------------| +| **storeId** | **String**| | | +| **body** | [**ListObjectsRequest**](ListObjectsRequest.md)| | | + +### Return type + +CompletableFuture> + + +### Authorization + +No authorization required + +### HTTP request headers + +- **Content-Type**: application/json +- **Accept**: application/json + +### HTTP response details +| Status code | Description | Response headers | +|-------------|-------------|------------------| +| **200** | A successful response.(streaming responses) | - | +| **400** | Request failed due to invalid input. | - | +| **401** | Not authenticated. | - | +| **403** | Forbidden. | - | +| **404** | Request failed due to incorrect path. | - | +| **409** | Request was aborted due a transaction conflict. | - | +| **422** | Request timed out due to excessive request throttling. | - | +| **500** | Request failed due to internal server error. | - | + + ## write > CompletableFuture write(storeId, body) diff --git a/docs/StreamResultOfStreamedListObjectsResponse.md b/docs/StreamResultOfStreamedListObjectsResponse.md new file mode 100644 index 00000000..af23d053 --- /dev/null +++ b/docs/StreamResultOfStreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamResultOfStreamedListObjectsResponse + + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**result** | [**StreamedListObjectsResponse**](StreamedListObjectsResponse.md) | | [optional] | +|**error** | [**Status**](Status.md) | | [optional] | + + + diff --git a/docs/StreamedListObjectsResponse.md b/docs/StreamedListObjectsResponse.md new file mode 100644 index 00000000..04b00157 --- /dev/null +++ b/docs/StreamedListObjectsResponse.md @@ -0,0 +1,14 @@ + + +# StreamedListObjectsResponse + +The response for a StreamedListObjects RPC. + +## Properties + +| Name | Type | Description | Notes | +|------------ | ------------- | ------------- | -------------| +|**_object** | **String** | | | + + + diff --git a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java index d0589ca0..0ea565a2 100644 --- a/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java +++ b/src/main/java/dev/openfga/sdk/api/OpenFgaApi.java @@ -38,6 +38,7 @@ import dev.openfga.sdk.api.model.ReadChangesResponse; import dev.openfga.sdk.api.model.ReadRequest; import dev.openfga.sdk.api.model.ReadResponse; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.WriteAssertionsRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelRequest; import dev.openfga.sdk.api.model.WriteAuthorizationModelResponse; @@ -906,6 +907,68 @@ private CompletableFuture> readChanges( } } + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body) throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with + * The Streamed ListObjects API is very similar to the the ListObjects API, with two differences: 1. Instead of collecting all objects before returning a response, it streams them to the client as they are collected. 2. The number of results returned is only limited by the execution timeout specified in the flag OPENFGA_LIST_OBJECTS_DEADLINE. + * @param storeId (required) + * @param body (required) + * @param configurationOverride Override the {@link Configuration} this OpenFgaApi was constructed with + * @return CompletableFuture<ApiResponse<StreamResultOfStreamedListObjectsResponse>> + * @throws ApiException if fails to make API call + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, this.configuration.override(configurationOverride)); + } + + private CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = buildTelemetryAttributes(methodParameters); + + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return new HttpRequestAttempt<>( + request, + "streamedListObjects", + StreamResultOfStreamedListObjectsResponse.class, + apiClient, + configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (ApiException e) { + return CompletableFuture.failedFuture(e); + } + } + /** * Add or delete tuples from the store * The Write API will transactionally update the tuples for a certain store. Tuples and type definitions allow OpenFGA to determine whether a relationship exists between an object and an user. In the body, `writes` adds new tuples and `deletes` removes existing tuples. When deleting a tuple, any `condition` specified with it is ignored. The API is not idempotent by default: if, later on, you try to add the same tuple key (even if the `condition` is different), or if you try to delete a non-existing tuple, it will throw an error. To allow writes when an identical tuple already exists in the database, set `\"on_duplicate\": \"ignore\"` on the `writes` object. To allow deletes when a tuple was already removed from the database, set `\"on_missing\": \"ignore\"` on the `deletes` object. If a Write request contains both idempotent (ignore) and non-idempotent (error) operations, the most restrictive action (error) will take precedence. If a condition fails for a sub-request with an error flag, the entire transaction will be rolled back. This gives developers explicit control over the atomicity of the requests. The API will not allow you to write tuples such as `document:2021-budget#viewer@document:2021-budget#viewer`, because they are implicit. An `authorization_model_id` may be specified in the body. If it is, it will be used to assert that each written tuple (not deleted) is valid for the model specified. If it is not specified, the latest authorization model ID will be used. ## Example ### Adding relationships To add `user:anne` as a `writer` for `document:2021-budget`, call write API with the following ```json { \"writes\": { \"tuple_keys\": [ { \"user\": \"user:anne\", \"relation\": \"writer\", \"object\": \"document:2021-budget\" } ], \"on_duplicate\": \"ignore\" }, \"authorization_model_id\": \"01G50QVV17PECNVAHX1GG4Y5NC\" } ``` ### Removing relationships To remove `user:bob` as a `reader` for `document:2021-budget`, call write API with the following ```json { \"deletes\": { \"tuple_keys\": [ { \"user\": \"user:bob\", \"relation\": \"reader\", \"object\": \"document:2021-budget\" } ], \"on_missing\": \"ignore\" } } ``` diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java new file mode 100644 index 00000000..20331735 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java @@ -0,0 +1,168 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * StreamResultOfStreamedListObjectsResponse + */ +@JsonPropertyOrder({ + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_RESULT, + StreamResultOfStreamedListObjectsResponse.JSON_PROPERTY_ERROR +}) +public class StreamResultOfStreamedListObjectsResponse { + public static final String JSON_PROPERTY_RESULT = "result"; + private StreamedListObjectsResponse result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResultOfStreamedListObjectsResponse() {} + + public StreamResultOfStreamedListObjectsResponse result(StreamedListObjectsResponse result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public StreamedListObjectsResponse getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(StreamedListObjectsResponse result) { + this.result = result; + } + + public StreamResultOfStreamedListObjectsResponse error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + /** + * Return true if this Stream_result_of_StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResultOfStreamedListObjectsResponse streamResultOfStreamedListObjectsResponse = + (StreamResultOfStreamedListObjectsResponse) o; + return Objects.equals(this.result, streamResultOfStreamedListObjectsResponse.result) + && Objects.equals(this.error, streamResultOfStreamedListObjectsResponse.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResultOfStreamedListObjectsResponse {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `result` to the URL query string + if (getResult() != null) { + joiner.add(getResult().toUrlQueryString(prefix + "result" + suffix)); + } + + // add `error` to the URL query string + if (getError() != null) { + joiner.add(getError().toUrlQueryString(prefix + "error" + suffix)); + } + + return joiner.toString(); + } +} diff --git a/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java new file mode 100644 index 00000000..43c02ab5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java @@ -0,0 +1,139 @@ +/* + * OpenFGA + * A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar. + * + * The version of the OpenAPI document: 1.x + * Contact: community@openfga.dev + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + +package dev.openfga.sdk.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.StringJoiner; + +/** + * The response for a StreamedListObjects RPC. + */ +@JsonPropertyOrder({StreamedListObjectsResponse.JSON_PROPERTY_OBJECT}) +public class StreamedListObjectsResponse { + public static final String JSON_PROPERTY_OBJECT = "object"; + private String _object; + + public StreamedListObjectsResponse() {} + + public StreamedListObjectsResponse _object(String _object) { + this._object = _object; + return this; + } + + /** + * Get _object + * @return _object + **/ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public String getObject() { + return _object; + } + + @JsonProperty(JSON_PROPERTY_OBJECT) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setObject(String _object) { + this._object = _object; + } + + /** + * Return true if this StreamedListObjectsResponse object is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamedListObjectsResponse streamedListObjectsResponse = (StreamedListObjectsResponse) o; + return Objects.equals(this._object, streamedListObjectsResponse._object); + } + + @Override + public int hashCode() { + return Objects.hash(_object); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamedListObjectsResponse {\n"); + sb.append(" _object: ").append(toIndentedString(_object)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + /** + * Convert the instance into URL query string. + * + * @return URL query string + */ + public String toUrlQueryString() { + return toUrlQueryString(null); + } + + /** + * Convert the instance into URL query string. + * + * @param prefix prefix of the query string + * @return URL query string + */ + public String toUrlQueryString(String prefix) { + String suffix = ""; + String containerSuffix = ""; + String containerPrefix = ""; + if (prefix == null) { + // style=form, explode=true, e.g. /pet?name=cat&type=manx + prefix = ""; + } else { + // deepObject style e.g. /pet?id[name]=cat&id[type]=manx + prefix = prefix + "["; + suffix = "]"; + containerSuffix = "]"; + containerPrefix = "["; + } + + StringJoiner joiner = new StringJoiner("&"); + + // add `object` to the URL query string + if (getObject() != null) { + joiner.add(String.format( + "%sobject%s=%s", + prefix, + suffix, + URLEncoder.encode(String.valueOf(getObject()), StandardCharsets.UTF_8) + .replaceAll("\\+", "%20"))); + } + + return joiner.toString(); + } +} From aff7424b6d6e8dbbc210bbd1b14ad9ab29877eef Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Fri, 31 Oct 2025 16:50:58 +0530 Subject: [PATCH 2/9] feat: streamed list objects support --- .../sdk/api/StreamedListObjectsApi.java | 93 +++++++++++++ .../sdk/api/client/HttpRequestAttempt.java | 7 +- .../openfga/sdk/api/client/OpenFgaClient.java | 71 ++++++++++ .../api/client/StreamedResponseIterator.java | 110 +++++++++++++++ .../api/client/StreamingResponseString.java | 17 +++ .../sdk/api/client/OpenFgaClientTest.java | 127 ++++++++++++++++++ 6 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java create mode 100644 src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java create mode 100644 src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java new file mode 100644 index 00000000..52af0a02 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -0,0 +1,93 @@ +package dev.openfga.sdk.api; + +import static dev.openfga.sdk.util.Validation.assertParamExists; + +import dev.openfga.sdk.api.client.ApiClient; +import dev.openfga.sdk.api.client.ApiResponse; +import dev.openfga.sdk.api.client.HttpRequestAttempt; +import dev.openfga.sdk.api.client.StreamingResponseString; +import dev.openfga.sdk.api.configuration.Configuration; +import dev.openfga.sdk.api.model.ListObjectsRequest; +import dev.openfga.sdk.errors.ApiException; +import dev.openfga.sdk.errors.FgaInvalidParameterException; +import dev.openfga.sdk.telemetry.Attribute; +import dev.openfga.sdk.telemetry.Attributes; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * API handler for streamed list objects operations. + * This class is separate from the generated OpenFgaApi to avoid modifications to generated code. + */ +public class StreamedListObjectsApi { + private final ApiClient apiClient; + + public StreamedListObjectsApi(ApiClient apiClient) { + this.apiClient = apiClient; + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Returns raw NDJSON response for parsing by the client layer. + * + * @param storeId The store ID (required) + * @param body The list objects request body (required) + * @param configuration The configuration to use for this request + * @return CompletableFuture with raw streaming response + * @throws ApiException if fails to make API call + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/" + storeId + "/streamed-list-objects"; + + try { + // Build the HTTP request + byte[] requestBody = apiClient.getObjectMapper().writeValueAsBytes(body); + var bodyPublisher = java.net.http.HttpRequest.BodyPublishers.ofByteArray(requestBody); + + var requestBuilder = java.net.http.HttpRequest.newBuilder() + .uri(java.net.URI.create(configuration.getApiUrl() + path)) + .header("Content-Type", "application/json") + .header("User-Agent", configuration.getUserAgent()) + .POST(bodyPublisher); + + // Add authorization header if needed + if (configuration.getCredentials() != null + && configuration.getCredentials().getApiToken() != null) { + requestBuilder.header( + "Authorization", + "Bearer " + configuration.getCredentials().getApiToken()); + } + + // Add default headers + if (configuration.getDefaultHeaders() != null) { + configuration.getDefaultHeaders().forEach(requestBuilder::header); + } + + var httpRequest = requestBuilder.build(); + + // Build telemetry attributes + Map methodParameters = new HashMap<>(); + methodParameters.put("storeId", storeId); + methodParameters.put("body", body); + + Map telemetryAttributes = new HashMap<>(); + telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); + + // Use HttpRequestAttempt with StreamingResponseString to get raw response + return new HttpRequestAttempt<>( + httpRequest, "streamedListObjects", StreamingResponseString.class, apiClient, configuration) + .addTelemetryAttributes(telemetryAttributes) + .attemptHttpRequest(); + } catch (Exception e) { + return CompletableFuture.failedFuture(new ApiException(e)); + } + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java index 4cf6f6a2..4dafaa6a 100644 --- a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java +++ b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java @@ -215,7 +215,12 @@ private CompletableFuture deserializeResponse(HttpResponse response) if (clazz == Void.class && isNullOrWhitespace(response.body())) { return CompletableFuture.completedFuture(null); } - + // Special handling for streaming responses - don't deserialize, just wrap the raw string + if (clazz == StreamingResponseString.class) { + @SuppressWarnings("unchecked") + T result = (T) new StreamingResponseString(response.body()); + return CompletableFuture.completedFuture(result); + } try { T deserialized = apiClient.getObjectMapper().readValue(response.body(), clazz); return CompletableFuture.completedFuture(deserialized); diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index c8334917..c0f61c0a 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -24,6 +24,7 @@ public class OpenFgaClient { private final ApiClient apiClient; private ClientConfiguration configuration; private OpenFgaApi api; + private StreamedListObjectsApi streamedListObjectsApi; public OpenFgaClient(ClientConfiguration configuration) throws FgaInvalidParameterException { this(configuration, new ApiClient()); @@ -33,6 +34,7 @@ public OpenFgaClient(ClientConfiguration configuration, ApiClient apiClient) thr this.apiClient = apiClient; this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(apiClient); } /* *********** @@ -1104,6 +1106,75 @@ public CompletableFuture listObjects( return call(() -> api.listObjects(storeId, body, overrides)).thenApply(ClientListObjectsResponse::new); } + /** + * StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates) + * + * Returns a Stream of objects that can be iterated. The streaming API returns results as they + * are computed, rather than collecting all results before returning. This is useful for + * large result sets. + * + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture> streamedListObjects(ClientListObjectsRequest request) + throws FgaInvalidParameterException { + return streamedListObjects(request, null); + } + + /** + * StreamedListObjects - Stream all objects of the given type that the user has a relation to (evaluates) + * + * Returns a Stream of objects that can be iterated. The streaming API returns results as they + * are computed, rather than collecting all results before returning. This is useful for + * large result sets. + * + * @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace + */ + public CompletableFuture> streamedListObjects( + ClientListObjectsRequest request, ClientListObjectsOptions options) throws FgaInvalidParameterException { + configuration.assertValid(); + String storeId = configuration.getStoreIdChecked(); + + ListObjectsRequest body = new ListObjectsRequest(); + + if (request != null) { + body.user(request.getUser()).relation(request.getRelation()).type(request.getType()); + if (request.getContextualTupleKeys() != null) { + var contextualTuples = request.getContextualTupleKeys(); + var bodyContextualTuples = ClientTupleKey.asContextualTupleKeys(contextualTuples); + body.contextualTuples(bodyContextualTuples); + } + if (request.getContext() != null) { + body.context(request.getContext()); + } + } + + if (options != null) { + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } + + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); + body.authorizationModelId(authorizationModelId); + } else { + body.setAuthorizationModelId(configuration.getAuthorizationModelId()); + } + + Configuration config = configuration.override(new ConfigurationOverride().addHeaders(options)); + + return call(() -> streamedListObjectsApi.streamedListObjects(storeId, body, config)) + .thenApply(response -> { + String ndjsonResponse = response.getData().getRawResponse(); + StreamedResponseIterator iterator = + new StreamedResponseIterator(ndjsonResponse, apiClient.getObjectMapper()); + // Convert Iterator to Stream + return java.util.stream.StreamSupport.stream( + ((Iterable) () -> iterator).spliterator(), false); + }); + } + /** * ListRelations - List allowed relations a user has with an object (evaluates) */ diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java new file mode 100644 index 00000000..75db6edd --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java @@ -0,0 +1,110 @@ +package dev.openfga.sdk.api.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.model.Status; +import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; +import dev.openfga.sdk.api.model.StreamedListObjectsResponse; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator for parsing newline-delimited JSON streaming responses. + * Each line in the response is a StreamResultOfStreamedListObjectsResponse. + * + * If an error is encountered in the stream (either from parsing or from an error + * response), it will be thrown as a RuntimeException when hasNext() or next() is called. + */ +public class StreamedResponseIterator implements Iterator { + private final BufferedReader reader; + private final ObjectMapper objectMapper; + private StreamedListObjectsResponse nextItem; + private boolean hasNext; + private RuntimeException pendingException; + + public StreamedResponseIterator(String ndjsonResponse, ObjectMapper objectMapper) { + this.reader = new BufferedReader(new StringReader(ndjsonResponse)); + this.objectMapper = objectMapper; + this.hasNext = true; + this.pendingException = null; + advance(); + } + + private void advance() { + try { + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty()) { + continue; + } + + StreamResultOfStreamedListObjectsResponse streamResult = + objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class); + + if (streamResult.getResult() != null) { + nextItem = streamResult.getResult(); + return; + } + + if (streamResult.getError() != null) { + // Handle error in stream - convert to exception + Status error = streamResult.getError(); + String errorMessage = String.format( + "Error in streaming response: code=%d, message=%s", + error.getCode(), error.getMessage() != null ? error.getMessage() : "Unknown error"); + pendingException = new RuntimeException(errorMessage); + hasNext = false; + nextItem = null; + return; + } + } + // No more lines + hasNext = false; + nextItem = null; + } catch (IOException e) { + pendingException = new RuntimeException("Failed to parse streaming response", e); + hasNext = false; + nextItem = null; + } + } + + @Override + public boolean hasNext() { + // If there's a pending exception, throw it before returning false + if (pendingException != null) { + RuntimeException ex = pendingException; + pendingException = null; // Clear it so we don't throw multiple times + throw ex; + } + return hasNext && nextItem != null; + } + + @Override + public StreamedListObjectsResponse next() { + // Check for pending exception first + if (pendingException != null) { + RuntimeException ex = pendingException; + pendingException = null; + throw ex; + } + + if (!hasNext()) { + throw new NoSuchElementException(); + } + + StreamedListObjectsResponse current = nextItem; + advance(); + + // Check again after advance in case an error occurred + if (pendingException != null) { + RuntimeException ex = pendingException; + pendingException = null; + throw ex; + } + + return current; + } +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java new file mode 100644 index 00000000..7cf25574 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseString.java @@ -0,0 +1,17 @@ +package dev.openfga.sdk.api.client; + +/** + * Marker class to indicate that the response should not be deserialized by HttpRequestAttempt. + * Instead, the raw response string should be returned for manual parsing (e.g., NDJSON). + */ +public class StreamingResponseString { + private final String rawResponse; + + public StreamingResponseString(String rawResponse) { + this.rawResponse = rawResponse; + } + + public String getRawResponse() { + return rawResponse; + } +} diff --git a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java index 2850929d..966fc904 100644 --- a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java @@ -2586,6 +2586,133 @@ public void listObjectsWithContextTest() throws Exception { assertEquals(List.of(DEFAULT_OBJECT), response.getObjects()); } + /** + * Test streaming list objects API + */ + @Test + public void streamedListObjectsTest() throws Exception { + // Given + String postPath = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + String expectedBody = String.format( + "{\"authorization_model_id\":\"%s\",\"type\":\"%s\",\"relation\":\"%s\",\"user\":\"%s\",\"contextual_tuples\":null,\"context\":null,\"consistency\":\"%s\"}", + DEFAULT_AUTH_MODEL_ID, DEFAULT_TYPE, DEFAULT_RELATION, DEFAULT_USER, DEFAULT_CONSISTENCY); + + // Simulate NDJSON response - each line is a StreamResultOfStreamedListObjectsResponse + String ndjsonResponse = String.format( + "{\"result\":{\"object\":\"document:1\"}}\n{\"result\":{\"object\":\"document:2\"}}\n{\"result\":{\"object\":\"document:3\"}}\n"); + + mockHttpClient.onPost(postPath).withBody(is(expectedBody)).doReturn(200, ndjsonResponse); + + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + Stream responseStream = + fga.streamedListObjects(request).get(); + List objects = + responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + + // Then + mockHttpClient.verify().post(postPath).withBody(is(expectedBody)).called(1); + assertEquals(3, objects.size()); + assertEquals("document:1", objects.get(0)); + assertEquals("document:2", objects.get(1)); + assertEquals("document:3", objects.get(2)); + } + + @Test + public void streamedListObjects_storeIdRequired() { + // Given + clientConfiguration.storeId(null); + + // When + var exception = assertThrows( + FgaInvalidParameterException.class, + () -> fga.streamedListObjects(new ClientListObjectsRequest()).get()); + + // Then + assertEquals( + "Required parameter storeId was invalid when calling ClientConfiguration.", exception.getMessage()); + } + + @Test + public void streamedListObjects_400() throws Exception { + // Given + String postUrl = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + mockHttpClient + .onPost(postUrl) + .doReturn(400, "{\"code\":\"validation_error\",\"message\":\"Generic validation error\"}"); + + // When + ExecutionException execException = + assertThrows(ExecutionException.class, () -> fga.streamedListObjects(new ClientListObjectsRequest()) + .get()); + + // Then + mockHttpClient.verify().post(postUrl).called(1); + var exception = assertInstanceOf(FgaApiValidationError.class, execException.getCause()); + assertEquals(400, exception.getStatusCode()); + assertEquals( + "{\"code\":\"validation_error\",\"message\":\"Generic validation error\"}", + exception.getResponseData()); + } + + @Test + public void streamedListObjects_500() throws Exception { + // Given + String postUrl = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + mockHttpClient + .onPost(postUrl) + .doReturn(500, "{\"code\":\"internal_error\",\"message\":\"Internal Server Error\"}"); + + // When + ExecutionException execException = + assertThrows(ExecutionException.class, () -> fga.streamedListObjects(new ClientListObjectsRequest()) + .get()); + + // Then + // POST requests retry on 5xx errors (1 initial + 3 retries = 4 total) + mockHttpClient.verify().post(postUrl).called(4); + var exception = assertInstanceOf(FgaApiInternalError.class, execException.getCause()); + assertEquals(500, exception.getStatusCode()); + assertEquals( + "{\"code\":\"internal_error\",\"message\":\"Internal Server Error\"}", exception.getResponseData()); + } + + @Test + public void streamedListObjects_errorInStream() throws Exception { + // Given + String postPath = + String.format("%s/stores/%s/streamed-list-objects", FgaConstants.TEST_API_URL, DEFAULT_STORE_ID); + + // Simulate NDJSON response with an error in the stream + String ndjsonResponse = + "{\"result\":{\"object\":\"document:1\"}}\n{\"error\":{\"code\":500,\"message\":\"Internal error\"}}\n"; + + mockHttpClient.onPost(postPath).doReturn(200, ndjsonResponse); + + ClientListObjectsRequest request = new ClientListObjectsRequest() + .type(DEFAULT_TYPE) + .relation(DEFAULT_RELATION) + .user(DEFAULT_USER); + + // When + Stream responseStream = + fga.streamedListObjects(request).get(); + + // Then - should throw when processing the stream + var exception = assertThrows(RuntimeException.class, () -> { + responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + }); + + assertTrue(exception.getMessage().contains("Error in streaming response")); + } + /** * Check whether a user is authorized to access an object. */ From cbc928ee52d92072669990824f2e7f951319f6a6 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Fri, 31 Oct 2025 18:01:09 +0530 Subject: [PATCH 3/9] feat: add example --- examples/streamed-list-objects/.env.example | 19 ++ examples/streamed-list-objects/Makefile | 13 + examples/streamed-list-objects/README.md | 111 ++++++++ examples/streamed-list-objects/build.gradle | 55 ++++ .../streamed-list-objects/gradle.properties | 1 + .../gradle/wrapper/gradle-wrapper.properties | 7 + examples/streamed-list-objects/gradlew | 244 ++++++++++++++++++ .../streamed-list-objects/settings.gradle | 1 + .../StreamedListObjectsExample.java | 173 +++++++++++++ 9 files changed, 624 insertions(+) create mode 100644 examples/streamed-list-objects/.env.example create mode 100644 examples/streamed-list-objects/Makefile create mode 100644 examples/streamed-list-objects/README.md create mode 100644 examples/streamed-list-objects/build.gradle create mode 100644 examples/streamed-list-objects/gradle.properties create mode 100644 examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties create mode 100755 examples/streamed-list-objects/gradlew create mode 100644 examples/streamed-list-objects/settings.gradle create mode 100644 examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java diff --git a/examples/streamed-list-objects/.env.example b/examples/streamed-list-objects/.env.example new file mode 100644 index 00000000..10798888 --- /dev/null +++ b/examples/streamed-list-objects/.env.example @@ -0,0 +1,19 @@ +# OpenFGA Configuration (REQUIRED) +FGA_API_URL=http://localhost:8000 +FGA_STORE_ID=store_id_here +FGA_MODEL_ID=model_id_here + +# Authentication (optional - for authenticated OpenFGA instances) +FGA_CLIENT_ID=client_id_here +FGA_CLIENT_SECRET=client_secret_here +FGA_API_AUDIENCE=api_audience_here +FGA_API_TOKEN_ISSUER=api_issuer_here + +# OpenTelemetry Configuration (for manual configuration mode - ./gradlew run) +# These are used when running with manual OpenTelemetry setup +# Note: When using the Java agent (./gradlew runWithAgent), +# these values are overridden by the JVM arguments in build.gradle +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_SERVICE_NAME=openfga-java-sdk-example +OTEL_SERVICE_VERSION=1.0.0 + diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile new file mode 100644 index 00000000..c671023c --- /dev/null +++ b/examples/streamed-list-objects/Makefile @@ -0,0 +1,13 @@ +all: build + +openfga_version=latest + +build: + ./gradlew build + +run: + ./gradlew run + +run-openfga: + docker pull docker.io/openfga/openfga:${openfga_version} && \ + docker run -p 8080:8080 docker.io/openfga/openfga:${openfga_version} run diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md new file mode 100644 index 00000000..9b5a939e --- /dev/null +++ b/examples/streamed-list-objects/README.md @@ -0,0 +1,111 @@ +# Streamed List Objects Example + +This example demonstrates how to use the Streamed ListObjects API in the OpenFGA Java SDK. + +## What is Streamed ListObjects? + +The StreamedListObjects API is similar to the regular ListObjects API, but with key differences: + +1. **Streaming Response**: Instead of collecting all objects before returning a response, it streams them to the client as they are collected +2. **No Result Limit**: The number of results returned is only limited by the execution timeout specified in the server configuration (`OPENFGA_LIST_OBJECTS_DEADLINE`) +3. **Immediate Processing**: You can start processing results as they arrive, without waiting for the entire result set + +## When to Use Streamed ListObjects + +Use the Streamed ListObjects API when: + +- You expect a large number of results that would take a long time to collect +- You want to start processing results immediately rather than waiting for the complete set +- You might not need all results (e.g., you want to stop after finding a certain number) +- You want to avoid timeout issues with very large result sets + +## Running the Example + +### Prerequisites + +- A running OpenFGA server (or use the [OpenFGA Playground](https://play.fga.dev/)) + +### Environment Variables + +Set the following environment variables: + +```bash +# Required +export FGA_API_URL=http://localhost:8080 # Your OpenFGA server URL + +# Optional - for authenticated servers +export FGA_CLIENT_ID=your_client_id +export FGA_CLIENT_SECRET=your_client_secret +export FGA_API_TOKEN_ISSUER=your_token_issuer +export FGA_API_AUDIENCE=your_audience +``` + +### Running + +```bash +make run +``` + +## Code Examples + +### Basic Usage + +```java +// Create a request +var request = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + +// Call the streaming API +var objectStream = fgaClient.streamedListObjects(request).get(); + +// Collect all results +List objects = objectStream + .map(StreamedListObjectsResponse::getObject) + .collect(Collectors.toList()); +``` + +### Early Termination + +```java +// Get only the first 10 results +var objectStream = fgaClient.streamedListObjects(request).get(); +List firstTen = objectStream + .map(StreamedListObjectsResponse::getObject) + .limit(10) + .collect(Collectors.toList()); +``` + +### Process as You Go + +```java +// Process each object immediately as it arrives +var objectStream = fgaClient.streamedListObjects(request).get(); +objectStream + .map(StreamedListObjectsResponse::getObject) + .forEach(obj -> { + // Do something with each object + System.out.println("Processing: " + obj); + }); +``` + +### With Options + +```java +// Use options to specify consistency preference +var options = new ClientListObjectsOptions() + .consistency(ConsistencyPreference.HIGHER_CONSISTENCY) + .authorizationModelId("01GXSXXXXXXXXXXXXXXXX"); + +var objectStream = fgaClient.streamedListObjects(request, options).get(); +``` + +## Comparison with Regular ListObjects + +| Feature | ListObjects | Streamed ListObjects | +|---------|-------------|---------------------| +| Result Collection | Waits for all results | Streams results as computed | +| Result Limit | Limited by server pagination | Limited only by execution timeout | +| Processing | Must wait for complete response | Can process immediately | +| Use Case | Small to medium result sets | Large result sets, immediate processing | diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle new file mode 100644 index 00000000..53ba4388 --- /dev/null +++ b/examples/streamed-list-objects/build.gradle @@ -0,0 +1,55 @@ +plugins { + id 'application' + id 'com.diffplug.spotless' version '8.0.0' +} + +application { + mainClass = 'dev.openfga.sdk.example.streamedlistobjects.StreamedListObjectsExample' +} + +repositories { + mavenCentral() +} + +ext { + jacksonVersion = "2.20.0" +} + +dependencies { + implementation("dev.openfga:openfga-sdk:0.9.2") + + // Serialization + implementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion") + implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") + implementation("org.openapitools:jackson-databind-nullable:0.2.7") +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +spotless { + // comment out below to run spotless as part of the `check` task + enforceCheck false + format 'misc', { + // define the files (e.g. '*.gradle', '*.md') to apply `misc` to + target '.gitignore' + // define the steps to apply to those files + trimTrailingWhitespace() + indentWithSpaces() // Takes an integer argument if you don't like 4 + endWithNewline() + } + java { + palantirJavaFormat() + removeUnusedImports() + importOrder() + } +} + +// Use spotless plugin to automatically format code, remove unused import, etc +// To apply changes directly to the file, run `gradlew spotlessApply` +// Ref: https://github.com/diffplug/spotless/tree/main/plugin-gradle +tasks.register('fmt') { + dependsOn 'spotlessApply' +} \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle.properties b/examples/streamed-list-objects/gradle.properties new file mode 100644 index 00000000..5f544a8e --- /dev/null +++ b/examples/streamed-list-objects/gradle.properties @@ -0,0 +1 @@ +language=java \ No newline at end of file diff --git a/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..a80b22ce --- /dev/null +++ b/examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/examples/streamed-list-objects/gradlew b/examples/streamed-list-objects/gradlew new file mode 100755 index 00000000..79a61d42 --- /dev/null +++ b/examples/streamed-list-objects/gradlew @@ -0,0 +1,244 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/examples/streamed-list-objects/settings.gradle b/examples/streamed-list-objects/settings.gradle new file mode 100644 index 00000000..ff41744a --- /dev/null +++ b/examples/streamed-list-objects/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'example1' \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java new file mode 100644 index 00000000..e26ad845 --- /dev/null +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java @@ -0,0 +1,173 @@ +package dev.openfga.sdk.example.streamedlistobjects; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.openfga.sdk.api.client.OpenFgaClient; +import dev.openfga.sdk.api.client.model.*; +import dev.openfga.sdk.api.configuration.*; +import dev.openfga.sdk.api.model.*; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Example demonstrating the usage of the Streamed ListObjects API. + * + * The Streamed ListObjects API returns results as they are computed, rather than + * collecting all results before returning. This is particularly useful for: + * - Large result sets that would take a long time to collect + * - Scenarios where you want to start processing results immediately + * - Cases where you might not need all results (early termination) + */ +public class StreamedListObjectsExample { + public static void main(String[] args) throws Exception { + // Configure the SDK + var credentials = new Credentials(); + if (System.getenv("FGA_CLIENT_ID") != null) { + credentials = new Credentials(new ClientCredentials() + .apiAudience(System.getenv("FGA_API_AUDIENCE")) + .apiTokenIssuer(System.getenv("FGA_API_TOKEN_ISSUER")) + .clientId(System.getenv("FGA_CLIENT_ID")) + .clientSecret(System.getenv("FGA_CLIENT_SECRET"))); + } + + var configuration = new ClientConfiguration() + .apiUrl(System.getenv("FGA_API_URL")) // e.g., http://localhost:8080 + .credentials(credentials); + + var fgaClient = new OpenFgaClient(configuration); + + // Create a test store + System.out.println("Creating test store..."); + var store = + fgaClient.createStore(new CreateStoreRequest().name("StreamedListObjects Test Store")) + .get(); + fgaClient.setStoreId(store.getId()); + System.out.println("Created store: " + store.getId()); + + // Create an authorization model + System.out.println("Creating authorization model..."); + String authModelJson = """ + { + "schema_version": "1.1", + "type_definitions": [ + { + "type": "user", + "relations": {} + }, + { + "type": "document", + "relations": { + "owner": { + "this": {} + }, + "viewer": { + "this": {} + } + }, + "metadata": { + "relations": { + "owner": { + "directly_related_user_types": [ + {"type": "user"} + ] + }, + "viewer": { + "directly_related_user_types": [ + {"type": "user"} + ] + } + } + } + } + ] + } + """; + + var mapper = new ObjectMapper(); + var authModel = mapper.readValue(authModelJson, new TypeReference() {}); + var modelResponse = fgaClient.writeAuthorizationModel(authModel).get(); + fgaClient.setAuthorizationModelId(modelResponse.getAuthorizationModelId()); + System.out.println("Created model: " + modelResponse.getAuthorizationModelId()); + + // Write some test tuples + System.out.println("\nWriting 100 test tuples..."); + List writes = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + writes.add(new ClientTuple() + .user("user:anne") + .relation("owner") + ._object("document:" + i)); + } + + fgaClient + .write(new ClientWriteRequest().writes(writes)) + .get(); + System.out.println("Successfully wrote 100 tuples"); + + // Example 1: Use streamedListObjects to get all objects + System.out.println("\n=== Example 1: List all objects using streaming ==="); + var request1 = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + var objectStream1 = fgaClient.streamedListObjects(request1).get(); + List allObjects = + objectStream1.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + + System.out.println("Total objects found: " + allObjects.size()); + System.out.println("First 5 objects: " + allObjects.subList(0, Math.min(5, allObjects.size()))); + + // Example 2: Early termination - stop after finding first N objects + System.out.println("\n=== Example 2: Early termination (first 10 objects) ==="); + var request2 = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + var objectStream2 = fgaClient.streamedListObjects(request2).get(); + List firstTen = objectStream2 + .map(StreamedListObjectsResponse::getObject) + .limit(10) + .collect(Collectors.toList()); + + System.out.println("First 10 objects: " + firstTen); + + // Example 3: Process objects as they arrive (immediate processing) + System.out.println("\n=== Example 3: Process objects immediately ==="); + var request3 = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + var objectStream3 = fgaClient.streamedListObjects(request3).get(); + objectStream3 + .map(StreamedListObjectsResponse::getObject) + .limit(10) + .forEach(obj -> { + // Process each object as it arrives + System.out.println(" Processing: " + obj); + }); + + // Example 4: With options (custom authorization model ID and consistency preference) + System.out.println("\n=== Example 4: With options ==="); + var request4 = new ClientListObjectsRequest() + .type("document") + .relation("owner") + .user("user:anne"); + + var options4 = new ClientListObjectsOptions().consistency(ConsistencyPreference.HIGHER_CONSISTENCY); + + var objectStream4 = fgaClient.streamedListObjects(request4, options4).get(); + long count = objectStream4.count(); + System.out.println("Total objects with higher consistency: " + count); + + // Clean up - delete the test store + System.out.println("\n=== Cleaning up ==="); + fgaClient.deleteStore().get(); + System.out.println("Deleted test store: " + store.getId()); + + System.out.println("\nExample completed successfully!"); + } +} \ No newline at end of file From 6a79981274e414d13aa58899cf9313f10cc549ed Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Fri, 31 Oct 2025 21:21:45 +0530 Subject: [PATCH 4/9] fix: example code, deps --- examples/streamed-list-objects/build.gradle | 2 +- examples/streamed-list-objects/settings.gradle | 8 +++++++- .../streamedlistobjects/StreamedListObjectsExample.java | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle index 53ba4388..f428ec02 100644 --- a/examples/streamed-list-objects/build.gradle +++ b/examples/streamed-list-objects/build.gradle @@ -12,7 +12,7 @@ repositories { } ext { - jacksonVersion = "2.20.0" + jacksonVersion = "2.19.0" } dependencies { diff --git a/examples/streamed-list-objects/settings.gradle b/examples/streamed-list-objects/settings.gradle index ff41744a..549c237b 100644 --- a/examples/streamed-list-objects/settings.gradle +++ b/examples/streamed-list-objects/settings.gradle @@ -1 +1,7 @@ -rootProject.name = 'example1' \ No newline at end of file +rootProject.name = 'streamed-list-objects-example' + +includeBuild('../../') { + dependencySubstitution { + substitute(module('dev.openfga:openfga-sdk')).using(project(':')) + } +} \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java index e26ad845..c3ae0129 100644 --- a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java @@ -92,9 +92,9 @@ public static void main(String[] args) throws Exception { // Write some test tuples System.out.println("\nWriting 100 test tuples..."); - List writes = new ArrayList<>(); + List writes = new ArrayList<>(); for (int i = 0; i < 100; i++) { - writes.add(new ClientTuple() + writes.add(new ClientTupleKey() .user("user:anne") .relation("owner") ._object("document:" + i)); From 02153f9f15a1754368ef2e91a48e0901a03d083b Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Mon, 3 Nov 2025 10:05:35 +0530 Subject: [PATCH 5/9] feat: standardise example code --- examples/streamed-list-objects/.env.example | 10 +- examples/streamed-list-objects/README.md | 34 +-- examples/streamed-list-objects/build.gradle | 2 +- examples/streamed-list-objects/model.json | 121 ++++++++++ .../StreamedListObjectsExample.java | 223 ++++++++---------- 5 files changed, 245 insertions(+), 145 deletions(-) create mode 100644 examples/streamed-list-objects/model.json diff --git a/examples/streamed-list-objects/.env.example b/examples/streamed-list-objects/.env.example index 10798888..65c1e29d 100644 --- a/examples/streamed-list-objects/.env.example +++ b/examples/streamed-list-objects/.env.example @@ -1,5 +1,5 @@ # OpenFGA Configuration (REQUIRED) -FGA_API_URL=http://localhost:8000 +FGA_API_URL=http://localhost:8080 FGA_STORE_ID=store_id_here FGA_MODEL_ID=model_id_here @@ -9,11 +9,3 @@ FGA_CLIENT_SECRET=client_secret_here FGA_API_AUDIENCE=api_audience_here FGA_API_TOKEN_ISSUER=api_issuer_here -# OpenTelemetry Configuration (for manual configuration mode - ./gradlew run) -# These are used when running with manual OpenTelemetry setup -# Note: When using the Java agent (./gradlew runWithAgent), -# these values are overridden by the JVM arguments in build.gradle -OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 -OTEL_SERVICE_NAME=openfga-java-sdk-example -OTEL_SERVICE_VERSION=1.0.0 - diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md index 9b5a939e..3e0538b1 100644 --- a/examples/streamed-list-objects/README.md +++ b/examples/streamed-list-objects/README.md @@ -1,6 +1,6 @@ # Streamed List Objects Example -This example demonstrates how to use the Streamed ListObjects API in the OpenFGA Java SDK. +This example demonstrates working with [OpenFGA's `/streamed-list-objects` endpoint](https://openfga.dev/api/service#/Relationship%20Queries/StreamedListObjects) using the Java SDK's `streamedListObjects()` method. ## What is Streamed ListObjects? @@ -19,33 +19,37 @@ Use the Streamed ListObjects API when: - You might not need all results (e.g., you want to stop after finding a certain number) - You want to avoid timeout issues with very large result sets +## Prerequisites + +- Java 11+ +- OpenFGA running on `localhost:8080` + +You can start OpenFGA with Docker by running the following command: + +```bash +docker pull openfga/openfga && docker run -it --rm -p 8080:8080 openfga/openfga run +``` + ## Running the Example -### Prerequisites +No additional setup is required to run this example. Simply run the following command: -- A running OpenFGA server (or use the [OpenFGA Playground](https://play.fga.dev/)) +```bash +make run +``` -### Environment Variables +### Environment Variables (Optional) -Set the following environment variables: +For authenticated OpenFGA instances, set the following environment variables: ```bash -# Required export FGA_API_URL=http://localhost:8080 # Your OpenFGA server URL - -# Optional - for authenticated servers export FGA_CLIENT_ID=your_client_id export FGA_CLIENT_SECRET=your_client_secret export FGA_API_TOKEN_ISSUER=your_token_issuer export FGA_API_AUDIENCE=your_audience ``` -### Running - -```bash -make run -``` - ## Code Examples ### Basic Usage @@ -108,4 +112,4 @@ var objectStream = fgaClient.streamedListObjects(request, options).get(); | Result Collection | Waits for all results | Streams results as computed | | Result Limit | Limited by server pagination | Limited only by execution timeout | | Processing | Must wait for complete response | Can process immediately | -| Use Case | Small to medium result sets | Large result sets, immediate processing | +| Use Case | Small to medium result sets | Large result sets, immediate processing | \ No newline at end of file diff --git a/examples/streamed-list-objects/build.gradle b/examples/streamed-list-objects/build.gradle index f428ec02..53ba4388 100644 --- a/examples/streamed-list-objects/build.gradle +++ b/examples/streamed-list-objects/build.gradle @@ -12,7 +12,7 @@ repositories { } ext { - jacksonVersion = "2.19.0" + jacksonVersion = "2.20.0" } dependencies { diff --git a/examples/streamed-list-objects/model.json b/examples/streamed-list-objects/model.json new file mode 100644 index 00000000..9cfc967e --- /dev/null +++ b/examples/streamed-list-objects/model.json @@ -0,0 +1,121 @@ +{ + "schema_version": "1.1", + "type_definitions": [ + { "type": "user", "relations": {} }, + { + "type": "group", + "relations": { "member": { "this": {} } }, + "metadata": { + "relations": { + "member": { "directly_related_user_types": [{ "type": "user" }] } + } + } + }, + { + "type": "folder", + "relations": { + "can_create_file": { + "computedUserset": { "object": "", "relation": "owner" } + }, + "owner": { "this": {} }, + "parent": { "this": {} }, + "viewer": { + "union": { + "child": [ + { "this": {} }, + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "viewer" } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_create_file": { "directly_related_user_types": [] }, + "owner": { "directly_related_user_types": [{ "type": "user" }] }, + "parent": { "directly_related_user_types": [{ "type": "folder" }] }, + "viewer": { + "directly_related_user_types": [ + { "type": "user" }, + { "type": "user", "wildcard": {} }, + { "type": "group", "relation": "member" } + ] + } + } + } + }, + { + "type": "document", + "relations": { + "can_change_owner": { + "computedUserset": { "object": "", "relation": "owner" } + }, + "owner": { "this": {} }, + "parent": { "this": {} }, + "can_read": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "viewer" } }, + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "viewer" } + } + } + ] + } + }, + "can_share": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "owner" } + } + } + ] + } + }, + "viewer": { "this": {} }, + "can_write": { + "union": { + "child": [ + { "computedUserset": { "object": "", "relation": "owner" } }, + { + "tupleToUserset": { + "tupleset": { "object": "", "relation": "parent" }, + "computedUserset": { "object": "", "relation": "owner" } + } + } + ] + } + } + }, + "metadata": { + "relations": { + "can_change_owner": { "directly_related_user_types": [] }, + "owner": { "directly_related_user_types": [{ "type": "user" }] }, + "parent": { "directly_related_user_types": [{ "type": "folder" }] }, + "can_read": { "directly_related_user_types": [] }, + "can_share": { "directly_related_user_types": [] }, + "viewer": { + "directly_related_user_types": [ + { "type": "user" }, + { "type": "user", "wildcard": {} }, + { "type": "group", "relation": "member" } + ] + }, + "can_write": { "directly_related_user_types": [] } + } + } + } + ] +} \ No newline at end of file diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java index c3ae0129..7a73bb68 100644 --- a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java @@ -6,6 +6,7 @@ import dev.openfga.sdk.api.client.model.*; import dev.openfga.sdk.api.configuration.*; import dev.openfga.sdk.api.model.*; +import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -13,13 +14,21 @@ /** * Example demonstrating the usage of the Streamed ListObjects API. * - * The Streamed ListObjects API returns results as they are computed, rather than - * collecting all results before returning. This is particularly useful for: - * - Large result sets that would take a long time to collect - * - Scenarios where you want to start processing results immediately - * - Cases where you might not need all results (early termination) + *

This example demonstrates working with OpenFGA's `/streamed-list-objects` endpoint using the + * Java SDK's `streamedListObjects()` method. + * + *

The Streamed ListObjects API returns results as they are computed, rather than collecting all + * results before returning. This is particularly useful for: + * + *

    + *
  • Large result sets that would take a long time to collect + *
  • Scenarios where you want to start processing results immediately + *
  • Cases where you might not need all results (early termination) + *
*/ public class StreamedListObjectsExample { + private static final int TUPLE_COUNT = 2000; + public static void main(String[] args) throws Exception { // Configure the SDK var credentials = new Credentials(); @@ -37,137 +46,111 @@ public static void main(String[] args) throws Exception { var fgaClient = new OpenFgaClient(configuration); - // Create a test store - System.out.println("Creating test store..."); - var store = - fgaClient.createStore(new CreateStoreRequest().name("StreamedListObjects Test Store")) - .get(); - fgaClient.setStoreId(store.getId()); - System.out.println("Created store: " + store.getId()); - - // Create an authorization model - System.out.println("Creating authorization model..."); - String authModelJson = """ - { - "schema_version": "1.1", - "type_definitions": [ - { - "type": "user", - "relations": {} - }, - { - "type": "document", - "relations": { - "owner": { - "this": {} - }, - "viewer": { - "this": {} - } - }, - "metadata": { - "relations": { - "owner": { - "directly_related_user_types": [ - {"type": "user"} - ] - }, - "viewer": { - "directly_related_user_types": [ - {"type": "user"} - ] - } - } - } - } - ] - } - """; + // Create our temporary store + String storeId = createStore(fgaClient); + System.out.println("Created temporary store (" + storeId + ")"); - var mapper = new ObjectMapper(); - var authModel = mapper.readValue(authModelJson, new TypeReference() {}); - var modelResponse = fgaClient.writeAuthorizationModel(authModel).get(); - fgaClient.setAuthorizationModelId(modelResponse.getAuthorizationModelId()); - System.out.println("Created model: " + modelResponse.getAuthorizationModelId()); - - // Write some test tuples - System.out.println("\nWriting 100 test tuples..."); - List writes = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - writes.add(new ClientTupleKey() - .user("user:anne") - .relation("owner") - ._object("document:" + i)); - } + // Configure the SDK to use the temporary store for the rest of the example + fgaClient.setStoreId(storeId); - fgaClient - .write(new ClientWriteRequest().writes(writes)) - .get(); - System.out.println("Successfully wrote 100 tuples"); + // Load the authorization model from a file and write it to the server + String modelId = writeModel(fgaClient); + System.out.println("Created temporary authorization model (" + modelId + ")\n"); - // Example 1: Use streamedListObjects to get all objects - System.out.println("\n=== Example 1: List all objects using streaming ==="); - var request1 = new ClientListObjectsRequest() - .type("document") - .relation("owner") - .user("user:anne"); + // Configure the SDK to use this authorization model for the rest of the example + fgaClient.setAuthorizationModelId(modelId); - var objectStream1 = fgaClient.streamedListObjects(request1).get(); - List allObjects = - objectStream1.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + // Write a bunch of example tuples to the temporary store + int wrote = writeTuples(fgaClient, TUPLE_COUNT); + System.out.println("Wrote " + wrote + " tuples to the store.\n"); - System.out.println("Total objects found: " + allObjects.size()); - System.out.println("First 5 objects: " + allObjects.subList(0, Math.min(5, allObjects.size()))); + //////////////////////////////// + // Demonstrate streaming vs standard list objects - // Example 2: Early termination - stop after finding first N objects - System.out.println("\n=== Example 2: Early termination (first 10 objects) ==="); - var request2 = new ClientListObjectsRequest() + // Craft a request to list all `documents` owned by `user:anne` + var request = new ClientListObjectsRequest() .type("document") .relation("owner") .user("user:anne"); - var objectStream2 = fgaClient.streamedListObjects(request2).get(); - List firstTen = objectStream2 - .map(StreamedListObjectsResponse::getObject) - .limit(10) - .collect(Collectors.toList()); + // Send a single request to the server using both the streamed and standard endpoints + List streamedResults = streamedListObjects(fgaClient, request); + List standardResults = listObjects(fgaClient, request); - System.out.println("First 10 objects: " + firstTen); + System.out.println( + "/streamed-list-objects returned " + streamedResults.size() + " objects in a single request."); - // Example 3: Process objects as they arrive (immediate processing) - System.out.println("\n=== Example 3: Process objects immediately ==="); - var request3 = new ClientListObjectsRequest() - .type("document") - .relation("owner") - .user("user:anne"); + System.out.println("/list-objects returned " + standardResults.size() + " objects in a single request."); - var objectStream3 = fgaClient.streamedListObjects(request3).get(); - objectStream3 - .map(StreamedListObjectsResponse::getObject) - .limit(10) - .forEach(obj -> { - // Process each object as it arrives - System.out.println(" Processing: " + obj); - }); - - // Example 4: With options (custom authorization model ID and consistency preference) - System.out.println("\n=== Example 4: With options ==="); - var request4 = new ClientListObjectsRequest() - .type("document") - .relation("owner") - .user("user:anne"); + //////////////////////////////// + // Clean up - delete the test store + fgaClient.deleteStore().get(); + System.out.println("\nDeleted temporary store (" + storeId + ")"); + } + + /** + * Create a temporary store. The store will be deleted at the end of the example. + */ + private static String createStore(OpenFgaClient fgaClient) throws Exception { + var response = fgaClient + .createStore(new CreateStoreRequest().name("Demo Store")) + .get(); + return response.getId(); + } + + /** + * Load the authorization model from a file and write it to the server. + */ + private static String writeModel(OpenFgaClient fgaClient) throws Exception { + var mapper = new ObjectMapper(); + var modelFile = new File("model.json"); + var authModel = mapper.readValue(modelFile, new TypeReference() {}); + var response = fgaClient.writeAuthorizationModel(authModel).get(); + return response.getAuthorizationModelId(); + } - var options4 = new ClientListObjectsOptions().consistency(ConsistencyPreference.HIGHER_CONSISTENCY); + /** + * Write a variable number of tuples to the temporary store. + */ + private static int writeTuples(OpenFgaClient fgaClient, int quantity) throws Exception { + int chunks = quantity / 100; + + for (int chunk = 0; chunk < chunks; chunk++) { + List writes = new ArrayList<>(); + for (int t = 0; t < 100; t++) { + writes.add(new ClientTupleKey() + .user("user:anne") + .relation("owner") + ._object("document:" + (chunk * 100 + t))); + } + fgaClient.write(new ClientWriteRequest().writes(writes)).get(); + } - var objectStream4 = fgaClient.streamedListObjects(request4, options4).get(); - long count = objectStream4.count(); - System.out.println("Total objects with higher consistency: " + count); + return quantity; + } - // Clean up - delete the test store - System.out.println("\n=== Cleaning up ==="); - fgaClient.deleteStore().get(); - System.out.println("Deleted test store: " + store.getId()); + /** + * Send our request to the streaming endpoint, and collect all results. + * + *

Note that streamedListObjects() returns a Stream, so we could process results as they + * come in. For the sake of this example, we'll just collect all the results into a list and + * return them all at once. + */ + private static List streamedListObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request) + throws Exception { + var objectStream = fgaClient.streamedListObjects(request).get(); + return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } - System.out.println("\nExample completed successfully!"); + /** + * For comparison sake, here is the non-streamed version of the same call, using + * listObjects(). + * + *

Note that in the non-streamed version, the server will return a maximum of 1000 results. + */ + private static List listObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request) + throws Exception { + var response = fgaClient.listObjects(request).get(); + return response.getObjects(); } } \ No newline at end of file From be65362a9dd0004e22b79b06f791a13a94c9d609 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Mon, 3 Nov 2025 19:43:49 +0530 Subject: [PATCH 6/9] feat: address coderabbit comment re init streamedlistobjectsapi --- examples/streamed-list-objects/Makefile | 2 ++ src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java | 1 + 2 files changed, 3 insertions(+) diff --git a/examples/streamed-list-objects/Makefile b/examples/streamed-list-objects/Makefile index c671023c..ca3179bc 100644 --- a/examples/streamed-list-objects/Makefile +++ b/examples/streamed-list-objects/Makefile @@ -1,3 +1,5 @@ +.PHONY: all build run run-openfga + all: build openfga_version=latest diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index c0f61c0a..90fd9774 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -59,6 +59,7 @@ public void setAuthorizationModelId(String authorizationModelId) { public void setConfiguration(ClientConfiguration configuration) throws FgaInvalidParameterException { this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(apiClient); } /* ******** From 86fcbc79ac942d575c656f9bc7e0df8a30255d13 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Mon, 3 Nov 2025 20:39:00 +0530 Subject: [PATCH 7/9] feat: address copilot comments --- .../sdk/api/StreamedListObjectsApi.java | 52 +++---- .../sdk/api/client/HttpRequestAttempt.java | 135 ++++++++++++++++-- .../openfga/sdk/api/client/OpenFgaClient.java | 103 ++++++------- .../api/client/StreamedResponseIterator.java | 37 ++--- .../sdk/api/client/StreamingException.java | 31 ++++ .../sdk/api/client/StreamingResponseBody.java | 24 ++++ 6 files changed, 260 insertions(+), 122 deletions(-) create mode 100644 src/main/java/dev/openfga/sdk/api/client/StreamingException.java create mode 100644 src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index 52af0a02..a3539ce6 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -5,13 +5,14 @@ import dev.openfga.sdk.api.client.ApiClient; import dev.openfga.sdk.api.client.ApiResponse; import dev.openfga.sdk.api.client.HttpRequestAttempt; -import dev.openfga.sdk.api.client.StreamingResponseString; +import dev.openfga.sdk.api.client.StreamingResponseBody; import dev.openfga.sdk.api.configuration.Configuration; import dev.openfga.sdk.api.model.ListObjectsRequest; import dev.openfga.sdk.errors.ApiException; import dev.openfga.sdk.errors.FgaInvalidParameterException; import dev.openfga.sdk.telemetry.Attribute; import dev.openfga.sdk.telemetry.Attributes; +import java.net.http.HttpRequest; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -21,9 +22,11 @@ * This class is separate from the generated OpenFgaApi to avoid modifications to generated code. */ public class StreamedListObjectsApi { + private final Configuration configuration; private final ApiClient apiClient; - public StreamedListObjectsApi(ApiClient apiClient) { + public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { + this.configuration = configuration; this.apiClient = apiClient; } @@ -33,13 +36,13 @@ public StreamedListObjectsApi(ApiClient apiClient) { * * @param storeId The store ID (required) * @param body The list objects request body (required) - * @param configuration The configuration to use for this request + * @param requestConfiguration The configuration to use for this request * @return CompletableFuture with raw streaming response * @throws ApiException if fails to make API call * @throws FgaInvalidParameterException if required parameters are missing */ - public CompletableFuture> streamedListObjects( - String storeId, ListObjectsRequest body, Configuration configuration) + public CompletableFuture> streamedListObjects( + String storeId, ListObjectsRequest body, Configuration requestConfiguration) throws ApiException, FgaInvalidParameterException { assertParamExists(storeId, "storeId", "streamedListObjects"); @@ -48,46 +51,25 @@ public CompletableFuture> streamedListObjec String path = "/stores/" + storeId + "/streamed-list-objects"; try { - // Build the HTTP request byte[] requestBody = apiClient.getObjectMapper().writeValueAsBytes(body); - var bodyPublisher = java.net.http.HttpRequest.BodyPublishers.ofByteArray(requestBody); + HttpRequest.Builder requestBuilder = + ApiClient.requestBuilder("POST", path, requestBody, requestConfiguration); - var requestBuilder = java.net.http.HttpRequest.newBuilder() - .uri(java.net.URI.create(configuration.getApiUrl() + path)) - .header("Content-Type", "application/json") - .header("User-Agent", configuration.getUserAgent()) - .POST(bodyPublisher); - - // Add authorization header if needed - if (configuration.getCredentials() != null - && configuration.getCredentials().getApiToken() != null) { - requestBuilder.header( - "Authorization", - "Bearer " + configuration.getCredentials().getApiToken()); - } - - // Add default headers - if (configuration.getDefaultHeaders() != null) { - configuration.getDefaultHeaders().forEach(requestBuilder::header); - } - - var httpRequest = requestBuilder.build(); - - // Build telemetry attributes - Map methodParameters = new HashMap<>(); - methodParameters.put("storeId", storeId); - methodParameters.put("body", body); + HttpRequest httpRequest = requestBuilder.build(); Map telemetryAttributes = new HashMap<>(); telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); - // Use HttpRequestAttempt with StreamingResponseString to get raw response return new HttpRequestAttempt<>( - httpRequest, "streamedListObjects", StreamingResponseString.class, apiClient, configuration) + httpRequest, + "streamedListObjects", + StreamingResponseBody.class, + apiClient, + requestConfiguration) .addTelemetryAttributes(telemetryAttributes) .attemptHttpRequest(); } catch (Exception e) { return CompletableFuture.failedFuture(new ApiException(e)); } } -} +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java index 4dafaa6a..46d6945d 100644 --- a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java +++ b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java @@ -12,6 +12,7 @@ import dev.openfga.sdk.util.RetryAfterHeaderParser; import dev.openfga.sdk.util.RetryStrategy; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.net.http.*; import java.nio.ByteBuffer; @@ -90,18 +91,27 @@ private HttpClient getHttpClient() { private CompletableFuture> attemptHttpRequest( HttpClient httpClient, int retryNumber, Throwable previousError) { - return httpClient - .sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .handle((response, throwable) -> { - if (throwable != null) { - // Handle network errors (no HTTP response received) - return handleNetworkError(throwable, retryNumber); - } - - // Handle HTTP response (including error status codes) - return processHttpResponse(response, retryNumber, previousError); - }) - .thenCompose(Function.identity()); + if (clazz == StreamingResponseBody.class) { + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()) + .handle((response, throwable) -> { + if (throwable != null) { + return handleNetworkError(throwable, retryNumber); + } + return processHttpResponseStreaming(response, retryNumber, previousError); + }) + .thenCompose(Function.identity()); + } else { + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .handle((response, throwable) -> { + if (throwable != null) { + return handleNetworkError(throwable, retryNumber); + } + return processHttpResponse(response, retryNumber, previousError); + }) + .thenCompose(Function.identity()); + } } private CompletableFuture> handleNetworkError(Throwable throwable, int retryNumber) { @@ -211,6 +221,105 @@ private CompletableFuture> processHttpResponse( response.statusCode(), response.headers().map(), response.body(), modeledResponse)); } + private CompletableFuture> processHttpResponseStreaming( + HttpResponse response, int retryNumber, Throwable previousError) { + int statusCode = response.statusCode(); + + if (!HttpStatusCode.isSuccessful(statusCode)) { + try { + String bodyStr = new String(response.body().readAllBytes(), StandardCharsets.UTF_8); + HttpResponse stringResponse = new HttpResponse<>() { + @Override + public int statusCode() { + return response.statusCode(); + } + + @Override + public HttpRequest request() { + return response.request(); + } + + @Override + public java.util.Optional> previousResponse() { + return java.util.Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return response.headers(); + } + + @Override + public String body() { + return bodyStr; + } + + @Override + public java.util.Optional sslSession() { + return response.sslSession(); + } + + @Override + public java.net.URI uri() { + return response.uri(); + } + + @Override + public HttpClient.Version version() { + return response.version(); + } + }; + + Optional fgaError = + FgaError.getError(name, request, configuration, stringResponse, previousError); + if (fgaError.isPresent()) { + FgaError error = fgaError.get(); + if (retryNumber < configuration.getMaxRetries()) { + Optional retryAfterDelay = response.headers() + .firstValue(FgaConstants.RETRY_AFTER_HEADER_NAME) + .flatMap(RetryAfterHeaderParser::parseRetryAfter); + if (RetryStrategy.shouldRetry(statusCode)) { + return handleHttpErrorRetry(retryAfterDelay, retryNumber, error); + } + } + return CompletableFuture.failedFuture(error); + } + } catch (IOException e) { + return CompletableFuture.failedFuture(new ApiException(e)); + } + } + + addTelemetryAttributes(Attributes.fromHttpResponse(response, this.configuration.getCredentials())); + + if (retryNumber > 0) { + addTelemetryAttribute(Attributes.HTTP_REQUEST_RESEND_COUNT, String.valueOf(retryNumber)); + } + + if (response.headers() + .firstValue(FgaConstants.QUERY_DURATION_HEADER_NAME) + .isPresent()) { + String queryDuration = response.headers() + .firstValue(FgaConstants.QUERY_DURATION_HEADER_NAME) + .orElse(null); + + if (!isNullOrWhitespace(queryDuration)) { + try { + double queryDurationDouble = Double.parseDouble(queryDuration); + telemetry.metrics().queryDuration(queryDurationDouble, this.getTelemetryAttributes()); + } catch (NumberFormatException e) { + } + } + } + + Double requestDuration = (double) (System.currentTimeMillis() - requestStarted); + telemetry.metrics().requestDuration(requestDuration, this.getTelemetryAttributes()); + + @SuppressWarnings("unchecked") + T result = (T) new StreamingResponseBody(response.body()); + return CompletableFuture.completedFuture( + new ApiResponse<>(response.statusCode(), response.headers().map(), null, result)); + } + private CompletableFuture deserializeResponse(HttpResponse response) { if (clazz == Void.class && isNullOrWhitespace(response.body())) { return CompletableFuture.completedFuture(null); @@ -262,4 +371,4 @@ public void onComplete() { out.flush(); } } -} +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index 90fd9774..8bf826a8 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -34,7 +34,7 @@ public OpenFgaClient(ClientConfiguration configuration, ApiClient apiClient) thr this.apiClient = apiClient; this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); - this.streamedListObjectsApi = new StreamedListObjectsApi(apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(configuration, apiClient); } /* *********** @@ -59,7 +59,7 @@ public void setAuthorizationModelId(String authorizationModelId) { public void setConfiguration(ClientConfiguration configuration) throws FgaInvalidParameterException { this.configuration = configuration; this.api = new OpenFgaApi(configuration, apiClient); - this.streamedListObjectsApi = new StreamedListObjectsApi(apiClient); + this.streamedListObjectsApi = new StreamedListObjectsApi(configuration, apiClient); } /* ******** @@ -82,7 +82,7 @@ public CompletableFuture listStores(ClientListStoresOp configuration.assertValid(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.listStores( - options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) + options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) .thenApply(ClientListStoresResponse::new); } @@ -299,12 +299,12 @@ public CompletableFuture readChanges( var options = readChangesOptions != null ? readChangesOptions : new ClientReadChangesOptions(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.readChanges( - storeId, - request.getType(), - options.getPageSize(), - options.getContinuationToken(), - request.getStartTime(), - overrides)) + storeId, + request.getType(), + options.getPageSize(), + options.getContinuationToken(), + request.getStartTime(), + overrides)) .thenApply(ClientReadChangesResponse::new); } @@ -495,19 +495,19 @@ private CompletableFuture writeTransactions( // For transaction-based writes, all tuples are successful if the call succeeds List writeResponses = writeTuples != null ? writeTuples.stream() - .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); List deleteResponses = deleteTuples != null ? deleteTuples.stream() - .map(tuple -> new ClientWriteSingleResponse( - new TupleKey() - .user(tuple.getUser()) - .relation(tuple.getRelation()) - ._object(tuple.getObject()), - ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse( + new TupleKey() + .user(tuple.getUser()) + .relation(tuple.getRelation()) + ._object(tuple.getObject()), + ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); return new ClientWriteResponse(writeResponses, deleteResponses); @@ -642,18 +642,18 @@ private CompletableFuture writeNonTransaction( CompletableFuture> allWritesFuture = writeFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> writeFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> writeFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); CompletableFuture> allDeletesFuture = deleteFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> deleteFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> deleteFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); return CompletableFuture.allOf(allWritesFuture, allDeletesFuture) .thenApply(v -> new ClientWriteResponse(allWritesFuture.join(), allDeletesFuture.join())); @@ -831,7 +831,7 @@ public CompletableFuture> clientBatchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckClientOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -892,8 +892,8 @@ public CompletableFuture batchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) - .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) + .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -954,22 +954,22 @@ public CompletableFuture batchCheck( var override = new ConfigurationOverride().addHeaders(options); Consumer> singleBatchCheckRequest = request -> call(() -> { - BatchCheckRequest body = new BatchCheckRequest().checks(request); - if (options.getConsistency() != null) { - body.consistency(options.getConsistency()); - } + BatchCheckRequest body = new BatchCheckRequest().checks(request); + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } - // Set authorizationModelId from options if available; otherwise, use the default from configuration - String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) - ? options.getAuthorizationModelId() - : configuration.getAuthorizationModelId(); + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); - if (!isNullOrWhitespace(authorizationModelId)) { - body.authorizationModelId(authorizationModelId); - } + if (!isNullOrWhitespace(authorizationModelId)) { + body.authorizationModelId(authorizationModelId); + } - return api.batchCheck(configuration.getStoreId(), body, override); - }) + return api.batchCheck(configuration.getStoreId(), body, override); + }) .whenComplete((batchCheckResponseApiResponse, throwable) -> { try { if (throwable != null) { @@ -1167,12 +1167,19 @@ public CompletableFuture> streamedListObject return call(() -> streamedListObjectsApi.streamedListObjects(storeId, body, config)) .thenApply(response -> { - String ndjsonResponse = response.getData().getRawResponse(); + StreamingResponseBody srb = response.getData(); + java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(srb.getBody(), java.nio.charset.StandardCharsets.UTF_8)); StreamedResponseIterator iterator = - new StreamedResponseIterator(ndjsonResponse, apiClient.getObjectMapper()); - // Convert Iterator to Stream - return java.util.stream.StreamSupport.stream( + new StreamedResponseIterator(reader, apiClient.getObjectMapper()); + Stream stream = java.util.stream.StreamSupport.stream( ((Iterable) () -> iterator).spliterator(), false); + return stream.onClose(() -> { + try { + srb.close(); + } catch (java.io.IOException ignore) { + } + }); }); } @@ -1198,7 +1205,7 @@ public CompletableFuture listRelations( var options = listRelationsOptions != null ? listRelationsOptions : new ClientListRelationsOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -1383,4 +1390,4 @@ private CompletableFuture call(CheckedInvocation action) { return CompletableFuture.failedFuture(throwable); } } -} +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java index 75db6edd..26dd3892 100644 --- a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java +++ b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java @@ -1,12 +1,11 @@ package dev.openfga.sdk.api.client; import com.fasterxml.jackson.databind.ObjectMapper; -import dev.openfga.sdk.api.model.Status; import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse; import dev.openfga.sdk.api.model.StreamedListObjectsResponse; import java.io.BufferedReader; import java.io.IOException; -import java.io.StringReader; +import java.io.Reader; import java.util.Iterator; import java.util.NoSuchElementException; @@ -15,17 +14,17 @@ * Each line in the response is a StreamResultOfStreamedListObjectsResponse. * * If an error is encountered in the stream (either from parsing or from an error - * response), it will be thrown as a RuntimeException when hasNext() or next() is called. + * response), it will be thrown as a StreamingException when hasNext() or next() is called. */ public class StreamedResponseIterator implements Iterator { private final BufferedReader reader; private final ObjectMapper objectMapper; private StreamedListObjectsResponse nextItem; private boolean hasNext; - private RuntimeException pendingException; + private StreamingException pendingException; - public StreamedResponseIterator(String ndjsonResponse, ObjectMapper objectMapper) { - this.reader = new BufferedReader(new StringReader(ndjsonResponse)); + public StreamedResponseIterator(Reader reader, ObjectMapper objectMapper) { + this.reader = reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); this.objectMapper = objectMapper; this.hasNext = true; this.pendingException = null; @@ -50,12 +49,7 @@ private void advance() { } if (streamResult.getError() != null) { - // Handle error in stream - convert to exception - Status error = streamResult.getError(); - String errorMessage = String.format( - "Error in streaming response: code=%d, message=%s", - error.getCode(), error.getMessage() != null ? error.getMessage() : "Unknown error"); - pendingException = new RuntimeException(errorMessage); + pendingException = new StreamingException(streamResult.getError()); hasNext = false; nextItem = null; return; @@ -65,7 +59,7 @@ private void advance() { hasNext = false; nextItem = null; } catch (IOException e) { - pendingException = new RuntimeException("Failed to parse streaming response", e); + pendingException = new StreamingException("Failed to parse streaming response", e); hasNext = false; nextItem = null; } @@ -73,22 +67,16 @@ private void advance() { @Override public boolean hasNext() { - // If there's a pending exception, throw it before returning false if (pendingException != null) { - RuntimeException ex = pendingException; - pendingException = null; // Clear it so we don't throw multiple times - throw ex; + throw pendingException; } return hasNext && nextItem != null; } @Override public StreamedListObjectsResponse next() { - // Check for pending exception first if (pendingException != null) { - RuntimeException ex = pendingException; - pendingException = null; - throw ex; + throw pendingException; } if (!hasNext()) { @@ -98,13 +86,10 @@ public StreamedListObjectsResponse next() { StreamedListObjectsResponse current = nextItem; advance(); - // Check again after advance in case an error occurred if (pendingException != null) { - RuntimeException ex = pendingException; - pendingException = null; - throw ex; + throw pendingException; } return current; } -} +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingException.java b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java new file mode 100644 index 00000000..b77725b1 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java @@ -0,0 +1,31 @@ +package dev.openfga.sdk.api.client; + +import dev.openfga.sdk.api.model.Status; + +public class StreamingException extends RuntimeException { + private final Status error; + + public StreamingException(Status error) { + super(formatErrorMessage(error)); + this.error = error; + } + + public StreamingException(String message, Throwable cause) { + super(message, cause); + this.error = null; + } + + public Status getError() { + return error; + } + + public Integer getCode() { + return error != null ? error.getCode() : null; + } + + private static String formatErrorMessage(Status error) { + String codeStr = error.getCode() != null ? String.valueOf(error.getCode()) : "unknown"; + String messageStr = error.getMessage() != null ? error.getMessage() : "Unknown error"; + return String.format("Error in streaming response: code=%s, message=%s", codeStr, messageStr); + } +} \ No newline at end of file diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java new file mode 100644 index 00000000..c42318e5 --- /dev/null +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java @@ -0,0 +1,24 @@ +package dev.openfga.sdk.api.client; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +public final class StreamingResponseBody implements Closeable { + private final InputStream body; + + public StreamingResponseBody(InputStream body) { + this.body = body; + } + + public InputStream getBody() { + return body; + } + + @Override + public void close() throws IOException { + if (body != null) { + body.close(); + } + } +} \ No newline at end of file From fde2193abf47be67c12d3343dbf6dadf87dc015a Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Mon, 3 Nov 2025 21:04:15 +0530 Subject: [PATCH 8/9] fmt: apply spotless --- .../sdk/api/StreamedListObjectsApi.java | 12 +-- .../sdk/api/client/HttpRequestAttempt.java | 2 +- .../openfga/sdk/api/client/OpenFgaClient.java | 84 +++++++++---------- .../api/client/StreamedResponseIterator.java | 2 +- .../sdk/api/client/StreamingException.java | 2 +- .../sdk/api/client/StreamingResponseBody.java | 2 +- 6 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java index a3539ce6..8ae1eb43 100644 --- a/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java +++ b/src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java @@ -61,15 +61,15 @@ public CompletableFuture> streamedListObjects telemetryAttributes.put(Attributes.FGA_CLIENT_REQUEST_METHOD, "StreamedListObjects"); return new HttpRequestAttempt<>( - httpRequest, - "streamedListObjects", - StreamingResponseBody.class, - apiClient, - requestConfiguration) + httpRequest, + "streamedListObjects", + StreamingResponseBody.class, + apiClient, + requestConfiguration) .addTelemetryAttributes(telemetryAttributes) .attemptHttpRequest(); } catch (Exception e) { return CompletableFuture.failedFuture(new ApiException(e)); } } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java index 46d6945d..6364a90a 100644 --- a/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java +++ b/src/main/java/dev/openfga/sdk/api/client/HttpRequestAttempt.java @@ -371,4 +371,4 @@ public void onComplete() { out.flush(); } } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index 8bf826a8..37a8e0a1 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -82,7 +82,7 @@ public CompletableFuture listStores(ClientListStoresOp configuration.assertValid(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.listStores( - options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) + options.getPageSize(), options.getContinuationToken(), options.getName(), overrides)) .thenApply(ClientListStoresResponse::new); } @@ -299,12 +299,12 @@ public CompletableFuture readChanges( var options = readChangesOptions != null ? readChangesOptions : new ClientReadChangesOptions(); var overrides = new ConfigurationOverride().addHeaders(options); return call(() -> api.readChanges( - storeId, - request.getType(), - options.getPageSize(), - options.getContinuationToken(), - request.getStartTime(), - overrides)) + storeId, + request.getType(), + options.getPageSize(), + options.getContinuationToken(), + request.getStartTime(), + overrides)) .thenApply(ClientReadChangesResponse::new); } @@ -495,19 +495,19 @@ private CompletableFuture writeTransactions( // For transaction-based writes, all tuples are successful if the call succeeds List writeResponses = writeTuples != null ? writeTuples.stream() - .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse(tuple.asTupleKey(), ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); List deleteResponses = deleteTuples != null ? deleteTuples.stream() - .map(tuple -> new ClientWriteSingleResponse( - new TupleKey() - .user(tuple.getUser()) - .relation(tuple.getRelation()) - ._object(tuple.getObject()), - ClientWriteStatus.SUCCESS)) - .collect(Collectors.toList()) + .map(tuple -> new ClientWriteSingleResponse( + new TupleKey() + .user(tuple.getUser()) + .relation(tuple.getRelation()) + ._object(tuple.getObject()), + ClientWriteStatus.SUCCESS)) + .collect(Collectors.toList()) : new ArrayList<>(); return new ClientWriteResponse(writeResponses, deleteResponses); @@ -642,18 +642,18 @@ private CompletableFuture writeNonTransaction( CompletableFuture> allWritesFuture = writeFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> writeFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> writeFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); CompletableFuture> allDeletesFuture = deleteFutures.isEmpty() ? CompletableFuture.completedFuture(new ArrayList<>()) : CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) - .thenApply(v -> deleteFutures.stream() - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); + .thenApply(v -> deleteFutures.stream() + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); return CompletableFuture.allOf(allWritesFuture, allDeletesFuture) .thenApply(v -> new ClientWriteResponse(allWritesFuture.join(), allDeletesFuture.join())); @@ -831,7 +831,7 @@ public CompletableFuture> clientBatchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckClientOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -892,8 +892,8 @@ public CompletableFuture batchCheck( var options = batchCheckOptions != null ? batchCheckOptions : new ClientBatchCheckOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) - .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS) + .maxBatchSize(FgaConstants.CLIENT_MAX_BATCH_SIZE); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -954,22 +954,22 @@ public CompletableFuture batchCheck( var override = new ConfigurationOverride().addHeaders(options); Consumer> singleBatchCheckRequest = request -> call(() -> { - BatchCheckRequest body = new BatchCheckRequest().checks(request); - if (options.getConsistency() != null) { - body.consistency(options.getConsistency()); - } + BatchCheckRequest body = new BatchCheckRequest().checks(request); + if (options.getConsistency() != null) { + body.consistency(options.getConsistency()); + } - // Set authorizationModelId from options if available; otherwise, use the default from configuration - String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) - ? options.getAuthorizationModelId() - : configuration.getAuthorizationModelId(); + // Set authorizationModelId from options if available; otherwise, use the default from configuration + String authorizationModelId = !isNullOrWhitespace(options.getAuthorizationModelId()) + ? options.getAuthorizationModelId() + : configuration.getAuthorizationModelId(); - if (!isNullOrWhitespace(authorizationModelId)) { - body.authorizationModelId(authorizationModelId); - } + if (!isNullOrWhitespace(authorizationModelId)) { + body.authorizationModelId(authorizationModelId); + } - return api.batchCheck(configuration.getStoreId(), body, override); - }) + return api.batchCheck(configuration.getStoreId(), body, override); + }) .whenComplete((batchCheckResponseApiResponse, throwable) -> { try { if (throwable != null) { @@ -1205,7 +1205,7 @@ public CompletableFuture listRelations( var options = listRelationsOptions != null ? listRelationsOptions : new ClientListRelationsOptions() - .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); + .maxParallelRequests(FgaConstants.CLIENT_MAX_METHOD_PARALLEL_REQUESTS); HashMap headers = options.getAdditionalHeaders() != null ? new HashMap<>(options.getAdditionalHeaders()) @@ -1390,4 +1390,4 @@ private CompletableFuture call(CheckedInvocation action) { return CompletableFuture.failedFuture(throwable); } } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java index 26dd3892..e7aac178 100644 --- a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java +++ b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java @@ -92,4 +92,4 @@ public StreamedListObjectsResponse next() { return current; } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingException.java b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java index b77725b1..a6283222 100644 --- a/src/main/java/dev/openfga/sdk/api/client/StreamingException.java +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingException.java @@ -28,4 +28,4 @@ private static String formatErrorMessage(Status error) { String messageStr = error.getMessage() != null ? error.getMessage() : "Unknown error"; return String.format("Error in streaming response: code=%s, message=%s", codeStr, messageStr); } -} \ No newline at end of file +} diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java index c42318e5..72bb0af1 100644 --- a/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java +++ b/src/main/java/dev/openfga/sdk/api/client/StreamingResponseBody.java @@ -21,4 +21,4 @@ public void close() throws IOException { body.close(); } } -} \ No newline at end of file +} From c13c9b8d15062f8722ed340796291fc73b821bb3 Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Mon, 3 Nov 2025 23:14:58 +0530 Subject: [PATCH 9/9] feat: add resource cleanup and try-catch --- examples/streamed-list-objects/README.md | 48 ++++++++++--------- .../StreamedListObjectsExample.java | 5 +- .../openfga/sdk/api/client/OpenFgaClient.java | 4 ++ .../api/client/StreamedResponseIterator.java | 7 ++- .../client/OpenFgaClientIntegrationTest.java | 27 +++++++++++ .../sdk/api/client/OpenFgaClientTest.java | 17 +++---- 6 files changed, 74 insertions(+), 34 deletions(-) diff --git a/examples/streamed-list-objects/README.md b/examples/streamed-list-objects/README.md index 3e0538b1..33ce170f 100644 --- a/examples/streamed-list-objects/README.md +++ b/examples/streamed-list-objects/README.md @@ -57,41 +57,43 @@ export FGA_API_AUDIENCE=your_audience ```java // Create a request var request = new ClientListObjectsRequest() - .type("document") - .relation("owner") - .user("user:anne"); - -// Call the streaming API -var objectStream = fgaClient.streamedListObjects(request).get(); + .type("document") + .relation("owner") + .user("user:anne"); +// Call the streaming API and ensure proper resource cleanup +try (var objectStream = fgaClient.streamedListObjects(request).get()) { // Collect all results List objects = objectStream - .map(StreamedListObjectsResponse::getObject) - .collect(Collectors.toList()); + .map(StreamedListObjectsResponse::getObject) + .collect(Collectors.toList()); +} ``` ### Early Termination ```java -// Get only the first 10 results -var objectStream = fgaClient.streamedListObjects(request).get(); -List firstTen = objectStream - .map(StreamedListObjectsResponse::getObject) - .limit(10) - .collect(Collectors.toList()); +// Get only the first 10 results, ensuring the stream is closed properly +try (var objectStream = fgaClient.streamedListObjects(request).get()) { + List firstTen = objectStream + .map(StreamedListObjectsResponse::getObject) + .limit(10) + .collect(Collectors.toList()); +} ``` ### Process as You Go ```java // Process each object immediately as it arrives -var objectStream = fgaClient.streamedListObjects(request).get(); -objectStream - .map(StreamedListObjectsResponse::getObject) - .forEach(obj -> { - // Do something with each object - System.out.println("Processing: " + obj); - }); +try (var objectStream = fgaClient.streamedListObjects(request).get()) { + objectStream + .map(StreamedListObjectsResponse::getObject) + .forEach(obj -> { + // Do something with each object + System.out.println("Processing: " + obj); + }); +} ``` ### With Options @@ -99,8 +101,8 @@ objectStream ```java // Use options to specify consistency preference var options = new ClientListObjectsOptions() - .consistency(ConsistencyPreference.HIGHER_CONSISTENCY) - .authorizationModelId("01GXSXXXXXXXXXXXXXXXX"); + .consistency(ConsistencyPreference.HIGHER_CONSISTENCY) + .authorizationModelId("01GXSXXXXXXXXXXXXXXXX"); var objectStream = fgaClient.streamedListObjects(request, options).get(); ``` diff --git a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java index 7a73bb68..1e9634d8 100644 --- a/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java +++ b/examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java @@ -138,8 +138,9 @@ private static int writeTuples(OpenFgaClient fgaClient, int quantity) throws Exc */ private static List streamedListObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request) throws Exception { - var objectStream = fgaClient.streamedListObjects(request).get(); - return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + try (var objectStream = fgaClient.streamedListObjects(request).get()) { + return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } } /** diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index 37a8e0a1..a6cc0ed7 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -1175,6 +1175,10 @@ public CompletableFuture> streamedListObject Stream stream = java.util.stream.StreamSupport.stream( ((Iterable) () -> iterator).spliterator(), false); return stream.onClose(() -> { + try { + iterator.close(); + } catch (java.io.IOException ignore) { + } try { srb.close(); } catch (java.io.IOException ignore) { diff --git a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java index e7aac178..a6e83592 100644 --- a/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java +++ b/src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java @@ -16,7 +16,7 @@ * If an error is encountered in the stream (either from parsing or from an error * response), it will be thrown as a StreamingException when hasNext() or next() is called. */ -public class StreamedResponseIterator implements Iterator { +public class StreamedResponseIterator implements Iterator, AutoCloseable { private final BufferedReader reader; private final ObjectMapper objectMapper; private StreamedListObjectsResponse nextItem; @@ -92,4 +92,9 @@ public StreamedListObjectsResponse next() { return current; } + + @Override + public void close() throws IOException { + reader.close(); + } } diff --git a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java index 8ed6255d..5cfab345 100644 --- a/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java +++ b/src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java @@ -328,6 +328,33 @@ public void write_and_listObjects() throws Exception { assertEquals(DEFAULT_DOC, response.getObjects().get(0)); } + @Test + public void write_and_streamedListObjects() throws Exception { + // Given + String storeName = thisTestName(); + String storeId = createStore(storeName); + fga.setStoreId(storeId); + String authModelId = writeAuthModel(storeId); + fga.setAuthorizationModelId(authModelId); + ClientWriteRequest writeRequest = new ClientWriteRequest().writes(List.of(DEFAULT_TUPLE_KEY)); + ClientListObjectsRequest listObjectsRequest = new ClientListObjectsRequest() + .user(DEFAULT_USER) + .relation("reader") + .type("document"); + + // When + fga.write(writeRequest).get(); + List objects; + try (var stream = fga.streamedListObjects(listObjectsRequest).get()) { + objects = stream.map(StreamedListObjectsResponse::getObject).collect(java.util.stream.Collectors.toList()); + } + + // Then + assertNotNull(objects); + assertEquals(1, objects.size()); + assertEquals(DEFAULT_DOC, objects.get(0)); + } + @Test public void write_readAssertions() throws Exception { // Given diff --git a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java index 966fc904..1c2f1843 100644 --- a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java @@ -2610,10 +2610,11 @@ public void streamedListObjectsTest() throws Exception { .user(DEFAULT_USER); // When - Stream responseStream = - fga.streamedListObjects(request).get(); - List objects = - responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + List objects; + try (Stream responseStream = + fga.streamedListObjects(request).get()) { + objects = responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } // Then mockHttpClient.verify().post(postPath).withBody(is(expectedBody)).called(1); @@ -2702,12 +2703,12 @@ public void streamedListObjects_errorInStream() throws Exception { .user(DEFAULT_USER); // When - Stream responseStream = - fga.streamedListObjects(request).get(); - // Then - should throw when processing the stream var exception = assertThrows(RuntimeException.class, () -> { - responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + try (Stream responseStream = + fga.streamedListObjects(request).get()) { + responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList()); + } }); assertTrue(exception.getMessage().contains("Error in streaming response"));