Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
added counters for line
Browse files Browse the repository at this point in the history
  • Loading branch information
Luis Lázaro committed May 5, 2015
1 parent 6b59631 commit 5187cfd
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-ftp-source</artifactId>
<version>1.1.5_rev2</version>
<version>1.1.6</version>
<packaging>jar</packaging>
<name>flume-ftp-source</name>

Expand Down
36 changes: 20 additions & 16 deletions src/main/java/org/apache/flume/source/FTPSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class FTPSource extends AbstractSource implements Configurable, PollableS
private Path hasmap = Paths.get("");
private Path absolutePath = Paths.get("");
private int counter = 0;


public void setListener(FTPSourceEventListener listener) {
this.listener = listener;
Expand Down Expand Up @@ -178,6 +177,7 @@ public void processMessage(byte[] lastInfo) {
} catch (ChannelException e) {
log.error("ChannelException", e);
}
ftpSourceCounter.incrementCountSizeProc(message.length);
ftpSourceCounter.incrementEventCount();
}

Expand Down Expand Up @@ -216,15 +216,16 @@ public void discoverElements(FTPClient ftpClient, String parentDir, String curre
InputStream inputStream = null;
try {
inputStream = ftpClient.retrieveFileStream(aFile.getName());
log.info("discovered: " + fileName + " ,size: " + aFile.getSize());
listener.fileStreamRetrieved();
readStream(inputStream, 0);
boolean success = inputStream != null && ftpClient.completePendingCommand(); //mandatory
if (success) {
sizeFileList.put(dirToList + "/" + aFile.getName(), aFile.getSize());
saveMap(sizeFileList);
ftpSourceCounter.incrementFilesProcCount();
log.info("discovered: " + fileName + " ,size: " + aFile.getSize()
+ " ,total files: " + sizeFileList.size());
log.info("processed: " + fileName + " ,total files: " + sizeFileList.size() + "\n");

} else {
handleProcessError(fileName);
}
Expand All @@ -245,6 +246,7 @@ public void discoverElements(FTPClient ftpClient, String parentDir, String curre
InputStream inputStream = null;
try {
inputStream = ftpClient.retrieveFileStream(aFile.getName());
log.info("modified: " + fileName + " ,dif: " + aFile.getSize());
listener.fileStreamRetrieved();
readStream(inputStream, prevSize);

Expand All @@ -253,11 +255,11 @@ public void discoverElements(FTPClient ftpClient, String parentDir, String curre
sizeFileList.put(dirToList + "/" + aFile.getName(), aFile.getSize());
saveMap(sizeFileList);
ftpSourceCounter.incrementCountModProc();
log.info("modified: " + fileName + " ," + sizeFileList.size());
log.info("processed: " + fileName + " ,total files: " + sizeFileList.size() + "\n");
} else {
handleProcessError(fileName);
}

} catch (FTPConnectionClosedException e) {
log.error("Ftp server closed connection ", e);
handleProcessError(fileName);
Expand Down Expand Up @@ -352,19 +354,20 @@ public boolean readStream(InputStream inputStream, long position) {
inputStream.skip(position);
BufferedReader in = new BufferedReader(new InputStreamReader(inputStream));
String line = null;

while ((line = in.readLine()) != null) {
processMessage(line.getBytes());
}

while ((line = in.readLine()) != null) {
processMessage(line.getBytes());
}


in.close();
inputStream.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
successRead = false;
}
} else {

}
}else {

try {
inputStream.skip(position);
Expand All @@ -385,10 +388,12 @@ public boolean readStream(InputStream inputStream, long position) {
successRead = false;
}
}
//end condition for only byte
//end condition for only byte

return successRead;
}
return successRead;
}



public void setFtpSourceCounter(FtpSourceCounter ftpSourceCounter) {
this.ftpSourceCounter = ftpSourceCounter;
Expand Down Expand Up @@ -427,6 +432,5 @@ public FTPClient getFTPClient() {
return ftpSourceUtils.getFtpClient();
}


} //end of class

47 changes: 32 additions & 15 deletions src/main/java/org/apache/flume/source/FtpSourceCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class FtpSourceCounter extends MonitoredCounterGroup implements FtpSource
, sendThroughput /* tasa de eventos por segundo */
, start_time /* milisegundos desde EPOC hasta creación de contador */
, last_sent /* milisegundos desde EPOC hasta generación de último evento */
, countModProc /* contador de modificaciones que se han procesado con éxito */
, mbProcessed /* megabytes de datos procesados*/
, kbProcessed; /* kbytes de datos procesados*/
, countModProc /* contador de modificaciones que se han procesado con éxito */
, bytesProcessed /* bytes de datos procesados*/
, KbProcessed
, MbProcessed;

private static final String[] ATTRIBUTES = { "files_count" , "filesProcCount", "filesProcCountError", "eventCount","start_time","last_sent",
"sendThroughput", "countModProc", "mbProcessed", "kbProcessed"};
private static final String[] ATTRIBUTES = { "files_count" , "filesProcCount", "filesProcCountError",
"eventCount","start_time","last_sent", "sendThroughput", "countModProc", "bytesProcessed", "KbProcessed", "MbProcessed"
};

public FtpSourceCounter(String name){
super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
Expand All @@ -35,8 +37,9 @@ public FtpSourceCounter(String name){
start_time = System.currentTimeMillis();
sendThroughput = 0;
countModProc = 0;
mbProcessed = 0;
kbProcessed = 0;
bytesProcessed = 0;
KbProcessed = 0;
MbProcessed = 0;
}

/*
Expand Down Expand Up @@ -118,19 +121,33 @@ public long getCountModProc(){
}

@Override
public long getMbProcessed(){
mbProcessed = getEventCount() /(1024);
return mbProcessed;
public long getLastSent(){
return last_sent;
}

@Override
public long getKbProcessed(){
kbProcessed = getEventCount();
return kbProcessed;
public void incrementCountSizeProc(long size){
bytesProcessed+= size;
}

@Override
public long getCountSizeProc(){
return bytesProcessed;
}

@Override
public long getLastSent(){
return last_sent;
public long getCountSizeProcKb(){
KbProcessed = getCountSizeProc() / 1024;
return KbProcessed;
}

@Override
public long getCountSizeProcMb(){
MbProcessed = getCountSizeProc() / (1024 * 1024);
return MbProcessed;
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public interface FtpSourceCounterMBean {
public long getSendThroughput();
public void incrementCountModProc();
public long getCountModProc();
public long getMbProcessed();
public long getKbProcessed();
public long getLastSent();
public void incrementCountSizeProc(long size);
public long getCountSizeProc();
public long getCountSizeProcKb();
public long getCountSizeProcMb();
}

0 comments on commit 5187cfd

Please sign in to comment.