public class DemuxManager extends Object implements CHUKWA_CONSTANT
Modifier and Type | Field and Description |
---|---|
protected ChukwaConfiguration |
conf |
protected SimpleDateFormat |
dayTextFormat |
protected int |
DEFAULT_MAX_FILES_PER_DEMUX |
protected int |
DEFAULT_REDUCER_COUNT |
protected int |
demuxReducerCount |
protected int |
ERROR_SLEEP_TIME |
protected org.apache.hadoop.fs.FileSystem |
fs |
protected boolean |
isRunning |
protected int |
NO_DATASINK_SLEEP_TIME |
protected int |
reprocess |
protected boolean |
sendAlert |
ARCHIVES_IN_ERROR_DIR_NAME, ARCHIVES_MR_INPUT_DIR_NAME, ARCHIVES_MR_OUTPUT_DIR_NAME, ARCHIVES_PROCESSING_DIR_NAME, CHUKWA_ARCHIVE_DIR_FIELD, CHUKWA_DATA_SINK_DIR_FIELD, CHUKWA_DEMUX_REDUCER_COUNT_FIELD, CHUKWA_NAGIOS_HOST_FIELD, CHUKWA_NAGIOS_PORT_FIELD, CHUKWA_POST_PROCESS_DIR_FIELD, CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, CHUKWA_REPORTING_HOST_FIELD, CHUKWA_ROOT_DIR_FIELD, CHUKWA_ROOT_REPOS_DIR_FIELD, DEFAULT_CHUKWA_DATASINK_DIR_NAME, DEFAULT_CHUKWA_LOGS_DIR_NAME, DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME, DEFAULT_CHUKWA_ROOT_DIR_NAME, DEFAULT_DEMUX_IN_ERROR_DIR_NAME, DEFAULT_DEMUX_MR_INPUT_DIR_NAME, DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME, DEFAULT_DEMUX_PROCESSING_DIR_NAME, DEFAULT_FINAL_ARCHIVES, DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME, DEFAULT_REPOS_DIR_NAME, POST_DEMUX_DATA_LOADER, WRITER_HDFS_FILESYSTEM_FIELD
Constructor and Description |
---|
DemuxManager() |
DemuxManager(ChukwaConfiguration conf) |
Modifier and Type | Method and Description |
---|---|
protected boolean |
checkDemuxInputDir(String demuxInputDir)
Test if demuxInputDir exists
|
protected boolean |
checkDemuxOutputDir(String demuxOutputDir)
Test if demuxOutputDir exists
|
protected boolean |
deleteDemuxOutputDir(String demuxOutputDir)
Delete DemuxOutput directory
|
protected boolean |
dirExists(String directory)
Check if source exists and if source is a directory
|
int |
getReprocess() |
protected void |
init() |
static void |
main(String[] args) |
protected boolean |
moveDataSinkFilesToArchiveDirectory(String demuxInputDir,
String archiveDirectory)
Move sourceFolder inside destFolder
|
protected boolean |
moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
String demuxErrorDir)
Move sourceFolder inside destFolder
|
protected boolean |
moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
String demuxInputDir)
Move dataSink files to Demux input directory
|
protected boolean |
moveDemuxOutputDirToPostProcessDirectory(String demuxOutputDir,
String postProcessDirectory)
Move sourceFolder inside destFolder
|
protected boolean |
moveFolder(String srcDir,
String destDir,
String prefix)
Move sourceFolder inside destFolder
|
protected boolean |
processData(String dataSinkDir,
String demuxInputDir,
String demuxOutputDir,
String postProcessDir,
String archiveDir)
Process Data, i.e.
|
protected boolean |
runDemux(String demuxInputDir,
String demuxOutputDir)
Submit and Run demux Job
|
protected void |
sendDemuxStatusToNagios(String nagiosHost,
int nagiosPort,
String reportingHost,
String demuxInErrorDir,
boolean demuxStatus,
String demuxException)
Send NSCA status to Nagios
|
protected void |
setup(org.apache.hadoop.fs.Path directory)
Create directory if !exists
|
void |
shutdown() |
void |
start()
Start the Demux Manager daemon
|
protected int ERROR_SLEEP_TIME
protected int NO_DATASINK_SLEEP_TIME
protected int DEFAULT_MAX_FILES_PER_DEMUX
protected int DEFAULT_REDUCER_COUNT
protected int demuxReducerCount
protected ChukwaConfiguration conf
protected org.apache.hadoop.fs.FileSystem fs
protected int reprocess
protected boolean sendAlert
protected SimpleDateFormat dayTextFormat
protected volatile boolean isRunning
public DemuxManager(ChukwaConfiguration conf) throws Exception
Exception
protected void init() throws IOException, URISyntaxException
IOException
URISyntaxException
public void shutdown()
public int getReprocess()
protected void sendDemuxStatusToNagios(String nagiosHost, int nagiosPort, String reportingHost, String demuxInErrorDir, boolean demuxStatus, String demuxException)
nagiosHost
- nagiosPort
- reportingHost
- demuxInErrorDir
- demuxStatus
- exception
- protected boolean processData(String dataSinkDir, String demuxInputDir, String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException
dataSinkDir
- demuxInputDir
- demuxOutputDir
- postProcessDir
- archiveDir
- IOException
protected boolean runDemux(String demuxInputDir, String demuxOutputDir)
demuxInputDir
- demuxOutputDir
- protected boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir, String demuxInputDir) throws IOException
dataSinkDir
- demuxInputDir
- IOException
protected boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir, String demuxErrorDir) throws IOException
dataSinkDir
- : ex chukwa/demux/inputDirdemuxErrorDir
- : ex /chukwa/demux/inErrorIOException
protected boolean moveDataSinkFilesToArchiveDirectory(String demuxInputDir, String archiveDirectory) throws IOException
demuxInputDir:
- ex chukwa/demux/inputDirarchiveDirectory:
- ex /chukwa/archivesIOException
protected boolean moveDemuxOutputDirToPostProcessDirectory(String demuxOutputDir, String postProcessDirectory) throws IOException
demuxOutputDir:
- ex chukwa/demux/outputDirpostProcessDirectory:
- ex /chukwa/postProcessIOException
protected boolean checkDemuxInputDir(String demuxInputDir) throws IOException
demuxInputDir
- IOException
protected boolean checkDemuxOutputDir(String demuxOutputDir) throws IOException
demuxOutputDir
- IOException
protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
demuxOutputDir
- IOException
protected void setup(org.apache.hadoop.fs.Path directory) throws IOException
directory
- IOException
protected boolean dirExists(String directory) throws IOException
f
- source fileIOException
protected boolean moveFolder(String srcDir, String destDir, String prefix) throws IOException
srcDir
- destDir
- IOException
Copyright © ${year} The Apache Software Foundation