Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NUTCH-3072 Fetcher to stop QueueFeeder if aborting with "hung threads" #832

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/java/org/apache/nutch/fetcher/FetchItemQueues.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class FetchItemQueues {
int maxExceptionsPerQueue = -1;
long exceptionsPerQueueDelay = -1;
long exceptionsPerQueueClearAfter = 1800 * 1000L;
boolean feederAlive = true;
volatile boolean feederAlive = true;
volatile boolean timoutReached = false;
Configuration conf;

public static final String QUEUE_MODE_HOST = "byHost";
Expand All @@ -71,7 +72,8 @@ enum QueuingStatus {
SUCCESSFULLY_QUEUED,
ERROR_CREATE_FETCH_ITEM,
ABOVE_EXCEPTION_THRESHOLD,
HIT_BY_TIMELIMIT;
HIT_BY_TIMELIMIT,
HIT_BY_TIMEOUT;
}

public FetchItemQueues(Configuration conf) {
Expand Down Expand Up @@ -105,7 +107,7 @@ public FetchItemQueues(Configuration conf) {

/**
* Check whether queue mode is valid, fall-back to default mode if not.
*
*
* @param queueMode
* queue mode to check
* @return valid queue mode or default
Expand Down Expand Up @@ -258,6 +260,18 @@ public synchronized int checkTimelimit() {
return count;
}

/**
* Signal that the hard timeout is reached because new fetches / requests
* where made during half of the MapReduce task timeout
* (<code>mapreduce.task.timeout</code>, default value: 10 minutes). In order
* to avoid that the task timeout is hit and the fetcher job is failed, we
* stop the fetching now. See also the property
* <code>fetcher.threads.timeout.divisor</code>.
*/
public void setTimeoutReached() {
this.timoutReached = true;
}

// empties the queues (used by fetcher timelimit and throughput threshold)
public synchronized int emptyQueues() {
int count = 0, queuesDropped = 0;
Expand All @@ -282,10 +296,10 @@ public synchronized int emptyQueues() {
/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
*
* The next fetch is delayed if specified by the param {@code delay} or
* configured by the property {@code fetcher.exceptions.per.queue.delay}.
*
*
* @param queueid
* a queue identifier to locate and check
* @param maxExceptions
Expand All @@ -296,7 +310,7 @@ public synchronized int emptyQueues() {
* in addition to the delay defined for the given queue. If a
* negative value is passed the delay is chosen by
* {@code fetcher.exceptions.per.queue.delay}
*
*
* @return number of purged items
*/
public synchronized int checkExceptionThreshold(String queueid,
Expand Down Expand Up @@ -367,9 +381,9 @@ private int purgeAndBlockQueue(String queueid, FetchItemQueue fiq,
/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
*
* @see #checkExceptionThreshold(String, int, long)
*
*
* @param queueid
* queue identifier to locate and check
* @return number of purged items
Expand Down
46 changes: 28 additions & 18 deletions src/java/org/apache/nutch/fetcher/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@

/**
* A queue-based fetcher.
*
*
* <p>
* This fetcher uses a well-known model of one producer (a QueueFeeder) and many
* consumers (FetcherThread-s).
*
*
* <p>
* QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
* which hold FetchItem-s that describe the items to be fetched. There are as
* many queues as there are unique hosts, but at any given time the total number
* of fetch items in all queues is less than a fixed number (currently set to a
* multiple of the number of threads).
*
*
* <p>
* As items are consumed from the queues, the QueueFeeder continues to add new
* input items, so that their total count stays fixed (FetcherThread-s may also
* add new items to the queues e.g. as a results of redirection) - until all
* input items are exhausted, at which point the number of items in the queues
* begins to decrease. When this number reaches 0 fetcher will finish.
*
*
* <p>
* This fetcher implementation handles per-host blocking itself, instead of
* delegating this work to protocol-specific plugins. Each per-host queue
Expand All @@ -85,13 +85,13 @@
* list of requests in progress, and the time the last request was finished. As
* FetcherThread-s ask for new items to be fetched, queues may return eligible
* items or null if for "politeness" reasons this host's queue is not yet ready.
*
*
* <p>
* If there are still unfetched items in the queues, but none of the items are
* ready, FetcherThread-s will spin-wait until either some items become
* available, or a timeout is reached (at which point the Fetcher will abort,
* assuming the task is hung).
*
*
* @author Andrzej Bialecki
*/
public class Fetcher extends NutchTool implements Tool {
Expand Down Expand Up @@ -147,7 +147,7 @@ public static class FetcherRun extends
private AtomicInteger activeThreads = new AtomicInteger(0);
private AtomicInteger spinWaiting = new AtomicInteger(0);
private long start = System.currentTimeMillis();
private AtomicLong lastRequestStart = new AtomicLong(start);
private AtomicLong lastRequestStart = new AtomicLong(start);
private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
private AtomicInteger errors = new AtomicInteger(0); // total pages errored
Expand All @@ -157,7 +157,7 @@ public static class FetcherRun extends
private AtomicInteger getActiveThreads() {
return activeThreads;
}

private void reportStatus(Context context, FetchItemQueues fetchQueues, int pagesLastSec, int bytesLastSec)
throws IOException {
StringBuilder status = new StringBuilder();
Expand All @@ -184,13 +184,13 @@ private void reportStatus(Context context, FetchItemQueues fetchQueues, int page
context.setStatus(status.toString());
}

@Override
@Override
public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
Configuration conf = context.getConfiguration();
segmentName = conf.get(Nutch.SEGMENT_NAME_KEY);
storingContent = isStoringContent(conf);
parsing = isParsing(conf);
}
}

@Override
public void run(Context innerContext)
Expand Down Expand Up @@ -218,11 +218,6 @@ public void run(Context innerContext)
feeder = new QueueFeeder(innerContext, fetchQueues,
threadCount * queueDepthMultiplier);

// the value of the time limit is either -1 or the time where it should
// finish
long timelimit = conf.getLong("fetcher.timelimit", -1);
if (timelimit != -1)
feeder.setTimeLimit(timelimit);
feeder.start();

int startDelay = conf.getInt("fetcher.threads.start.delay", 10);
Expand Down Expand Up @@ -427,9 +422,12 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
* fetches started during half of the MapReduce task timeout
* (mapreduce.task.timeout, default value: 10 minutes). In order to
* avoid that the task timeout is hit and the fetcher job is failed,
* we stop the fetching now.
* we stop the fetching now. See also the property
* fetcher.threads.timeout.divisor.
*/
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
LOG.warn("Timeout reached with no new requests since {} seconds.",
timeout);
LOG.warn("Aborting with {} hung threads{}.", activeThreads,
feeder.isAlive() ? " (queue feeder still alive)" : "");
innerContext.getCounter("FetcherStatus", "hungThreads")
Expand All @@ -448,6 +446,18 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
LOG.warn(sb.toString());
}
}

/*
* signal the queue feeder that the timeout is reached and wait
* shortly for it to shut down
*/
fetchQueues.setTimeoutReached();
if (feeder.isAlive()) {
LOG.info(
"Signaled QueueFeeder to stop, waiting 1.5 seconds before exiting.");
Thread.sleep(1500);
}

/*
* log and count queued items dropped from the fetch queues because
* of the timeout
Expand All @@ -469,7 +479,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
}
}

public void fetch(Path segment, int threads) throws IOException,
public void fetch(Path segment, int threads) throws IOException,
InterruptedException, ClassNotFoundException {

checkConfiguration();
Expand Down Expand Up @@ -626,7 +636,7 @@ public Map<String, Object> run(Map<String, Object> args, String crawlId) throws
else {
String segmentDir = crawlId+"/segments";
File segmentsDir = new File(segmentDir);
File[] segmentsList = segmentsDir.listFiles();
File[] segmentsList = segmentsDir.listFiles();
Arrays.sort(segmentsList, (f1, f2) -> {
if(f1.lastModified()>f2.lastModified())
return -1;
Expand Down
27 changes: 18 additions & 9 deletions src/java/org/apache/nutch/fetcher/QueueFeeder.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ public class QueueFeeder extends Thread {

private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());

private FetcherRun.Context context;
private FetchItemQueues queues;
private int size;
private long timelimit = -1;
private URLFilters urlFilters = null;
private URLNormalizers urlNormalizers = null;
private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
Expand All @@ -64,10 +63,6 @@ public QueueFeeder(FetcherRun.Context context,
}
}

public void setTimeLimit(long tl) {
timelimit = tl;
}

/** Filter and normalize the url */
private String filterNormalize(String url) {
if (url != null) {
Expand All @@ -90,12 +85,26 @@ public void run() {
int cnt = 0;
int[] queuingStatus = new int[QueuingStatus.values().length];
while (hasMore) {
if (timelimit != -1 && System.currentTimeMillis() >= timelimit) {
if (queues.timelimitExceeded() || queues.timoutReached) {
// enough ... lets' simply read all the entries from the input without
// processing them
if (queues.timoutReached) {
int qstatus = QueuingStatus.HIT_BY_TIMEOUT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timeout reached.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimeout").increment(1);
} else {
int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timelimit exceeded.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimelimit").increment(1);
}
try {
hasMore = context.nextKeyValue();
queuingStatus[QueuingStatus.HIT_BY_TIMELIMIT.ordinal()]++;
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
Expand Down Expand Up @@ -137,7 +146,7 @@ public void run() {
url = new Text(url);
}
CrawlDatum datum = new CrawlDatum();
datum.set((CrawlDatum) context.getCurrentValue());
datum.set(context.getCurrentValue());
QueuingStatus status = queues.addFetchItem(url, datum);
queuingStatus[status.ordinal()]++;
if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
Expand Down