Basic Options
Chukwa pipeline are responsible for accepting incoming data from Agents, and extract, transform and load data to destination storage. Most commonly, pipeline simply write all received to HBase or HDFS.
HBase
For enabling streaming data to HBase, chukwa pipeline can be configured in chukwa-agent-conf.xml.
<property> <name>chukwa.pipeline</name> <value>org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value> </property>
In this mode, HBase configuration is configured in chukwa-env.sh. HBASE_CONF_DIR should reference to HBae configuration directory to enable Chukwa agent to load hbase-site.xml from class path.
HDFS
For enabling streaming data to HDFS, chukwa pipeline can be configured in chukwa-agent-conf.xml.
<property> <name>chukwa.pipeline</name> <value>org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value> </property>
In this mode, the filesystem to write to is determined by the option writer.hdfs.filesystem in chukwa-agent-conf.xml.
<property> <name>writer.hdfs.filesystem</name> <value>hdfs://localhost:8020/</value> <description>HDFS to dump to</description> </property>
This is the only option that you really need to specify to get a working pipeline.
Advanced Options
There are some advanced options, not necessarily documented in the agent conf file, that are helpful in using Chukwa in nonstandard ways. While normally Chukwa writes sequence files to HDFS, it's possible to specify an alternate pipe class. The option chukwa.pipeline specifies a Java class to instantiate and use as a writer. See the ChukwaWriter javadoc for details.
One particularly useful pipeline class is PipelineStageWriter, which lets you string together a series of PipelineableWriters for pre-processing or post-processing incoming data. As an example, the SocketTeeWriter class allows other programs to get incoming chunks fed to them over a socket by Chukwa agent.
Stages in the pipeline should be listed, comma-separated, in option chukwa.pipeline
<property> <name>chukwa.pipeline</name> <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value> </property>
HBaseWriter
The default writer to store data on HBase. HBaseWriter runs Demux parsers inside for convert unstructured data to semi-structured data, then load the key value pairs to HBase table. HBaseWriter has the following configuration:
- hbase.demux.package Demux parser class package, HBaseWriter uses this package name to validate HBase for annotated demux parser classes.
<property> <name>hbase.demux.package</name> <value>org.apache.hadoop.chukwa.extraction.demux.processor</value> </property>
- hbase.writer.verify.schema Verify HBase Table schema with demux parser schema, log warning if there are mismatch between hbase schema and demux parsers.
<property> <name>hbase.writer.verify.schema</name> <value>false</value> </property>
- hbase.writer.halt.on.schema.mismatch If this option is set to true, and HBase table schema is mismatched with demux parser, agent will shut down itself.
<property> <name>hbase.writer.halt.on.schema.mismatch</name> <value>false</value> </property>
LocalWriter
LocalWriter writes chunks of data to local disk then upload file to HDFS as a whole file. This writer is designed for high throughput environment.
- chukwaCollector.localOutputDir Location to buffer data before moving data to HDFS.
<property> <name>chukwaCollector.localOutputDir</name> <value>/tmp/chukwa/logs</value> </property>
SeqFileWriter
The SeqFileWriter streams chunks of data to HDFS, and write data in temp filename with .chukwa suffix. When the file is completed writing, the filename is renamed with .done suffix. SeqFileWriter has the following configuration in chukwa-agent-conf.xml.
- writer.hdfs.filesystem Location to name node address
<property> <name>writer.hdfs.filesystem</name> <value>hdfs://localhost:8020/</value> <description>HDFS to dump to</description> </property>
- chukwaCollector.outputDir Location of collect data sink directory
<property> <name>chukwaCollector.outputDir</name> <value>/chukwa/logs/</value> <description>Chukwa data sink directory</description> </property>
- chukwaCollector.rotateInterval File Rotation Interval
<property> <name>chukwaCollector.rotateInterval</name> <value>300000</value> <description>Chukwa rotate interval (ms)</description> </property>
- chukwaCollector.isFixedTimeRotatorScheme A flag to indicate that the agent should close at a fixed offset after every rotateInterval. The default value is false which uses the default scheme where agents close after regular rotateIntervals. If set to true then specify chukwaCollector.fixedTimeIntervalOffset value. e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is set to 10000 and rotateInterval is set to 300000, then the agent will close its files at 10 seconds past the 5 minute mark, if isFixedTimeRotatorScheme is false, agents will rotate approximately once every 5 minutes
<property> <name>chukwaCollector.isFixedTimeRotatorScheme</name> <value>false</value> </property>
- chukwaCollector.fixedTimeIntervalOffset Chukwa fixed time interval offset value (ms)
<property> <name>chukwaCollector.fixedTimeIntervalOffset</name> <value>30000</value> <description>Chukwa fixed time interval offset value (ms)</description> </property>
SocketTeeWriter
The SocketTeeWriter allows external processes to watch the stream of chunks passing through the agent. This allows certain kinds of real-time monitoring to be done on-top of Chukwa.
SocketTeeWriter listens on a port (specified by conf option chukwaCollector.tee.port, defaulting to 9094.) Applications that want Chunks should connect to that port, and issue a command of the form RAW|WRITABLE <filter\n>. Filters use the same syntax as the Dump command. If the filter is accepted, the Writer will respond OK\n.
Subsequently, Chunks matching the filter will be serialized and sent back over the socket. Specifying "WRITABLE" will cause the chunks to be written using Hadoop's Writable serialization framework. "RAW" will send the internal data of the Chunk, without any metadata, prefixed by its length encoded as a 32-bit int, big-endian. "HEADER" is similar to "RAW", but with a one-line header in front of the content. Header format is:
<hostname> <datatype> <stream name> <offset>
separated by spaces.
The filter will be de-activated when the socket is closed.
Socket s2 = new Socket("host", SocketTeeWriter.DEFAULT_PORT); s2.getOutputStream().write("RAW datatype=XTrace\n".getBytes()); dis = new DataInputStream(s2.getInputStream()); dis.readFully(new byte[3]); //read "OK\n" while(true) { int len = dis.readInt(); byte[] data = new byte[len]; dis.readFully(data); DoSomethingUsing(data); }