This project has retired. For details please refer to its Attic page.
InMemoryWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  
21  import java.io.*;
22  import java.util.List;
23  import org.apache.hadoop.chukwa.Chunk;
24  import org.apache.hadoop.chukwa.ChunkImpl;
25  import org.apache.hadoop.conf.Configuration;
26  
27  public class InMemoryWriter implements ChukwaWriter {
28  
29    ByteArrayOutputStream buf;
30  
31    public void close() {
32      buf.reset();
33    }
34  
35    public void init(Configuration conf) throws WriterException {
36      buf = new ByteArrayOutputStream();
37    }
38  
39    public void add(Chunk data) throws WriterException {
40      DataOutputStream dos = new DataOutputStream(buf);
41      try {
42        data.write(dos);
43      } catch (IOException e) {
44        e.printStackTrace();
45        throw new WriterException(e);
46      }
47      synchronized (this) {
48        notify();
49      }
50    }
51  
52    @Override
53    public CommitStatus add(List<Chunk> chunks) throws WriterException {
54      for (Chunk chunk : chunks) {
55        add(chunk);
56      }
57      return COMMIT_OK;
58    }
59  
60    DataInputStream dis = null;
61  
62    /**
63     * Try to read bytes, waiting up to ms
64     * 
65     * @param bytes amount to try to read
66     * @param ms time to wait
67     * @return a newly read-in chunk
68     * @throws IOException
69     */
70    public Chunk readOutChunk(int bytes, int ms) throws IOException {
71  
72      long readStartTime = System.currentTimeMillis();
73      try {
74        while (buf.size() < bytes) {
75          synchronized (this) {
76            long timeLeft = ms - System.currentTimeMillis() + readStartTime;
77            if (timeLeft > 0)
78              wait(timeLeft);
79          }
80        }
81        if (dis == null)
82          dis = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
83        return ChunkImpl.read(dis);
84      } catch (InterruptedException e) {
85        Thread.currentThread().interrupt();
86        return null;
87      }
88    }
89  
90  }