This project has retired. For details please refer to its Attic page.

Basic Operation

Chukwa Collectors are responsible for accepting incoming data from Agents, and storing the data. Most commonly, collectors simply write all received to HBase or HDFS.

HBase

For enabling streaming data to HBase, chukwa collector writer class can be configured in chukwa-collector-conf.xml.

<property>
  <name>chukwaCollector.writerClass</name>
  <value>org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value>
</property>

In this mode, HBase configuration is configured in chukwa-env.sh. HBASE_CONF_DIR should reference to HBae configuration directory to enable Chukwa Collector to load hbase-site.xml from class path.

HDFS

For enabling streaming data to HDFS, chukwa collector writer class can be configured in chukwa-collector-conf.xml.

<property>
  <name>chukwaCollector.writerClass</name>
  <value>org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
</property>

In this mode, the filesystem to write to is determined by the option writer.hdfs.filesystem in chukwa-collector-conf.xml.

<property>
    <name>writer.hdfs.filesystem</name>
    <value>hdfs://localhost:8020/</value>
    <description>HDFS to dump to</description>
</property>

This is the only option that you really need to specify to get a working collector.

By default, collectors listen on port 8080. This can be configured in chukwa-collector.conf.xml

Configuration Knobs

There's a bunch more "standard" knobs worth knowing about. These are mostly documented in chukwa-collector-conf.xml

It's also possible to do limited configuration on the command line. This is primarily intended for debugging. You can say 'writer=pretend' to get the collector to print incoming chunks on standard out, or portno=xyz to override the default port number.

bin/chukwa collector writer=pretend portno=8081

Advanced Options

There are some advanced options, not necessarily documented in the collector conf file, that are helpful in using Chukwa in nonstandard ways. While normally Chukwa writes sequence files to HDFS, it's possible to specify an alternate Writer class. The option chukwaCollector.writerClass specifies a Java class to instantiate and use as a writer. See the ChukwaWriter javadoc for details.

One particularly useful Writer class is PipelineStageWriter, which lets you string together a series of PipelineableWriters for pre-processing or post-processing incoming data. As an example, the SocketTeeWriter class allows other programs to get incoming chunks fed to them over a socket by the collector.

Stages in the pipeline should be listed, comma-separated, in option chukwaCollector.pipeline

<property>
  <name>chukwaCollector.writerClass</name>
  <value>org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter</value>
</property>

<property>
  <name>chukwaCollector.pipeline</name>
  <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value>
</property>

HBaseWriter

The default writer to store data on HBase. HBaseWriter runs Demux parsers inside for convert unstructured data to semi-structured data, then load the key value pairs to HBase table. HBaseWriter has the following configuration:

  • hbase.demux.package Demux parser class package, HBaseWriter uses this package name to validate HBase for annotated demux parser classes.
    <property>
      <name>hbase.demux.package</name>
      <value>org.apache.hadoop.chukwa.extraction.demux.processor</value>
    </property>
  • hbase.writer.verify.schema Verify HBase Table schema with demux parser schema, log warning if there are mismatch between hbase schema and demux parsers.
    <property>
      <name>hbase.writer.verify.schema</name>
      <value>false</value>
    </property>
  • hbase.writer.halt.on.schema.mismatch If this option is set to true, and HBase table schema is mismatched with demux parser, collector will shut down itself.
    <property>
      <name>hbase.writer.halt.on.schema.mismatch</name>
      <value>false</value>
    </property>

LocalWriter

LocalWriter writes chunks of data to local disk then upload file to HDFS as a whole file. This writer is designed for high throughput environment.

  • chukwaCollector.localOutputDir Location to buffer data before moving data to HDFS.
    <property>
      <name>chukwaCollector.localOutputDir</name>
      <value>/tmp/chukwa/logs</value>
    </property>

SeqFileWriter

The SeqFileWriter streams chunks of data to HDFS, and write data in temp filename with .chukwa suffix. When the file is completed writing, the filename is renamed with .done suffix. SeqFileWriter has the following configuration in chukwa-collector-conf.xml.

  • writer.hdfs.filesystem Location to name node address
    <property>
        <name>writer.hdfs.filesystem</name>
        <value>hdfs://localhost:8020/</value>
        <description>HDFS to dump to</description>
    </property>
  • chukwaCollector.outputDir Location of collect data sink directory
    <property>
        <name>chukwaCollector.outputDir</name>
        <value>/chukwa/logs/</value>
        <description>Chukwa data sink directory</description>
    </property>
  • chukwaCollector.rotateInterval File Rotation Interval
    <property>
        <name>chukwaCollector.rotateInterval</name>
        <value>300000</value>
        <description>Chukwa rotate interval (ms)</description>
    </property>
  • chukwaCollector.isFixedTimeRotatorScheme A flag to indicate that the collector should close at a fixed offset after every rotateInterval. The default value is false which uses the default scheme where collectors close after regular rotateIntervals. If set to true then specify chukwaCollector.fixedTimeIntervalOffset value. e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is set to 10000 and rotateInterval is set to 300000, then the collector will close its files at 10 seconds past the 5 minute mark, if isFixedTimeRotatorScheme is false, collectors will rotate approximately once every 5 minutes
      <property>
        <name>chukwaCollector.isFixedTimeRotatorScheme</name>
        <value>false</value>
      </property>
  • chukwaCollector.fixedTimeIntervalOffset Chukwa fixed time interval offset value (ms)
    <property>
        <name>chukwaCollector.fixedTimeIntervalOffset</name>
        <value>30000</value>
        <description>Chukwa fixed time interval offset value (ms)</description>
    </property>

SocketTeeWriter

The SocketTeeWriter allows external processes to watch the stream of chunks passing through the collector. This allows certain kinds of real-time monitoring to be done on-top of Chukwa.

SocketTeeWriter listens on a port (specified by conf option chukwaCollector.tee.port, defaulting to 9094.) Applications that want Chunks should connect to that port, and issue a command of the form RAW|WRITABLE <filter\n>. Filters use the same syntax as the Dump command. If the filter is accepted, the Writer will respond OK\n.

Subsequently, Chunks matching the filter will be serialized and sent back over the socket. Specifying "WRITABLE" will cause the chunks to be written using Hadoop's Writable serialization framework. "RAW" will send the internal data of the Chunk, without any metadata, prefixed by its length encoded as a 32-bit int, big-endian. "HEADER" is similar to "RAW", but with a one-line header in front of the content. Header format is:

<hostname> <datatype> <stream name> <offset>

separated by spaces.

The filter will be de-activated when the socket is closed.

Socket s2 = new Socket("host", SocketTeeWriter.DEFAULT_PORT);
s2.getOutputStream().write("RAW datatype=XTrace\n".getBytes());
dis = new DataInputStream(s2.getInputStream());
dis.readFully(new byte[3]); //read "OK\n"
while(true) {
   int len = dis.readInt();
   byte[] data = new byte[len];
   dis.readFully(data);
   DoSomethingUsing(data);
}

Acknowledgement mode

Chukwa supports two different reliability strategies. The first, default strategy, is as follows: collectors write data to HDFS, and as soon as the HDFS write call returns success, report success to the agent, which advances its checkpoint state.

This is potentially a problem if HDFS (or some other storage tier) has non-durable or asynchronous writes. As a result, Chukwa offers a mechanism, asynchronous acknowledgement, for coping with this case.

This mechanism can be enabled by setting option httpConnector.asyncAcks. This option applies to both agents and collectors. On the collector side, it tells the collector to return asynchronous acknowledgements. On the agent side, it tells agents to look for and process them correctly. Agents with the option set to false should work OK with collectors where it's set to true. The reverse is not generally true: agents will expect a collector to be able to answer questions about the state of the filesystem.

Theory

In this approach, rather than try to build a fault tolerant collector, Chukwa agents look through the collectors to the underlying state of the filesystem. This filesystem state is what is used to detect and recover from failure. Recovery is handled entirely by the agent, without requiring anything at all from the failed collector.

When an agent sends data to a collector, the collector responds with the name of the HDFS file in which the data will be stored and the future location of the data within the file. This is very easy to compute -- since each file is only written by a single collector, the only requirement is to enqueue the data and add up lengths.

Every few minutes, each agent process polls a collector to find the length of each file to which data is being written. The length of the file is then compared with the offset at which each chunk was to be written. If the file length exceeds this value, then the data has been committed and the agent process advances its checkpoint accordingly. (Note that the length returned by the filesystem is the amount of data that has been successfully replicated.) There is nothing essential about the role of collectors in monitoring the written files. Collectors store no per-agent state. The reason to poll collectors, rather than the filesystem directly, is to reduce the load on the filesystem master and to shield agents from the details of the storage system.

The collector component that handles these requests is datacollection.collector.servlet.CommitCheckServlet. This will be started if httpConnector.asyncAcks is true in the collector configuration.

On error, agents resume from their last checkpoint and pick a new collector. In the event of a failure, the total volume of data retransmitted is bounded by the period between collector file rotations.

The solution is end-to-end. Authoritative copies of data can only exist in two places: the nodes where data was originally produced, and the HDFS file system where it will ultimately be stored. Collectors only hold soft state; the only ``hard'' state stored by Chukwa is the agent checkpoints. Below is a diagram of the flow of messages in this protocol.

Configuration

In addition to httpConnector.asyncAcks (which enables asynchronous acknowledgement) a number of options affect this mode of operation.

  • chukwaCollector.asyncAcks.scanperiod affects how often collectors will check the filesystem for commits. It defaults to twice the rotation interval.
  • chukwaCollector.asyncAcks.scanpaths determines where in HDFS collectors will look. It defaults to the data sink dir plus the archive dir.

    In the future, Zookeeper could be used instead to track rotations.