Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,71 +424,74 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,

Connection connection = null;
RegionLocator regionLocator = null;
if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);
try {
List<InputSplit> splits = new ArrayList<>();
try {
if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);

connection = ConnectionFactory.createConnection(newConf);
regionLocator = connection.getRegionLocator(htd.getTableName());

/* Get all locations for the table and cache it */
regionLocator.getAllRegionLocations();
} finally {
if (connection != null) {
connection.close();
}
}
}

List<InputSplit> splits = new ArrayList<>();
for (HRegionInfo hri : regionManifests) {
// load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);

hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
for (HRegionInfo hri : regionManifests) {
// load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);

hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
}
}
}

if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])
) {

Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}

if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0
? scan.getStopRow()
: sp[i + 1]);
}

splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
}
}
} else {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())
) {

Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}

if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
}

splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
} else {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())
) {

splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
} finally {
if (connection != null) {
connection.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
Expand Down Expand Up @@ -684,4 +686,49 @@ public void testReadFromRestoredSnapshotViaMR() throws Exception {
}
}
}

/**
* Tests that TableSnapshotInputFormatImpl correctly caches and uses region locations when
* locality is enabled
*/
@Test
public void testRegionLocatorUsesCache() throws Exception {
final TableName tableName = TableName.valueOf("testRegionLocatorCache");
Configuration conf = UTIL.getConfiguration();
try {
// Create table with multiple regions
createTableAndSnapshot(UTIL, tableName, "snapshot", getStartRow(), getEndRow(), 3);

conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);

Path tmpTableDir = UTIL.getDataTestDirOnTestFS("snapshot");

// Get snapshot manifest and region info
SnapshotManifest manifest = TableSnapshotInputFormatImpl.getSnapshotManifest(conf, "snapshot",
CommonFSUtils.getRootDir(conf), UTIL.getTestFileSystem());
List<HRegionInfo> regionInfos =
TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);

Scan scan = new Scan();

List<TableSnapshotInputFormatImpl.InputSplit> splits = TableSnapshotInputFormatImpl.getSplits(
scan, manifest, regionInfos, tmpTableDir, conf, new RegionSplitter.UniformSplit(), 1);

// Verify that splits contain proper locality information
Assert.assertNotNull(splits);
Assert.assertFalse(splits.isEmpty());

// Verify locations are populated from cache
for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
String[] locations = split.getLocations();
// Locations should be populated from cache
Assert.assertNotNull(locations);
}

} finally {
UTIL.getAdmin().deleteSnapshot("snapshot");
UTIL.deleteTable(tableName);
conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
}
}
}