From 012d641e06ca81c94385ede1b24a766d7adbe311 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 25 Jan 2022 15:11:56 -0600 Subject: [PATCH 1/7] [OAuth2] Enable configurable preemptive token refresh in Java Client --- .../oauth2/AuthenticationFactoryOAuth2.java | 31 +++++- .../auth/oauth2/AuthenticationOAuth2.java | 100 ++++++++++++++++-- .../auth/oauth2/AuthenticationOAuth2Test.java | 2 +- 3 files changed, 121 insertions(+), 12 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index cf567747567e4..55177b655d095 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -60,6 +60,35 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl .audience(audience) .scope(scope) .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), 0.9); + } + + /** + * Authenticate with client credentials. + * + * @param issuerUrl the issuer URL + * @param credentialsUrl the credentials URL + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * @param expiryAdjustment A field that represents how early to start attempting to refresh the access token. + * The value must be greater than 0. A value greater than or equal to 1 will turn off + * preemptive token retrieval. When less than 1, the value represents the percentage + * of the `expires_in` seconds field from the Access Token Response, and the resulting value + * will be used to schedule a refresh token task in the client. It defaults to 0.9. + * @return an Authentication object + */ + public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, + double expiryAdjustment) { + ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + .issuerUrl(issuerUrl) + .privateKey(credentialsUrl.toExternalForm()) + .audience(audience) + .scope(scope) + .build(); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), expiryAdjustment); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 022918d807d19..402dfd579a572 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -22,6 +22,8 @@ import java.time.Clock; import java.time.Instant; import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.NotImplementedException; @@ -31,10 +33,22 @@ import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; /** * Pulsar client authentication provider based on OAuth 2.0. + * + * The class has an option to preemptively refresh the token by configuring the {@link #expiryAdjustment}. This value + * must be greater than 0. When it is less than 1, it is treated as a percentage and is multiplied by the most recent + * token's `expires_in` value to determine how early this class should start attempting to retrieve another token. When + * it is greater than or equal to 1, this feature is turned off, and the client will only refresh the token when the + * client attempts to use an already expired token. + * + * The current implementation of this class can block the calling thread. + * + * This class is intended to be called from multiple threads, and is therefore designed to be threadsafe. */ @Slf4j public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { @@ -42,20 +56,37 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; public static final String AUTH_METHOD_NAME = "token"; - public static final double EXPIRY_ADJUSTMENT = 0.9; private static final long serialVersionUID = 1L; + private final transient ScheduledThreadPoolExecutor scheduler; + private final transient Backoff backoff; + private final double expiryAdjustment; final Clock clock; - Flow flow; - transient CachedToken cachedToken; + volatile Flow flow; + transient volatile CachedToken cachedToken; + + private AuthenticationOAuth2(Clock clock, double expiryAdjustment) { + if (expiryAdjustment <= 0) { + throw new IllegalArgumentException("ExpiryAdjustment must be greater than 0."); + } + this.clock = clock; + this.expiryAdjustment = expiryAdjustment; + boolean isPreemptiveTokenRefresh = expiryAdjustment < 1; + this.backoff = isPreemptiveTokenRefresh ? new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(5, TimeUnit.MINUTES) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create() : null; + this.scheduler = isPreemptiveTokenRefresh ? new ScheduledThreadPoolExecutor(1) : null; + } public AuthenticationOAuth2() { - this.clock = Clock.systemDefaultZone(); + this(Clock.systemDefaultZone(), 0.9); } - AuthenticationOAuth2(Flow flow, Clock clock) { + AuthenticationOAuth2(Flow flow, Clock clock, double expiryAdjustment) { + this(clock, expiryAdjustment); this.flow = flow; - this.clock = clock; } @Override @@ -96,21 +127,71 @@ public void start() throws PulsarClientException { flow.initialize(); } + /** + * The first time that this method is called, it retrieves a token. All subsequent + * calls should get a cached value. However, if there is an issue with the Identity + * Provider, there is a chance that the background thread responsible for keeping + * the refresh token hot will + * @return The authentication data identifying this client that will be sent to the broker + * @throws PulsarClientException + */ @Override public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException { if (this.cachedToken == null || this.cachedToken.isExpired()) { - TokenResult tr = this.flow.authenticate(); - this.cachedToken = new CachedToken(tr); + this.authenticate(); } return this.cachedToken.getAuthData(); } + /** + * Retrieve the token (synchronously), and then schedule refresh runnable. + */ + private void authenticate() throws PulsarClientException { + if (log.isDebugEnabled()) { + log.debug("Attempting to retrieve OAuth2 token now."); + } + TokenResult tr = this.flow.authenticate(); + this.cachedToken = new CachedToken(tr); + handleSuccessfulTokenRefresh(); + } + + private void handleSuccessfulTokenRefresh() { + if (scheduler != null) { + backoff.reset(); + long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); + scheduleRefresh((long) (expiresInMillis * expiryAdjustment)); + } + } + + /** + * Attempt to refresh the token. If successful, schedule the next refresh task according to the + * {@link #expiryAdjustment}. If failed, schedule another attempt to refresh the token according to the + * {@link #backoff} policy. + */ + private void refreshToken() { + try { + this.authenticate(); + } catch (Throwable e) { + long delayMillis = backoff.next(); + log.error("Error refreshing token. Will retry in {} millis.", delayMillis, e); + scheduleRefresh(delayMillis); + } + } + + private void scheduleRefresh(long delayMillis) { + scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS); + } + @Override public void close() throws IOException { try { flow.close(); } catch (Exception e) { throw new IOException(e); + } finally { + if (scheduler != null) { + scheduler.shutdownNow(); + } } } @@ -122,8 +203,7 @@ class CachedToken { public CachedToken(TokenResult latest) { this.latest = latest; - int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT); - this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn); + this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(latest.getExpiresIn()); this.authData = new AuthenticationDataOAuth2(latest.getAccessToken()); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index 3ae578c34845c..080cee6a6df72 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -56,7 +56,7 @@ public class AuthenticationOAuth2Test { public void before() { this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); this.flow = mock(Flow.class); - this.auth = new AuthenticationOAuth2(flow, this.clock); + this.auth = new AuthenticationOAuth2(flow, this.clock, 0.9); } @Test From af7100ba6965371550844535a067fe9ce37788b7 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jan 2022 15:27:41 -0600 Subject: [PATCH 2/7] Use single thread; improve naming; improve thread safety; add docs --- .../oauth2/AuthenticationFactoryOAuth2.java | 25 ++-- .../auth/oauth2/AuthenticationOAuth2.java | 132 +++++++++++++----- 2 files changed, 106 insertions(+), 51 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 55177b655d095..53c6fc8449879 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -54,13 +54,8 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - ClientCredentialsFlow flow = ClientCredentialsFlow.builder() - .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) - .audience(audience) - .scope(scope) - .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), 0.9); + ClientCredentialsFlow flow = buildFlow(issuerUrl, credentialsUrl, audience, scope); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); } /** @@ -75,20 +70,24 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * and each string adds an additional access range to the requested scope. * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 * @param expiryAdjustment A field that represents how early to start attempting to refresh the access token. - * The value must be greater than 0. A value greater than or equal to 1 will turn off - * preemptive token retrieval. When less than 1, the value represents the percentage - * of the `expires_in` seconds field from the Access Token Response, and the resulting value - * will be used to schedule a refresh token task in the client. It defaults to 0.9. + * See {@link AuthenticationOAuth2} for details. * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, double expiryAdjustment) { - ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + ClientCredentialsFlow flow = buildFlow(issuerUrl, credentialsUrl, audience, scope); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), expiryAdjustment); + } + + /** + * Internal method to build a {@link ClientCredentialsFlow}. + */ + private static ClientCredentialsFlow buildFlow(URL issuerUrl, URL credentialsUrl, String audience, String scope) { + return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .privateKey(credentialsUrl.toExternalForm()) .audience(audience) .scope(scope) .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), expiryAdjustment); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 402dfd579a572..8a4ea9d8b8e6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.time.Instant; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.Data; @@ -40,55 +41,69 @@ /** * Pulsar client authentication provider based on OAuth 2.0. * - * The class has an option to preemptively refresh the token by configuring the {@link #expiryAdjustment}. This value - * must be greater than 0. When it is less than 1, it is treated as a percentage and is multiplied by the most recent - * token's `expires_in` value to determine how early this class should start attempting to retrieve another token. When - * it is greater than or equal to 1, this feature is turned off, and the client will only refresh the token when the - * client attempts to use an already expired token. + * The first call to {@link #getAuthData()} will result in a blocking network call to retrieve the OAuth2.0 token from + * the Identity Provider. After that, there are two behaviors, depending on {@link #earlyTokenRefreshPercent}: + * + * 1. If {@link #earlyTokenRefreshPercent} is less than 1, this authentication class will schedule a runnable to refresh + * the token in n seconds where n is the result of multiplying {@link #earlyTokenRefreshPercent} and the `expires_in` + * value returned by the Identity Provider. If the call to the Identity Provider fails, this class will retry attempting + * to refresh the token using an exponential backoff. If the token is not refreshed before it expires, the Pulsar client + * will make one final blocking call to the Identity Provider. If that call fails, this class will pass the failure to + * the Pulsar client. This proactive approach to token management is good for use cases that want to avoid latency + * spikes from calls to the Identity Provider and that want to be able to withstand short Identity Provider outages. The + * tradeoff is that this class consumes slightly more resources. + * + * 2. If {@link #earlyTokenRefreshPercent} is greater than or equal to 1, this class will not retrieve a new token until + * the {@link #getAuthData()} method is called while the cached token is expired. If the call to the Identity Provider + * fails, this class will pass the failure to the Pulsar client. This lazy approach is good for use cases that are not + * latency sensitive and that will not use the token frequently. + * + * {@link #earlyTokenRefreshPercent} must be greater than 0. It defaults to 1, which means that early token refresh is + * disabled by default. * * The current implementation of this class can block the calling thread. * - * This class is intended to be called from multiple threads, and is therefore designed to be threadsafe. + * This class is intended to be called from multiple threads, and is therefore designed to be thread-safe. */ @Slf4j public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; + public static final String EARLY_TOKEN_REFRESH_PERCENT = "early_token_refresh_percent"; + public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // feature disabled by default public static final String AUTH_METHOD_NAME = "token"; private static final long serialVersionUID = 1L; + private static final transient ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); - private final transient ScheduledThreadPoolExecutor scheduler; - private final transient Backoff backoff; - private final double expiryAdjustment; + private volatile double earlyTokenRefreshPercent; final Clock clock; volatile Flow flow; - transient volatile CachedToken cachedToken; + private transient volatile CachedToken cachedToken; - private AuthenticationOAuth2(Clock clock, double expiryAdjustment) { - if (expiryAdjustment <= 0) { - throw new IllegalArgumentException("ExpiryAdjustment must be greater than 0."); - } - this.clock = clock; - this.expiryAdjustment = expiryAdjustment; - boolean isPreemptiveTokenRefresh = expiryAdjustment < 1; - this.backoff = isPreemptiveTokenRefresh ? new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(5, TimeUnit.MINUTES) - .setMandatoryStop(0, TimeUnit.MILLISECONDS) - .create() : null; - this.scheduler = isPreemptiveTokenRefresh ? new ScheduledThreadPoolExecutor(1) : null; - } + // Only ever updated on the single scheduler thread. Does not need to be volatile. + private transient Backoff backoff; + private transient ScheduledFuture nextRefreshAttempt; + // No args constructor used when creating class with reflection public AuthenticationOAuth2() { - this(Clock.systemDefaultZone(), 0.9); + this(null, Clock.systemDefaultZone()); } - AuthenticationOAuth2(Flow flow, Clock clock, double expiryAdjustment) { - this(clock, expiryAdjustment); + AuthenticationOAuth2(Flow flow, Clock clock) { + this(flow, clock, EARLY_TOKEN_REFRESH_PERCENT_DEFAULT); + } + + AuthenticationOAuth2(Flow flow, Clock clock, double earlyTokenRefreshPercent) { + this(clock, earlyTokenRefreshPercent); this.flow = flow; } + private AuthenticationOAuth2(Clock clock, double earlyRefreshPercent) { + this.clock = clock; + setEarlyTokenRefreshPercent(earlyRefreshPercent); + } + @Override public String getAuthMethodName() { return AUTH_METHOD_NAME; @@ -106,6 +121,9 @@ public void configure(String encodedAuthParamString) { throw new IllegalArgumentException("Malformed authentication parameters", e); } + setEarlyTokenRefreshPercent(Double.parseDouble(params.getOrDefault(EARLY_TOKEN_REFRESH_PERCENT, + Integer.toString(EARLY_TOKEN_REFRESH_PERCENT_DEFAULT)))); + String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); switch(type) { case TYPE_CLIENT_CREDENTIALS: @@ -155,31 +173,71 @@ private void authenticate() throws PulsarClientException { handleSuccessfulTokenRefresh(); } + /** + * When we successfully get a token, we need to schedule the next attempt to refresh it. + * This is done completely based on the "expires_in" value returned by the identity provider. + * The code is run on the single scheduler thread in order to ensure that the backoff is updated correctly. + */ private void handleSuccessfulTokenRefresh() { - if (scheduler != null) { - backoff.reset(); - long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); - scheduleRefresh((long) (expiresInMillis * expiryAdjustment)); - } + scheduler.execute(() -> { + if (earlyTokenRefreshPercent < 1) { + backoff = buildBackoff(cachedToken.latest.getExpiresIn()); + long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); + scheduleRefresh((long) (expiresInMillis * earlyTokenRefreshPercent)); + } + }); } /** * Attempt to refresh the token. If successful, schedule the next refresh task according to the - * {@link #expiryAdjustment}. If failed, schedule another attempt to refresh the token according to the + * {@link #earlyTokenRefreshPercent}. If failed, schedule another attempt to refresh the token according to the * {@link #backoff} policy. */ private void refreshToken() { try { this.authenticate(); - } catch (Throwable e) { + } catch (PulsarClientException | RuntimeException e) { long delayMillis = backoff.next(); log.error("Error refreshing token. Will retry in {} millis.", delayMillis, e); scheduleRefresh(delayMillis); } } + /** + * Schedule the task to refresh the token. + * NOTE: this method must be run on the {@link #scheduler} thread in order to ensure {@link #nextRefreshAttempt} + * is accessed and updated safely. + * @param delayMillis the time, in milliseconds, to wait before starting to attempt to refresh the token. + */ private void scheduleRefresh(long delayMillis) { - scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS); + nextRefreshAttempt = scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS); + } + + /** + * Cancel the all subsequent refresh attempts by canceling the next token refresh attempt. By running this command + * on the single {@link #scheduler} thread, we remove the chance for a race condition that could allow a currently + * executing refresh attempt to schedule another refresh attempt. + */ + private void cancelTokenRefresh() { + scheduler.execute(() -> { + nextRefreshAttempt.cancel(false); + }); + } + + private void setEarlyTokenRefreshPercent(double earlyRefreshPercent) { + if (earlyRefreshPercent <= 0) { + throw new IllegalArgumentException("ExpiryAdjustment must be greater than 0."); + } + this.earlyTokenRefreshPercent = earlyRefreshPercent; + } + + private Backoff buildBackoff(int expiresInSeconds) { + return new BackoffBuilder() + .setInitialTime(1, TimeUnit.SECONDS) + .setMax(10, TimeUnit.MINUTES) + // Attempt a final token refresh attempt 2 seconds before the token actually expires, if necessary. + .setMandatoryStop(expiresInSeconds - 2, TimeUnit.SECONDS) + .create(); } @Override @@ -189,9 +247,7 @@ public void close() throws IOException { } catch (Exception e) { throw new IOException(e); } finally { - if (scheduler != null) { - scheduler.shutdownNow(); - } + cancelTokenRefresh(); } } From c63c37ce4b0bdb47ee735b8602411c3c0cc1cdc9 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jan 2022 15:47:13 -0600 Subject: [PATCH 3/7] Add isClosed logic to authentication class --- .../client/impl/auth/oauth2/AuthenticationOAuth2.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 8a4ea9d8b8e6c..769ce243108e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -81,6 +81,9 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati volatile Flow flow; private transient volatile CachedToken cachedToken; + // Only ever updated in synchronized block on class. + private boolean isClosed = false; + // Only ever updated on the single scheduler thread. Does not need to be volatile. private transient Backoff backoff; private transient ScheduledFuture nextRefreshAttempt; @@ -155,6 +158,9 @@ public void start() throws PulsarClientException { */ @Override public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException { + if (isClosed) { + throw new PulsarClientException.AlreadyClosedException("Authentication already closed."); + } if (this.cachedToken == null || this.cachedToken.isExpired()) { this.authenticate(); } @@ -241,8 +247,9 @@ private Backoff buildBackoff(int expiresInSeconds) { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { try { + isClosed = true; flow.close(); } catch (Exception e) { throw new IOException(e); From cbd1a4f21477a566631a999a549d77fb11d6d33f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 28 Jan 2022 15:49:34 -0600 Subject: [PATCH 4/7] Avoid NPE when early refresh not enabled --- .../pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 769ce243108e3..f0108dfea68a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -226,7 +226,9 @@ private void scheduleRefresh(long delayMillis) { */ private void cancelTokenRefresh() { scheduler.execute(() -> { - nextRefreshAttempt.cancel(false); + if (nextRefreshAttempt != null) { + nextRefreshAttempt.cancel(false); + } }); } From bc57b7ac263274708aa5e9d229528b8c9577995f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 4 Feb 2022 00:16:14 -0600 Subject: [PATCH 5/7] Minor improvements to javadocs and threadsafety; add tests --- .../oauth2/AuthenticationFactoryOAuth2.java | 8 +-- .../auth/oauth2/AuthenticationOAuth2.java | 26 ++++++--- .../auth/oauth2/AuthenticationOAuth2Test.java | 54 +++++++++++++++++-- 3 files changed, 72 insertions(+), 16 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 53c6fc8449879..4e5a73580fd80 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -69,14 +69,14 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * If the value contains multiple space-delimited strings, their order does not matter, * and each string adds an additional access range to the requested scope. * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 - * @param expiryAdjustment A field that represents how early to start attempting to refresh the access token. - * See {@link AuthenticationOAuth2} for details. + * @param earlyTokenRefreshPercent A field that represents how early to start attempting to refresh the access + * token. See {@link AuthenticationOAuth2} for details. * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, - double expiryAdjustment) { + double earlyTokenRefreshPercent) { ClientCredentialsFlow flow = buildFlow(issuerUrl, credentialsUrl, audience, scope); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), expiryAdjustment); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent); } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index f0108dfea68a1..c487bea161350 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -76,7 +76,6 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati private static final long serialVersionUID = 1L; private static final transient ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); - private volatile double earlyTokenRefreshPercent; final Clock clock; volatile Flow flow; private transient volatile CachedToken cachedToken; @@ -84,7 +83,8 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati // Only ever updated in synchronized block on class. private boolean isClosed = false; - // Only ever updated on the single scheduler thread. Does not need to be volatile. + // Only ever updated on the single scheduler thread. Do not need to be volatile. + private double earlyTokenRefreshPercent; private transient Backoff backoff; private transient ScheduledFuture nextRefreshAttempt; @@ -182,7 +182,8 @@ private void authenticate() throws PulsarClientException { /** * When we successfully get a token, we need to schedule the next attempt to refresh it. * This is done completely based on the "expires_in" value returned by the identity provider. - * The code is run on the single scheduler thread in order to ensure that the backoff is updated correctly. + * The code is run on the single scheduler thread in order to ensure that the backoff and the nextRefreshAttempt are + * updated safely. */ private void handleSuccessfulTokenRefresh() { scheduler.execute(() -> { @@ -232,11 +233,18 @@ private void cancelTokenRefresh() { }); } - private void setEarlyTokenRefreshPercent(double earlyRefreshPercent) { - if (earlyRefreshPercent <= 0) { - throw new IllegalArgumentException("ExpiryAdjustment must be greater than 0."); + /** + * Update the {@link #earlyTokenRefreshPercent}. By running this command on the single {@link #scheduler} thread, + * we remove a potential data race for updating {@link #earlyTokenRefreshPercent}. + * @param earlyTokenRefreshPercent - see javadoc for {@link AuthenticationOAuth2}. Must be greater than 0. + */ + private void setEarlyTokenRefreshPercent(double earlyTokenRefreshPercent) { + if (earlyTokenRefreshPercent <= 0) { + throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); } - this.earlyTokenRefreshPercent = earlyRefreshPercent; + scheduler.execute(() -> { + this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; + }); } private Backoff buildBackoff(int expiresInSeconds) { @@ -252,7 +260,9 @@ private Backoff buildBackoff(int expiresInSeconds) { public synchronized void close() throws IOException { try { isClosed = true; - flow.close(); + if (flow != null) { + flow.close(); + } } catch (Exception e) { throw new IOException(e); } finally { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index 080cee6a6df72..d13837b541f40 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,7 +37,10 @@ import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; + +import lombok.Cleanup; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; import org.testng.annotations.BeforeMethod; @@ -56,7 +61,7 @@ public class AuthenticationOAuth2Test { public void before() { this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); this.flow = mock(Flow.class); - this.auth = new AuthenticationOAuth2(flow, this.clock, 0.9); + this.auth = new AuthenticationOAuth2(flow, this.clock, 1); } @Test @@ -112,7 +117,7 @@ public void testStart() throws Exception { } @Test - public void testGetAuthData() throws Exception { + public void testGetAuthDataNoEarlyRefresh() throws Exception { AuthenticationDataProvider data; TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); doReturn(tr).when(this.flow).authenticate(); @@ -125,13 +130,53 @@ public void testGetAuthData() throws Exception { verify(this.flow, times(1)).authenticate(); assertEquals(data.getCommandData(), tr.getAccessToken()); - // cache miss - clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN)); + // cache miss (have to move passed expiration b/c we refresh when token is expired now) + // NOTE: this works because the token uses the mocked clock. + clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1)); data = this.auth.getAuthData(); verify(this.flow, times(2)).authenticate(); assertEquals(data.getCommandData(), tr.getAccessToken()); } + // This test skips the early refresh logic and just ensures that if the class were to somehow fail + // to refresh the token before expiration, the caller will get one final attempt at calling authenticate + @Test + public void testGetAuthDataWithEarlyRefresh() throws Exception { + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.8); + AuthenticationDataProvider data; + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); + doReturn(tr).when(this.flow).authenticate(); + data = auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache hit + data = auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache miss (have to move passed expiration b/c we refresh when token is expired now) + clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1)); + data = auth.getAuthData(); + verify(this.flow, times(2)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + } + + // This test ensures that the early token refresh actually calls the authenticate method in the background. + @Test + public void testEarlyTokenRefreshCallsAuthenticate() throws Exception { + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1); + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build(); + doReturn(tr).when(this.flow).authenticate(); + // Initialize the flow + auth.getAuthData(); + // Give the auth token refresh a chance to run multiple times + Thread.sleep(1000); + auth.close(); + verify(this.flow, atLeast(2)).authenticate(); + verify(this.flow).close(); + } + @Test public void testMetadataResolver() throws MalformedURLException { URL url = DefaultMetadataResolver.getWellKnownMetadataUrl(URI.create("http://localhost/path/oauth").toURL()); @@ -142,5 +187,6 @@ public void testMetadataResolver() throws MalformedURLException { public void testClose() throws Exception { this.auth.close(); verify(this.flow).close(); + assertThrows(PulsarClientException.AlreadyClosedException.class, () -> this.auth.getAuthData()); } } From 087a8fd42f02327c3f5f154378ccf6265a605651 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 4 Feb 2022 00:24:59 -0600 Subject: [PATCH 6/7] Update client documentation --- site2/docs/security-oauth2.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/site2/docs/security-oauth2.md b/site2/docs/security-oauth2.md index cbb4f9e1eaa52..ec2fa7164924d 100644 --- a/site2/docs/security-oauth2.md +++ b/site2/docs/security-oauth2.md @@ -97,6 +97,11 @@ PulsarClient client = PulsarClient.builder() .build(); ``` +In the Java Client, you can also specify a configuration named `earlyTokenRefreshPercent` via the `AuthenticationFactoryOAuth2` +and the encoded parameters. The `earlyTokenRefreshPercent` must be greater than 0. If it is less than 1, the Java Client +will start attempting to retrieve a new token when the `earlyTokenRefreshPercent` percent of the `expires_in` value has +passed. If the value is greater than or equal to 1, the Java Client will not refresh the token until it has expired. + ### C++ client The C++ client is similar to the Java client. You need to provide parameters of `issuerUrl`, `private_key` (the credentials file path), and `audience`. From 2d15f2159497bfc2ad4bb1fe25ee1efad96e9f4f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Sat, 5 Feb 2022 00:11:16 -0600 Subject: [PATCH 7/7] User builders; prevent thread leak --- ...uth2AuthenticatedProducerConsumerTest.java | 17 ++- .../oauth2/AuthenticationFactoryOAuth2.java | 42 ++----- .../auth/oauth2/AuthenticationOAuth2.java | 112 +++++++++--------- .../oauth2/AuthenticationOAuth2Builder.java | 88 ++++++++++++++ .../ClientCredentialsConfiguration.java | 55 +++++++++ .../auth/oauth2/ClientCredentialsFlow.java | 22 +++- .../auth/oauth2/AuthenticationOAuth2Test.java | 26 +++- 7 files changed, 261 insertions(+), 101 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java index 5d2e473d58267..73b593f6f20f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -34,7 +34,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ProducerImpl; -import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; +import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2Builder; +import org.apache.pulsar.client.impl.auth.oauth2.ClientCredentialsConfiguration; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; @@ -92,12 +93,16 @@ protected final void clientSetup() throws Exception { Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath(); log.info("Credentials File path: {}", path.toString()); + ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder() + .issuerUrl(new URL("https://dev-kt-aa9ne.us.auth0.com")) + .keyFileUrl(path.toUri().toURL()) + .scope("https://dev-kt-aa9ne.us.auth0.com/api/v2/") + .build(); + // AuthenticationOAuth2 - Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials( - new URL("https://dev-kt-aa9ne.us.auth0.com"), - path.toUri().toURL(), // key file path - "https://dev-kt-aa9ne.us.auth0.com/api/v2/" - ); + Authentication authentication = AuthenticationOAuth2Builder.builder() + .setClientCredentialsConfiguration(config) + .build(); admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) .authentication(authentication) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 4e5a73580fd80..534f1ec98f14c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -19,13 +19,15 @@ package org.apache.pulsar.client.impl.auth.oauth2; import java.net.URL; -import java.time.Clock; import org.apache.pulsar.client.api.Authentication; /** * Factory class that allows to create {@link Authentication} instances * for OAuth 2.0 authentication methods. + * + * Use {@link AuthenticationOAuth2Builder}. */ +@Deprecated public final class AuthenticationFactoryOAuth2 { /** @@ -35,7 +37,9 @@ public final class AuthenticationFactoryOAuth2 { * @param credentialsUrl the credentials URL * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. * @return an Authentication object + * @deprecated use {@link AuthenticationOAuth2Builder}, instead. */ + @Deprecated public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { return clientCredentials(issuerUrl, credentialsUrl, audience, null); } @@ -52,42 +56,16 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * and each string adds an additional access range to the requested scope. * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 * @return an Authentication object + * @deprecated use {@link AuthenticationOAuth2Builder}, instead. */ + @Deprecated public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - ClientCredentialsFlow flow = buildFlow(issuerUrl, credentialsUrl, audience, scope); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); - } - - /** - * Authenticate with client credentials. - * - * @param issuerUrl the issuer URL - * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. - * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited, - * case-sensitive strings. The strings are defined by the authorization server. - * If the value contains multiple space-delimited strings, their order does not matter, - * and each string adds an additional access range to the requested scope. - * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 - * @param earlyTokenRefreshPercent A field that represents how early to start attempting to refresh the access - * token. See {@link AuthenticationOAuth2} for details. - * @return an Authentication object - */ - public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, - double earlyTokenRefreshPercent) { - ClientCredentialsFlow flow = buildFlow(issuerUrl, credentialsUrl, audience, scope); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent); - } - - /** - * Internal method to build a {@link ClientCredentialsFlow}. - */ - private static ClientCredentialsFlow buildFlow(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - return ClientCredentialsFlow.builder() + ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder() .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) + .keyFileUrl(credentialsUrl) .audience(audience) .scope(scope) .build(); + return new AuthenticationOAuth2Builder().setClientCredentialsConfiguration(config).build(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index c487bea161350..a16b31ac9a7c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -70,11 +70,12 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; - public static final String EARLY_TOKEN_REFRESH_PERCENT = "early_token_refresh_percent"; public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // feature disabled by default public static final String AUTH_METHOD_NAME = "token"; private static final long serialVersionUID = 1L; - private static final transient ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); + private final transient ScheduledThreadPoolExecutor scheduler; + private final boolean createdScheduler; + private final double earlyTokenRefreshPercent; final Clock clock; volatile Flow flow; @@ -84,27 +85,49 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati private boolean isClosed = false; // Only ever updated on the single scheduler thread. Do not need to be volatile. - private double earlyTokenRefreshPercent; private transient Backoff backoff; private transient ScheduledFuture nextRefreshAttempt; // No args constructor used when creating class with reflection public AuthenticationOAuth2() { - this(null, Clock.systemDefaultZone()); + this(Clock.systemDefaultZone(), EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, null); } - AuthenticationOAuth2(Flow flow, Clock clock) { - this(flow, clock, EARLY_TOKEN_REFRESH_PERCENT_DEFAULT); + AuthenticationOAuth2(Flow flow, + double earlyTokenRefreshPercent, + ScheduledThreadPoolExecutor scheduler) { + this(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent, scheduler); } - AuthenticationOAuth2(Flow flow, Clock clock, double earlyTokenRefreshPercent) { - this(clock, earlyTokenRefreshPercent); + AuthenticationOAuth2(Flow flow, + Clock clock, + double earlyTokenRefreshPercent, + ScheduledThreadPoolExecutor scheduler) { + this(clock, earlyTokenRefreshPercent, scheduler); this.flow = flow; } - private AuthenticationOAuth2(Clock clock, double earlyRefreshPercent) { + /** + * @param clock - clock to use when determining token expiration. + * @param earlyTokenRefreshPercent - see javadoc for {@link AuthenticationOAuth2}. Must be greater than 0. + * @param scheduler - The scheduler to use for background refreshes of the token. If null and the + * {@link #earlyTokenRefreshPercent} is less than 1, the client will create an internal scheduler. + * Otherwise, it will use the passed in scheduler. If the caller supplies a scheduler, the + * {@link AuthenticationOAuth2} will not close it. + */ + private AuthenticationOAuth2(Clock clock, double earlyTokenRefreshPercent, ScheduledThreadPoolExecutor scheduler) { + if (earlyTokenRefreshPercent <= 0) { + throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); + } + this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; this.clock = clock; - setEarlyTokenRefreshPercent(earlyRefreshPercent); + if (scheduler == null && earlyTokenRefreshPercent < 1) { + this.scheduler = new ScheduledThreadPoolExecutor(1); + this.createdScheduler = true; + } else { + this.scheduler = scheduler; + this.createdScheduler = false; + } } @Override @@ -124,16 +147,11 @@ public void configure(String encodedAuthParamString) { throw new IllegalArgumentException("Malformed authentication parameters", e); } - setEarlyTokenRefreshPercent(Double.parseDouble(params.getOrDefault(EARLY_TOKEN_REFRESH_PERCENT, - Integer.toString(EARLY_TOKEN_REFRESH_PERCENT_DEFAULT)))); - String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); - switch(type) { - case TYPE_CLIENT_CREDENTIALS: - this.flow = ClientCredentialsFlow.fromParameters(params); - break; - default: - throw new IllegalArgumentException("Unsupported authentication type: " + type); + if (TYPE_CLIENT_CREDENTIALS.equals(type)) { + this.flow = ClientCredentialsFlow.fromParameters(params); + } else { + throw new IllegalArgumentException("Unsupported authentication type: " + type); } } @@ -186,13 +204,15 @@ private void authenticate() throws PulsarClientException { * updated safely. */ private void handleSuccessfulTokenRefresh() { - scheduler.execute(() -> { - if (earlyTokenRefreshPercent < 1) { - backoff = buildBackoff(cachedToken.latest.getExpiresIn()); - long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); - scheduleRefresh((long) (expiresInMillis * earlyTokenRefreshPercent)); - } - }); + if (scheduler != null) { + scheduler.execute(() -> { + if (earlyTokenRefreshPercent < 1) { + backoff = buildBackoff(cachedToken.latest.getExpiresIn()); + long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); + scheduleRefresh((long) (expiresInMillis * earlyTokenRefreshPercent)); + } + }); + } } /** @@ -220,33 +240,6 @@ private void scheduleRefresh(long delayMillis) { nextRefreshAttempt = scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS); } - /** - * Cancel the all subsequent refresh attempts by canceling the next token refresh attempt. By running this command - * on the single {@link #scheduler} thread, we remove the chance for a race condition that could allow a currently - * executing refresh attempt to schedule another refresh attempt. - */ - private void cancelTokenRefresh() { - scheduler.execute(() -> { - if (nextRefreshAttempt != null) { - nextRefreshAttempt.cancel(false); - } - }); - } - - /** - * Update the {@link #earlyTokenRefreshPercent}. By running this command on the single {@link #scheduler} thread, - * we remove a potential data race for updating {@link #earlyTokenRefreshPercent}. - * @param earlyTokenRefreshPercent - see javadoc for {@link AuthenticationOAuth2}. Must be greater than 0. - */ - private void setEarlyTokenRefreshPercent(double earlyTokenRefreshPercent) { - if (earlyTokenRefreshPercent <= 0) { - throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); - } - scheduler.execute(() -> { - this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; - }); - } - private Backoff buildBackoff(int expiresInSeconds) { return new BackoffBuilder() .setInitialTime(1, TimeUnit.SECONDS) @@ -266,7 +259,18 @@ public synchronized void close() throws IOException { } catch (Exception e) { throw new IOException(e); } finally { - cancelTokenRefresh(); + if (createdScheduler) { + this.scheduler.shutdownNow(); + } else if (scheduler != null) { + // Cancel the all subsequent refresh attempts by canceling the next token refresh attempt. By running + // this command on the single scheduler thread, we remove the chance for a race condition that could + // allow a currently executing refresh attempt to schedule another refresh attempt. + scheduler.execute(() -> { + if (nextRefreshAttempt != null) { + nextRefreshAttempt.cancel(false); + } + }); + } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java new file mode 100644 index 0000000000000..92c47b86eca91 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.pulsar.client.api.Authentication; + +/** + * Builder for {@link AuthenticationOAuth2} class. + */ +public final class AuthenticationOAuth2Builder { + + /** + * Create a builder instance. + * + * @return builder + */ + public static AuthenticationOAuth2Builder builder() { + return new AuthenticationOAuth2Builder(); + } + + private double earlyTokenRefreshPercent = AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT; + private ScheduledThreadPoolExecutor scheduler; + private ClientCredentialsConfiguration clientCredentialsConfiguration; + + /** + * Set the {@link ClientCredentialsConfiguration} when using the OAuth2 client credentials flow. + * @return builder + */ + public AuthenticationOAuth2Builder setClientCredentialsConfiguration( + ClientCredentialsConfiguration clientCredentialsConfiguration) { + if (clientCredentialsConfiguration == null) { + throw new IllegalArgumentException("ClientCredentialsConfiguration cannot be null."); + } + this.clientCredentialsConfiguration = clientCredentialsConfiguration; + return this; + } + + /** + * @param earlyTokenRefreshPercent - The percent of the expires_in time when the client should start attempting + * to refresh the token. If greater than or equal to 1, it is disabled. See + * {@link AuthenticationOAuth2} for details. + * @return builder + */ + public AuthenticationOAuth2Builder setEarlyTokenRefreshPercent(double earlyTokenRefreshPercent) { + if (earlyTokenRefreshPercent <= 0) { + throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); + } + this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; + return this; + } + + /** + * @param scheduler - The scheduler to use for background refreshes of the token. If null and the + * {@link #earlyTokenRefreshPercent} is less than 1, the client will create an internal scheduler. + * Otherwise, it will use the passed in scheduler. If the caller supplies a scheduler, the + * {@link AuthenticationOAuth2} will not close it. + * @return builder + */ + public AuthenticationOAuth2Builder setEarlyTokenRefreshExecutor(ScheduledThreadPoolExecutor scheduler) { + this.scheduler = scheduler; + return this; + } + + public Authentication build() { + if (clientCredentialsConfiguration == null) { + throw new IllegalArgumentException("ClientCredentialsConfiguration must be set."); + } + Flow flow = new ClientCredentialsFlow(clientCredentialsConfiguration); + return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent, scheduler); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java new file mode 100644 index 0000000000000..4da00fcb9beea --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.net.URL; +import lombok.Builder; +import lombok.Getter; + +/** + * OAuth 2.0 Client Credentials flow data used to retrieve access token. + */ +@Builder +@Getter +public final class ClientCredentialsConfiguration { + /** + * Identity Provider's token issuer URL. + */ + private final URL issuerUrl; + + /** + * Optional. The audience identifier used by some Identity Providers, like Auth0. + */ + private final String audience; + + /** + * The {@link KeyFile} URL used to retrieve the client credentials. See {@link KeyFile} for file + * format. This field was previously called the credentialsUrl. + */ + private final URL keyFileUrl; + + /** + * Optional. The value of the scope parameter is expressed as a list of space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * Documented here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + */ + private final String scope; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 987d7ec53f811..e35256a7462c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -46,27 +46,39 @@ class ClientCredentialsFlow extends FlowBase { public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; public static final String CONFIG_PARAM_AUDIENCE = "audience"; + // Maps to the keyFileUrl public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; public static final String CONFIG_PARAM_SCOPE = "scope"; private static final long serialVersionUID = 1L; private final String audience; - private final String privateKey; + private final String keyFileUrl; private final String scope; private transient ClientCredentialsExchanger exchanger; private boolean initialized = false; + /** + * See {@link ClientCredentialsConfiguration} for field documentation. + */ @Builder - public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) { + public ClientCredentialsFlow(URL issuerUrl, String audience, String keyFileUrl, String scope) { super(issuerUrl); this.audience = audience; - this.privateKey = privateKey; + this.keyFileUrl = keyFileUrl; this.scope = scope; } + /** + * Constructor. + * @param config - see {@link ClientCredentialsConfiguration}. + */ + public ClientCredentialsFlow(ClientCredentialsConfiguration config) { + this(config.getIssuerUrl(), config.getAudience(), config.getKeyFileUrl().toExternalForm(), config.getScope()); + } + @Override public void initialize() throws PulsarClientException { super.initialize(); @@ -81,7 +93,7 @@ public TokenResult authenticate() throws PulsarClientException { // read the private key from storage KeyFile keyFile; try { - keyFile = loadPrivateKey(this.privateKey); + keyFile = loadPrivateKey(this.keyFileUrl); } catch (IOException e) { throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); } @@ -126,7 +138,7 @@ public static ClientCredentialsFlow fromParameters(Map params) { return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) - .privateKey(privateKeyUrl) + .keyFileUrl(privateKeyUrl) .scope(scope) .build(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index d13837b541f40..e03febc69bd99 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -37,7 +38,7 @@ import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; - +import java.util.concurrent.ScheduledThreadPoolExecutor; import lombok.Cleanup; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; @@ -61,7 +62,7 @@ public class AuthenticationOAuth2Test { public void before() { this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); this.flow = mock(Flow.class); - this.auth = new AuthenticationOAuth2(flow, this.clock, 1); + this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null); } @Test @@ -142,7 +143,7 @@ public void testGetAuthDataNoEarlyRefresh() throws Exception { // to refresh the token before expiration, the caller will get one final attempt at calling authenticate @Test public void testGetAuthDataWithEarlyRefresh() throws Exception { - @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.8); + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.8, null); AuthenticationDataProvider data; TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); doReturn(tr).when(this.flow).authenticate(); @@ -165,7 +166,7 @@ public void testGetAuthDataWithEarlyRefresh() throws Exception { // This test ensures that the early token refresh actually calls the authenticate method in the background. @Test public void testEarlyTokenRefreshCallsAuthenticate() throws Exception { - @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1); + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1, null); TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build(); doReturn(tr).when(this.flow).authenticate(); // Initialize the flow @@ -177,6 +178,23 @@ public void testEarlyTokenRefreshCallsAuthenticate() throws Exception { verify(this.flow).close(); } + // This test ensures scheduler is used when passed in + @Test + public void testEarlyTokenRefreshCallsAuthenticateWithParameterizedScheduler() throws Exception { + ScheduledThreadPoolExecutor scheduler = mock(ScheduledThreadPoolExecutor.class); + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1, scheduler); + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build(); + doReturn(tr).when(this.flow).authenticate(); + // Initialize the flow and trigger scheduling + auth.getAuthData(); + verify(scheduler, times(1)).execute(any(Runnable.class)); + // Close and verify that the passed in scheduler isn't shutdown + auth.close(); + verify(this.flow).close(); + verify(scheduler, times(0)).shutdownNow(); + verify(scheduler, times(2)).execute(any(Runnable.class)); + } + @Test public void testMetadataResolver() throws MalformedURLException { URL url = DefaultMetadataResolver.getWellKnownMetadataUrl(URI.create("http://localhost/path/oauth").toURL());