This project has retired. For details please refer to its Attic page.
FSMIntermedEntry xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.util.Iterator;
25  import java.util.TreeMap;
26  import java.util.Set;
27  import java.util.Map.Entry;
28  
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.commons.lang3.builder.HashCodeBuilder;
31  
32  /*
33   * FSM Intermediate State Entry
34   * 
35   * Each state corresponds to two of these entries:
36   * One corresponding to the start of the state, one corresponding to the end of the state
37   *
38   * Intermediate data-structure passed from Maps to Reduces
39   *
40   */
41  public class FSMIntermedEntry 
42  	implements Cloneable, WritableComparable 
43  {
44  	private final char DELIM = 1;
45  	
46  	/* Begin fields */
47  	public StateType state_type;
48  	public MapRedState state_mapred;
49  	public HDFSState state_hdfs;
50  	public FSMType fsm_type;
51  	
52  	public String state_name;
53  	public String identifier;
54  	public String unique_id; // state name + unique identifier 
55  														// (state-dependent)
56  														// this id should also correspond 
57  														// to the k2 value between
58  														// mappers and reducers
59  														
60  	public String timestamp;
61  	public String time_start;
62  	public String time_end;
63  	
64  	public String host_exec;
65  	public String host_other; // for instance, source host for shuffle, 
66  														// src/dest host for dfs read/write
67  	
68  	// These values filled in by splitting the original 
69  	// ChukwaRecordKey from Demux
70  	public String time_orig_epoch;
71  	public String time_orig;
72  	public String job_id; // we get this for free from the CRK
73  	
74  	TreeMap<String,String> add_info; // additional information 
75  																	 // e.g. locality information
76  																	
77  	/* End of fields */
78  	
79  	public FSMIntermedEntry() {
80  		this.state_mapred = new MapRedState(MapRedState.NONE);
81  		this.state_hdfs = new HDFSState(HDFSState.NONE);
82  		this.state_type = new StateType(StateType.STATE_NOOP);			
83  		this.add_info = new TreeMap<String, String>();
84  		this.host_other = "";
85  		this.job_id = "";
86  		this.time_orig_epoch = "";
87  		this.time_orig = "";
88  	}
89  	
90  	public String getUniqueID()
91  	{
92  		return this.unique_id;
93  	}
94  	
95  	public String getFriendlyID()
96  	{
97  		return this.identifier;
98  	}
99  	
100 	/**
101 	 * Set state_type and identifier before calling
102 	 */
103 	public void generateUniqueID()
104 	{
105 		if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || 
106 			  this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) 
107 		{
108 			this.state_name = this.state_mapred.toString();
109 		} else if (this.fsm_type.val == FSMType.FILESYSTEM_FSM || 
110 			 this.fsm_type.val == FSMType.FILESYSTEM_FSM_INCOMPLETE) 
111 		{
112 			this.state_name = this.state_hdfs.toString();
113 		}
114 		this.unique_id = new StringBuilder().append(this.state_name).append("@").append(this.identifier).toString();
115 	}	
116 	
117 	public void write(DataOutput out) throws IOException {
118 		Set<String> mapKeys;
119 		
120 		out.writeInt(this.state_type.val);
121 		out.writeInt(this.state_mapred.val);
122 		out.writeInt(this.state_hdfs.val);
123 		out.writeInt(this.fsm_type.val);
124 		out.writeChar(DELIM);
125 		out.writeInt(state_name.length());
126 		if (state_name.length() > 0) out.writeUTF(state_name);
127 		out.writeInt(unique_id.length());
128 		if (unique_id.length() > 0) out.writeUTF(unique_id);
129 		out.writeInt(timestamp.length());
130 		if (timestamp.length() > 0) out.writeUTF(timestamp);
131 		out.writeInt(time_start.length());
132 		if (time_start.length() > 0) out.writeUTF(time_start);
133 		out.writeInt(time_end.length());
134 		if (time_end.length() > 0) out.writeUTF(time_end);
135 		out.writeInt(host_exec.length());
136 		if (host_exec.length() > 0) out.writeUTF(host_exec);
137 		out.writeInt(host_other.length());
138 		if (host_other.length() > 0) out.writeUTF(host_other);
139 		out.writeInt(time_orig_epoch.length());
140 		if (time_orig_epoch.length() > 0) out.writeUTF(time_orig_epoch);
141 		out.writeInt(time_orig.length());
142 		if (time_orig.length() > 0) out.writeUTF(time_orig);
143 		out.writeInt(job_id.length());
144 		if (job_id.length() > 0) out.writeUTF(job_id);
145 		out.writeInt(identifier.length());
146 		if (identifier.length() > 0) out.writeUTF(identifier);
147 
148 		mapKeys = this.add_info.keySet();
149 		out.writeInt(mapKeys.size());
150 		
151 		for(Entry<String, String> entry : this.add_info.entrySet()) {
152 		  String value = entry.getValue();
153 		  if(value.length() > 0) {
154 	      out.writeUTF(entry.getKey());
155 	      out.writeInt(value.length());
156 		    out.writeUTF(value);
157 		  } else {
158 		    out.writeUTF("NULL");
159 		    out.writeInt(0);
160 		  }
161 		}
162 	}
163 
164 	public void readFields(DataInput in) throws IOException {
165 		int currlen, numkeys;
166 		
167 		this.state_type = new StateType(in.readInt());
168 		this.state_mapred = new MapRedState(in.readInt());
169 		this.state_hdfs = new HDFSState(in.readInt());
170 		this.fsm_type = new FSMType(in.readInt());
171 		in.readChar();
172 
173 		currlen = in.readInt();
174 		if (currlen > 0) this.state_name = in.readUTF();
175 		else this.state_name = "";
176 
177 		currlen = in.readInt();
178 		if (currlen > 0) this.unique_id = in.readUTF();
179 		else this.unique_id = "";
180 
181 		currlen = in.readInt();
182 		if (currlen > 0) this.timestamp = in.readUTF();
183 		else this.timestamp = "";
184 
185 		currlen = in.readInt();
186 		if (currlen > 0) this.time_start = in.readUTF();
187 		else this.time_start = "";
188 
189 		currlen = in.readInt();
190 		if (currlen > 0) this.time_end = in.readUTF();
191 		else this.time_end = "";
192 
193 		currlen = in.readInt();
194 		if (currlen > 0) this.host_exec = in.readUTF();
195 		else this.host_exec = "";
196 
197 		currlen = in.readInt();
198 		if (currlen > 0) this.host_other = in.readUTF();
199 		else this.host_other = "";
200 
201 		currlen = in.readInt();
202 		if (currlen > 0) this.time_orig_epoch = in.readUTF();
203 		else this.time_orig_epoch = "";
204 
205 		currlen = in.readInt();
206 		if (currlen > 0) this.time_orig = in.readUTF();
207 		else this.time_orig = "";
208 
209 		currlen = in.readInt();
210 		if (currlen > 0) this.job_id = in.readUTF();
211 		else this.job_id = "";
212 			
213 		currlen = in.readInt();
214 		if (currlen > 0) this.identifier = in.readUTF();
215 		else this.identifier = "";
216 					
217 		numkeys = in.readInt();
218 
219 		this.add_info = new TreeMap<String, String>();
220 		
221 		if (numkeys > 0) {
222 			for (int i = 0; i < numkeys; i++) {
223 				String currkey, currval;
224 				currkey = in.readUTF();
225 				currlen = in.readInt();
226 				if (currlen > 0) {
227 					currval = in.readUTF();
228 					this.add_info.put(currkey, currval);
229 				}
230 			}
231 		}
232 	}
233 	 
234 	@Override
235 	public int hashCode() {
236 		return new HashCodeBuilder(13, 71).
237 		append(this.unique_id).
238 		toHashCode();
239 	}
240 
241 	@Override	
242 	public boolean equals (Object o) {
243 	  if((o instanceof FSMIntermedEntry)) {
244 	    FSMIntermedEntry other = (FSMIntermedEntry) o;
245 	    return this.unique_id.equals(other.unique_id);
246 	  }
247 	  return false;
248 	}
249 	
250 	public int compareTo (Object o) {
251 	  final int BEFORE = -1;
252     final int EQUAL = 0;
253     //this optimization is usually worthwhile, and can
254     //always be added
255     if ( this == o ) return EQUAL;
256     
257 	  if((o instanceof FSMIntermedEntry)) {
258 	    FSMIntermedEntry other = (FSMIntermedEntry) o;
259 	    return this.unique_id.compareTo(other.unique_id);
260 	  }
261 	  return BEFORE;
262 	}
263 	
264 	/*
265 	 * This method is to support convenient creating of new copies
266 	 * of states for Reduce to create sub-states ReduceShuffle, ReduceSort, and ReduceReducer
267 	 */
268 	public FSMIntermedEntry clone() throws CloneNotSupportedException {
269 		FSMIntermedEntry newObj = (FSMIntermedEntry) super.clone();
270 		Set<String> mapKeys;
271 
272 		newObj.state_type = new StateType(this.state_type.val);
273 		newObj.state_mapred = new MapRedState(this.state_mapred.val);
274 		newObj.state_hdfs = new HDFSState(this.state_hdfs.val);
275 		newObj.fsm_type = new FSMType(this.fsm_type.val);
276 
277 		/* Deep copy all strings */
278 		newObj.state_name = this.state_name;
279 		newObj.unique_id = this.unique_id;
280 		newObj.timestamp = this.timestamp;
281 		newObj.time_start = this.time_start;
282 		newObj.time_end = this.time_end;
283 		
284 		newObj.time_orig_epoch = this.time_orig_epoch;
285 		newObj.time_orig = this.time_orig;
286 		newObj.job_id = this.job_id;
287 		
288 		
289 		/* Deep copy of TreeMap */
290 		newObj.add_info = new TreeMap<String,String>();
291 		for(Entry<String, String> entry : this.add_info.entrySet()) {
292 		  String currKey = entry.getKey();
293 		  String value = entry.getValue();
294 		  newObj.add_info.put(currKey, value);
295 		}		
296 		return newObj;
297 	}
298 	
299 	public String toString() {
300 		return new StringBuilder().append(this.state_name).append("@").append(this.unique_id).toString();
301 	}
302 	
303 }