diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java index 5d2e473d58267..73b593f6f20f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -34,7 +34,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ProducerImpl; -import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; +import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2Builder; +import org.apache.pulsar.client.impl.auth.oauth2.ClientCredentialsConfiguration; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; @@ -92,12 +93,16 @@ protected final void clientSetup() throws Exception { Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath(); log.info("Credentials File path: {}", path.toString()); + ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder() + .issuerUrl(new URL("https://dev-kt-aa9ne.us.auth0.com")) + .keyFileUrl(path.toUri().toURL()) + .scope("https://dev-kt-aa9ne.us.auth0.com/api/v2/") + .build(); + // AuthenticationOAuth2 - Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials( - new URL("https://dev-kt-aa9ne.us.auth0.com"), - path.toUri().toURL(), // key file path - "https://dev-kt-aa9ne.us.auth0.com/api/v2/" - ); + Authentication authentication = AuthenticationOAuth2Builder.builder() + .setClientCredentialsConfiguration(config) + .build(); admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) .authentication(authentication) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index cf567747567e4..534f1ec98f14c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -19,13 +19,15 @@ package org.apache.pulsar.client.impl.auth.oauth2; import java.net.URL; -import java.time.Clock; import org.apache.pulsar.client.api.Authentication; /** * Factory class that allows to create {@link Authentication} instances * for OAuth 2.0 authentication methods. + * + * Use {@link AuthenticationOAuth2Builder}. */ +@Deprecated public final class AuthenticationFactoryOAuth2 { /** @@ -35,7 +37,9 @@ public final class AuthenticationFactoryOAuth2 { * @param credentialsUrl the credentials URL * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. * @return an Authentication object + * @deprecated use {@link AuthenticationOAuth2Builder}, instead. */ + @Deprecated public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { return clientCredentials(issuerUrl, credentialsUrl, audience, null); } @@ -52,14 +56,16 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * and each string adds an additional access range to the requested scope. * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 * @return an Authentication object + * @deprecated use {@link AuthenticationOAuth2Builder}, instead. */ + @Deprecated public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder() .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) + .keyFileUrl(credentialsUrl) .audience(audience) .scope(scope) .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + return new AuthenticationOAuth2Builder().setClientCredentialsConfiguration(config).build(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 022918d807d19..a16b31ac9a7c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -22,6 +22,9 @@ import java.time.Clock; import java.time.Instant; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.NotImplementedException; @@ -31,31 +34,100 @@ import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; /** * Pulsar client authentication provider based on OAuth 2.0. + * + * The first call to {@link #getAuthData()} will result in a blocking network call to retrieve the OAuth2.0 token from + * the Identity Provider. After that, there are two behaviors, depending on {@link #earlyTokenRefreshPercent}: + * + * 1. If {@link #earlyTokenRefreshPercent} is less than 1, this authentication class will schedule a runnable to refresh + * the token in n seconds where n is the result of multiplying {@link #earlyTokenRefreshPercent} and the `expires_in` + * value returned by the Identity Provider. If the call to the Identity Provider fails, this class will retry attempting + * to refresh the token using an exponential backoff. If the token is not refreshed before it expires, the Pulsar client + * will make one final blocking call to the Identity Provider. If that call fails, this class will pass the failure to + * the Pulsar client. This proactive approach to token management is good for use cases that want to avoid latency + * spikes from calls to the Identity Provider and that want to be able to withstand short Identity Provider outages. The + * tradeoff is that this class consumes slightly more resources. + * + * 2. If {@link #earlyTokenRefreshPercent} is greater than or equal to 1, this class will not retrieve a new token until + * the {@link #getAuthData()} method is called while the cached token is expired. If the call to the Identity Provider + * fails, this class will pass the failure to the Pulsar client. This lazy approach is good for use cases that are not + * latency sensitive and that will not use the token frequently. + * + * {@link #earlyTokenRefreshPercent} must be greater than 0. It defaults to 1, which means that early token refresh is + * disabled by default. + * + * The current implementation of this class can block the calling thread. + * + * This class is intended to be called from multiple threads, and is therefore designed to be thread-safe. */ @Slf4j public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; + public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // feature disabled by default public static final String AUTH_METHOD_NAME = "token"; - public static final double EXPIRY_ADJUSTMENT = 0.9; private static final long serialVersionUID = 1L; + private final transient ScheduledThreadPoolExecutor scheduler; + private final boolean createdScheduler; + private final double earlyTokenRefreshPercent; final Clock clock; - Flow flow; - transient CachedToken cachedToken; + volatile Flow flow; + private transient volatile CachedToken cachedToken; + + // Only ever updated in synchronized block on class. + private boolean isClosed = false; + + // Only ever updated on the single scheduler thread. Do not need to be volatile. + private transient Backoff backoff; + private transient ScheduledFuture nextRefreshAttempt; + // No args constructor used when creating class with reflection public AuthenticationOAuth2() { - this.clock = Clock.systemDefaultZone(); + this(Clock.systemDefaultZone(), EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, null); } - AuthenticationOAuth2(Flow flow, Clock clock) { + AuthenticationOAuth2(Flow flow, + double earlyTokenRefreshPercent, + ScheduledThreadPoolExecutor scheduler) { + this(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent, scheduler); + } + + AuthenticationOAuth2(Flow flow, + Clock clock, + double earlyTokenRefreshPercent, + ScheduledThreadPoolExecutor scheduler) { + this(clock, earlyTokenRefreshPercent, scheduler); this.flow = flow; + } + + /** + * @param clock - clock to use when determining token expiration. + * @param earlyTokenRefreshPercent - see javadoc for {@link AuthenticationOAuth2}. Must be greater than 0. + * @param scheduler - The scheduler to use for background refreshes of the token. If null and the + * {@link #earlyTokenRefreshPercent} is less than 1, the client will create an internal scheduler. + * Otherwise, it will use the passed in scheduler. If the caller supplies a scheduler, the + * {@link AuthenticationOAuth2} will not close it. + */ + private AuthenticationOAuth2(Clock clock, double earlyTokenRefreshPercent, ScheduledThreadPoolExecutor scheduler) { + if (earlyTokenRefreshPercent <= 0) { + throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); + } + this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; this.clock = clock; + if (scheduler == null && earlyTokenRefreshPercent < 1) { + this.scheduler = new ScheduledThreadPoolExecutor(1); + this.createdScheduler = true; + } else { + this.scheduler = scheduler; + this.createdScheduler = false; + } } @Override @@ -76,12 +148,10 @@ public void configure(String encodedAuthParamString) { } String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); - switch(type) { - case TYPE_CLIENT_CREDENTIALS: - this.flow = ClientCredentialsFlow.fromParameters(params); - break; - default: - throw new IllegalArgumentException("Unsupported authentication type: " + type); + if (TYPE_CLIENT_CREDENTIALS.equals(type)) { + this.flow = ClientCredentialsFlow.fromParameters(params); + } else { + throw new IllegalArgumentException("Unsupported authentication type: " + type); } } @@ -96,21 +166,111 @@ public void start() throws PulsarClientException { flow.initialize(); } + /** + * The first time that this method is called, it retrieves a token. All subsequent + * calls should get a cached value. However, if there is an issue with the Identity + * Provider, there is a chance that the background thread responsible for keeping + * the refresh token hot will + * @return The authentication data identifying this client that will be sent to the broker + * @throws PulsarClientException + */ @Override public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException { + if (isClosed) { + throw new PulsarClientException.AlreadyClosedException("Authentication already closed."); + } if (this.cachedToken == null || this.cachedToken.isExpired()) { - TokenResult tr = this.flow.authenticate(); - this.cachedToken = new CachedToken(tr); + this.authenticate(); } return this.cachedToken.getAuthData(); } + /** + * Retrieve the token (synchronously), and then schedule refresh runnable. + */ + private void authenticate() throws PulsarClientException { + if (log.isDebugEnabled()) { + log.debug("Attempting to retrieve OAuth2 token now."); + } + TokenResult tr = this.flow.authenticate(); + this.cachedToken = new CachedToken(tr); + handleSuccessfulTokenRefresh(); + } + + /** + * When we successfully get a token, we need to schedule the next attempt to refresh it. + * This is done completely based on the "expires_in" value returned by the identity provider. + * The code is run on the single scheduler thread in order to ensure that the backoff and the nextRefreshAttempt are + * updated safely. + */ + private void handleSuccessfulTokenRefresh() { + if (scheduler != null) { + scheduler.execute(() -> { + if (earlyTokenRefreshPercent < 1) { + backoff = buildBackoff(cachedToken.latest.getExpiresIn()); + long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn()); + scheduleRefresh((long) (expiresInMillis * earlyTokenRefreshPercent)); + } + }); + } + } + + /** + * Attempt to refresh the token. If successful, schedule the next refresh task according to the + * {@link #earlyTokenRefreshPercent}. If failed, schedule another attempt to refresh the token according to the + * {@link #backoff} policy. + */ + private void refreshToken() { + try { + this.authenticate(); + } catch (PulsarClientException | RuntimeException e) { + long delayMillis = backoff.next(); + log.error("Error refreshing token. Will retry in {} millis.", delayMillis, e); + scheduleRefresh(delayMillis); + } + } + + /** + * Schedule the task to refresh the token. + * NOTE: this method must be run on the {@link #scheduler} thread in order to ensure {@link #nextRefreshAttempt} + * is accessed and updated safely. + * @param delayMillis the time, in milliseconds, to wait before starting to attempt to refresh the token. + */ + private void scheduleRefresh(long delayMillis) { + nextRefreshAttempt = scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS); + } + + private Backoff buildBackoff(int expiresInSeconds) { + return new BackoffBuilder() + .setInitialTime(1, TimeUnit.SECONDS) + .setMax(10, TimeUnit.MINUTES) + // Attempt a final token refresh attempt 2 seconds before the token actually expires, if necessary. + .setMandatoryStop(expiresInSeconds - 2, TimeUnit.SECONDS) + .create(); + } + @Override - public void close() throws IOException { + public synchronized void close() throws IOException { try { - flow.close(); + isClosed = true; + if (flow != null) { + flow.close(); + } } catch (Exception e) { throw new IOException(e); + } finally { + if (createdScheduler) { + this.scheduler.shutdownNow(); + } else if (scheduler != null) { + // Cancel the all subsequent refresh attempts by canceling the next token refresh attempt. By running + // this command on the single scheduler thread, we remove the chance for a race condition that could + // allow a currently executing refresh attempt to schedule another refresh attempt. + scheduler.execute(() -> { + if (nextRefreshAttempt != null) { + nextRefreshAttempt.cancel(false); + } + }); + } } } @@ -122,8 +282,7 @@ class CachedToken { public CachedToken(TokenResult latest) { this.latest = latest; - int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT); - this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn); + this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(latest.getExpiresIn()); this.authData = new AuthenticationDataOAuth2(latest.getAccessToken()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java new file mode 100644 index 0000000000000..92c47b86eca91 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Builder.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.pulsar.client.api.Authentication; + +/** + * Builder for {@link AuthenticationOAuth2} class. + */ +public final class AuthenticationOAuth2Builder { + + /** + * Create a builder instance. + * + * @return builder + */ + public static AuthenticationOAuth2Builder builder() { + return new AuthenticationOAuth2Builder(); + } + + private double earlyTokenRefreshPercent = AuthenticationOAuth2.EARLY_TOKEN_REFRESH_PERCENT_DEFAULT; + private ScheduledThreadPoolExecutor scheduler; + private ClientCredentialsConfiguration clientCredentialsConfiguration; + + /** + * Set the {@link ClientCredentialsConfiguration} when using the OAuth2 client credentials flow. + * @return builder + */ + public AuthenticationOAuth2Builder setClientCredentialsConfiguration( + ClientCredentialsConfiguration clientCredentialsConfiguration) { + if (clientCredentialsConfiguration == null) { + throw new IllegalArgumentException("ClientCredentialsConfiguration cannot be null."); + } + this.clientCredentialsConfiguration = clientCredentialsConfiguration; + return this; + } + + /** + * @param earlyTokenRefreshPercent - The percent of the expires_in time when the client should start attempting + * to refresh the token. If greater than or equal to 1, it is disabled. See + * {@link AuthenticationOAuth2} for details. + * @return builder + */ + public AuthenticationOAuth2Builder setEarlyTokenRefreshPercent(double earlyTokenRefreshPercent) { + if (earlyTokenRefreshPercent <= 0) { + throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0."); + } + this.earlyTokenRefreshPercent = earlyTokenRefreshPercent; + return this; + } + + /** + * @param scheduler - The scheduler to use for background refreshes of the token. If null and the + * {@link #earlyTokenRefreshPercent} is less than 1, the client will create an internal scheduler. + * Otherwise, it will use the passed in scheduler. If the caller supplies a scheduler, the + * {@link AuthenticationOAuth2} will not close it. + * @return builder + */ + public AuthenticationOAuth2Builder setEarlyTokenRefreshExecutor(ScheduledThreadPoolExecutor scheduler) { + this.scheduler = scheduler; + return this; + } + + public Authentication build() { + if (clientCredentialsConfiguration == null) { + throw new IllegalArgumentException("ClientCredentialsConfiguration must be set."); + } + Flow flow = new ClientCredentialsFlow(clientCredentialsConfiguration); + return new AuthenticationOAuth2(flow, earlyTokenRefreshPercent, scheduler); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java new file mode 100644 index 0000000000000..4da00fcb9beea --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsConfiguration.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.net.URL; +import lombok.Builder; +import lombok.Getter; + +/** + * OAuth 2.0 Client Credentials flow data used to retrieve access token. + */ +@Builder +@Getter +public final class ClientCredentialsConfiguration { + /** + * Identity Provider's token issuer URL. + */ + private final URL issuerUrl; + + /** + * Optional. The audience identifier used by some Identity Providers, like Auth0. + */ + private final String audience; + + /** + * The {@link KeyFile} URL used to retrieve the client credentials. See {@link KeyFile} for file + * format. This field was previously called the credentialsUrl. + */ + private final URL keyFileUrl; + + /** + * Optional. The value of the scope parameter is expressed as a list of space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * Documented here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + */ + private final String scope; +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 987d7ec53f811..e35256a7462c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -46,27 +46,39 @@ class ClientCredentialsFlow extends FlowBase { public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; public static final String CONFIG_PARAM_AUDIENCE = "audience"; + // Maps to the keyFileUrl public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; public static final String CONFIG_PARAM_SCOPE = "scope"; private static final long serialVersionUID = 1L; private final String audience; - private final String privateKey; + private final String keyFileUrl; private final String scope; private transient ClientCredentialsExchanger exchanger; private boolean initialized = false; + /** + * See {@link ClientCredentialsConfiguration} for field documentation. + */ @Builder - public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) { + public ClientCredentialsFlow(URL issuerUrl, String audience, String keyFileUrl, String scope) { super(issuerUrl); this.audience = audience; - this.privateKey = privateKey; + this.keyFileUrl = keyFileUrl; this.scope = scope; } + /** + * Constructor. + * @param config - see {@link ClientCredentialsConfiguration}. + */ + public ClientCredentialsFlow(ClientCredentialsConfiguration config) { + this(config.getIssuerUrl(), config.getAudience(), config.getKeyFileUrl().toExternalForm(), config.getScope()); + } + @Override public void initialize() throws PulsarClientException { super.initialize(); @@ -81,7 +93,7 @@ public TokenResult authenticate() throws PulsarClientException { // read the private key from storage KeyFile keyFile; try { - keyFile = loadPrivateKey(this.privateKey); + keyFile = loadPrivateKey(this.keyFileUrl); } catch (IOException e) { throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); } @@ -126,7 +138,7 @@ public static ClientCredentialsFlow fromParameters(Map params) { return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) - .privateKey(privateKeyUrl) + .keyFileUrl(privateKeyUrl) .scope(scope) .build(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index 3ae578c34845c..e03febc69bd99 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,7 +38,10 @@ import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import lombok.Cleanup; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; import org.testng.annotations.BeforeMethod; @@ -56,7 +62,7 @@ public class AuthenticationOAuth2Test { public void before() { this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); this.flow = mock(Flow.class); - this.auth = new AuthenticationOAuth2(flow, this.clock); + this.auth = new AuthenticationOAuth2(flow, this.clock, 1, null); } @Test @@ -112,7 +118,7 @@ public void testStart() throws Exception { } @Test - public void testGetAuthData() throws Exception { + public void testGetAuthDataNoEarlyRefresh() throws Exception { AuthenticationDataProvider data; TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); doReturn(tr).when(this.flow).authenticate(); @@ -125,13 +131,70 @@ public void testGetAuthData() throws Exception { verify(this.flow, times(1)).authenticate(); assertEquals(data.getCommandData(), tr.getAccessToken()); - // cache miss - clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN)); + // cache miss (have to move passed expiration b/c we refresh when token is expired now) + // NOTE: this works because the token uses the mocked clock. + clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1)); data = this.auth.getAuthData(); verify(this.flow, times(2)).authenticate(); assertEquals(data.getCommandData(), tr.getAccessToken()); } + // This test skips the early refresh logic and just ensures that if the class were to somehow fail + // to refresh the token before expiration, the caller will get one final attempt at calling authenticate + @Test + public void testGetAuthDataWithEarlyRefresh() throws Exception { + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.8, null); + AuthenticationDataProvider data; + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build(); + doReturn(tr).when(this.flow).authenticate(); + data = auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache hit + data = auth.getAuthData(); + verify(this.flow, times(1)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + + // cache miss (have to move passed expiration b/c we refresh when token is expired now) + clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN + 1)); + data = auth.getAuthData(); + verify(this.flow, times(2)).authenticate(); + assertEquals(data.getCommandData(), tr.getAccessToken()); + } + + // This test ensures that the early token refresh actually calls the authenticate method in the background. + @Test + public void testEarlyTokenRefreshCallsAuthenticate() throws Exception { + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1, null); + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build(); + doReturn(tr).when(this.flow).authenticate(); + // Initialize the flow + auth.getAuthData(); + // Give the auth token refresh a chance to run multiple times + Thread.sleep(1000); + auth.close(); + verify(this.flow, atLeast(2)).authenticate(); + verify(this.flow).close(); + } + + // This test ensures scheduler is used when passed in + @Test + public void testEarlyTokenRefreshCallsAuthenticateWithParameterizedScheduler() throws Exception { + ScheduledThreadPoolExecutor scheduler = mock(ScheduledThreadPoolExecutor.class); + @Cleanup AuthenticationOAuth2 auth = new AuthenticationOAuth2(flow, this.clock, 0.1, scheduler); + TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(1).build(); + doReturn(tr).when(this.flow).authenticate(); + // Initialize the flow and trigger scheduling + auth.getAuthData(); + verify(scheduler, times(1)).execute(any(Runnable.class)); + // Close and verify that the passed in scheduler isn't shutdown + auth.close(); + verify(this.flow).close(); + verify(scheduler, times(0)).shutdownNow(); + verify(scheduler, times(2)).execute(any(Runnable.class)); + } + @Test public void testMetadataResolver() throws MalformedURLException { URL url = DefaultMetadataResolver.getWellKnownMetadataUrl(URI.create("http://localhost/path/oauth").toURL()); @@ -142,5 +205,6 @@ public void testMetadataResolver() throws MalformedURLException { public void testClose() throws Exception { this.auth.close(); verify(this.flow).close(); + assertThrows(PulsarClientException.AlreadyClosedException.class, () -> this.auth.getAuthData()); } } diff --git a/site2/docs/security-oauth2.md b/site2/docs/security-oauth2.md index cbb4f9e1eaa52..ec2fa7164924d 100644 --- a/site2/docs/security-oauth2.md +++ b/site2/docs/security-oauth2.md @@ -97,6 +97,11 @@ PulsarClient client = PulsarClient.builder() .build(); ``` +In the Java Client, you can also specify a configuration named `earlyTokenRefreshPercent` via the `AuthenticationFactoryOAuth2` +and the encoded parameters. The `earlyTokenRefreshPercent` must be greater than 0. If it is less than 1, the Java Client +will start attempting to retrieve a new token when the `earlyTokenRefreshPercent` percent of the `expires_in` value has +passed. If the value is greater than or equal to 1, the Java Client will not refresh the token until it has expired. + ### C++ client The C++ client is similar to the Java client. You need to provide parameters of `issuerUrl`, `private_key` (the credentials file path), and `audience`.