1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.adaptor;
1920import java.io.IOException;
21import java.net.*;
22import java.util.Arrays;
23import java.util.HashMap;
2425import org.apache.hadoop.chukwa.*;
26import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27import org.apache.log4j.Logger;
2829/**30 * SyslogAdaptor reads UDP syslog message from a port and convert the message to Chukwa31 * Chunk for transport from Chukwa Agent to Chukwa Collector. Usage:32 * 33 * add SyslogAdaptor [DataType] [Port] [SequenceNumber]34 * 35 * Syslog protocol facility name is mapped to Chukwa Data Type 36 * by SyslogAdaptor, hence each UDP port can support up to 24 data streams.37 * 38 * Data Type mapping can be overwritten in Chukwa Agent Configuration file, i.e.:39 * 40 * <property>41 * <name>syslog.adaptor.port.9095.facility.LOCAL1</name>42 * <value>HADOOP</value>43 * </property>44 * 45 * When demux takes place, data received on port 9095 with facility name LOCAL0 will46 * be processed by demux parser for data type "HADOOP".47 */48publicclassSyslogAdaptorextendsUDPAdaptor {
4950privatefinalstatic Logger log = Logger.getLogger(SyslogAdaptor.class);
51public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 }
52public HashMap<Integer, String> facilityMap;
53 DatagramSocket ds;
54volatileboolean running = true;
55volatilelong bytesReceived = 0;
5657publicSyslogAdaptor() {
58 facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
59 }
6061publicvoid send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
62 StringBuilder source = new StringBuilder();
63 source.append(dp.getAddress());
64 String dataType = type;
65 byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength());
66 String rawPRI = new String(trimmedBuf, 1, 4);
67int i = rawPRI.indexOf(">");
68if (i <= 3 && i > -1) {
69 String priorityStr = rawPRI.substring(0,i);
70int priority = 0;
71int facility = 0;
72try {
73 priority = Integer.parseInt(priorityStr);
74 facility = (priority >> 3) << 3;
75 facility = facility / 8;
76 dataType = facilityMap.get(facility);
77 } catch (NumberFormatException nfe) {
78 log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
79 }
80 }
8182 bytesReceived += trimmedBuf.length;
83Chunk c = newChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
84 dest.add(c);
85 }
8687 @Override
88public String parseArgs(String s) {
89 portno = Integer.parseInt(s);
90ChukwaConfiguration cc = newChukwaConfiguration();
91for(FacilityType e : FacilityType.values()) {
92 StringBuilder buffer = new StringBuilder();
93 buffer.append("syslog.adaptor.port.");
94 buffer.append(portno);
95 buffer.append(".facility.");
96 buffer.append(e.name());
97 String dataType = cc.get(buffer.toString(), e.name());
98 facilityMap.put(e.ordinal(), dataType);
99 }
100return s;
101 }
102103 }