@@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
125
125
db_lock_(NULL ),
126
126
shutting_down_(NULL ),
127
127
bg_cv_(&mutex_),
128
- mem_(new MemTable(internal_comparator_) ),
128
+ mem_(NULL ),
129
129
imm_(NULL ),
130
130
logfile_(NULL ),
131
131
logfile_number_(0 ),
@@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
134
134
tmp_batch_(new WriteBatch),
135
135
bg_compaction_scheduled_(false ),
136
136
manual_compaction_(NULL ) {
137
- mem_->Ref ();
138
137
has_imm_.Release_Store (NULL );
139
138
140
139
// Reserve ten files or so for other uses and give the rest to TableCache.
@@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
271
270
}
272
271
}
273
272
274
- Status DBImpl::Recover (VersionEdit* edit) {
273
+ Status DBImpl::Recover (VersionEdit* edit, bool *save_manifest ) {
275
274
mutex_.AssertHeld ();
276
275
277
276
// Ignore error from CreateDir since the creation of the DB is
@@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
301
300
}
302
301
}
303
302
304
- s = versions_->Recover ();
305
- if (s.ok ()) {
306
- SequenceNumber max_sequence (0 );
307
-
308
- // Recover from all newer log files than the ones named in the
309
- // descriptor (new log files may have been added by the previous
310
- // incarnation without registering them in the descriptor).
311
- //
312
- // Note that PrevLogNumber() is no longer used, but we pay
313
- // attention to it in case we are recovering a database
314
- // produced by an older version of leveldb.
315
- const uint64_t min_log = versions_->LogNumber ();
316
- const uint64_t prev_log = versions_->PrevLogNumber ();
317
- std::vector<std::string> filenames;
318
- s = env_->GetChildren (dbname_, &filenames);
303
+ s = versions_->Recover (save_manifest);
304
+ if (!s.ok ()) {
305
+ return s;
306
+ }
307
+ SequenceNumber max_sequence (0 );
308
+
309
+ // Recover from all newer log files than the ones named in the
310
+ // descriptor (new log files may have been added by the previous
311
+ // incarnation without registering them in the descriptor).
312
+ //
313
+ // Note that PrevLogNumber() is no longer used, but we pay
314
+ // attention to it in case we are recovering a database
315
+ // produced by an older version of leveldb.
316
+ const uint64_t min_log = versions_->LogNumber ();
317
+ const uint64_t prev_log = versions_->PrevLogNumber ();
318
+ std::vector<std::string> filenames;
319
+ s = env_->GetChildren (dbname_, &filenames);
320
+ if (!s.ok ()) {
321
+ return s;
322
+ }
323
+ std::set<uint64_t > expected;
324
+ versions_->AddLiveFiles (&expected);
325
+ uint64_t number;
326
+ FileType type;
327
+ std::vector<uint64_t > logs;
328
+ for (size_t i = 0 ; i < filenames.size (); i++) {
329
+ if (ParseFileName (filenames[i], &number, &type)) {
330
+ expected.erase (number);
331
+ if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
332
+ logs.push_back (number);
333
+ }
334
+ }
335
+ if (!expected.empty ()) {
336
+ char buf[50 ];
337
+ snprintf (buf, sizeof (buf), " %d missing files; e.g." ,
338
+ static_cast <int >(expected.size ()));
339
+ return Status::Corruption (buf, TableFileName (dbname_, *(expected.begin ())));
340
+ }
341
+
342
+ // Recover in the order in which the logs were generated
343
+ std::sort (logs.begin (), logs.end ());
344
+ for (size_t i = 0 ; i < logs.size (); i++) {
345
+ s = RecoverLogFile (logs[i], (i == logs.size () - 1 ), save_manifest, edit,
346
+ &max_sequence);
319
347
if (!s.ok ()) {
320
348
return s;
321
349
}
322
- std::set<uint64_t > expected;
323
- versions_->AddLiveFiles (&expected);
324
- uint64_t number;
325
- FileType type;
326
- std::vector<uint64_t > logs;
327
- for (size_t i = 0 ; i < filenames.size (); i++) {
328
- if (ParseFileName (filenames[i], &number, &type)) {
329
- expected.erase (number);
330
- if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
331
- logs.push_back (number);
332
- }
333
- }
334
- if (!expected.empty ()) {
335
- char buf[50 ];
336
- snprintf (buf, sizeof (buf), " %d missing files; e.g." ,
337
- static_cast <int >(expected.size ()));
338
- return Status::Corruption (buf, TableFileName (dbname_, *(expected.begin ())));
339
- }
340
-
341
- // Recover in the order in which the logs were generated
342
- std::sort (logs.begin (), logs.end ());
343
- for (size_t i = 0 ; i < logs.size (); i++) {
344
- s = RecoverLogFile (logs[i], edit, &max_sequence);
345
350
346
- // The previous incarnation may not have written any MANIFEST
347
- // records after allocating this log number. So we manually
348
- // update the file number allocation counter in VersionSet.
349
- versions_->MarkFileNumberUsed (logs[i]);
350
- }
351
+ // The previous incarnation may not have written any MANIFEST
352
+ // records after allocating this log number. So we manually
353
+ // update the file number allocation counter in VersionSet.
354
+ versions_->MarkFileNumberUsed (logs[i]);
355
+ }
351
356
352
- if (s.ok ()) {
353
- if (versions_->LastSequence () < max_sequence) {
354
- versions_->SetLastSequence (max_sequence);
355
- }
356
- }
357
+ if (versions_->LastSequence () < max_sequence) {
358
+ versions_->SetLastSequence (max_sequence);
357
359
}
358
360
359
- return s ;
361
+ return Status::OK () ;
360
362
}
361
363
362
- Status DBImpl::RecoverLogFile (uint64_t log_number,
363
- VersionEdit* edit,
364
+ Status DBImpl::RecoverLogFile (uint64_t log_number, bool last_log,
365
+ bool * save_manifest, VersionEdit* edit,
364
366
SequenceNumber* max_sequence) {
365
367
struct LogReporter : public log ::Reader::Reporter {
366
368
Env* env;
@@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
405
407
std::string scratch;
406
408
Slice record;
407
409
WriteBatch batch;
410
+ int compactions = 0 ;
408
411
MemTable* mem = NULL ;
409
412
while (reader.ReadRecord (&record, &scratch) &&
410
413
status.ok ()) {
@@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
432
435
}
433
436
434
437
if (mem->ApproximateMemoryUsage () > options_.write_buffer_size ) {
438
+ compactions++;
439
+ *save_manifest = true ;
435
440
status = WriteLevel0Table (mem, edit, NULL );
441
+ mem->Unref ();
442
+ mem = NULL ;
436
443
if (!status.ok ()) {
437
444
// Reflect errors immediately so that conditions like full
438
445
// file-systems cause the DB::Open() to fail.
439
446
break ;
440
447
}
441
- mem->Unref ();
442
- mem = NULL ;
443
448
}
444
449
}
445
450
446
- if (status.ok () && mem != NULL ) {
447
- status = WriteLevel0Table (mem, edit, NULL );
448
- // Reflect errors immediately so that conditions like full
449
- // file-systems cause the DB::Open() to fail.
451
+ delete file;
452
+
453
+ // See if we should keep reusing the last log file.
454
+ if (status.ok () && options_.reuse_logs && last_log && compactions == 0 ) {
455
+ assert (logfile_ == NULL );
456
+ assert (log_ == NULL );
457
+ assert (mem_ == NULL );
458
+ uint64_t lfile_size;
459
+ if (env_->GetFileSize (fname, &lfile_size).ok () &&
460
+ env_->NewAppendableFile (fname, &logfile_).ok ()) {
461
+ Log (options_.info_log , " Reusing old log %s \n " , fname.c_str ());
462
+ log_ = new log ::Writer (logfile_, lfile_size);
463
+ logfile_number_ = log_number;
464
+ if (mem != NULL ) {
465
+ mem_ = mem;
466
+ mem = NULL ;
467
+ } else {
468
+ // mem can be NULL if lognum exists but was empty.
469
+ mem_ = new MemTable (internal_comparator_);
470
+ mem_->Ref ();
471
+ }
472
+ }
473
+ }
474
+
475
+ if (mem != NULL ) {
476
+ // mem did not get reused; compact it.
477
+ if (status.ok ()) {
478
+ *save_manifest = true ;
479
+ status = WriteLevel0Table (mem, edit, NULL );
480
+ }
481
+ mem->Unref ();
450
482
}
451
483
452
- if (mem != NULL ) mem->Unref ();
453
- delete file;
454
484
return status;
455
485
}
456
486
@@ -1449,8 +1479,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
1449
1479
DBImpl* impl = new DBImpl (options, dbname);
1450
1480
impl->mutex_ .Lock ();
1451
1481
VersionEdit edit;
1452
- Status s = impl->Recover (&edit); // Handles create_if_missing, error_if_exists
1453
- if (s.ok ()) {
1482
+ // Recover handles create_if_missing, error_if_exists
1483
+ bool save_manifest = false ;
1484
+ Status s = impl->Recover (&edit, &save_manifest);
1485
+ if (s.ok () && impl->mem_ == NULL ) {
1486
+ // Create new log and a corresponding memtable.
1454
1487
uint64_t new_log_number = impl->versions_ ->NewFileNumber ();
1455
1488
WritableFile* lfile;
1456
1489
s = options.env ->NewWritableFile (LogFileName (dbname, new_log_number),
@@ -1460,15 +1493,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
1460
1493
impl->logfile_ = lfile;
1461
1494
impl->logfile_number_ = new_log_number;
1462
1495
impl->log_ = new log ::Writer (lfile);
1463
- s = impl->versions_ ->LogAndApply (&edit, &impl->mutex_ );
1464
- }
1465
- if (s.ok ()) {
1466
- impl->DeleteObsoleteFiles ();
1467
- impl->MaybeScheduleCompaction ();
1496
+ impl->mem_ = new MemTable (impl->internal_comparator_ );
1497
+ impl->mem_ ->Ref ();
1468
1498
}
1469
1499
}
1500
+ if (s.ok () && save_manifest) {
1501
+ edit.SetPrevLogNumber (0 ); // No older logs needed after recovery.
1502
+ edit.SetLogNumber (impl->logfile_number_ );
1503
+ s = impl->versions_ ->LogAndApply (&edit, &impl->mutex_ );
1504
+ }
1505
+ if (s.ok ()) {
1506
+ impl->DeleteObsoleteFiles ();
1507
+ impl->MaybeScheduleCompaction ();
1508
+ }
1470
1509
impl->mutex_ .Unlock ();
1471
1510
if (s.ok ()) {
1511
+ assert (impl->mem_ != NULL );
1472
1512
*dbptr = impl;
1473
1513
} else {
1474
1514
delete impl;
0 commit comments