1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
1920import java.io.File;
21import java.io.IOException;
22import java.io.RandomAccessFile;
23import java.util.regex.Matcher;
24import java.util.regex.Pattern;
25import org.apache.hadoop.chukwa.ChunkImpl;
26import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
27import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
28import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
29import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
30import org.apache.hadoop.conf.Configuration;
31import org.apache.log4j.Logger;
3233/**34 * A base class for file tailing adaptors. 35 * Intended to mandate as little policy as possible, and to use as 36 * few system resources as possible.37 * 38 * 39 * If the file does not exist, this class will continue to retry quietly40 * forever and will start tailing if it's eventually created.41 */42publicclassLWFTAdaptorextendsAbstractAdaptor {
4344/**45 * This is the maximum amount we'll read from any one file before moving on to46 * the next. This way, we get quick response time for other files if one file47 * is growing rapidly.48 * 49 */50publicstaticfinalint DEFAULT_MAX_READ_SIZE = 128 * 1024;
51publicstaticfinal String MAX_READ_SIZE_OPT =
52"chukwaAgent.fileTailingAdaptor.maxReadSize";
5354publicstaticint MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
5556static Logger log;
57protectedstaticFileTailer tailer;
5859static {
60 tailer = null;
61 log = Logger.getLogger(FileTailingAdaptor.class);
62 }
636465/**66 * next PHYSICAL offset to read67 */68protectedlong fileReadOffset;
6970/**71 * The logical offset of the first byte of the file72 */73protectedlong offsetOfFirstByte = 0;
74protected Configuration conf = null;
75/**76 * The timestamp of last slurping.77 */78protectedlong lastSlurpTime = 0l;
7980 File toWatch;
8182 @Override
83publicvoid start(long offset) {
84synchronized(LWFTAdaptor.class) {
85if(tailer == null)
86 tailer = newFileTailer(control.getConfiguration());
87 }
88this.fileReadOffset = offset - offsetOfFirstByte;
89 tailer.startWatchingFile(this);
90 }
9192/**93 * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()94 */95public String getCurrentStatus() {
96return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
97 }
9899public String toString() {
100return"Lightweight Tailer on " + toWatch;
101 }
102103public String getStreamName() {
104return toWatch.getPath();
105 }
106107 @Override
108public String parseArgs(String params) {
109 conf = control.getConfiguration();
110 MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
111112 Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
113 Matcher m = cmd.matcher(params);
114if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.115 offsetOfFirstByte = Long.parseLong(m.group(1));
116 toWatch = new File(m.group(2));
117 } else {
118 toWatch = new File(params.trim());
119 }
120return toWatch.getAbsolutePath();
121 }
122123 @Override
124publiclong shutdown(AdaptorShutdownPolicy shutdownPolicy)
125throwsAdaptorException {
126 tailer.stopWatchingFile(this);
127return fileReadOffset + offsetOfFirstByte;
128 }
129130131/**132 * Extract records from a byte sequence133 * 134 * @param eq the queue to stick the new chunk[s] in135 * @param buffOffsetInFile the byte offset in the stream at which buf[] begins136 * @param buf the byte buffer to extract records from137 * @return the number of bytes processed138 * @throws InterruptedException139 */140protectedint extractRecords(ChunkReceiver eq, long buffOffsetInFile,
141 byte[] buf) throws InterruptedException {
142if(buf.length == 0)
143return 0;
144145ChunkImpl chunk = newChunkImpl(type, toWatch.getAbsolutePath(),
146 buffOffsetInFile + buf.length, buf, this);
147148 eq.add(chunk);
149return buf.length;
150 }
151152protectedboolean slurp(long len, RandomAccessFile reader) throws IOException,
153 InterruptedException{
154boolean hasMoreData = false;
155156 log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
157 reader.seek(fileReadOffset);
158159long bufSize = len - fileReadOffset;
160161if (bufSize > MAX_READ_SIZE) {
162 bufSize = MAX_READ_SIZE;
163 hasMoreData = true;
164 }
165 byte[] buf = new byte[(int) bufSize];
166167long curOffset = fileReadOffset;
168169 lastSlurpTime = System.currentTimeMillis();
170int bufferRead = reader.read(buf);
171 assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "172 + " pointer is "173 + reader.getFilePointer()
174 + " but offset is "175 + fileReadOffset + bufSize;
176177int bytesUsed = extractRecords(dest,
178 fileReadOffset + offsetOfFirstByte, buf);
179180// === WARNING ===181// If we couldn't found a complete record AND182// we cannot read more, i.e bufferRead == MAX_READ_SIZE183// it's because the record is too BIG184// So log.warn, and drop current buffer so we can keep moving185// instead of being stopped at that point for ever186if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
187 log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="188 + curOffset
189 + ", MAX_READ_SIZE="190 + MAX_READ_SIZE
191 + ", for "192 + toWatch.getPath());
193 bytesUsed = buf.length;
194 }
195196 fileReadOffset = fileReadOffset + bytesUsed;
197198 log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"199 + fileReadOffset);
200return hasMoreData;
201 }
202203publicsynchronizedboolean tailFile()
204throws InterruptedException {
205boolean hasMoreData = false;
206try {
207208//if file doesn't exist, length =0 and we just keep waiting for it.209//if(!toWatch.exists())210// deregisterAndStop(false);211212long len = toWatch.length();
213if(len < fileReadOffset) {
214//file shrank; probably some data went missing.215 handleShrunkenFile(len);
216 } elseif(len > fileReadOffset) {
217 RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
218 hasMoreData = slurp(len, reader);
219 reader.close();
220 }
221 } catch(IOException e) {
222 log.warn("IOException in tailer", e);
223 deregisterAndStop();
224 }
225226return hasMoreData;
227 }
228229privatevoid handleShrunkenFile(long measuredLen) {
230 log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
231 offsetOfFirstByte = measuredLen;
232 fileReadOffset = 0;
233 }
234235 }