Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2Builder;
import org.apache.pulsar.client.impl.auth.oauth2.ClientCredentialsConfiguration;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -92,12 +93,16 @@ protected final void clientSetup() throws Exception {
Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
log.info("Credentials File path: {}", path.toString());

ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder()
.issuerUrl(new URL("https://dev-kt-aa9ne.us.auth0.com"))
.keyFileUrl(path.toUri().toURL())
.scope("https://dev-kt-aa9ne.us.auth0.com/api/v2/")
.build();

// AuthenticationOAuth2
Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
new URL("https://dev-kt-aa9ne.us.auth0.com"),
path.toUri().toURL(), // key file path
"https://dev-kt-aa9ne.us.auth0.com/api/v2/"
);
Authentication authentication = AuthenticationOAuth2Builder.builder()
.setClientCredentialsConfiguration(config)
.build();

admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(authentication)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.pulsar.client.impl.auth.oauth2;

import java.net.URL;
import java.time.Clock;
import org.apache.pulsar.client.api.Authentication;

/**
* Factory class that allows to create {@link Authentication} instances
* for OAuth 2.0 authentication methods.
*
* Use {@link AuthenticationOAuth2Builder}.
*/
@Deprecated
public final class AuthenticationFactoryOAuth2 {

/**
Expand All @@ -35,7 +37,9 @@ public final class AuthenticationFactoryOAuth2 {
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @return an Authentication object
* @deprecated use {@link AuthenticationOAuth2Builder}, instead.
*/
@Deprecated
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
return clientCredentials(issuerUrl, credentialsUrl, audience, null);
}
Expand All @@ -52,14 +56,16 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
* @deprecated use {@link AuthenticationOAuth2Builder}, instead.
*/
@Deprecated
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
ClientCredentialsConfiguration config = ClientCredentialsConfiguration.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.keyFileUrl(credentialsUrl)
.audience(audience)
.scope(scope)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
return new AuthenticationOAuth2Builder().setClientCredentialsConfiguration(config).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.time.Clock;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.NotImplementedException;
Expand All @@ -31,31 +34,100 @@
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AuthenticationUtil;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;

/**
* Pulsar client authentication provider based on OAuth 2.0.
*
* The first call to {@link #getAuthData()} will result in a blocking network call to retrieve the OAuth2.0 token from
* the Identity Provider. After that, there are two behaviors, depending on {@link #earlyTokenRefreshPercent}:
*
* 1. If {@link #earlyTokenRefreshPercent} is less than 1, this authentication class will schedule a runnable to refresh
* the token in n seconds where n is the result of multiplying {@link #earlyTokenRefreshPercent} and the `expires_in`
* value returned by the Identity Provider. If the call to the Identity Provider fails, this class will retry attempting
* to refresh the token using an exponential backoff. If the token is not refreshed before it expires, the Pulsar client
* will make one final blocking call to the Identity Provider. If that call fails, this class will pass the failure to
* the Pulsar client. This proactive approach to token management is good for use cases that want to avoid latency
* spikes from calls to the Identity Provider and that want to be able to withstand short Identity Provider outages. The
* tradeoff is that this class consumes slightly more resources.
*
* 2. If {@link #earlyTokenRefreshPercent} is greater than or equal to 1, this class will not retrieve a new token until
* the {@link #getAuthData()} method is called while the cached token is expired. If the call to the Identity Provider
* fails, this class will pass the failure to the Pulsar client. This lazy approach is good for use cases that are not
* latency sensitive and that will not use the token frequently.
*
* {@link #earlyTokenRefreshPercent} must be greater than 0. It defaults to 1, which means that early token refresh is
* disabled by default.
*
* The current implementation of this class can block the calling thread.
*
* This class is intended to be called from multiple threads, and is therefore designed to be thread-safe.
*/
@Slf4j
public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport {

public static final String CONFIG_PARAM_TYPE = "type";
public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
public static final int EARLY_TOKEN_REFRESH_PERCENT_DEFAULT = 1; // feature disabled by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reply separately to the earlyTokenRefreshPercent question

public static final String AUTH_METHOD_NAME = "token";
public static final double EXPIRY_ADJUSTMENT = 0.9;
private static final long serialVersionUID = 1L;
private final transient ScheduledThreadPoolExecutor scheduler;
private final boolean createdScheduler;
private final double earlyTokenRefreshPercent;

final Clock clock;
Flow flow;
transient CachedToken cachedToken;
volatile Flow flow;
private transient volatile CachedToken cachedToken;

// Only ever updated in synchronized block on class.
private boolean isClosed = false;

// Only ever updated on the single scheduler thread. Do not need to be volatile.
private transient Backoff backoff;
private transient ScheduledFuture<?> nextRefreshAttempt;

// No args constructor used when creating class with reflection
public AuthenticationOAuth2() {
this.clock = Clock.systemDefaultZone();
this(Clock.systemDefaultZone(), EARLY_TOKEN_REFRESH_PERCENT_DEFAULT, null);
}

AuthenticationOAuth2(Flow flow, Clock clock) {
AuthenticationOAuth2(Flow flow,
double earlyTokenRefreshPercent,
ScheduledThreadPoolExecutor scheduler) {
this(flow, Clock.systemDefaultZone(), earlyTokenRefreshPercent, scheduler);
}

AuthenticationOAuth2(Flow flow,
Clock clock,
double earlyTokenRefreshPercent,
ScheduledThreadPoolExecutor scheduler) {
this(clock, earlyTokenRefreshPercent, scheduler);
this.flow = flow;
}

/**
* @param clock - clock to use when determining token expiration.
* @param earlyTokenRefreshPercent - see javadoc for {@link AuthenticationOAuth2}. Must be greater than 0.
* @param scheduler - The scheduler to use for background refreshes of the token. If null and the
* {@link #earlyTokenRefreshPercent} is less than 1, the client will create an internal scheduler.
* Otherwise, it will use the passed in scheduler. If the caller supplies a scheduler, the
* {@link AuthenticationOAuth2} will not close it.
*/
private AuthenticationOAuth2(Clock clock, double earlyTokenRefreshPercent, ScheduledThreadPoolExecutor scheduler) {
if (earlyTokenRefreshPercent <= 0) {
throw new IllegalArgumentException("EarlyTokenRefreshPercent must be greater than 0.");
}
this.earlyTokenRefreshPercent = earlyTokenRefreshPercent;
this.clock = clock;
if (scheduler == null && earlyTokenRefreshPercent < 1) {
this.scheduler = new ScheduledThreadPoolExecutor(1);
this.createdScheduler = true;
} else {
this.scheduler = scheduler;
this.createdScheduler = false;
}
}

@Override
Expand All @@ -76,12 +148,10 @@ public void configure(String encodedAuthParamString) {
}

String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS);
switch(type) {
case TYPE_CLIENT_CREDENTIALS:
this.flow = ClientCredentialsFlow.fromParameters(params);
break;
default:
throw new IllegalArgumentException("Unsupported authentication type: " + type);
if (TYPE_CLIENT_CREDENTIALS.equals(type)) {
this.flow = ClientCredentialsFlow.fromParameters(params);
} else {
throw new IllegalArgumentException("Unsupported authentication type: " + type);
}
}

Expand All @@ -96,21 +166,111 @@ public void start() throws PulsarClientException {
flow.initialize();
}

/**
* The first time that this method is called, it retrieves a token. All subsequent
* calls should get a cached value. However, if there is an issue with the Identity
* Provider, there is a chance that the background thread responsible for keeping
* the refresh token hot will
* @return The authentication data identifying this client that will be sent to the broker
* @throws PulsarClientException
*/
@Override
public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException {
if (isClosed) {
throw new PulsarClientException.AlreadyClosedException("Authentication already closed.");
}
if (this.cachedToken == null || this.cachedToken.isExpired()) {
TokenResult tr = this.flow.authenticate();
this.cachedToken = new CachedToken(tr);
this.authenticate();
}
return this.cachedToken.getAuthData();
}

/**
* Retrieve the token (synchronously), and then schedule refresh runnable.
*/
private void authenticate() throws PulsarClientException {
if (log.isDebugEnabled()) {
log.debug("Attempting to retrieve OAuth2 token now.");
}
TokenResult tr = this.flow.authenticate();
this.cachedToken = new CachedToken(tr);
handleSuccessfulTokenRefresh();
}

/**
* When we successfully get a token, we need to schedule the next attempt to refresh it.
* This is done completely based on the "expires_in" value returned by the identity provider.
* The code is run on the single scheduler thread in order to ensure that the backoff and the nextRefreshAttempt are
* updated safely.
*/
private void handleSuccessfulTokenRefresh() {
if (scheduler != null) {
scheduler.execute(() -> {
if (earlyTokenRefreshPercent < 1) {
backoff = buildBackoff(cachedToken.latest.getExpiresIn());
long expiresInMillis = TimeUnit.SECONDS.toMillis(cachedToken.latest.getExpiresIn());
scheduleRefresh((long) (expiresInMillis * earlyTokenRefreshPercent));
}
});
}
}

/**
* Attempt to refresh the token. If successful, schedule the next refresh task according to the
* {@link #earlyTokenRefreshPercent}. If failed, schedule another attempt to refresh the token according to the
* {@link #backoff} policy.
*/
private void refreshToken() {
try {
this.authenticate();
} catch (PulsarClientException | RuntimeException e) {
long delayMillis = backoff.next();
log.error("Error refreshing token. Will retry in {} millis.", delayMillis, e);
scheduleRefresh(delayMillis);
}
}

/**
* Schedule the task to refresh the token.
* NOTE: this method must be run on the {@link #scheduler} thread in order to ensure {@link #nextRefreshAttempt}
* is accessed and updated safely.
* @param delayMillis the time, in milliseconds, to wait before starting to attempt to refresh the token.
*/
private void scheduleRefresh(long delayMillis) {
nextRefreshAttempt = scheduler.schedule(this::refreshToken, delayMillis, TimeUnit.MILLISECONDS);
}

private Backoff buildBackoff(int expiresInSeconds) {
return new BackoffBuilder()
.setInitialTime(1, TimeUnit.SECONDS)
.setMax(10, TimeUnit.MINUTES)
// Attempt a final token refresh attempt 2 seconds before the token actually expires, if necessary.
.setMandatoryStop(expiresInSeconds - 2, TimeUnit.SECONDS)
.create();
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
try {
flow.close();
isClosed = true;
if (flow != null) {
flow.close();
}
} catch (Exception e) {
throw new IOException(e);
} finally {
if (createdScheduler) {
this.scheduler.shutdownNow();
} else if (scheduler != null) {
// Cancel the all subsequent refresh attempts by canceling the next token refresh attempt. By running
// this command on the single scheduler thread, we remove the chance for a race condition that could
// allow a currently executing refresh attempt to schedule another refresh attempt.
scheduler.execute(() -> {
if (nextRefreshAttempt != null) {
nextRefreshAttempt.cancel(false);
}
});
}
}
}

Expand All @@ -122,8 +282,7 @@ class CachedToken {

public CachedToken(TokenResult latest) {
this.latest = latest;
int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT);
this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(latest.getExpiresIn());
this.authData = new AuthenticationDataOAuth2(latest.getAccessToken());
}

Expand Down
Loading