Index: hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (revision 1571469)
+++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (working copy)
@@ -21,6 +21,8 @@
HDFS-5698. Use protobuf to serialize / deserialize FSImage. (See breakdown
of tasks below for features and contributors)
+ HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack)
+
IMPROVEMENTS
HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (revision 1571469)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (working copy)
@@ -82,6 +82,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
@@ -172,6 +176,7 @@
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
@@ -221,6 +226,10 @@
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private final ClientContext clientContext;
+ private volatile long hedgedReadThresholdMillis;
+ private static DFSHedgedReadMetrics HEDGED_READ_METRIC =
+ new DFSHedgedReadMetrics();
+ private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
/**
* DFSClient configuration
@@ -573,6 +582,15 @@
this.clientContext = ClientContext.get(
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
dfsClientConf);
+ this.hedgedReadThresholdMillis = conf.getLong(
+ DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+ DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
+ int numThreads = conf.getInt(
+ DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+ DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
+ if (numThreads > 0) {
+ this.initThreadsNumForHedgedReads(numThreads);
+ }
}
/**
@@ -2640,4 +2658,64 @@
}
}
}
+
+ /**
+ * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
+ * it does not already exist.
+ * @param num Number of threads for hedged reads thread pool.
+ * If zero, skip hedged reads thread pool creation.
+ */
+ private synchronized void initThreadsNumForHedgedReads(int num) {
+ if (num (),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIndex =
+ new AtomicInteger(0);
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("hedgedRead-" +
+ threadIndex.getAndIncrement());
+ return t;
+ }
+ },
+ new ThreadPoolExecutor.CallerRunsPolicy() {
+
+ @Override
+ public void rejectedExecution(Runnable runnable,
+ ThreadPoolExecutor e) {
+ LOG.info("Execution rejected, Executing in current thread");
+ HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
+ // will run in the current thread
+ super.rejectedExecution(runnable, e);
+ }
+ });
+ HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using hedged reads; pool threads=" + num);
+ }
+ }
+
+ long getHedgedReadTimeout() {
+ return this.hedgedReadThresholdMillis;
+ }
+
+ @VisibleForTesting
+ void setHedgedReadTimeout(long timeoutMillis) {
+ this.hedgedReadThresholdMillis = timeoutMillis;
+ }
+
+ ThreadPoolExecutor getHedgedReadsThreadPool() {
+ return HEDGED_READ_THREAD_POOL;
+ }
+
+ boolean isHedgedReadsEnabled() {
+ return (HEDGED_READ_THREAD_POOL != null) &&
+ HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
+ }
+
+ DFSHedgedReadMetrics getHedgedReadMetrics() {
+ return HEDGED_READ_METRIC;
+ }
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (revision 1571469)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (working copy)
@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
@@ -46,4 +45,6 @@
public boolean failPacket() {
return false;
}
+
+ public void startFetchFromDatanode() {}
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (revision 1571469)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (working copy)
@@ -589,4 +589,14 @@
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
+
+ // hedged read properties
+ public static final String DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+ "dfs.client.hedged.read.threshold.millis";
+ public static final long DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS =
+ 500;
+
+ public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
+ "dfs.client.hedged.read.threadpool.size";
+ public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java (revision 0)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java (working copy)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The client-side metrics for hedged read feature.
+ * This class has a number of metrics variables that are publicly accessible,
+ * we can grab them from client side, like HBase.
+ */
+public class DFSHedgedReadMetrics {
+ public AtomicLong hedgedReadOps = new AtomicLong();
+ public AtomicLong hedgedReadOpsWin = new AtomicLong();
+ public AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
+
+ public void incHedgedReadOps() {
+ hedgedReadOps.incrementAndGet();
+ }
+
+ public void incHedgedReadOpsInCurThread() {
+ hedgedReadOpsInCurThread.incrementAndGet();
+ }
+
+ public void incHedgedReadWins() {
+ hedgedReadOpsWin.incrementAndGet();
+ }
+
+ public long getHedgedReadOps() {
+ return hedgedReadOps.longValue();
+ }
+
+ public long getHedgedReadOpsInCurThread() {
+ return hedgedReadOpsInCurThread.longValue();
+ }
+
+ public long getHedgedReadWins() {
+ return hedgedReadOpsWin.longValue();
+ }
+}
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (revision 1571469)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (working copy)
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,9 +31,14 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ByteBufferUtil;
@@ -54,15 +58,12 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
@@ -555,7 +556,7 @@
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();
- DNAddrPair retval = chooseDataNode(targetBlock);
+ DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@@ -863,32 +864,30 @@
corruptedBlockMap.put(blk, dnSet);
}
}
-
- private DNAddrPair chooseDataNode(LocatedBlock block)
- throws IOException {
+
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection ignoredNodes) throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
- final String dnAddr =
- chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
- }
- InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
- return new DNAddrPair(chosenNode, targetAddr);
+ return getBestNodeDNAddrPair(nodes, ignoredNodes);
} catch (IOException ie) {
+ String errMsg =
+ getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
- throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
- block.getStartOffset());
+ String description = "Could not obtain block: " + blockInfo;
+ DFSClient.LOG.warn(description + errMsg
+ + ". Throwing a BlockMissingException");
+ throw new BlockMissingException(src, description,
+ block.getStartOffset());
}
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + ie
+ + " from any node: " + ie + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
@@ -914,21 +913,99 @@
continue;
}
}
- }
-
+ }
+
+ /**
+ * Get the best node.
+ * @param nodes Nodes to choose from.
+ * @param ignoredNodes Do not chose nodes in this array (may be null)
+ * @return The DNAddrPair of the best node.
+ * @throws IOException
+ */
+ private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
+ Collection ignoredNodes) throws IOException {
+ DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
+ final String dnAddr =
+ chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
+ return new DNAddrPair(chosenNode, targetAddr);
+ }
+
+ private static String getBestNodeDNAddrPairErrorString(
+ DatanodeInfo nodes[], AbstractMap deadNodes, Collection ignoredNodes) {
+ StringBuilder errMsgr = new StringBuilder(
+ " No live nodes contain current block ");
+ errMsgr.append("Block locations:");
+ for (DatanodeInfo datanode : nodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ errMsgr.append(" Dead nodes: ");
+ for (DatanodeInfo datanode : deadNodes.keySet()) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ if (ignoredNodes != null) {
+ errMsgr.append(" Ignored nodes: ");
+ for (DatanodeInfo datanode : ignoredNodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ }
+ return errMsgr.toString();
+ }
+
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map> corruptedBlockMap)
throws IOException {
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
+ block = getBlockAt(block.getStartOffset(), false);
+ while (true) {
+ DNAddrPair addressPair = chooseDataNode(block, null);
+ try {
+ actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
+ corruptedBlockMap);
+ return;
+ } catch (IOException e) {
+ // Ignore. Already processed inside the function.
+ // Loop through to try the next node.
+ }
+ }
+ }
+
+ private Callable getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final ByteBuffer bb,
+ final Map> corruptedBlockMap,
+ final CountDownLatch latch) {
+ return new Callable() {
+ @Override
+ public ByteBuffer call() throws Exception {
+ byte[] buf = bb.array();
+ int offset = bb.position();
+ actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+ corruptedBlockMap);
+ latch.countDown();
+ return bb;
+ }
+ };
+ }
+
+ private void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long start, final long end, byte[] buf,
+ int offset, Map> corruptedBlockMap)
+ throws IOException {
+ DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
+
while (true) {
// cached block locations may have been updated by chooseDataNode()
- // or fetchBlockAt(). Always get the latest list of locations at the
+ // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
@@ -937,11 +1014,10 @@
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
}
- DNAddrPair retval = chooseDataNode(block);
- DatanodeInfo chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
+ DatanodeInfo chosenNode = datanode.info;
+ InetSocketAddress targetAddr = datanode.addr;
BlockReader reader = null;
-
+
try {
Token blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
@@ -969,11 +1045,14 @@
}
return;
} catch (ChecksumException e) {
- DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
- src + " at " + block.getBlock() + ":" +
- e.getPos() + " from " + chosenNode);
+ String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ + chosenNode;
+ DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
@@ -985,26 +1064,168 @@
continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
- fetchBlockAt(block.getStartOffset());
+ try {
+ fetchBlockAt(block.getStartOffset());
+ } catch (IOException fbae) {
+ // ignore IOE, since we can retry it later in a loop
+ }
continue;
} else {
- DFSClient.LOG.warn("Failed to connect to " + targetAddr +
- " for file " + src + " for block " + block.getBlock() + ":" + e);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connection failure ", e);
- }
+ String msg = "Failed to connect to " + targetAddr + " for file "
+ + src + " for block " + block.getBlock() + ":" + e;
+ DFSClient.LOG.warn("Connection failure: " + msg, e);
+ addToDeadNodes(chosenNode);
+ throw new IOException(msg);
}
} finally {
if (reader != null) {
reader.close();
}
}
- // Put chosen node into dead list, continue
- addToDeadNodes(chosenNode);
}
}
/**
+ * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
+ * int, Map)} except we start up a second, parallel, 'hedged' read
+ * if the first read is taking longer than configured amount of
+ * time. We then wait on which ever read returns first.
+ *
+ * @param block
+ * @param start
+ * @param end
+ * @param buf
+ * @param offset
+ * @param corruptedBlockMap
+ * @throws IOException
+ */
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map> corruptedBlockMap)
+ throws IOException {
+ ArrayList> futures = null;
+ ArrayList ignored = new ArrayList();
+ ByteBuffer bb = null;
+ int len = (int) (end - start + 1);
+ block = getBlockAt(block.getStartOffset(), false);
+ // Latch shared by all outstanding reads. First to finish closes
+ CountDownLatch hasReceivedResult = new CountDownLatch(1);
+ while (true) {
+ DNAddrPair chosenNode = null;
+ Future future = null;
+ // futures is null if there is no request already executing.
+ if (futures == null) {
+ // chooseDataNode is a commitment. If no node, we go to
+ // the NN to reget block locations. Only go here on first read.
+ chosenNode = chooseDataNode(block, ignored);
+ bb = ByteBuffer.wrap(buf, offset, len);
+ future = getHedgedReadFuture(chosenNode, block, start, end, bb,
+ corruptedBlockMap, hasReceivedResult);
+ try {
+ future.get(dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
+ return;
+ } catch (TimeoutException e) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout() +
+ "ms to read from " + chosenNode.info + "; spawning hedged read");
+ }
+ // Ignore this node on next go around.
+ ignored.add(chosenNode.info);
+ dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+ futures = new ArrayList>();
+ futures.add(future);
+ continue; // no need to refresh block locations
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // Ignore already logged in the call.
+ }
+ } else {
+ // We are starting up a 'hedged' read. We have a read already
+ // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
+ // If no nodes to do hedged reads against, pass.
+ try {
+ chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+ bb = ByteBuffer.allocate(len);
+ future = getHedgedReadFuture(chosenNode, block, start, end, bb,
+ corruptedBlockMap, hasReceivedResult);
+ futures.add(future);
+ } catch (IOException ioe) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Failed getting node for hedged read: " +
+ ioe.getMessage());
+ }
+ }
+ // if not succeeded. Submit callables for each datanode in a loop, wait
+ // for a fixed interval and get the result from the fastest one.
+ try {
+ ByteBuffer result = getFirstToComplete(futures, hasReceivedResult);
+ // cancel the rest.
+ cancelAll(futures);
+ if (result.array() != buf) { // compare the array pointers
+ dfsClient.getHedgedReadMetrics().incHedgedReadWins();
+ System.arraycopy(result.array(), result.position(), buf, offset,
+ len);
+ } else {
+ dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+ }
+ return;
+ } catch (InterruptedException ie) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // exception already handled in the call method. getFirstToComplete
+ // will remove the failing future from the list. nothing more to do.
+ }
+ // We got here if exception. Ignore this node on next go around.
+ ignored.add(chosenNode.info);
+ }
+ // executed if we get an error from a data node
+ block = getBlockAt(block.getStartOffset(), false);
+ }
+ }
+
+ private Future getHedgedReadFuture(final DNAddrPair chosenNode,
+ final LocatedBlock block, long start,
+ final long end, final ByteBuffer bb,
+ final Map> corruptedBlockMap,
+ final CountDownLatch hasReceivedResult) {
+ Callable getFromDataNodeCallable =
+ getFromOneDataNode(chosenNode, block, start, end, bb,
+ corruptedBlockMap, hasReceivedResult);
+ return dfsClient.getHedgedReadsThreadPool().submit(getFromDataNodeCallable);
+ }
+
+ private ByteBuffer getFirstToComplete(ArrayList> futures,
+ CountDownLatch latch) throws ExecutionException, InterruptedException {
+ latch.await();
+ for (Future future : futures) {
+ if (future.isDone()) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ throw e;
+ }
+ }
+ }
+ throw new InterruptedException("latch has counted down to zero but no"
+ + "result available yet, for safety try to request another one from"
+ + "outside loop, this should be rare");
+ }
+
+ private void cancelAll(List> futures) {
+ for (Future future : futures) {
+ // Unfortunately, hdfs reads do not take kindly to interruption.
+ // Threads return a variety of interrupted-type exceptions but
+ // also complaints about invalid pbs -- likely because read
+ // is interrupted before gets whole pb. Also verbose WARN
+ // logging. So, for now, do not interrupt running read.
+ future.cancel(false);
+ }
+ }
+
+ /**
* Should the block access token be refetched on an exception
*
* @param ex Exception received
@@ -1070,8 +1291,13 @@
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
try {
- fetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ if (dfsClient.isHedgedReadsEnabled()) {
+ hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
+ - 1, buffer, offset, corruptedBlockMap);
+ } else {
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
+ }
} finally {
// Check and report if any block replicas are corrupted.
// BlockMissingException may be caught if all block replicas are
@@ -1265,12 +1491,13 @@
* Pick the best node from which to stream the data.
* Entries in nodes are already in the priority order
*/
- static DatanodeInfo bestNode(DatanodeInfo nodes[],
- AbstractMap deadNodes)
- throws IOException {
- if (nodes != null) {
+ static DatanodeInfo bestNode(DatanodeInfo nodes[],
+ AbstractMap deadNodes,
+ Collection ignoredNodes) throws IOException {
+ if (nodes != null) {
for (int i = 0; i 1 blocks to test out the hedged reads.
// create and write a file that contains three blocks of data
- DataOutputStream stm = fileSys.create(name, true, 4096, (short)1,
- blockSize);
+ DataOutputStream stm = fileSys.create(name, true, 4096,
+ (short)replication, blockSize);
// test empty file open and read
stm.close();
FSDataInputStream in = fileSys.open(name);
@@ -196,26 +206,128 @@
assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name));
}
-
+
+ private Callable getPReadFileCallable(final FileSystem fileSys,
+ final Path file) {
+ return new Callable() {
+ public Void call() throws IOException {
+ pReadFile(fileSys, file);
+ return null;
+ }
+ };
+ }
+
/**
* Tests positional read in DFS.
*/
@Test
public void testPreadDFS() throws IOException {
- dfsPreadTest(false, true); //normal pread
- dfsPreadTest(true, true); //trigger read code path without transferTo.
+ Configuration conf = new Configuration();
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
}
@Test
public void testPreadDFSNoChecksum() throws IOException {
+ Configuration conf = new Configuration();
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
- dfsPreadTest(false, false);
- dfsPreadTest(true, false);
+ dfsPreadTest(conf, false, false);
+ dfsPreadTest(conf, true, false);
}
- private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
+ /**
+ * Tests positional read in DFS, with hedged reads enabled.
+ */
+ @Test
+ public void testHedgedPreadDFSBasic() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100);
+ dfsPreadTest(conf, false, true); // normal pread
+ dfsPreadTest(conf, true, true); // trigger read code path without
+ // transferTo.
+ }
+
+ @Test
+ public void testMaxOutHedgedReadPool() throws IOException,
+ InterruptedException, ExecutionException {
+ Configuration conf = new Configuration();
+ int numHedgedReadPoolThreads = 5;
+ final int initialHedgedReadTimeoutMillis = 500;
+ final int fixedSleepIntervalMillis = 50;
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
+ numHedgedReadPoolThreads);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
+ initialHedgedReadTimeoutMillis);
+
+ // Set up the InjectionHandler
+ DFSClientFaultInjector.instance = Mockito
+ .mock(DFSClientFaultInjector.class);
+ DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
+ // make preads sleep for 50ms
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(fixedSleepIntervalMillis);
+ return null;
+ }
+ }).when(injector).startFetchFromDatanode();
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .format(true).build();
+ DistributedFileSystem fileSys = cluster.getFileSystem();
+ DFSClient dfsClient = fileSys.getClient();
+ DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
+
+ try {
+ Path file1 = new Path("hedgedReadMaxOut.dat");
+ writeFile(fileSys, file1);
+ // Basic test. Reads complete within timeout. Assert that there were no
+ // hedged reads.
+ pReadFile(fileSys, file1);
+ // assert that there were no hedged reads. 50ms + delta 0);
+ assertTrue(metrics.getHedgedReadOpsInCurThread() == 0);
+ /*
+ * Multiple threads reading. Reads take longer than timeout. Assert that
+ * there were hedged reads. And that reads had to run in the current
+ * thread.
+ */
+ int factor = 10;
+ int numHedgedReads = numHedgedReadPoolThreads * factor;
+ long initialReadOpsValue = metrics.getHedgedReadOps();
+ ExecutorService executor = Executors.newFixedThreadPool(numHedgedReads);
+ ArrayList> futures = new ArrayList>();
+ for (int i = 0; i initialReadOpsValue);
+ assertTrue(metrics.getHedgedReadOpsInCurThread() > 0);
+ cleanupFile(fileSys, file1);
+ executor.shutdown();
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ Mockito.reset(injector);
+ }
+ }
+
+ private void dfsPreadTest(Configuration conf, boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
- Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) {