This project has retired. For details please refer to its Attic page.
SyslogAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.IOException;
21  import java.net.*;
22  import java.util.Arrays;
23  import java.util.HashMap;
24  
25  import org.apache.hadoop.chukwa.*;
26  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27  import org.apache.log4j.Logger;
28  
29  /**
30   * SyslogAdaptor reads UDP syslog message from a port and convert the message to Chukwa
31   * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
32   * 
33   * add SyslogAdaptor [DataType] [Port] [SequenceNumber]
34   * 
35   * Syslog protocol facility name is mapped to Chukwa Data Type 
36   * by SyslogAdaptor, hence each UDP port can support up to 24 data streams.
37   * 
38   * Data Type mapping can be overwritten in Chukwa Agent Configuration file, i.e.:
39   * 
40   * <property>
41   *   <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
42   *   <value>HADOOP</value>
43   * </property>
44   * 
45   * When demux takes place, data received on port 9095 with facility name LOCAL0 will
46   * be processed by demux parser for data type "HADOOP".
47   */
48  public class SyslogAdaptor extends UDPAdaptor {
49  
50    private final static Logger log = Logger.getLogger(SyslogAdaptor.class);
51    public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 }
52    public HashMap<Integer, String> facilityMap;
53    DatagramSocket ds;
54    volatile boolean running = true;
55    volatile long bytesReceived = 0;
56    
57    public SyslogAdaptor() {
58      facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
59    }
60    
61    public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
62      StringBuilder source = new StringBuilder();
63      source.append(dp.getAddress());
64      String dataType = type;
65      byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
66      String rawPRI = new String(trimmedBuf, 1, 4);
67      int i = rawPRI.indexOf(">");
68      if (i <= 3 && i > -1) {
69        String priorityStr = rawPRI.substring(0,i);
70        int priority = 0;
71        int facility = 0;
72        try {
73          priority = Integer.parseInt(priorityStr);
74          facility = (priority >> 3) << 3;
75          facility = facility / 8;
76          dataType = facilityMap.get(facility); 
77        } catch (NumberFormatException nfe) {
78          log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
79        }
80      }
81  
82      bytesReceived += trimmedBuf.length;
83      Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
84      dest.add(c);
85    }
86    
87    @Override
88    public String parseArgs(String s) {
89      portno = Integer.parseInt(s);
90      ChukwaConfiguration cc = new ChukwaConfiguration();
91      for(FacilityType e : FacilityType.values()) {
92        StringBuilder buffer = new StringBuilder();
93        buffer.append("syslog.adaptor.port.");
94        buffer.append(portno);
95        buffer.append(".facility.");
96        buffer.append(e.name());
97        String dataType = cc.get(buffer.toString(), e.name());
98        facilityMap.put(e.ordinal(), dataType);
99      }
100     return s;
101   }
102 
103 }