Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.IOUtil;
Expand Down Expand Up @@ -227,9 +227,28 @@ public void readMissingLocation() {
InputFile in = fileIO.newInputFile(location);

assertThatThrownBy(() -> in.newStream().read())
.isInstanceOf(IOException.class)
.hasCauseInstanceOf(StorageException.class)
.hasMessageContaining("404 Not Found");
.isInstanceOf(NotFoundException.class)
.hasCauseInstanceOf(IOException.class)
.hasMessage("Location does not exist: gs://test-bucket/path/to/data.parquet");
}

@Test
public void readMissingLocationGcsAnalyticsCoreEnabled() throws IOException {
String location = String.format("gs://%s/path/to/data.parquet", BUCKET);
fileIO.initialize(
ImmutableMap.of(
GCPProperties.GCS_ANALYTICS_CORE_ENABLED,
"true",
GCPProperties.GCS_NO_AUTH,
"true",
GCPProperties.GCS_SERVICE_HOST,
String.format("http://localhost:%d", GCS_EMULATOR_PORT)));
InputFile in = fileIO.newInputFile(location);

assertThatThrownBy(() -> in.newStream().read())
.isInstanceOf(NotFoundException.class)
.hasCauseInstanceOf(IOException.class)
.hasMessage("Location does not exist: gs://test-bucket/path/to/data.parquet");
}

@Test
Expand Down
35 changes: 35 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSExceptionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.gcp.gcs;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.StorageException;
import java.io.IOException;
import org.apache.iceberg.exceptions.NotFoundException;

final class GCSExceptionUtil {
private GCSExceptionUtil() {}

static void throwNotFoundIfPresent(IOException ioException, BlobId blobId) {
if (ioException.getCause() instanceof StorageException storageException
&& storageException.getCode() == 404) {
throw new NotFoundException(ioException, "Location does not exist: %s", blobId.toGsUtilUri());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ public SeekableInputStream newStream() {
private SeekableInputStream newGoogleCloudStorageInputStream() throws IOException {
if (null == blobSize) {
return new GcsInputStreamWrapper(
GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()), metrics());
GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsItemId()), blobId(), metrics());
}

return new GcsInputStreamWrapper(
GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()), metrics());
GoogleCloudStorageInputStream.create(gcsFileSystem(), gcsFileInfo()), blobId(), metrics());
}

private GcsItemId gcsItemId() {
Expand Down
14 changes: 12 additions & 2 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ public int read() throws IOException {
singleByteBuffer.position(0);

pos += 1;
channel.read(singleByteBuffer);
try {
channel.read(singleByteBuffer);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
readBytes.increment();
readOperations.increment();

Expand Down Expand Up @@ -174,7 +179,12 @@ private int read(ReadChannel readChannel, ByteBuffer buffer, int off, int len)
throws IOException {
buffer.position(off);
buffer.limit(Math.min(off + len, buffer.capacity()));
return readChannel.read(buffer);
try {
return readChannel.read(buffer);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.client.util.Preconditions;
import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
import com.google.cloud.storage.BlobId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand All @@ -37,10 +38,14 @@ class GcsInputStreamWrapper extends SeekableInputStream implements RangeReadable
private final Counter readBytes;
private final Counter readOperations;
private final GoogleCloudStorageInputStream stream;
private final BlobId blobId;

GcsInputStreamWrapper(GoogleCloudStorageInputStream stream, MetricsContext metrics) {
GcsInputStreamWrapper(
GoogleCloudStorageInputStream stream, BlobId blobId, MetricsContext metrics) {
Preconditions.checkArgument(null != stream, "Invalid input stream : null");
Preconditions.checkArgument(null != blobId, "Invalid blobId : null");
this.stream = stream;
this.blobId = blobId;
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
}
Expand All @@ -57,7 +62,13 @@ public void seek(long newPos) throws IOException {

@Override
public int read() throws IOException {
int readByte = stream.read();
int readByte;
try {
readByte = stream.read();
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
readBytes.increment();
readOperations.increment();
return readByte;
Expand All @@ -70,7 +81,13 @@ public int read(byte[] b) throws IOException {

@Override
public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = stream.read(b, off, len);
int bytesRead;
try {
bytesRead = stream.read(b, off, len);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
if (bytesRead > 0) {
readBytes.increment(bytesRead);
}
Expand All @@ -80,12 +97,22 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
stream.readFully(position, buffer, offset, length);
try {
stream.readFully(position, buffer, offset, length);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
return stream.readTail(buffer, offset, length);
try {
return stream.readTail(buffer, offset, length);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
}

@Override
Expand All @@ -101,8 +128,12 @@ public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> allocat
.setByteBufferFuture(fileRange.byteBuffer())
.build())
.collect(Collectors.toList());

stream.readVectored(objectRanges, allocate);
try {
stream.readVectored(objectRanges, allocate);
} catch (IOException e) {
GCSExceptionUtil.throwNotFoundIfPresent(e, blobId);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ private void readAndCheckRanges(
@Test
public void testClose() throws Exception {
BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
byte[] data = randomData(1024 * 1024);

writeGCSData(blobId, data);

SeekableInputStream closed =
new GCSInputStream(storage, blobId, null, gcpProperties, MetricsContext.nullMetrics());
closed.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
import com.google.cloud.storage.BlobId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand All @@ -44,7 +45,10 @@ public class TestGcsInputStreamWrapper {
@BeforeEach
public void before() {
inputStreamWrapper =
new GcsInputStreamWrapper(googleCloudStorageInputStream, MetricsContext.nullMetrics());
new GcsInputStreamWrapper(
googleCloudStorageInputStream,
BlobId.of("mockbucket", "mockname"),
MetricsContext.nullMetrics());
}

@Test
Expand Down
Loading