diff --git a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java index b377d24c6042..804b34507874 100644 --- a/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java +++ b/gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java @@ -31,7 +31,6 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import java.io.IOException; import java.io.InputStream; @@ -39,6 +38,7 @@ import java.util.List; import java.util.Random; import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.IOUtil; @@ -227,9 +227,28 @@ public void readMissingLocation() { InputFile in = fileIO.newInputFile(location); assertThatThrownBy(() -> in.newStream().read()) - .isInstanceOf(IOException.class) - .hasCauseInstanceOf(StorageException.class) - .hasMessageContaining("404 Not Found"); + .isInstanceOf(NotFoundException.class) + .hasCauseInstanceOf(IOException.class) + .hasMessage("Location does not exist: gs://test-bucket/path/to/data.parquet"); + } + + @Test + public void readMissingLocationGcsAnalyticsCoreEnabled() throws IOException { + String location = String.format("gs://%s/path/to/data.parquet", BUCKET); + fileIO.initialize( + ImmutableMap.of( + GCPProperties.GCS_ANALYTICS_CORE_ENABLED, + "true", + GCPProperties.GCS_NO_AUTH, + "true", + GCPProperties.GCS_SERVICE_HOST, + String.format("http://localhost:%d", GCS_EMULATOR_PORT))); + InputFile in = fileIO.newInputFile(location); + + assertThatThrownBy(() -> in.newStream().read()) + .isInstanceOf(NotFoundException.class) + .hasCauseInstanceOf(IOException.class) + .hasMessage("Location does not exist: gs://test-bucket/path/to/data.parquet"); } @Test diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSExceptionUtil.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSExceptionUtil.java new file mode 100644 index 000000000000..a0a5689069ef --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSExceptionUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.StorageException; +import java.io.IOException; +import org.apache.iceberg.exceptions.NotFoundException; + +final class GCSExceptionUtil { + private GCSExceptionUtil() {} + + static void throwNotFoundIfPresent(IOException ioException, BlobId blobId) { + if (ioException.getCause() instanceof StorageException storageException + && storageException.getCode() == 404) { + throw new NotFoundException(ioException, "Location does not exist: %s", blobId.toGsUtilUri()); + } + } +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java index 497af03bcdaa..12dc71b5a181 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java @@ -94,11 +94,11 @@ public SeekableInputStream newStream() { private SeekableInputStream newGoogleCloudStorageInputStream() throws IOException { if (null == blobSize) { return new GcsInputStreamWrapper( - GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()), metrics()); + GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()), blobId(), metrics()); } return new GcsInputStreamWrapper( - GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()), metrics()); + GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()), blobId(), metrics()); } private GcsItemId gcsItemId() { diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java index 3b41ae21d34e..9338a06754c0 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java @@ -127,7 +127,12 @@ public int read() throws IOException { singleByteBuffer.position(0); pos += 1; - channel.read(singleByteBuffer); + try { + channel.read(singleByteBuffer); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } readBytes.increment(); readOperations.increment(); @@ -174,7 +179,12 @@ private int read(ReadChannel readChannel, ByteBuffer buffer, int off, int len) throws IOException { buffer.position(off); buffer.limit(Math.min(off + len, buffer.capacity())); - return readChannel.read(buffer); + try { + return readChannel.read(buffer); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } } @Override diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java index 2e1dfdd73c08..ebb513578c39 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GcsInputStreamWrapper.java @@ -21,6 +21,7 @@ import com.google.api.client.util.Preconditions; import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import com.google.cloud.storage.BlobId; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -37,10 +38,14 @@ class GcsInputStreamWrapper extends SeekableInputStream implements RangeReadable private final Counter readBytes; private final Counter readOperations; private final GoogleCloudStorageInputStream stream; + private final BlobId blobId; - GcsInputStreamWrapper(GoogleCloudStorageInputStream stream, MetricsContext metrics) { + GcsInputStreamWrapper( + GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext metrics) { Preconditions.checkArgument(null != stream, "Invalid input stream : null"); + Preconditions.checkArgument(null != blobId, "Invalid blobId : null"); this.stream = stream; + this.blobId = blobId; this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES); this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); } @@ -57,7 +62,13 @@ public void seek(long newPos) throws IOException { @Override public int read() throws IOException { - int readByte = stream.read(); + int readByte; + try { + readByte = stream.read(); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } readBytes.increment(); readOperations.increment(); return readByte; @@ -70,7 +81,13 @@ public int read(byte[] b) throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { - int bytesRead = stream.read(b, off, len); + int bytesRead; + try { + bytesRead = stream.read(b, off, len); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } if (bytesRead > 0) { readBytes.increment(bytesRead); } @@ -80,12 +97,22 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - stream.readFully(position, buffer, offset, length); + try { + stream.readFully(position, buffer, offset, length); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } } @Override public int readTail(byte[] buffer, int offset, int length) throws IOException { - return stream.readTail(buffer, offset, length); + try { + return stream.readTail(buffer, offset, length); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } } @Override @@ -101,8 +128,12 @@ public void readVectored(List ranges, IntFunction allocat .setByteBufferFuture(fileRange.byteBuffer()) .build()) .collect(Collectors.toList()); - - stream.readVectored(objectRanges, allocate); + try { + stream.readVectored(objectRanges, allocate); + } catch (IOException e) { + GCSExceptionUtil.throwNotFoundIfPresent(e, blobId); + throw e; + } } @Override diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java index f367db94264a..7c2908770ab6 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGCSInputStream.java @@ -163,6 +163,10 @@ private void readAndCheckRanges( @Test public void testClose() throws Exception { BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat"); + byte[] data = randomData(1024 * 1024); + + writeGCSData(blobId, data); + SeekableInputStream closed = new GCSInputStream(storage, blobId, null, gcpProperties, MetricsContext.nullMetrics()); closed.close(); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java index 2320037bd017..c6eae113d52d 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputStreamWrapper.java @@ -20,6 +20,7 @@ import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import com.google.cloud.storage.BlobId; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -44,7 +45,10 @@ public class TestGcsInputStreamWrapper { @BeforeEach public void before() { inputStreamWrapper = - new GcsInputStreamWrapper(googleCloudStorageInputStream, MetricsContext.nullMetrics()); + new GcsInputStreamWrapper( + googleCloudStorageInputStream, + BlobId.of("mockbucket", "mockname"), + MetricsContext.nullMetrics()); } @Test