diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c1a1a73..efd1d6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -78,6 +78,9 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; @@ -167,6 +170,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; @@ -218,6 +222,10 @@ private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; private ClientMmapManager mmapManager; + private volatile long hedgedReadThresholdMillis; + private static DFSHedgedReadMetrics HEDGED_READ_METRIC = + new DFSHedgedReadMetrics(); + private volatile static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY = new ClientMmapManagerFactory(); @@ -594,6 +602,16 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead); this.mmapManager = MMAP_MANAGER_FACTORY.get(conf); + + this.hedgedReadThresholdMillis = conf.getLong( + DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS); + int numThreads = conf.getInt( + DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE); + if (numThreads > 0) { + this.initThreadsNumForHedgedReads(numThreads); + } } /** @@ -2644,4 +2662,43 @@ public CachingStrategy getDefaultWriteCachingStrategy() { public ClientMmapManager getMmapManager() { return mmapManager; } -} + + private synchronized void initThreadsNumForHedgedReads(int num) { + if (num (), + new Daemon.DaemonFactory(), + new ThreadPoolExecutor.CallerRunsPolicy() { + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info("Execution rejected, Executing in current thread"); + HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + long getHedgedReadTimeout() { + return this.hedgedReadThresholdMillis; + } + + @VisibleForTesting + void setHedgedReadTimeout(long timeoutMillis) { + this.hedgedReadThresholdMillis = timeoutMillis; + } + + ThreadPoolExecutor getHedgedReadsThreadPool() { + return HEDGED_READ_THREAD_POOL; + } + + boolean isHedgedReadsEnabled() { + return (HEDGED_READ_THREAD_POOL != null) && + HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0; + } + + DFSHedgedReadMetrics getHedgedReadMetrics() { + return HEDGED_READ_METRIC; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index c5c6d5c..dba0c36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -16,7 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hdfs; -import java.io.IOException; import com.google.common.annotations.VisibleForTesting; @@ -46,4 +45,6 @@ public boolean uncorruptPacket() { public boolean failPacket() { return false; } + + public void startFetchFromDatanode() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fe1d3d1..8443629 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -582,4 +582,14 @@ public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500; public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis"; public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; + + // hedged read properties + public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = + "dfs.client.hedged.read.threshold.millis"; + public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS = + 500; + + public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = + "dfs.client.hedged.read.threadpool.size"; + public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 73861bc..c6de599 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -32,7 +33,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -565,7 +572,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { assert (target==pos) : "Wrong postion " + pos + " expect " + target; long offsetIntoBlock = target - targetBlock.getStartOffset(); - DNAddrPair retval = chooseDataNode(targetBlock); + DNAddrPair retval = chooseDataNode(targetBlock, null); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; @@ -864,32 +871,29 @@ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, corruptedBlockMap.put(blk, dnSet); } } - - private DNAddrPair chooseDataNode(LocatedBlock block) - throws IOException { + + private DNAddrPair chooseDataNode(LocatedBlock block, + Collection ignoredNodes) throws IOException { while (true) { DatanodeInfo[] nodes = block.getLocations(); try { - DatanodeInfo chosenNode = bestNode(nodes, deadNodes); - final String dnAddr = - chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + dnAddr); - } - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr); + return getBestNodeDNAddrPair(nodes, ignoredNodes); } catch (IOException ie) { + String errMsg = getBestNodeErrorString(nodes, deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getMaxBlockAcquireFailures()) { - throw new BlockMissingException(src, "Could not obtain block: " + blockInfo, - block.getStartOffset()); + String description = "Could not obtain block: " + blockInfo; + DFSClient.LOG.warn(description + errMsg + + ". Throwing a BlockMissingException"); + throw new BlockMissingException(src, description, + block.getStartOffset()); } if (nodes == null || nodes.length == 0) { DFSClient.LOG.info("No node available for " + blockInfo); } DFSClient.LOG.info("Could not obtain " + block.getBlock() - + " from any node: " + ie + + " from any node: " + ie + errMsg + ". Will get new block locations from namenode and retry..."); try { // Introducing a random factor to the wait time before another retry. @@ -915,35 +919,104 @@ private DNAddrPair chooseDataNode(LocatedBlock block) continue; } } - } - + } + + /** + * Get the best node. + * @param nodes Nodes to choose from. + * @param ignoredNodes Do not chose nodes in this array (may be null) + * @return The DNAddrPair of the best node. + * @throws IOException + */ + private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes, + Collection ignoredNodes) throws IOException { + DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes); + final String dnAddr = + chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Connecting to datanode " + dnAddr); + } + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); + return new DNAddrPair(chosenNode, targetAddr); + } + + // we need two normal nodes at least for hedged read + private boolean enoughNodesForHedgedRead(LocatedBlock block) { + DatanodeInfo[] nodes = block.getLocations(); + if (nodes != null) { + int sum = 0; + for (int i = 0; i = 2) { + return true; + } + } + } + } + return false; + } + private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { - // - // Connect to best DataNode for desired Block, with potential offset - // + block = getBlockAt(block.getStartOffset(), false); + while (true) { + DNAddrPair addressPair = chooseDataNode(block, null); + try { + actualGetFromOneDataNode(addressPair, block, start, end, buf, offset, + corruptedBlockMap); + return; + } catch (IOException e) { + // Ignore. Already processed inside the function. + // Loop through to try the next node. + } + } + } + + private Callable getFromOneDataNode(final DNAddrPair datanode, + final LocatedBlock block, final long start, final long end, + final ByteBuffer bb, + final Map> corruptedBlockMap, + final CountDownLatch latch) { + return new Callable() { + @Override + public ByteBuffer call() throws Exception { + byte[] buf = bb.array(); + int offset = bb.position(); + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + latch.countDown(); + return bb; + } + }; + } + + private void actualGetFromOneDataNode(final DNAddrPair datanode, + LocatedBlock block, final long start, final long end, byte[] buf, + int offset, Map> corruptedBlockMap) + throws IOException { + DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once - + while (true) { // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the + // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. CachingStrategy curCachingStrategy; synchronized (this) { block = getBlockAt(block.getStartOffset(), false); curCachingStrategy = cachingStrategy; } - DNAddrPair retval = chooseDataNode(block); - DatanodeInfo chosenNode = retval.info; - InetSocketAddress targetAddr = retval.addr; + DatanodeInfo chosenNode = datanode.info; + InetSocketAddress targetAddr = datanode.addr; BlockReader reader = null; - + try { Token blockToken = block.getBlockToken(); - + int len = (int) (end - start + 1); reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), blockToken, start, len, buffersize, verifyChecksum, @@ -955,11 +1028,14 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, } return; } catch (ChecksumException e) { - DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + - src + " at " + block.getBlock() + ":" + - e.getPos() + " from " + chosenNode); + String msg = "fetchBlockByteRange(). Got a checksum exception for " + + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + + chosenNode; + DFSClient.LOG.warn(msg); // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); + addToDeadNodes(chosenNode); + throw new IOException(msg); } catch (AccessControlException ex) { DFSClient.LOG.warn("Short circuit access failed " + ex); dfsClient.disableLegacyBlockReaderLocal(); @@ -975,22 +1051,146 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, continue; } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) { refetchToken--; - fetchBlockAt(block.getStartOffset()); + try { + fetchBlockAt(block.getStartOffset()); + } catch (IOException fbae) { + // ignore IOE, since we can retry it later in a loop + } continue; } else { - DFSClient.LOG.warn("Failed to connect to " + targetAddr + - " for file " + src + " for block " + block.getBlock() + ":" + e); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connection failure ", e); - } + String msg = "Failed to connect to " + targetAddr + " for file " + + src + " for block " + block.getBlock() + ":" + e; + DFSClient.LOG.warn("Connection failure: " + msg, e); + addToDeadNodes(chosenNode); + throw new IOException(msg); } } finally { if (reader != null) { reader.close(); } } - // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); + } + } + + private void hedgedFetchBlockByteRange(LocatedBlock block, long start, + long end, byte[] buf, int offset, + Map> corruptedBlockMap) + throws IOException { + ArrayList> futures = null; + ArrayList ignored = new ArrayList(); + ByteBuffer bb = null; + int len = (int) (end - start + 1); + block = getBlockAt(block.getStartOffset(), false); + // Latch shared by all outstanding reads. First to finish closes + CountDownLatch hasReceivedResult = new CountDownLatch(1); + while (true) { + DNAddrPair chosenNode = null; + Future future = null; + // futures is null if there is no request already executing. + if (futures == null) { + // chooseDataNode is a commitment. If no node, we go to + // the NN to reget block locations. Only go here on first read. + chosenNode = chooseDataNode(block, ignored); + bb = ByteBuffer.wrap(buf, offset, len); + future = getHedgedReadFuture(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + try { + future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS); + return; + } catch (TimeoutException e) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waited for " + + dfsClient.getHedgedReadTimeout() + + "ms . Still not complete. Spawning new task"); + } + // Ignore this node on next go around. + ignored.add(chosenNode.info); + dfsClient.getHedgedReadMetrics().incHedgedReadOps(); + futures = new ArrayList>(); + futures.add(future); + continue; // no need to refresh block locations + } catch (InterruptedException e) { + // Ignore + } catch (ExecutionException e) { + // Ignore already logged in the call. + } + } else { + // We are starting up a 'hedged' read. We have a read already + // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. + // If no nodes to do hedged reads against, pass. + try { + chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + bb = ByteBuffer.allocate(len); + future = getHedgedReadFuture(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + futures.add(future); + } catch (IOException ioe) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Failed getting node for hedged read: " + + ioe.getMessage()); + } + } + // if not succeeded. Submit callables for each datanode in a loop, wait + // for a fixed interval and get the result from the fastest one. + try { + ByteBuffer result = getFirstToComplete(futures, hasReceivedResult); + // cancel the rest. + cancelAll(futures); + if (result.array() != buf) { // compare the array pointers + dfsClient.getHedgedReadMetrics().incHedgedReadWins(); + System.arraycopy(result.array(), result.position(), buf, offset, + len); + } else { + dfsClient.getHedgedReadMetrics().incHedgedReadOps(); + } + return; + } catch (InterruptedException ie) { + // Ignore + } catch (ExecutionException e) { + // exception already handled in the call method. getFirstToComplete + // will remove the failing future from the list. nothing more to do. + } + // We got here if exception. Ignore this node on next go around. + ignored.add(chosenNode.info); + } + // executed if we get an error from a data node + block = getBlockAt(block.getStartOffset(), false); + } + } + + private Future getHedgedReadFuture(final DNAddrPair chosenNode, + final LocatedBlock block, long start, + final long end, final ByteBuffer bb, + final Map> corruptedBlockMap, + final CountDownLatch hasReceivedResult) { + Callable getFromDataNodeCallable = + getFromOneDataNode(chosenNode, block, start, end, bb, + corruptedBlockMap, hasReceivedResult); + return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable); + } + + private ByteBuffer getFirstToComplete(ArrayList> futures, + CountDownLatch latch) throws ExecutionException, InterruptedException { + latch.await(); + for (Future future : futures) { + if (future.isDone()) { + try { + return future.get(); + } catch (ExecutionException e) { + // already logged in the Callable + futures.remove(future); + throw e; + } + } + } + throw new InterruptedException("latch has counted down to zero but no" + + "result available yet, for safety try to request another one from" + + "outside loop, this should be rare"); + } + + private void cancelAll(List> futures) { + for (Future future : futures) { + future.cancel(true); } } @@ -1248,8 +1448,13 @@ public int read(long position, byte[] buffer, int offset, int length) long targetStart = position - blk.getStartOffset(); long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); try { - fetchBlockByteRange(blk, targetStart, - targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); + if (dfsClient.isHedgedReadsEnabled() && enoughNodesForHedgedRead(blk)) { + hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead + - 1, buffer, offset, corruptedBlockMap); + } else { + fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, + buffer, offset, corruptedBlockMap); + } } finally { // Check and report if any block replicas are corrupted. // BlockMissingException may be caught if all block replicas are @@ -1435,12 +1640,13 @@ public void reset() throws IOException { * Pick the best node from which to stream the data. * Entries in nodes are already in the priority order */ - static DatanodeInfo bestNode(DatanodeInfo nodes[], - AbstractMap deadNodes) - throws IOException { - if (nodes != null) { + static DatanodeInfo bestNode(DatanodeInfo nodes[], + AbstractMap deadNodes, + Collection ignoredNodes) throws IOException { + if (nodes != null) { for (int i = 0; i deadNodes, + Collection ignoredNodes) { + StringBuilder errMsgr = new StringBuilder( + " No live nodes contain current block "); + errMsgr.append("Block locations:"); + for (DatanodeInfo datanode : nodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + errMsgr.append(" Dead nodes: "); + for (DatanodeInfo datanode : deadNodes.keySet()) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + if (ignoredNodes != null) { + errMsgr.append(" Ignored nodes: "); + for (DatanodeInfo datanode : ignoredNodes) { + errMsgr.append(" "); + errMsgr.append(datanode.toString()); + } + } + return errMsgr.toString(); + } + /** Utility class to encapsulate data node info and its address. */ static class DNAddrPair { DatanodeInfo info; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 9afa493..1241441 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -22,7 +22,13 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; @@ -33,6 +39,9 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.log4j.Level; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * This class tests the DFS positional read functionality in a single node @@ -44,9 +53,12 @@ boolean simulatedStorage = false; private void writeFile(FileSystem fileSys, Path name) throws IOException { + int replication = 3;// We need > 1 blocks to test out the hedged reads. // test empty file open and read DFSTestUtil.createFile(fileSys, name, 12 * blockSize, 0, blockSize, (short) 1, seed); + DataOutputStream stm = fileSys.create(name, true, 4096, + (short)replication, blockSize); FSDataInputStream in = fileSys.open(name); byte[] buffer = new byte[12 * blockSize]; in.readFully(0, buffer, 0, 0); @@ -191,26 +203,128 @@ private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.delete(name, true)); assertTrue(!fileSys.exists(name)); } - + + private Callable getPReadFileCallable(final FileSystem fileSys, + final Path file) { + return new Callable() { + public Void call() throws IOException { + pReadFile(fileSys, file); + return null; + } + }; + } + /** * Tests positional read in DFS. */ @Test public void testPreadDFS() throws IOException { - dfsPreadTest(false, true); //normal pread - dfsPreadTest(true, true); //trigger read code path without transferTo. + Configuration conf = new Configuration(); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. } @Test public void testPreadDFSNoChecksum() throws IOException { + Configuration conf = new Configuration(); ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL); - dfsPreadTest(false, false); - dfsPreadTest(true, false); + dfsPreadTest(conf, false, false); + dfsPreadTest(conf, true, false); } - private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum) + /** + * Tests positional read in DFS, with hedged reads enabled. + */ + @Test + public void testHedgedPreadDFSBasic() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100); + dfsPreadTest(conf, false, true); // normal pread + dfsPreadTest(conf, true, true); // trigger read code path without + // transferTo. + } + + @Test + public void testMaxOutHedgedReadPool() throws IOException, + InterruptedException, ExecutionException { + Configuration conf = new Configuration(); + int numHedgedReadPoolThreads = 5; + final int initialHedgedReadTimeoutMillis = 500; + final int fixedSleepIntervalMillis = 50; + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, + numHedgedReadPoolThreads); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, + initialHedgedReadTimeoutMillis); + + // Set up the InjectionHandler + DFSClientFaultInjector.instance = Mockito + .mock(DFSClientFaultInjector.class); + DFSClientFaultInjector injector = DFSClientFaultInjector.instance; + // make preads sleep for 50ms + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(fixedSleepIntervalMillis); + return null; + } + }).when(injector).startFetchFromDatanode(); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .format(true).build(); + DistributedFileSystem fileSys = cluster.getFileSystem(); + DFSClient dfsClient = fileSys.getClient(); + DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); + + try { + Path file1 = new Path("hedgedReadMaxOut.dat"); + writeFile(fileSys, file1); + // Basic test. Reads complete within timeout. Assert that there were no + // hedged reads. + pReadFile(fileSys, file1); + // assert that there were no hedged reads. 50ms + delta 0); + assertTrue(metrics.getHedgedReadOpsInCurThread() == 0); + /* + * Multiple threads reading. Reads take longer than timeout. Assert that + * there were hedged reads. And that reads had to run in the current + * thread. + */ + int factor = 10; + int numHedgedReads = numHedgedReadPoolThreads * factor; + long initialReadOpsValue = metrics.getHedgedReadOps(); + ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads); + ArrayList> futures = new ArrayList>(); + for (int i = 0; i initialReadOpsValue); + assertTrue(metrics.getHedgedReadOpsInCurThread() > 0); + cleanupFile(fileSys, file1); + executor.shutdown(); + } finally { + fileSys.close(); + cluster.shutdown(); + Mockito.reset(injector); + } + } + + private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum) throws IOException { - Configuration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); if (simulatedStorage) {