5353import static org .apache .hadoop .fs .Options .OpenFileOptions .FS_OPTION_OPENFILE_READ_POLICY ;
5454import static org .apache .hadoop .fs .statistics .IOStatisticsContext .getCurrentIOStatisticsContext ;
5555import static org .apache .hadoop .fs .statistics .IOStatisticsSupport .retrieveIOStatistics ;
56+ import static org .apache .hadoop .util .Preconditions .checkArgument ;
5657
5758/**
5859 * Reflection-friendly access to APIs which are not available in
6465 * new classes etc.
6566 */
6667@ InterfaceAudience .Public
67- @ InterfaceStability .Evolving
68+ @ InterfaceStability .Unstable
6869public final class WrappedIO {
6970
7071 private WrappedIO () {
@@ -147,7 +148,7 @@ public static boolean pathCapabilities_hasPathCapability(Object fs,
147148 * @return true iff the object implements StreamCapabilities and the capability is
148149 * declared available.
149150 */
150- public static boolean streamCapabilities_hasStreamCapability (Object o , String capability ) {
151+ public static boolean streamCapabilities_hasCapability (Object o , String capability ) {
151152 if (o instanceof StreamCapabilities ) {
152153 return false ;
153154 }
@@ -205,6 +206,14 @@ public static Path fileSystem_getEnclosingRoot(FileSystem fs, Path path) throws
205206 return fs .getEnclosingRoot (path );
206207 }
207208
209+ /**
210+ * Create a new {@link IOStatisticsSnapshot} instance.
211+ * @return an empty IOStatisticsSnapshot.
212+ */
213+ public static Serializable iostatisticsSnapshot_create () {
214+ return new IOStatisticsSnapshot ();
215+ }
216+
208217 /**
209218 * Extract the IOStatistics from an object in a serializable form.
210219 * @param source source object, may be null/not a statistics source/instance
@@ -224,7 +233,7 @@ public static Serializable iostatisticsSnapshot_retrieve(@Nullable Object source
224233 * @return JSON string value or null if source is not an IOStatisticsSnapshot
225234 * @throws UncheckedIOException Any IO/jackson exception.
226235 */
227- public static String iostatisticsSnapshot_toJsonString (@ Nullable Object snapshot ) {
236+ public static String iostatisticsSnapshot_toJsonString (@ Nullable Serializable snapshot ) {
228237 return applyToIOStatisticsSnapshot (snapshot ,
229238 null ,
230239 IOStatisticsSnapshot .serializer ()::toJson );
@@ -241,7 +250,7 @@ public static String iostatisticsSnapshot_toJsonString(@Nullable Object snapshot
241250 * @throws UncheckedIOException Any IO exception.
242251 */
243252 public static boolean iostatisticsSnapshot_save (
244- @ Nullable Object snapshot ,
253+ @ Nullable Serializable snapshot ,
245254 FileSystem fs ,
246255 Path path ,
247256 boolean overwrite ) {
@@ -251,6 +260,73 @@ public static boolean iostatisticsSnapshot_save(
251260 });
252261 }
253262
263+ /**
264+ * Aggregate an existing {@link IOStatisticsSnapshot} with
265+ * the supplied statistics.
266+ * @param snapshot snapshot to update
267+ * @param statistics IOStatistics to add
268+ * @return true if the snapshot was updated.
269+ * @throws IllegalArgumentException if the {@code statistics} argument is not
270+ * null but not an instance of IOStatistics, or if {@code snapshot} is invalid.
271+ */
272+ public static boolean iostatisticsSnapshot_aggregate (
273+ Serializable snapshot , @ Nullable Object statistics ) {
274+
275+ requireIOStatisticsSnapshot (snapshot );
276+ if (statistics == null ) {
277+ return false ;
278+ }
279+ checkArgument (statistics instanceof IOStatistics ,
280+ "Not an IOStatistics instance: %s" , statistics );
281+
282+ final IOStatistics sourceStats = (IOStatistics ) statistics ;
283+ return applyToIOStatisticsSnapshot (snapshot , false , s ->
284+ s .aggregate (sourceStats ));
285+ }
286+
287+ /**
288+ * Require the parameter to be an instance of {@link IOStatisticsSnapshot}
289+ * @param snapshot class to validate
290+ * @throws IllegalArgumentException if the condition is not met
291+ */
292+ private static void requireIOStatisticsSnapshot (final Serializable snapshot ) {
293+ checkArgument (snapshot instanceof IOStatisticsSnapshot ,
294+ "Not an IOStatisticsSnapshot %s" , snapshot );
295+ }
296+
297+
298+ /**
299+ * Convert an IOStatisticsSnapshot to a map.
300+ * <p>
301+ * Counters, gauges, minimums and maximums are all added as key/value pairs.
302+ * Mean statistics are added as the sample count, sum of all values and
303+ * the mean, which, as it is converted to a long, is only approximate.
304+ * It is better to calculate it by retrieving the sum and dividing by the sample
305+ * count.
306+ * <p>
307+ * Mean values have three entries in the map, with the ".mean", ".samples" and ".sum"
308+ * suffixes on the base key string.
309+ * @param source source of statistics -may be null.
310+ * @return a map or null if the source is null/invalid
311+ */
312+ public static Map <String , Long > iostatisticsSnapshot_toMap (
313+ @ Nullable Serializable source ) {
314+
315+ return applyToIOStatisticsSnapshot (source , null , stats -> {
316+ Map <String , Long > map = new HashMap <>();
317+ stats .counters ().forEach (map ::put );
318+ stats .gauges ().forEach (map ::put );
319+ stats .minimums ().forEach (map ::put );
320+ stats .maximums ().forEach (map ::put );
321+ stats .meanStatistics ().forEach ((k , v ) -> {
322+ map .put (k + ".mean" , (long ) v .mean ());
323+ map .put (k + ".samples" , v .getSamples ());
324+ map .put (k + ".sum" , v .getSum ());
325+ });
326+ return map ;
327+ });
328+ }
329+
254330 /**
255331 * Apply a function to an object which may be an IOStatisticsSnapshot.
256332 * @param <T> return type
@@ -316,39 +392,10 @@ public static Object iostatisticsSource_demandStringify(
316392 }
317393
318394 /**
319- * Convert an IOStatistics snapshot to a map.
320- * <p>
321- * Counters, gauges, minimums and maximums are all added as key/value pairs.
322- * Mean statistics are added as the sample count, sum of all values and
323- * the mean, which, as it is converted to a long, is only approximate.
324- * It is better to calculate it by retrieving the sum and dividing by the sample
325- * count.
326- * <p>
327- * Mean values have three entries in the map, with the ".mean", ".samples" and ".sum"
328- * suffixes on the base key string.
329- * @param source source of statistics -may be null.
330- * @return a map or null if the source is null/invalid
331- */
332- public static Map <String , Long > iostatisticsSnapshot_toMap (@ Nullable Object source ) {
333- return applyToIOStatisticsSnapshot (source , null , stats -> {
334- Map <String , Long > map = new HashMap <>();
335- stats .counters ().forEach (map ::put );
336- stats .gauges ().forEach (map ::put );
337- stats .minimums ().forEach (map ::put );
338- stats .maximums ().forEach (map ::put );
339- stats .meanStatistics ().forEach ((k , v ) -> {
340- map .put (k + ".mean" , (long ) v .mean ());
341- map .put (k + ".samples" , v .getSamples ());
342- map .put (k + ".sum" , v .getSum ());
343- });
344- return map ;
345- });
346- }
347-
348- /**
349- * Get the context's IOStatisticsContext.
395+ * Get the context's {@link IOStatisticsContext} which
396+ * implements {@link IOStatisticsSource}.
350397 * This is either a thread-local value or a global empty context.
351- * @return instance of {@link IOStatisticsContext}
398+ * @return instance of {@link IOStatisticsContext}.
352399 */
353400 public static Object iostatisticsContext_getCurrent () {
354401 return getCurrentIOStatisticsContext ();
@@ -422,6 +469,6 @@ public static boolean byteBufferPositionedReadable_readFullyAvailable(
422469 }
423470 // now rely on the input stream implementing path capabilities, which
424471 // all the Hadoop FS implementations do.
425- return streamCapabilities_hasStreamCapability (in , StreamCapabilities .PREADBYTEBUFFER );
472+ return streamCapabilities_hasCapability (in , StreamCapabilities .PREADBYTEBUFFER );
426473 }
427474}
0 commit comments