This project has retired. For details please refer to its Attic page.
UDPAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.IOException;
21  import java.net.*;
22  import java.util.Arrays;
23  import org.apache.hadoop.chukwa.*;
24  import org.apache.log4j.Logger;
25  
26  public class UDPAdaptor extends AbstractAdaptor {
27  
28    static Logger log = Logger.getLogger(UDPAdaptor.class);
29    
30    int portno;
31    DatagramSocket ds;
32    volatile boolean running = true;
33    volatile long bytesReceived = 0;
34    String source;
35    
36    class ListenThread extends Thread {
37      public void run() {
38        log.info("UDP adaptor " + adaptorID + " started on port " + portno + " offset =" + bytesReceived);
39        byte[] buf = new byte[65535];
40        DatagramPacket dp = new DatagramPacket(buf, buf.length);
41        try {
42          while(running) {
43            ds.receive(dp);
44            send(buf, dp);
45          }
46        } catch(Exception e) {
47          if(running)
48            log.error("can't read UDP messages in " + adaptorID, e);
49        }
50      }
51    }
52    ListenThread lt;
53  
54    public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
55      byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
56      bytesReceived += trimmedBuf.length;
57      Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
58      dest.add(c);
59    }
60    
61    @Override
62    public String parseArgs(String s) {
63      portno = Integer.parseInt(s);
64      source = "udp:"+portno;
65      return s;
66    }
67  
68    @Override
69    public void start(long offset) throws AdaptorException {
70      try {
71        bytesReceived = offset;
72        ds = new DatagramSocket(portno);
73        portno = ds.getLocalPort();
74        lt = new ListenThread();
75        lt.start();
76      } catch(Exception e) {
77        throw new AdaptorException(e);
78      }
79    }
80  
81    @Override
82    public String getCurrentStatus() {
83      return type + " " + portno;
84    }
85  
86    @Override
87    public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
88        throws AdaptorException {
89      try {
90        running = false;
91        ds.close();
92  //      if(shutdownPolicy == AdaptorShutdownPolicy.GRACEFULLY)
93          lt.join();
94      } catch(InterruptedException e) {}
95      return bytesReceived;
96    }
97  
98  }