Skip to content

Commit 25d97f6

Browse files
author
Harsh J
committed
HADOOP-1381. The distance between sync blocks in SequenceFiles should be configurable rather than hard coded to 2000 bytes.
1 parent 7ffb994 commit 25d97f6

File tree

2 files changed

+146
-41
lines changed

2 files changed

+146
-41
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.rmi.server.UID;
2525
import java.security.MessageDigest;
2626

27+
import com.google.common.annotations.VisibleForTesting;
2728
import org.apache.commons.logging.*;
2829
import org.apache.hadoop.util.Options;
2930
import org.apache.hadoop.fs.*;
@@ -146,7 +147,7 @@
146147
* </ul>
147148
* </li>
148149
* <li>
149-
* A sync-marker every few <code>100</code> bytes or so.
150+
* A sync-marker every few <code>100</code> kilobytes or so.
150151
* </li>
151152
* </ul>
152153
*
@@ -165,7 +166,7 @@
165166
* </ul>
166167
* </li>
167168
* <li>
168-
* A sync-marker every few <code>100</code> bytes or so.
169+
* A sync-marker every few <code>100</code> kilobytes or so.
169170
* </li>
170171
* </ul>
171172
*
@@ -217,8 +218,11 @@ private SequenceFile() {} // no public ctor
217218
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
218219
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
219220

220-
/** The number of bytes between sync points.*/
221-
public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
221+
/**
222+
* The number of bytes between sync points. 100 KB, default.
223+
* Computed as 5 KB * 20 = 100 KB
224+
*/
225+
public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)
222226

223227
/**
224228
* The compression type used to compress key/value pairs in the
@@ -856,6 +860,9 @@ public static class Writer implements java.io.Closeable, Syncable {
856860
// starts and ends by scanning for this value.
857861
long lastSyncPos; // position of last sync
858862
byte[] sync; // 16 random bytes
863+
@VisibleForTesting
864+
int syncInterval;
865+
859866
{
860867
try {
861868
MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -987,7 +994,16 @@ public static Option file(Path value) {
987994
private static Option filesystem(FileSystem fs) {
988995
return new SequenceFile.Writer.FileSystemOption(fs);
989996
}
990-
997+
998+
private static class SyncIntervalOption extends Options.IntegerOption
999+
implements Option {
1000+
SyncIntervalOption(int val) {
1001+
// If a negative sync interval is provided,
1002+
// fall back to the default sync interval.
1003+
super(val < 0 ? SYNC_INTERVAL : val);
1004+
}
1005+
}
1006+
9911007
public static Option bufferSize(int value) {
9921008
return new BufferSizeOption(value);
9931009
}
@@ -1032,11 +1048,15 @@ public static Option compression(CompressionType value,
10321048
CompressionCodec codec) {
10331049
return new CompressionOption(value, codec);
10341050
}
1035-
1051+
1052+
public static Option syncInterval(int value) {
1053+
return new SyncIntervalOption(value);
1054+
}
1055+
10361056
/**
10371057
* Construct a uncompressed writer from a set of options.
10381058
* @param conf the configuration to use
1039-
* @param options the options used when creating the writer
1059+
* @param opts the options used when creating the writer
10401060
* @throws IOException if it fails
10411061
*/
10421062
Writer(Configuration conf,
@@ -1062,6 +1082,8 @@ public static Option compression(CompressionType value,
10621082
Options.getOption(MetadataOption.class, opts);
10631083
CompressionOption compressionTypeOption =
10641084
Options.getOption(CompressionOption.class, opts);
1085+
SyncIntervalOption syncIntervalOption =
1086+
Options.getOption(SyncIntervalOption.class, opts);
10651087
// check consistency of options
10661088
if ((fileOption == null) == (streamOption == null)) {
10671089
throw new IllegalArgumentException("file or stream must be specified");
@@ -1163,7 +1185,12 @@ public static Option compression(CompressionType value,
11631185
"GzipCodec without native-hadoop " +
11641186
"code!");
11651187
}
1166-
init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1188+
this.syncInterval = (syncIntervalOption == null) ?
1189+
SYNC_INTERVAL :
1190+
syncIntervalOption.getValue();
1191+
init(
1192+
conf, out, ownStream, keyClass, valueClass,
1193+
codec, metadata, syncInterval);
11671194
}
11681195

11691196
/** Create the named file.
@@ -1176,7 +1203,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
11761203
Class keyClass, Class valClass) throws IOException {
11771204
this.compress = CompressionType.NONE;
11781205
init(conf, fs.create(name), true, keyClass, valClass, null,
1179-
new Metadata());
1206+
new Metadata(), SYNC_INTERVAL);
11801207
}
11811208

11821209
/** Create the named file with write-progress reporter.
@@ -1190,7 +1217,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
11901217
Progressable progress, Metadata metadata) throws IOException {
11911218
this.compress = CompressionType.NONE;
11921219
init(conf, fs.create(name, progress), true, keyClass, valClass,
1193-
null, metadata);
1220+
null, metadata, SYNC_INTERVAL);
11941221
}
11951222

11961223
/** Create the named file with write-progress reporter.
@@ -1206,7 +1233,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
12061233
this.compress = CompressionType.NONE;
12071234
init(conf,
12081235
fs.create(name, true, bufferSize, replication, blockSize, progress),
1209-
true, keyClass, valClass, null, metadata);
1236+
true, keyClass, valClass, null, metadata, SYNC_INTERVAL);
12101237
}
12111238

12121239
boolean isCompressed() { return compress != CompressionType.NONE; }
@@ -1234,18 +1261,21 @@ private void writeFileHeader()
12341261

12351262
/** Initialize. */
12361263
@SuppressWarnings("unchecked")
1237-
void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1238-
Class keyClass, Class valClass,
1239-
CompressionCodec codec, Metadata metadata)
1264+
void init(Configuration config, FSDataOutputStream outStream,
1265+
boolean ownStream, Class key, Class val,
1266+
CompressionCodec compCodec, Metadata meta,
1267+
int syncIntervalVal)
12401268
throws IOException {
1241-
this.conf = conf;
1242-
this.out = out;
1269+
this.conf = config;
1270+
this.out = outStream;
12431271
this.ownOutputStream = ownStream;
1244-
this.keyClass = keyClass;
1245-
this.valClass = valClass;
1246-
this.codec = codec;
1247-
this.metadata = metadata;
1248-
SerializationFactory serializationFactory = new SerializationFactory(conf);
1272+
this.keyClass = key;
1273+
this.valClass = val;
1274+
this.codec = compCodec;
1275+
this.metadata = meta;
1276+
this.syncInterval = syncIntervalVal;
1277+
SerializationFactory serializationFactory =
1278+
new SerializationFactory(config);
12491279
this.keySerializer = serializationFactory.getSerializer(keyClass);
12501280
if (this.keySerializer == null) {
12511281
throw new IOException(
@@ -1366,7 +1396,7 @@ public synchronized void close() throws IOException {
13661396

13671397
synchronized void checkAndWriteSync() throws IOException {
13681398
if (sync != null &&
1369-
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1399+
out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync
13701400
sync();
13711401
}
13721402
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import org.apache.hadoop.fs.FSDataInputStream;
2828
import org.apache.hadoop.fs.FileSystem;
2929
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.io.SequenceFile.CompressionType;
3031
import org.apache.hadoop.test.GenericTestUtils;
3132
import org.junit.Test;
3233

34+
/** Tests sync based seek reads/write intervals inside SequenceFiles. */
3335
public class TestSequenceFileSync {
3436
private static final int NUMRECORDS = 2000;
3537
private static final int RECORDSIZE = 80;
36-
private static final Random rand = new Random();
38+
private static final Random RAND = new Random();
3739

3840
private final static String REC_FMT = "%d RECORDID %d : ";
3941

@@ -46,37 +48,110 @@ private static void forOffset(SequenceFile.Reader reader,
4648
reader.next(key, val);
4749
assertEquals(key.get(), expectedRecord);
4850
final String test = String.format(REC_FMT, expectedRecord, expectedRecord);
49-
assertEquals("Invalid value " + val, 0, val.find(test, 0));
51+
assertEquals(
52+
"Invalid value in iter " + iter + ": " + val,
53+
0,
54+
val.find(test, 0));
55+
}
56+
57+
@Test
58+
public void testDefaultSyncInterval() throws IOException {
59+
// Uses the default sync interval of 100 KB
60+
final Configuration conf = new Configuration();
61+
final FileSystem fs = FileSystem.getLocal(conf);
62+
final Path path = new Path(GenericTestUtils.getTempPath(
63+
"sequencefile.sync.test"));
64+
final IntWritable input = new IntWritable();
65+
final Text val = new Text();
66+
SequenceFile.Writer writer = new SequenceFile.Writer(
67+
conf,
68+
SequenceFile.Writer.file(path),
69+
SequenceFile.Writer.compression(CompressionType.NONE),
70+
SequenceFile.Writer.keyClass(IntWritable.class),
71+
SequenceFile.Writer.valueClass(Text.class)
72+
);
73+
try {
74+
writeSequenceFile(writer, NUMRECORDS*4);
75+
for (int i = 0; i < 5; i++) {
76+
final SequenceFile.Reader reader;
77+
78+
//try different SequenceFile.Reader constructors
79+
if (i % 2 == 0) {
80+
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
81+
reader = new SequenceFile.Reader(conf,
82+
SequenceFile.Reader.file(path),
83+
SequenceFile.Reader.bufferSize(buffersize));
84+
} else {
85+
final FSDataInputStream in = fs.open(path);
86+
final long length = fs.getFileStatus(path).getLen();
87+
reader = new SequenceFile.Reader(conf,
88+
SequenceFile.Reader.stream(in),
89+
SequenceFile.Reader.start(0L),
90+
SequenceFile.Reader.length(length));
91+
}
92+
93+
try {
94+
forOffset(reader, input, val, i, 0, 0);
95+
forOffset(reader, input, val, i, 65, 0);
96+
// There would be over 1000 records within
97+
// this sync interval
98+
forOffset(reader, input, val, i, 2000, 1101);
99+
forOffset(reader, input, val, i, 0, 0);
100+
} finally {
101+
reader.close();
102+
}
103+
}
104+
} finally {
105+
fs.delete(path, false);
106+
}
50107
}
51108

52109
@Test
53110
public void testLowSyncpoint() throws IOException {
111+
// Uses a smaller sync interval of 2000 bytes
54112
final Configuration conf = new Configuration();
55113
final FileSystem fs = FileSystem.getLocal(conf);
56114
final Path path = new Path(GenericTestUtils.getTempPath(
57115
"sequencefile.sync.test"));
58116
final IntWritable input = new IntWritable();
59117
final Text val = new Text();
60-
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
61-
IntWritable.class, Text.class);
118+
SequenceFile.Writer writer = new SequenceFile.Writer(
119+
conf,
120+
SequenceFile.Writer.file(path),
121+
SequenceFile.Writer.compression(CompressionType.NONE),
122+
SequenceFile.Writer.keyClass(IntWritable.class),
123+
SequenceFile.Writer.valueClass(Text.class),
124+
SequenceFile.Writer.syncInterval(20*100)
125+
);
126+
// Ensure the custom sync interval value is set
127+
assertEquals(writer.syncInterval, 20*100);
62128
try {
63129
writeSequenceFile(writer, NUMRECORDS);
64-
for (int i = 0; i < 5 ; i++) {
65-
final SequenceFile.Reader reader;
130+
for (int i = 0; i < 5; i++) {
131+
final SequenceFile.Reader reader;
66132

67-
//try different SequenceFile.Reader constructors
68-
if (i % 2 == 0) {
69-
reader = new SequenceFile.Reader(fs, path, conf);
70-
} else {
71-
final FSDataInputStream in = fs.open(path);
72-
final long length = fs.getFileStatus(path).getLen();
73-
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
74-
reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
75-
}
133+
//try different SequenceFile.Reader constructors
134+
if (i % 2 == 0) {
135+
final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
136+
reader = new SequenceFile.Reader(
137+
conf,
138+
SequenceFile.Reader.file(path),
139+
SequenceFile.Reader.bufferSize(bufferSize));
140+
} else {
141+
final FSDataInputStream in = fs.open(path);
142+
final long length = fs.getFileStatus(path).getLen();
143+
reader = new SequenceFile.Reader(
144+
conf,
145+
SequenceFile.Reader.stream(in),
146+
SequenceFile.Reader.start(0L),
147+
SequenceFile.Reader.length(length));
148+
}
76149

77-
try {
150+
try {
78151
forOffset(reader, input, val, i, 0, 0);
79152
forOffset(reader, input, val, i, 65, 0);
153+
// There would be only a few records within
154+
// this sync interval
80155
forOffset(reader, input, val, i, 2000, 21);
81156
forOffset(reader, input, val, i, 0, 0);
82157
} finally {
@@ -88,7 +163,7 @@ public void testLowSyncpoint() throws IOException {
88163
}
89164
}
90165

91-
public static void writeSequenceFile(SequenceFile.Writer writer,
166+
private static void writeSequenceFile(SequenceFile.Writer writer,
92167
int numRecords) throws IOException {
93168
final IntWritable key = new IntWritable();
94169
final Text val = new Text();
@@ -100,13 +175,13 @@ public static void writeSequenceFile(SequenceFile.Writer writer,
100175
writer.close();
101176
}
102177

103-
static void randomText(Text val, int id, int recordSize) {
178+
private static void randomText(Text val, int id, int recordSize) {
104179
val.clear();
105180
final StringBuilder ret = new StringBuilder(recordSize);
106181
ret.append(String.format(REC_FMT, id, id));
107182
recordSize -= ret.length();
108183
for (int i = 0; i < recordSize; ++i) {
109-
ret.append(rand.nextInt(9));
184+
ret.append(RAND.nextInt(9));
110185
}
111186
val.set(ret.toString());
112187
}

0 commit comments

Comments
 (0)