Skip to content
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {

if (gcEnabled) {
// delete data files only if we are sure this won't corrupt other tables
deleteFiles(io, manifestsToDelete);
deleteFiles(io, manifestsToDelete, metadata.specsById());
}

deleteFiles(io, Iterables.transform(manifestsToDelete, ManifestFile::path), "manifest");
Expand Down Expand Up @@ -163,7 +163,8 @@ public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
private static void deleteFiles(
FileIO io, Set<ManifestFile> allManifests, Map<Integer, PartitionSpec> specsById) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Map<String, Boolean> deletedFiles =
new MapMaker().concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE).weakKeys().makeMap();
Expand All @@ -177,7 +178,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(
manifest -> {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, io, specsById)) {
List<String> pathsToDelete = Lists.newArrayList();
for (ManifestEntry<?> entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of
Expand Down
58 changes: 45 additions & 13 deletions core/src/main/java/org/apache/iceberg/CherryPickOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.iceberg.exceptions.CherrypickAncestorCommitException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.util.WapUtil;

/**
Expand All @@ -40,14 +45,12 @@
*/
class CherryPickOperation extends MergingSnapshotProducer<CherryPickOperation> {

private final FileIO io;
private Snapshot cherrypickSnapshot = null;
private boolean requireFastForward = false;
private PartitionSet replacedPartitions = null;

CherryPickOperation(String tableName, TableOperations ops) {
super(tableName, ops);
this.io = ops.io();
}

@Override
Expand Down Expand Up @@ -161,7 +164,8 @@ protected void validate(TableMetadata base, Snapshot snapshot) {
// case
if (!isFastForward(base)) {
validateNonAncestor(base, cherrypickSnapshot.snapshotId());
validateReplacedPartitions(base, cherrypickSnapshot.parentId(), replacedPartitions, io);
validateReplacedPartitions(
base, cherrypickSnapshot.parentId(), replacedPartitions, ops().io());
WapUtil.validateWapPublish(base, cherrypickSnapshot.snapshotId());
}
}
Expand Down Expand Up @@ -218,21 +222,49 @@ private static void validateReplacedPartitions(
parentId == null || isCurrentAncestor(meta, parentId),
"Cannot cherry-pick overwrite, based on non-ancestor of the current state: %s",
parentId);
try (CloseableIterable<DataFile> newFiles =
SnapshotUtil.newFilesBetween(
parentId, meta.currentSnapshot().snapshotId(), meta::snapshot, io)) {
for (DataFile newFile : newFiles) {
ValidationException.check(
!replacedPartitions.contains(newFile.specId(), newFile.partition()),
"Cannot cherry-pick replace partitions with changed partition: %s",
newFile.partition());
List<Snapshot> snapshots =
Lists.newArrayList(
SnapshotUtil.ancestorsBetween(
meta.currentSnapshot().snapshotId(), parentId, meta::snapshot));
if (!snapshots.isEmpty()) {
Iterable<CloseableIterable<DataFile>> addedFileTasks =
Iterables.concat(
Iterables.transform(
snapshots,
snap ->
Iterables.transform(
manifestsCreatedBy(snap, io),
manifest -> addedDataFiles(manifest, io, meta.specsById()))));

try (CloseableIterable<DataFile> newFiles =
new ParallelIterable<>(addedFileTasks, ThreadPools.getWorkerPool())) {
for (DataFile newFile : newFiles) {
ValidationException.check(
!replacedPartitions.contains(newFile.specId(), newFile.partition()),
"Cannot cherry-pick replace partitions with changed partition: %s",
newFile.partition());
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to validate replaced partitions", e);
}
} catch (IOException ioe) {
throw new UncheckedIOException("Failed to validate replaced partitions", ioe);
}
}
}

private static Iterable<ManifestFile> manifestsCreatedBy(Snapshot snapshot, FileIO io) {
return Iterables.filter(
snapshot.dataManifests(io), m -> Objects.equals(m.snapshotId(), snapshot.snapshotId()));
}

private static CloseableIterable<DataFile> addedDataFiles(
ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
CloseableIterable<ManifestEntry<DataFile>> entries =
ManifestFiles.read(manifest, io, specsById).entries();
CloseableIterable<ManifestEntry<DataFile>> added =
CloseableIterable.filter(entries, e -> e.status() == ManifestEntry.Status.ADDED);
return CloseableIterable.transform(added, e -> e.file().copy());
}

private static Long lookupAncestorBySourceSnapshot(TableMetadata meta, long snapshotId) {
String snapshotIdStr = String.valueOf(snapshotId);
for (long ancestorId : currentAncestors(meta)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ private static PartitionMap<PartitionStatistics> computeStats(
private static PartitionMap<PartitionStatistics> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType, boolean incremental) {
List<String> projection = BaseScan.scanColumns(manifest.content());
try (ManifestReader<?> reader = ManifestFiles.open(manifest, table.io()).select(projection)) {
try (ManifestReader<?> reader =
ManifestFiles.open(manifest, table.io(), table.specs()).select(projection)) {
PartitionMap<PartitionStatistics> statsMap = PartitionMap.create(table.specs());
int specId = manifest.partitionSpecId();
PartitionSpec spec = table.specs().get(specId);
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -81,7 +82,8 @@ public void cleanFiles(

if (!manifestsToDelete.isEmpty()) {
if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) {
Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
Set<String> dataFilesToDelete =
findFilesToDelete(manifestsToDelete, currentManifests, beforeExpiration.specsById());
LOG.debug("Deleting {} data files", dataFilesToDelete.size());
deleteFiles(dataFilesToDelete, "data");
}
Expand Down Expand Up @@ -165,9 +167,10 @@ private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
return manifestFiles;
}

// Helper to determine data files to delete
private Set<String> findFilesToDelete(
Set<ManifestFile> manifestFilesToDelete, Set<ManifestFile> currentManifestFiles) {
Set<ManifestFile> manifestFilesToDelete,
Set<ManifestFile> currentManifestFiles,
Map<Integer, PartitionSpec> specsById) {
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();

Tasks.foreach(manifestFilesToDelete)
Expand All @@ -180,7 +183,8 @@ private Set<String> findFilesToDelete(
"Failed to determine live files in manifest {}. Retrying", item.path(), exc))
.run(
manifest -> {
try (CloseableIterable<String> paths = ManifestFiles.readPaths(manifest, fileIO)) {
try (CloseableIterable<String> paths =
ManifestFiles.readPaths(manifest, fileIO, specsById)) {
paths.forEach(filesToDelete::add);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest);
Expand Down Expand Up @@ -208,7 +212,8 @@ private Set<String> findFilesToDelete(
}

// Remove all the live files from the candidate deletion set
try (CloseableIterable<String> paths = ManifestFiles.readPaths(manifest, fileIO)) {
try (CloseableIterable<String> paths =
ManifestFiles.readPaths(manifest, fileIO, specsById)) {
paths.forEach(filesToDelete::remove);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest);
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -284,8 +285,8 @@ private static Iterable<Long> toIds(Iterable<Snapshot> snapshots) {
}

/**
* @deprecated will be removed in 1.12.0, use {@link #newFilesBetween(Long, long, Function,
* FileIO)} instead.
* @deprecated will be removed in 1.12.0, use {@link SnapshotChanges} with {@link
* #ancestorsBetween(long, Long, Function)} instead.
*/
@Deprecated
public static List<DataFile> newFiles(
Expand All @@ -310,6 +311,11 @@ public static List<DataFile> newFiles(
return newFiles;
}

/**
* @deprecated will be removed in 1.12.0, use {@link SnapshotChanges} with {@link
* #ancestorsBetween(long, Long, Function)} instead.
*/
@Deprecated
public static CloseableIterable<DataFile> newFilesBetween(
Long startSnapshotId, long endSnapshotId, Function<Long, Snapshot> lookup, FileIO io) {

Expand Down
23 changes: 18 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,10 @@ public void testDuplicateDVsAreMerged() throws IOException {
commit(table, rowDelta1, branch);

Iterable<DeleteFile> addedDeleteFiles =
latestSnapshot(table, branch).addedDeleteFiles(table.io());
SnapshotChanges.builderFor(table)
.snapshot(latestSnapshot(table, branch))
.build()
.addedDeleteFiles();
assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1);
DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);

Expand Down Expand Up @@ -2004,7 +2007,8 @@ public void testDuplicateDVsMergedMultipleSpecs() throws IOException {

Snapshot snapshot = latestSnapshot(table, branch);
// Expect 3 merged DVs, one per data file
Iterable<DeleteFile> addedDeleteFiles = snapshot.addedDeleteFiles(table.io());
Iterable<DeleteFile> addedDeleteFiles =
SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDeleteFiles();
List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
assertThat(mergedDVs).hasSize(3);
// Should be a Puffin produced per merged DV spec
Expand Down Expand Up @@ -2067,7 +2071,10 @@ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws IOExcept

// Expect two merged DVs, one per data file
Iterable<DeleteFile> addedDeleteFiles =
latestSnapshot(table, branch).addedDeleteFiles(table.io());
SnapshotChanges.builderFor(table)
.snapshot(latestSnapshot(table, branch))
.build()
.addedDeleteFiles();
List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);

assertThat(mergedDVs).hasSize(2);
Expand Down Expand Up @@ -2119,7 +2126,10 @@ public void testDuplicateDVsAndValidDV() throws IOException {

// Expect two DVs: one merged for dataFile1 and deleteFile2
Iterable<DeleteFile> addedDeleteFiles =
latestSnapshot(table, branch).addedDeleteFiles(table.io());
SnapshotChanges.builderFor(table)
.snapshot(latestSnapshot(table, branch))
.build()
.addedDeleteFiles();
List<DeleteFile> committedDVs = Lists.newArrayList(addedDeleteFiles);

assertThat(committedDVs).hasSize(2);
Expand Down Expand Up @@ -2167,7 +2177,10 @@ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
commit(table, rowDelta, branch);

Iterable<DeleteFile> addedDeleteFiles =
latestSnapshot(table, branch).addedDeleteFiles(table.io());
SnapshotChanges.builderFor(table)
.snapshot(latestSnapshot(table, branch))
.build()
.addedDeleteFiles();
List<DeleteFile> committedDeletes = Lists.newArrayList(addedDeleteFiles);

// 1 DV + 1 equality delete
Expand Down
21 changes: 13 additions & 8 deletions data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
Expand Down Expand Up @@ -150,8 +151,9 @@ public void testRewriteDVs() throws IOException {

// commit the first DV
commit(result1);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
SnapshotChanges changes1 = SnapshotChanges.builderFor(table).build();
assertThat(changes1.addedDeleteFiles()).hasSize(1);
assertThat(changes1.removedDeleteFiles()).isEmpty();

// verify correctness after committing the first DV
assertRows(ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa")));
Expand All @@ -174,8 +176,9 @@ public void testRewriteDVs() throws IOException {

// replace DVs
commit(result2);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
SnapshotChanges changes2 = SnapshotChanges.builderFor(table).build();
assertThat(changes2.addedDeleteFiles()).hasSize(1);
assertThat(changes2.removedDeleteFiles()).hasSize(1);

// verify correctness after replacing DVs
assertRows(ImmutableList.of(toRow(1, "aaa")));
Expand Down Expand Up @@ -220,8 +223,9 @@ public void testRewriteFileScopedPositionDeletes() throws IOException {

// replace the position delete file with the DV
commit(result);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
SnapshotChanges changes = SnapshotChanges.builderFor(table).build();
assertThat(changes.addedDeleteFiles()).hasSize(1);
assertThat(changes.removedDeleteFiles()).hasSize(1);

// verify correctness
assertRows(ImmutableList.of(toRow(3, "aaa")));
Expand Down Expand Up @@ -299,8 +303,9 @@ public void testApplyPartitionScopedPositionDeletes() throws IOException {

// commit the DV, ensuring the position delete file remains
commit(result);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
SnapshotChanges changes = SnapshotChanges.builderFor(table).build();
assertThat(changes.addedDeleteFiles()).hasSize(1);
assertThat(changes.removedDeleteFiles()).isEmpty();

// verify correctness with DVs and position delete files
assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(6, "aaa")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.net.URLCodec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -459,9 +460,10 @@ private void checkDataFilePathsIntegrity(
deltaLog.update().getAllFiles().stream()
.map(f -> getFullFilePath(f.getPath(), deltaLog.getPath().toString()))
.collect(Collectors.toList());
icebergTable
.currentSnapshot()
.addedDataFiles(icebergTable.io())
SnapshotChanges.builderFor(icebergTable)
.snapshot(icebergTable.currentSnapshot())
.build()
.addedDataFiles()
.forEach(
dataFile -> {
assertThat(URI.create(dataFile.location()).isAbsolute()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -88,6 +89,6 @@ static List<DataFileRewriteRunner.ExecutedGroup> executeRewrite(

static Set<DataFile> newDataFiles(Table table) {
table.refresh();
return Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io()));
return Sets.newHashSet(SnapshotChanges.builderFor(table).build().addedDataFiles());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.SnapshotChanges;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.Trigger;
Expand Down Expand Up @@ -262,9 +263,9 @@ private static void assertDataFiles(

assertThat(table.currentSnapshot().summary().get(TOTAL_DATA_FILES))
.isEqualTo(String.valueOf(expectedCurrent));
Set<DataFile> actualAdded = Sets.newHashSet(table.currentSnapshot().addedDataFiles(table.io()));
Set<DataFile> actualRemoved =
Sets.newHashSet(table.currentSnapshot().removedDataFiles(table.io()));
SnapshotChanges changes = SnapshotChanges.builderFor(table).build();
Set<DataFile> actualAdded = Sets.newHashSet(changes.addedDataFiles());
Set<DataFile> actualRemoved = Sets.newHashSet(changes.removedDataFiles());
assertThat(actualAdded.stream().map(DataFile::location).collect(Collectors.toSet()))
.isEqualTo(expectedAdded.stream().map(DataFile::location).collect(Collectors.toSet()));
assertThat(actualRemoved.stream().map(DataFile::location).collect(Collectors.toSet()))
Expand Down
Loading
Loading