public class DemuxManager extends Object implements CHUKWA_CONSTANT
| Modifier and Type | Field and Description |
|---|---|
protected ChukwaConfiguration |
conf |
protected SimpleDateFormat |
dayTextFormat |
protected int |
DEFAULT_MAX_FILES_PER_DEMUX |
protected int |
DEFAULT_REDUCER_COUNT |
protected int |
demuxReducerCount |
protected int |
ERROR_SLEEP_TIME |
protected org.apache.hadoop.fs.FileSystem |
fs |
protected boolean |
isRunning |
protected int |
NO_DATASINK_SLEEP_TIME |
protected int |
reprocess |
protected boolean |
sendAlert |
ARCHIVES_IN_ERROR_DIR_NAME, ARCHIVES_MR_INPUT_DIR_NAME, ARCHIVES_MR_OUTPUT_DIR_NAME, ARCHIVES_PROCESSING_DIR_NAME, CHUKWA_ARCHIVE_DIR_FIELD, CHUKWA_DATA_SINK_DIR_FIELD, CHUKWA_DEMUX_REDUCER_COUNT_FIELD, CHUKWA_NAGIOS_HOST_FIELD, CHUKWA_NAGIOS_PORT_FIELD, CHUKWA_POST_PROCESS_DIR_FIELD, CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, CHUKWA_REPORTING_HOST_FIELD, CHUKWA_ROOT_DIR_FIELD, CHUKWA_ROOT_REPOS_DIR_FIELD, DEFAULT_CHUKWA_DATASINK_DIR_NAME, DEFAULT_CHUKWA_LOGS_DIR_NAME, DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME, DEFAULT_CHUKWA_ROOT_DIR_NAME, DEFAULT_DEMUX_IN_ERROR_DIR_NAME, DEFAULT_DEMUX_MR_INPUT_DIR_NAME, DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME, DEFAULT_DEMUX_PROCESSING_DIR_NAME, DEFAULT_FINAL_ARCHIVES, DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME, DEFAULT_REPOS_DIR_NAME, HDFS_DEFAULT_NAME_FIELD, POST_DEMUX_DATA_LOADER, WRITER_HDFS_FILESYSTEM_FIELD| Constructor and Description |
|---|
DemuxManager() |
DemuxManager(ChukwaConfiguration conf) |
| Modifier and Type | Method and Description |
|---|---|
protected boolean |
checkDemuxInputDir(String demuxInputDir)
Test if demuxInputDir exists
|
protected boolean |
checkDemuxOutputDir(String demuxOutputDir)
Test if demuxOutputDir exists
|
protected boolean |
deleteDemuxOutputDir(String demuxOutputDir)
Delete DemuxOutput directory
|
protected boolean |
dirExists(String directory)
Check if source exists and if source is a directory
|
int |
getReprocess() |
protected void |
init() |
static void |
main(String[] args) |
protected boolean |
moveDataSinkFilesToArchiveDirectory(String demuxInputDir,
String archiveDirectory)
Move sourceFolder inside destFolder
|
protected boolean |
moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir,
String demuxErrorDir)
Move sourceFolder inside destFolder
|
protected boolean |
moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir,
String demuxInputDir)
Move dataSink files to Demux input directory
|
protected boolean |
moveDemuxOutputDirToPostProcessDirectory(String demuxOutputDir,
String postProcessDirectory)
Move sourceFolder inside destFolder
|
protected boolean |
moveFolder(String srcDir,
String destDir,
String prefix)
Move sourceFolder inside destFolder
|
protected boolean |
processData(String dataSinkDir,
String demuxInputDir,
String demuxOutputDir,
String postProcessDir,
String archiveDir)
Process Data, i.e.
|
protected boolean |
runDemux(String demuxInputDir,
String demuxOutputDir)
Submit and Run demux Job
|
protected void |
sendDemuxStatusToNagios(String nagiosHost,
int nagiosPort,
String reportingHost,
String demuxInErrorDir,
boolean demuxStatus,
String demuxException)
Send NSCA status to Nagios
|
protected void |
setup(org.apache.hadoop.fs.Path directory)
Create directory if !exists
|
void |
shutdown() |
void |
start()
Start the Demux Manager daemon
|
protected int ERROR_SLEEP_TIME
protected int NO_DATASINK_SLEEP_TIME
protected int DEFAULT_MAX_FILES_PER_DEMUX
protected int DEFAULT_REDUCER_COUNT
protected int demuxReducerCount
protected ChukwaConfiguration conf
protected org.apache.hadoop.fs.FileSystem fs
protected int reprocess
protected boolean sendAlert
protected SimpleDateFormat dayTextFormat
protected volatile boolean isRunning
public DemuxManager(ChukwaConfiguration conf) throws Exception
Exceptionprotected void init()
throws IOException,
URISyntaxException
IOExceptionURISyntaxExceptionpublic void shutdown()
public int getReprocess()
protected void sendDemuxStatusToNagios(String nagiosHost, int nagiosPort, String reportingHost, String demuxInErrorDir, boolean demuxStatus, String demuxException)
nagiosHost - nagiosPort - reportingHost - demuxInErrorDir - demuxStatus - exception - protected boolean processData(String dataSinkDir, String demuxInputDir, String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException
dataSinkDir - demuxInputDir - demuxOutputDir - postProcessDir - archiveDir - IOExceptionprotected boolean runDemux(String demuxInputDir, String demuxOutputDir)
demuxInputDir - demuxOutputDir - protected boolean moveDataSinkFilesToDemuxInputDirectory(String dataSinkDir, String demuxInputDir) throws IOException
dataSinkDir - demuxInputDir - IOExceptionprotected boolean moveDataSinkFilesToDemuxErrorDirectory(String dataSinkDir, String demuxErrorDir) throws IOException
dataSinkDir - : ex chukwa/demux/inputDirdemuxErrorDir - : ex /chukwa/demux/inErrorIOExceptionprotected boolean moveDataSinkFilesToArchiveDirectory(String demuxInputDir, String archiveDirectory) throws IOException
demuxInputDir: - ex chukwa/demux/inputDirarchiveDirectory: - ex /chukwa/archivesIOExceptionprotected boolean moveDemuxOutputDirToPostProcessDirectory(String demuxOutputDir, String postProcessDirectory) throws IOException
demuxOutputDir: - ex chukwa/demux/outputDirpostProcessDirectory: - ex /chukwa/postProcessIOExceptionprotected boolean checkDemuxInputDir(String demuxInputDir) throws IOException
demuxInputDir - IOExceptionprotected boolean checkDemuxOutputDir(String demuxOutputDir) throws IOException
demuxOutputDir - IOExceptionprotected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
demuxOutputDir - IOExceptionprotected void setup(org.apache.hadoop.fs.Path directory)
throws IOException
directory - IOExceptionprotected boolean dirExists(String directory) throws IOException
f - source fileIOExceptionprotected boolean moveFolder(String srcDir, String destDir, String prefix) throws IOException
srcDir - destDir - IOExceptionCopyright © ${year} The Apache Software Foundation