|  | 
|  | 1 | +/* | 
|  | 2 | + * Licensed to the Apache Software Foundation (ASF) under one | 
|  | 3 | + * or more contributor license agreements.  See the NOTICE file | 
|  | 4 | + * distributed with this work for additional information | 
|  | 5 | + * regarding copyright ownership.  The ASF licenses this file | 
|  | 6 | + * to you under the Apache License, Version 2.0 (the | 
|  | 7 | + * "License"); you may not use this file except in compliance | 
|  | 8 | + * with the License.  You may obtain a copy of the License at | 
|  | 9 | + * | 
|  | 10 | + *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 11 | + * | 
|  | 12 | + * Unless required by applicable law or agreed to in writing, software | 
|  | 13 | + * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | 15 | + * See the License for the specific language governing permissions and | 
|  | 16 | + * limitations under the License. | 
|  | 17 | + */ | 
|  | 18 | + | 
|  | 19 | +package org.apache.hadoop.fs.gs; | 
|  | 20 | + | 
|  | 21 | +import org.slf4j.Logger; | 
|  | 22 | +import org.slf4j.LoggerFactory; | 
|  | 23 | + | 
|  | 24 | +/** | 
|  | 25 | + * Manages the access pattern of object being read from cloud storage. For adaptive fadvise | 
|  | 26 | + * configurations it computes the access pattern based on previous requests. | 
|  | 27 | + */ | 
|  | 28 | +class FileAccessPatternManager { | 
|  | 29 | +  private static final Logger LOG = LoggerFactory.getLogger(FileAccessPatternManager.class); | 
|  | 30 | +  private final StorageResourceId resourceId; | 
|  | 31 | +  private final GoogleHadoopFileSystemConfiguration config; | 
|  | 32 | +  private final Fadvise fadvise; | 
|  | 33 | +  private boolean isPatternOverriden; | 
|  | 34 | +  private boolean randomAccess; | 
|  | 35 | +  // keeps track of any backward seek requested in lifecycle of InputStream | 
|  | 36 | +  private boolean isBackwardSeekRequested = false; | 
|  | 37 | +  // keeps track of any backward seek requested in lifecycle of InputStream | 
|  | 38 | +  private boolean isForwardSeekRequested = false; | 
|  | 39 | +  private long lastServedIndex = -1; | 
|  | 40 | +  // Keeps track of distance between consecutive requests | 
|  | 41 | +  private int consecutiveSequentialCount = 0; | 
|  | 42 | + | 
|  | 43 | +  FileAccessPatternManager( | 
|  | 44 | +      StorageResourceId resourceId, GoogleHadoopFileSystemConfiguration configuration) { | 
|  | 45 | +    this.isPatternOverriden = false; | 
|  | 46 | +    this.resourceId = resourceId; | 
|  | 47 | +    this.config = configuration; | 
|  | 48 | +    this.fadvise = config.getFadvise(); | 
|  | 49 | +    this.randomAccess = fadvise == Fadvise.AUTO_RANDOM || fadvise == Fadvise.RANDOM; | 
|  | 50 | +  } | 
|  | 51 | + | 
|  | 52 | +  void updateLastServedIndex(long position) { | 
|  | 53 | +    this.lastServedIndex = position; | 
|  | 54 | +  } | 
|  | 55 | + | 
|  | 56 | +  boolean shouldAdaptToRandomAccess() { | 
|  | 57 | +    return randomAccess; | 
|  | 58 | +  } | 
|  | 59 | + | 
|  | 60 | +  void updateAccessPattern(long currentPosition) { | 
|  | 61 | +    if (isPatternOverriden) { | 
|  | 62 | +      LOG.trace("Will bypass computing access pattern as it's overriden for resource :{}", | 
|  | 63 | +          resourceId); | 
|  | 64 | +      return; | 
|  | 65 | +    } | 
|  | 66 | +    updateSeekFlags(currentPosition); | 
|  | 67 | +    if (fadvise == Fadvise.AUTO_RANDOM) { | 
|  | 68 | +      if (randomAccess) { | 
|  | 69 | +        if (shouldAdaptToSequential(currentPosition)) { | 
|  | 70 | +          unsetRandomAccess(); | 
|  | 71 | +        } | 
|  | 72 | +      } else { | 
|  | 73 | +        if (shouldAdaptToRandomAccess(currentPosition)) { | 
|  | 74 | +          setRandomAccess(); | 
|  | 75 | +        } | 
|  | 76 | +      } | 
|  | 77 | +    } else if (fadvise == Fadvise.AUTO) { | 
|  | 78 | +      if (shouldAdaptToRandomAccess(currentPosition)) { | 
|  | 79 | +        setRandomAccess(); | 
|  | 80 | +      } | 
|  | 81 | +    } | 
|  | 82 | +  } | 
|  | 83 | + | 
|  | 84 | +  /** | 
|  | 85 | +   * This provides a way to override the access isRandomPattern, once overridden it will not be | 
|  | 86 | +   * recomputed for adaptive fadvise types. | 
|  | 87 | +   * | 
|  | 88 | +   * @param isRandomPattern, true, to override with random access else false | 
|  | 89 | +   */ | 
|  | 90 | +  void overrideAccessPattern(boolean isRandomPattern) { | 
|  | 91 | +    this.isPatternOverriden = true; | 
|  | 92 | +    this.randomAccess = isRandomPattern; | 
|  | 93 | +    LOG.trace( | 
|  | 94 | +        "Overriding the random access pattern to %s for fadvise:%s for resource: %s ", | 
|  | 95 | +        isRandomPattern, fadvise, resourceId); | 
|  | 96 | +  } | 
|  | 97 | + | 
|  | 98 | +  private boolean shouldAdaptToSequential(long currentPosition) { | 
|  | 99 | +    if (lastServedIndex != -1) { | 
|  | 100 | +      long distance = currentPosition - lastServedIndex; | 
|  | 101 | +      if (distance < 0 || distance > config.getInplaceSeekLimit()) { | 
|  | 102 | +        consecutiveSequentialCount = 0; | 
|  | 103 | +      } else { | 
|  | 104 | +        consecutiveSequentialCount++; | 
|  | 105 | +      } | 
|  | 106 | +    } | 
|  | 107 | + | 
|  | 108 | +    if (!shouldDetectSequentialAccess()) { | 
|  | 109 | +      return false; | 
|  | 110 | +    } | 
|  | 111 | + | 
|  | 112 | +    if (consecutiveSequentialCount < config.getFadviseRequestTrackCount()) { | 
|  | 113 | +      return false; | 
|  | 114 | +    } | 
|  | 115 | +    LOG.trace( | 
|  | 116 | +        "Detected {} consecutive read request within distance threshold {} with fadvise: {} " | 
|  | 117 | +            + "switching to sequential IO for '{}'", | 
|  | 118 | +        consecutiveSequentialCount, | 
|  | 119 | +        config.getInplaceSeekLimit(), | 
|  | 120 | +        fadvise, | 
|  | 121 | +        resourceId); | 
|  | 122 | +    return true; | 
|  | 123 | +  } | 
|  | 124 | + | 
|  | 125 | +  private boolean shouldAdaptToRandomAccess(long currentPosition) { | 
|  | 126 | +    if (!shouldDetectRandomAccess()) { | 
|  | 127 | +      return false; | 
|  | 128 | +    } | 
|  | 129 | +    if (lastServedIndex == -1) { | 
|  | 130 | +      return false; | 
|  | 131 | +    } | 
|  | 132 | + | 
|  | 133 | +    if (isBackwardOrForwardSeekRequested()) { | 
|  | 134 | +      LOG.trace( | 
|  | 135 | +          "Backward or forward seek requested, isBackwardSeek: {}, isForwardSeek:{} for '{}'", | 
|  | 136 | +          isBackwardSeekRequested, isForwardSeekRequested, resourceId); | 
|  | 137 | +      return true; | 
|  | 138 | +    } | 
|  | 139 | +    return false; | 
|  | 140 | +  } | 
|  | 141 | + | 
|  | 142 | +  private boolean shouldDetectSequentialAccess() { | 
|  | 143 | +    return randomAccess | 
|  | 144 | +        && !isBackwardOrForwardSeekRequested() | 
|  | 145 | +        && consecutiveSequentialCount >= config.getFadviseRequestTrackCount() | 
|  | 146 | +        && fadvise == Fadvise.AUTO_RANDOM; | 
|  | 147 | +  } | 
|  | 148 | + | 
|  | 149 | +  private boolean shouldDetectRandomAccess() { | 
|  | 150 | +    return !randomAccess && (fadvise == Fadvise.AUTO || fadvise == Fadvise.AUTO_RANDOM); | 
|  | 151 | +  } | 
|  | 152 | + | 
|  | 153 | +  private void setRandomAccess() { | 
|  | 154 | +    randomAccess = true; | 
|  | 155 | +  } | 
|  | 156 | + | 
|  | 157 | +  private void unsetRandomAccess() { | 
|  | 158 | +    randomAccess = false; | 
|  | 159 | +  } | 
|  | 160 | + | 
|  | 161 | +  private boolean isBackwardOrForwardSeekRequested() { | 
|  | 162 | +    return isBackwardSeekRequested || isForwardSeekRequested; | 
|  | 163 | +  } | 
|  | 164 | + | 
|  | 165 | +  private void updateSeekFlags(long currentPosition) { | 
|  | 166 | +    if (lastServedIndex == -1) { | 
|  | 167 | +      return; | 
|  | 168 | +    } | 
|  | 169 | + | 
|  | 170 | +    if (currentPosition < lastServedIndex) { | 
|  | 171 | +      isBackwardSeekRequested = true; | 
|  | 172 | +      LOG.trace( | 
|  | 173 | +          "Detected backward read from {} to {} position, updating to backwardSeek for '{}'", | 
|  | 174 | +          lastServedIndex, currentPosition, resourceId); | 
|  | 175 | + | 
|  | 176 | +    } else if (lastServedIndex + config.getInplaceSeekLimit() < currentPosition) { | 
|  | 177 | +      isForwardSeekRequested = true; | 
|  | 178 | +      LOG.trace( | 
|  | 179 | +          "Detected forward read from {} to {} position over {} threshold," | 
|  | 180 | +              + " updated to forwardSeek for '{}'", | 
|  | 181 | +          lastServedIndex, currentPosition, config.getInplaceSeekLimit(), resourceId); | 
|  | 182 | +    } | 
|  | 183 | +  } | 
|  | 184 | +} | 
0 commit comments