diff --git a/build.gradle b/build.gradle index ed0809c2f66c..b6026af67f40 100644 --- a/build.gradle +++ b/build.gradle @@ -718,6 +718,7 @@ project(':iceberg-gcp') { testImplementation libs.esotericsoftware.kryo testImplementation libs.mockserver.netty testImplementation libs.mockserver.client.java + testImplementation libs.mockito.junit.jupiter } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java new file mode 100644 index 000000000000..a1d2b539ab16 --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthManager.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.auth; + +import com.google.auth.oauth2.GoogleCredentials; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthManager; +import org.apache.iceberg.rest.auth.AuthSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An authentication manager that uses Google Credentials (typically Application Default + * Credentials) to create {@link GoogleAuthSession} instances. + * + *

This manager can be configured with properties such as: + * + *

+ */ +public class GoogleAuthManager implements AuthManager { + private static final Logger LOG = LoggerFactory.getLogger(GoogleAuthManager.class); + private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); + public static final String DEFAULT_SCOPES = "https://www.googleapis.com/auth/cloud-platform"; + public static final String GCP_CREDENTIALS_PATH_PROPERTY = "gcp.auth.credentials-path"; + public static final String GCP_SCOPES_PROPERTY = "gcp.auth.scopes"; + private final String name; + + private GoogleCredentials credentials; + private boolean initialized = false; + + public GoogleAuthManager(String managerName) { + this.name = managerName; + } + + public String name() { + return name; + } + + private void initialize(Map properties) { + if (initialized) { + return; + } + + String credentialsPath = properties.get(GCP_CREDENTIALS_PATH_PROPERTY); + String scopesString = properties.getOrDefault(GCP_SCOPES_PROPERTY, DEFAULT_SCOPES); + List scopes = + Strings.isNullOrEmpty(scopesString) + ? ImmutableList.of() + : ImmutableList.copyOf(SPLITTER.splitToList(scopesString)); + + try { + if (credentialsPath != null && !credentialsPath.isEmpty()) { + LOG.info("Using Google credentials from path: {}", credentialsPath); + try (FileInputStream credentialsStream = new FileInputStream(credentialsPath)) { + this.credentials = GoogleCredentials.fromStream(credentialsStream).createScoped(scopes); + } + } else { + LOG.info("Using Application Default Credentials with scopes: {}", scopesString); + this.credentials = GoogleCredentials.getApplicationDefault().createScoped(scopes); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to load Google credentials", e); + } + + this.initialized = true; + } + + /** + * Initializes and returns a short-lived session, typically for fetching configuration. This + * implementation reuses the long-lived catalog session logic. + */ + @Override + public AuthSession initSession(RESTClient initClient, Map properties) { + return catalogSession(initClient, properties); + } + + /** + * Returns a long-lived session tied to the catalog's lifecycle. This session uses Google + * Application Default Credentials or a specified service account. + * + * @param sharedClient The long-lived RESTClient (not used by this implementation for credential + * fetching). + * @param properties Configuration properties for the auth manager. + * @return A {@link GoogleAuthSession}. + * @throws UncheckedIOException if credential loading fails. + */ + @Override + public AuthSession catalogSession(RESTClient sharedClient, Map properties) { + initialize(properties); + Preconditions.checkState( + credentials != null, "GoogleAuthManager not initialized or failed to load credentials"); + return new GoogleAuthSession(credentials); + } + + /** + * Returns a session for a specific context. Defaults to the catalog session. For GCP, tokens are + * typically not context-specific in this manner. + */ + @Override + public AuthSession contextualSession(SessionCatalog.SessionContext context, AuthSession parent) { + return parent; + } + + /** Returns a session for a specific table or view. Defaults to the catalog session. */ + @Override + public AuthSession tableSession( + TableIdentifier table, Map properties, AuthSession parent) { + return parent; + } + + /** Closes the manager. This is a no-op for GoogleAuthManager. */ + @Override + public void close() {} +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthSession.java b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthSession.java new file mode 100644 index 000000000000..c81053613da4 --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/auth/GoogleAuthSession.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.auth; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.HTTPHeaders; +import org.apache.iceberg.rest.HTTPRequest; +import org.apache.iceberg.rest.ImmutableHTTPRequest; +import org.apache.iceberg.rest.auth.AuthSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An authentication session that uses Google Credentials (typically Application Default + * Credentials) to obtain an OAuth2 access token and add it to HTTP requests. + */ +class GoogleAuthSession implements AuthSession { + private static final Logger LOG = LoggerFactory.getLogger(GoogleAuthSession.class); + private final GoogleCredentials credentials; + + /** + * Constructs a GoogleAuthSession with the provided GoogleCredentials. + * + * @param credentials The GoogleCredentials to use for authentication. + */ + GoogleAuthSession(GoogleCredentials credentials) { + Preconditions.checkArgument(credentials != null, "Invalid credentials: null"); + this.credentials = credentials; + } + + /** + * Authenticates the given HTTP request by adding an "Authorization: Bearer token" header. The + * access token is obtained from the GoogleCredentials. + * + * @param request The HTTPRequest to authenticate. + * @return A new HTTPRequest with the added Authorization header. + * @throws UncheckedIOException if an IOException occurs while refreshing the access token. + */ + @Override + public HTTPRequest authenticate(HTTPRequest request) { + try { + credentials.refreshIfExpired(); + AccessToken token = credentials.getAccessToken(); + + if (token != null && token.getTokenValue() != null) { + HTTPHeaders newHeaders = + request + .headers() + .putIfAbsent( + HTTPHeaders.of( + HTTPHeaders.HTTPHeader.of( + "Authorization", "Bearer " + token.getTokenValue()))); + return newHeaders.equals(request.headers()) + ? request + : ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build(); + } else { + throw new IllegalStateException( + "Failed to obtain Google access token. Cannot authenticate request."); + } + } catch (IOException e) { + LOG.error("IOException while trying to refresh Google access token", e); + throw new UncheckedIOException("Failed to refresh Google access token", e); + } + } + + /** + * Closes the session. This is a no-op for GoogleAuthSession as the lifecycle of GoogleCredentials + * is not managed by this session. + */ + @Override + public void close() { + // No-op + } +} diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthManager.java b/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthManager.java new file mode 100644 index 000000000000..834cfbf92532 --- /dev/null +++ b/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthManager.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.auth; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.auth.oauth2.GoogleCredentials; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestGoogleAuthManager { + + private static final String MANAGER_NAME = "testManager"; + @Mock private RESTClient restClient; + @Mock private GoogleCredentials credentials; + @Mock private GoogleCredentials credentialsFromFile; + + private GoogleAuthManager authManager; + private MockedStatic mockedStaticCredentials; + + @TempDir File tempDir; + private File credentialFile; + + @BeforeEach + public void beforeEach() throws IOException { + authManager = new GoogleAuthManager(MANAGER_NAME); + mockedStaticCredentials = Mockito.mockStatic(GoogleCredentials.class); + credentialFile = new File(tempDir, "fake-creds.json"); + Files.write(credentialFile.toPath(), "{\"type\": \"service_account\"}".getBytes()); + } + + @AfterEach + public void afterEach() { + mockedStaticCredentials.close(); + } + + @Test + public void providesCorrectManagerName() { + assertThat(authManager.name()).isEqualTo(MANAGER_NAME); + } + + @Test + public void buildsCatalogSessionFromCredentialsFile() { + String customScopes = "scope1,scope2"; + Map properties = + ImmutableMap.of( + GoogleAuthManager.GCP_CREDENTIALS_PATH_PROPERTY, + credentialFile.getAbsolutePath(), + GoogleAuthManager.GCP_SCOPES_PROPERTY, + customScopes); + + mockedStaticCredentials + .when(() -> GoogleCredentials.fromStream(any(FileInputStream.class))) + .thenReturn(credentialsFromFile); + when(credentialsFromFile.createScoped(anyList())).thenReturn(credentials); + + AuthSession session = authManager.catalogSession(restClient, properties); + + assertThat(session).isInstanceOf(GoogleAuthSession.class); + verify(credentialsFromFile).createScoped(ImmutableList.of("scope1", "scope2")); + } + + @Test + public void buildsCatalogSessionWithEmptyScopes() { + Map properties = + ImmutableMap.of( + GoogleAuthManager.GCP_CREDENTIALS_PATH_PROPERTY, + credentialFile.getAbsolutePath(), + GoogleAuthManager.GCP_SCOPES_PROPERTY, + ""); + + mockedStaticCredentials + .when(() -> GoogleCredentials.fromStream(any(FileInputStream.class))) + .thenReturn(credentialsFromFile); + when(credentialsFromFile.createScoped(anyList())).thenReturn(credentials); + + AuthSession session = authManager.catalogSession(restClient, properties); + + assertThat(session).isInstanceOf(GoogleAuthSession.class); + verify(credentialsFromFile).createScoped(ImmutableList.of()); + } + + @Test + public void throwsUncheckedIOExceptionOnCredentialsFileError() { + Map properties = + ImmutableMap.of( + GoogleAuthManager.GCP_CREDENTIALS_PATH_PROPERTY, credentialFile.getAbsolutePath()); + + mockedStaticCredentials + .when(() -> GoogleCredentials.fromStream(any(FileInputStream.class))) + .thenThrow(new IOException("Simulated stream loading failure")); + + assertThatThrownBy(() -> authManager.catalogSession(restClient, properties)) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("Failed to load Google credentials"); + } + + @Test + public void buildsCatalogSessionUsingADC() { + mockedStaticCredentials + .when(GoogleCredentials::getApplicationDefault) + .thenReturn(credentialsFromFile); + + when(credentialsFromFile.createScoped(anyList())).thenReturn(credentials); + + AuthSession session = authManager.catalogSession(restClient, Collections.emptyMap()); + + assertThat(session).isInstanceOf(GoogleAuthSession.class); + mockedStaticCredentials.verify(GoogleCredentials::getApplicationDefault, times(1)); + } + + @Test + public void throwsUncheckedIOExceptionOnADCError() { + mockedStaticCredentials + .when(GoogleCredentials::getApplicationDefault) + .thenThrow(new IOException("ADC unavailable")); + + assertThatThrownBy(() -> authManager.catalogSession(restClient, Collections.emptyMap())) + .isInstanceOf(UncheckedIOException.class) + .hasMessageContaining("Failed to load Google credentials"); + } + + @Test + public void initializationOccursOnlyOnce() { + mockedStaticCredentials + .when(GoogleCredentials::getApplicationDefault) + .thenReturn(credentialsFromFile); + + when(credentialsFromFile.createScoped(anyList())).thenReturn(credentials); + + authManager.catalogSession(restClient, Collections.emptyMap()); + authManager.catalogSession(restClient, Collections.emptyMap()); + + mockedStaticCredentials.verify(GoogleCredentials::getApplicationDefault, times(1)); + } + + @Test + public void initSessionDelegatesToCatalogSession() { + GoogleAuthManager spyManager = spy(authManager); + AuthSession mockSession = mock(GoogleAuthSession.class); + Map props = Collections.emptyMap(); + + doReturn(mockSession).when(spyManager).catalogSession(restClient, props); + + AuthSession resultSession = spyManager.initSession(restClient, props); + assertThat(resultSession).isSameAs(mockSession); + verify(spyManager).catalogSession(restClient, props); + } +} diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthSession.java b/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthSession.java new file mode 100644 index 000000000000..ff2163b421c7 --- /dev/null +++ b/gcp/src/test/java/org/apache/iceberg/gcp/auth/TestGoogleAuthSession.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.auth; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.iceberg.rest.HTTPHeaders; +import org.apache.iceberg.rest.HTTPRequest; +import org.apache.iceberg.rest.ImmutableHTTPRequest; +import org.apache.iceberg.rest.auth.AuthSession; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestGoogleAuthSession { + + private static final String TEST_TOKEN_VALUE = "test-token-12345"; + private static final String TEST_RELATIVE_PATH = "v1/some/resource"; + @Mock private GoogleCredentials credentials; + @Mock private AccessToken accessToken; + + private AuthSession session; + private URI testBaseUri; + + @BeforeEach + public void beforeEach() throws URISyntaxException { + this.session = new GoogleAuthSession(credentials); + this.testBaseUri = new URI("http://localhost:8080"); + } + + @Test + public void addsAuthHeaderOnSuccessfulTokenFetch() throws IOException { + when(credentials.getAccessToken()).thenReturn(accessToken); + when(accessToken.getTokenValue()).thenReturn(TEST_TOKEN_VALUE); + + HTTPRequest originalRequest = + ImmutableHTTPRequest.builder() + .baseUri(testBaseUri) + .path(TEST_RELATIVE_PATH) + .method(HTTPRequest.HTTPMethod.GET) + .build(); + HTTPRequest authenticatedRequest = session.authenticate(originalRequest); + + verify(credentials).refreshIfExpired(); + verify(credentials).getAccessToken(); + + assertThat(authenticatedRequest).isNotSameAs(originalRequest); + + HTTPHeaders headers = authenticatedRequest.headers(); + assertThat(headers.contains("Authorization")).isTrue(); + assertThat(authenticatedRequest.headers().entries()) + .containsExactly(HTTPHeaders.HTTPHeader.of("Authorization", "Bearer " + TEST_TOKEN_VALUE)); + } + + @Test + public void preservesExistingAuthHeader() throws IOException { + String existingAuthHeaderValue = "Bearer existing-bearer-token"; + HTTPHeaders initialHeaders = + HTTPHeaders.of(HTTPHeaders.HTTPHeader.of("Authorization", existingAuthHeaderValue)); + HTTPRequest originalRequest = + ImmutableHTTPRequest.builder() + .baseUri(testBaseUri) + .path(TEST_RELATIVE_PATH) + .method(HTTPRequest.HTTPMethod.GET) + .headers(initialHeaders) + .build(); + + when(credentials.getAccessToken()).thenReturn(accessToken); + when(accessToken.getTokenValue()).thenReturn(TEST_TOKEN_VALUE); + + HTTPRequest authenticatedRequest = session.authenticate(originalRequest); + + verify(credentials).refreshIfExpired(); + + assertThat(authenticatedRequest).isSameAs(originalRequest); + assertThat(authenticatedRequest.headers().entries()) + .containsExactly(HTTPHeaders.HTTPHeader.of("Authorization", existingAuthHeaderValue)); + } + + @Test + public void propagatesIOExceptionAsUncheckedOnTokenRefreshFailure() throws IOException { + doThrow(new IOException("Failed to refresh token")).when(credentials).refreshIfExpired(); + + HTTPRequest originalRequest = + ImmutableHTTPRequest.builder() + .baseUri(testBaseUri) + .path(TEST_RELATIVE_PATH) + .method(HTTPRequest.HTTPMethod.GET) + .build(); + + assertThatThrownBy(() -> session.authenticate(originalRequest)) + .isInstanceOf(UncheckedIOException.class) + .hasMessage("Failed to refresh Google access token") + .cause() + .isInstanceOf(IOException.class) + .hasMessage("Failed to refresh token"); + + verify(credentials).refreshIfExpired(); + } + + @Test + public void throwsExceptionWhenAccessTokenIsNull() { + when(credentials.getAccessToken()).thenReturn(null); + + HTTPRequest originalRequest = + ImmutableHTTPRequest.builder() + .baseUri(testBaseUri) + .path(TEST_RELATIVE_PATH) + .method(HTTPRequest.HTTPMethod.GET) + .build(); + + assertThatThrownBy(() -> session.authenticate(originalRequest)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Failed to obtain Google access token. Cannot authenticate request."); + } + + @Test + public void throwsExceptionWhenTokenValueIsNull() { + when(credentials.getAccessToken()).thenReturn(accessToken); + when(accessToken.getTokenValue()).thenReturn(null); + + HTTPRequest originalRequest = + ImmutableHTTPRequest.builder() + .baseUri(testBaseUri) + .path(TEST_RELATIVE_PATH) + .method(HTTPRequest.HTTPMethod.GET) + .build(); + + assertThatThrownBy(() -> session.authenticate(originalRequest)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Failed to obtain Google access token. Cannot authenticate request."); + } + + @Test + public void sessionCloseBehavesAsNoOp() { + assertThatCode(session::close).doesNotThrowAnyException(); + } +}