1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.writer;
192021import java.io.*;
22import java.util.List;
23import org.apache.hadoop.chukwa.Chunk;
24import org.apache.hadoop.chukwa.ChunkImpl;
25import org.apache.hadoop.conf.Configuration;
2627publicclassInMemoryWriterimplementsChukwaWriter {
2829 ByteArrayOutputStream buf;
3031publicvoid close() {
32 buf.reset();
33 }
3435publicvoid init(Configuration conf) throws WriterException {
36 buf = new ByteArrayOutputStream();
37 }
3839publicvoid add(Chunk data) throws WriterException {
40 DataOutputStream dos = new DataOutputStream(buf);
41try {
42 data.write(dos);
43 } catch (IOException e) {
44 e.printStackTrace();
45thrownewWriterException(e);
46 }
47synchronized (this) {
48 notify();
49 }
50 }
5152 @Override
53publicCommitStatus add(List<Chunk> chunks) throws WriterException {
54for (Chunk chunk : chunks) {
55 add(chunk);
56 }
57return COMMIT_OK;
58 }
5960 DataInputStream dis = null;
6162/**63 * Try to read bytes, waiting up to ms64 * 65 * @param bytes amount to try to read66 * @param ms time to wait67 * @return a newly read-in chunk68 * @throws IOException69 */70publicChunk readOutChunk(int bytes, int ms) throws IOException {
7172long readStartTime = System.currentTimeMillis();
73try {
74while (buf.size() < bytes) {
75synchronized (this) {
76long timeLeft = ms - System.currentTimeMillis() + readStartTime;
77if (timeLeft > 0)
78 wait(timeLeft);
79 }
80 }
81if (dis == null)
82 dis = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
83return ChunkImpl.read(dis);
84 } catch (InterruptedException e) {
85 Thread.currentThread().interrupt();
86returnnull;
87 }
88 }
8990 }