This project has retired. For details please refer to its
Attic page.
UDPAdaptor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.IOException;
21 import java.net.*;
22 import java.util.Arrays;
23 import org.apache.hadoop.chukwa.*;
24 import org.apache.log4j.Logger;
25
26 public class UDPAdaptor extends AbstractAdaptor {
27
28 static Logger log = Logger.getLogger(UDPAdaptor.class);
29
30 int portno;
31 DatagramSocket ds;
32 volatile boolean running = true;
33 volatile long bytesReceived = 0;
34 String source;
35
36 class ListenThread extends Thread {
37 public void run() {
38 log.info("UDP adaptor " + adaptorID + " started on port " + portno + " offset =" + bytesReceived);
39 byte[] buf = new byte[65535];
40 DatagramPacket dp = new DatagramPacket(buf, buf.length);
41 try {
42 while(running) {
43 ds.receive(dp);
44 send(buf, dp);
45 }
46 } catch(Exception e) {
47 if(running)
48 log.error("can't read UDP messages in " + adaptorID, e);
49 }
50 }
51 }
52 ListenThread lt;
53
54 public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
55 byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength());
56 bytesReceived += trimmedBuf.length;
57 Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
58 dest.add(c);
59 }
60
61 @Override
62 public String parseArgs(String s) {
63 portno = Integer.parseInt(s);
64 source = "udp:"+portno;
65 return s;
66 }
67
68 @Override
69 public void start(long offset) throws AdaptorException {
70 try {
71 bytesReceived = offset;
72 ds = new DatagramSocket(portno);
73 portno = ds.getLocalPort();
74 lt = new ListenThread();
75 lt.start();
76 } catch(Exception e) {
77 throw new AdaptorException(e);
78 }
79 }
80
81 @Override
82 public String getCurrentStatus() {
83 return type + " " + portno;
84 }
85
86 @Override
87 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
88 throws AdaptorException {
89 try {
90 running = false;
91 ds.close();
92
93 lt.join();
94 } catch(InterruptedException e) {}
95 return bytesReceived;
96 }
97
98 }