Page MenuHomePhabricator

No OneTemporary

This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/src/leveldb/AUTHORS b/src/leveldb/AUTHORS
index 27a9407e52..fc40194ab9 100644
--- a/src/leveldb/AUTHORS
+++ b/src/leveldb/AUTHORS
@@ -1,8 +1,11 @@
# Names should be added to this file like so:
# Name or Organization <email address>
Google Inc.
# Initial version authors:
Jeffrey Dean <jeff@google.com>
Sanjay Ghemawat <sanjay@google.com>
+
+# Partial list of contributors:
+Kevin Regan <kevin.d.regan@gmail.com>
diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile
index 42c4952fec..38b9bf7729 100644
--- a/src/leveldb/Makefile
+++ b/src/leveldb/Makefile
@@ -1,202 +1,206 @@
# Copyright (c) 2011 The LevelDB Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.
#-----------------------------------------------
# Uncomment exactly one of the lines labelled (A), (B), and (C) below
# to switch between compilation modes.
OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode)
# OPT ?= -g2 # (B) Debug mode, w/ full line-level debugging symbols
# OPT ?= -O2 -g2 -DNDEBUG # (C) Profiling mode: opt, but w/debugging symbols
#-----------------------------------------------
# detect what platform we're building on
$(shell CC=$(CC) CXX=$(CXX) TARGET_OS=$(TARGET_OS) \
./build_detect_platform build_config.mk ./)
# this file is generated by the previous line to set build flags and sources
include build_config.mk
CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)
LDFLAGS += $(PLATFORM_LDFLAGS)
LIBS += $(PLATFORM_LIBS)
LIBOBJECTS = $(SOURCES:.cc=.o)
MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o)
TESTUTIL = ./util/testutil.o
TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TESTS = \
arena_test \
bloom_test \
c_test \
cache_test \
coding_test \
corruption_test \
crc32c_test \
db_test \
dbformat_test \
env_test \
filename_test \
filter_block_test \
+ issue178_test \
log_test \
memenv_test \
skiplist_test \
table_test \
version_edit_test \
version_set_test \
write_batch_test
PROGRAMS = db_bench leveldbutil $(TESTS)
BENCHMARKS = db_bench_sqlite3 db_bench_tree_db
LIBRARY = libleveldb.a
MEMENVLIBRARY = libmemenv.a
default: all
# Should we build shared libraries?
ifneq ($(PLATFORM_SHARED_EXT),)
ifneq ($(PLATFORM_SHARED_VERSIONED),true)
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1)
SHARED3 = $(SHARED1)
SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
-SHARED_MINOR = 9
+SHARED_MINOR = 12
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
SHARED = $(SHARED1) $(SHARED2) $(SHARED3)
$(SHARED1): $(SHARED3)
ln -fs $(SHARED3) $(SHARED1)
$(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
endif
$(SHARED3):
$(CXX) $(LDFLAGS) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) -o $(SHARED3) $(LIBS)
endif # PLATFORM_SHARED_EXT
all: $(SHARED) $(LIBRARY)
check: all $(PROGRAMS) $(TESTS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean:
-rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk
-rm -rf ios-x86/* ios-arm/*
$(LIBRARY): $(LIBOBJECTS)
rm -f $@
$(AR) -rs $@ $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ $(LIBS)
db_bench_sqlite3: doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lsqlite3 $(LIBS)
db_bench_tree_db: doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lkyotocabinet $(LIBS)
leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
$(CXX) $(LDFLAGS) db/leveldb_main.o $(LIBOBJECTS) -o $@ $(LIBS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
c_test: db/c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
corruption_test: db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
crc32c_test: util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
db_test: db/db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
env_test: util/env_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS)
+ $(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
+
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@
$(AR) -rs $@ $(MEMENVOBJECTS)
memenv_test : helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS)
$(CXX) $(LDFLAGS) helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS) -o $@ $(LIBS)
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
.cc.o:
mkdir -p ios-x86/$(dir $@)
$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o:
mkdir -p ios-x86/$(dir $@)
$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
.cc.o:
$(CXX) $(CXXFLAGS) -c $< -o $@
.c.o:
$(CC) $(CFLAGS) -c $< -o $@
endif
diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc
index c9de169f29..af02467b33 100644
--- a/src/leveldb/db/db_impl.cc
+++ b/src/leveldb/db/db_impl.cc
@@ -1,1467 +1,1488 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include <algorithm>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/mutexlock.h"
namespace leveldb {
+const int kNumNonTableCacheFiles = 10;
+
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
struct DBImpl::CompactionState {
Compaction* const compaction;
// Sequence numbers < smallest_snapshot are not significant since we
// will never have to service a snapshot below smallest_snapshot.
// Therefore if we have seen a sequence number S <= smallest_snapshot,
// we can drop all entries for the same key with sequence numbers < S.
SequenceNumber smallest_snapshot;
// Files produced by compaction
struct Output {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
};
std::vector<Output> outputs;
// State kept for output being generated
WritableFile* outfile;
TableBuilder* builder;
uint64_t total_bytes;
Output* current_output() { return &outputs[outputs.size()-1]; }
explicit CompactionState(Compaction* c)
: compaction(c),
outfile(NULL),
builder(NULL),
total_bytes(0) {
}
};
// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
Options SanitizeOptions(const std::string& dbname,
const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const Options& src) {
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
- ClipToRange(&result.max_open_files, 20, 50000);
- ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
- ClipToRange(&result.block_size, 1<<10, 4<<20);
+ ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
+ ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
+ ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
if (!s.ok()) {
// No place suitable for logging
result.info_log = NULL;
}
}
if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20);
}
return result;
}
DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(
dbname, &internal_comparator_, &internal_filter_policy_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
dbname_(dbname),
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(NULL),
logfile_number_(0),
log_(NULL),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
- manual_compaction_(NULL) {
+ manual_compaction_(NULL),
+ consecutive_compaction_errors_(0) {
mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
- const int table_cache_size = options.max_open_files - 10;
+ const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_);
}
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_) {
bg_cv_.Wait();
}
mutex_.Unlock();
if (db_lock_ != NULL) {
env_->UnlockFile(db_lock_);
}
delete versions_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
delete tmp_batch_;
delete log_;
delete logfile_;
delete table_cache_;
if (owns_info_log_) {
delete options_.info_log;
}
if (owns_cache_) {
delete options_.block_cache;
}
}
Status DBImpl::NewDB() {
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) {
return s;
}
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = file->Close();
}
}
delete file;
if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1);
} else {
env_->DeleteFile(manifest);
}
return s;
}
void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) {
// No change needed
} else {
Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
*s = Status::OK();
}
}
void DBImpl::DeleteObsoleteFiles() {
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= versions_->ManifestFileNumber());
break;
case kTableFile:
keep = (live.find(number) != live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
keep = true;
break;
}
if (!keep) {
if (type == kTableFile) {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
}
}
}
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
+ std::set<uint64_t> expected;
+ versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &type)
- && type == kLogFile
- && ((number >= min_log) || (number == prev_log))) {
+ if (ParseFileName(filenames[i], &number, &type)) {
+ expected.erase(number);
+ if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
+ if (!expected.empty()) {
+ char buf[50];
+ snprintf(buf, sizeof(buf), "%d missing files; e.g.",
+ static_cast<int>(expected.size()));
+ return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
+ }
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
return s;
}
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
// Open the log file
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
mem->Unref();
mem = NULL;
}
}
if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
if (mem != NULL) mem->Unref();
delete file;
return status;
}
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}
Status DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL);
// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError("Deleting DB during memtable compaction");
}
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
}
return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
InternalKey begin_storage, end_storage;
ManualCompaction manual;
manual.level = level;
manual.done = false;
if (begin == NULL) {
manual.begin = NULL;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == NULL) {
manual.end = NULL;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
manual.end = &end_storage;
}
MutexLock l(&mutex_);
while (!manual.done) {
while (manual_compaction_ != NULL) {
bg_cv_.Wait();
}
manual_compaction_ = &manual;
MaybeScheduleCompaction();
while (manual_compaction_ == &manual) {
bg_cv_.Wait();
}
}
}
Status DBImpl::TEST_CompactMemTable() {
// NULL batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), NULL);
if (s.ok()) {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
}
return s;
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction();
if (s.ok()) {
// Success
+ consecutive_compaction_errors_ = 0;
} else if (shutting_down_.Acquire_Load()) {
// Error most likely due to shutdown; do not wait
} else {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
- env_->SleepForMicroseconds(1000000);
+ ++consecutive_compaction_errors_;
+ int seconds_to_sleep = 1;
+ for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) {
+ seconds_to_sleep *= 2;
+ }
+ env_->SleepForMicroseconds(seconds_to_sleep * 1000000);
mutex_.Lock();
}
}
bg_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
bg_cv_.SignalAll();
}
Status DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
return CompactMemTable();
}
Compaction* c;
bool is_manual = (manual_compaction_ != NULL);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction();
}
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}
if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}
return status;
}
void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
if (compact->builder != NULL) {
// May happen if we get a shutdown call in the middle of compaction
compact->builder->Abandon();
delete compact->builder;
} else {
assert(compact->outfile == NULL);
}
delete compact->outfile;
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
}
delete compact;
}
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
assert(compact != NULL);
assert(compact->builder == NULL);
uint64_t file_number;
{
mutex_.Lock();
file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
mutex_.Unlock();
}
// Make the output file
std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
return s;
}
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
assert(compact != NULL);
assert(compact->outfile != NULL);
assert(compact->builder != NULL);
const uint64_t output_number = compact->current_output()->number;
assert(output_number != 0);
// Check for iterator errors
Status s = input->status();
const uint64_t current_entries = compact->builder->NumEntries();
if (s.ok()) {
s = compact->builder->Finish();
} else {
compact->builder->Abandon();
}
const uint64_t current_bytes = compact->builder->FileSize();
compact->current_output()->file_size = current_bytes;
compact->total_bytes += current_bytes;
delete compact->builder;
compact->builder = NULL;
// Finish and check for file errors
if (s.ok()) {
s = compact->outfile->Sync();
}
if (s.ok()) {
s = compact->outfile->Close();
}
delete compact->outfile;
compact->outfile = NULL;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
output_number,
current_bytes);
s = iter->status();
delete iter;
if (s.ok()) {
Log(options_.info_log,
"Generated table #%llu: %lld keys, %lld bytes",
(unsigned long long) output_number,
(unsigned long long) current_entries,
(unsigned long long) current_bytes);
+
+ // rate-limit compaction file creation with a 100ms pause
+ env_->SleepForMicroseconds(100000);
}
}
return s;
}
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == NULL);
assert(compact->outfile == NULL);
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->number_;
}
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key,
Slice(current_user_key)) != 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
#if 0
Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d",
ikey.user_key.ToString().c_str(),
(int)ikey.sequence, ikey.type, kTypeValue, drop,
compact->compaction->IsBaseLevelForKey(ikey.user_key),
(int)last_sequence_for_key, (int)compact->smallest_snapshot);
#endif
if (!drop) {
// Open output file if necessary
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next();
}
if (status.ok() && shutting_down_.Acquire_Load()) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = NULL;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
namespace {
struct IterState {
port::Mutex* mu;
Version* version;
MemTable* mem;
MemTable* imm;
};
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock();
state->mem->Unref();
if (state->imm != NULL) state->imm->Unref();
state->version->Unref();
state->mu->Unlock();
delete state;
}
} // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
return NewInternalIterator(ReadOptions(), &ignored);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return versions_->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// Append to *reuslt
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
+ Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
- Log(options_.info_log, "waiting...\n");
+ Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
Slice in = property;
Slice prefix("leveldb.");
if (!in.starts_with(prefix)) return false;
in.remove_prefix(prefix.size());
if (in.starts_with("num-files-at-level")) {
in.remove_prefix(strlen("num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || level >= config::kNumLevels) {
return false;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
} else if (in == "stats") {
char buf[200];
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
"--------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
stats_[level].micros / 1e6,
stats_[level].bytes_read / 1048576.0,
stats_[level].bytes_written / 1048576.0);
value->append(buf);
}
}
return true;
} else if (in == "sstables") {
*value = versions_->current()->DebugString();
return true;
}
return false;
}
void DBImpl::GetApproximateSizes(
const Range* range, int n,
uint64_t* sizes) {
// TODO(opt): better implementation
Version* v;
{
MutexLock l(&mutex_);
versions_->current()->Ref();
v = versions_->current();
}
for (int i = 0; i < n; i++) {
// Convert user_key into a corresponding internal key.
InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
uint64_t start = versions_->ApproximateOffsetOf(v, k1);
uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
sizes[i] = (limit >= start ? limit - start : 0);
}
{
MutexLock l(&mutex_);
v->Unref();
}
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
return s;
}
Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
std::vector<std::string> filenames;
// Ignore error in case directory does not exist
env->GetChildren(dbname, &filenames);
if (filenames.empty()) {
return Status::OK();
}
FileLock* lock;
const std::string lockname = LockFileName(dbname);
Status result = env->LockFile(lockname, &lock);
if (result.ok()) {
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type != kDBLockFile) { // Lock file will be deleted at end
Status del = env->DeleteFile(dbname + "/" + filenames[i]);
if (result.ok() && !del.ok()) {
result = del;
}
}
}
env->UnlockFile(lock); // Ignore error since state is already gone
env->DeleteFile(lockname);
env->DeleteDir(dbname); // Ignore error in case dir contains other files
}
return result;
}
} // namespace leveldb
diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h
index bd29dd8055..3c8d711ae0 100644
--- a/src/leveldb/db/db_impl.h
+++ b/src/leveldb/db/db_impl.h
@@ -1,202 +1,203 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <deque>
#include <set>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
namespace leveldb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class DBImpl : public DB {
public:
DBImpl(const Options& options, const std::string& dbname);
virtual ~DBImpl();
// Implementations of the DB interface
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value);
virtual Status Delete(const WriteOptions&, const Slice& key);
virtual Status Write(const WriteOptions& options, WriteBatch* updates);
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value);
virtual Iterator* NewIterator(const ReadOptions&);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
Iterator* TEST_NewInternalIterator();
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
private:
friend class DB;
struct CompactionState;
struct Writer;
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable()
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
WriteBatch* BuildBatchGroup(Writer** last_writer);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Constant after construction
Env* const env_;
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
bool owns_info_log_;
bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
TableCache* table_cache_;
// Lock over the persistent DB state. Non-NULL iff successfully acquired.
FileLock* db_lock_;
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
MemTable* mem_;
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_;
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch* tmp_batch_;
SnapshotList snapshots_;
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_;
// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
// Information for a manual compaction
struct ManualCompaction {
int level;
bool done;
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
ManualCompaction* manual_compaction_;
VersionSet* versions_;
// Have we encountered a background error in paranoid mode?
Status bg_error_;
+ int consecutive_compaction_errors_;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
int64_t micros;
int64_t bytes_read;
int64_t bytes_written;
CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { }
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_read += c.bytes_read;
this->bytes_written += c.bytes_written;
}
};
CompactionStats stats_[config::kNumLevels];
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
};
// Sanitize db options. The caller should delete result.info_log if
// it is not equal to src.info_log.
extern Options SanitizeOptions(const std::string& db,
const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const Options& src);
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_DB_IMPL_H_
diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc
index 684ea3bdbc..49aae04dbd 100644
--- a/src/leveldb/db/db_test.cc
+++ b/src/leveldb/db/db_test.cc
@@ -1,2027 +1,2092 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "leveldb/cache.h"
#include "leveldb/env.h"
#include "leveldb/table.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
static std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
}
namespace {
class AtomicCounter {
private:
port::Mutex mu_;
int count_;
public:
AtomicCounter() : count_(0) { }
void Increment() {
+ IncrementBy(1);
+ }
+ void IncrementBy(int count) {
MutexLock l(&mu_);
- count_++;
+ count_ += count;
}
int Read() {
MutexLock l(&mu_);
return count_;
}
void Reset() {
MutexLock l(&mu_);
count_ = 0;
}
};
+
+void DelayMilliseconds(int millis) {
+ Env::Default()->SleepForMicroseconds(millis * 1000);
+}
}
// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
public:
// sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_;
// Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_;
// Simulate non-writable file system while this pointer is non-NULL
port::AtomicPointer non_writable_;
// Force sync of manifest files to fail while this pointer is non-NULL
port::AtomicPointer manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-NULL
port::AtomicPointer manifest_write_error_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
AtomicCounter sleep_counter_;
+ AtomicCounter sleep_time_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL);
count_random_reads_ = false;
manifest_sync_error_.Release_Store(NULL);
manifest_write_error_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class SSTableFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
public:
SSTableFile(SpecialEnv* env, WritableFile* base)
: env_(env),
base_(base) {
}
~SSTableFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != NULL) {
// Drop writes on the floor
return Status::OK();
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
return base_->Sync();
}
};
class ManifestFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
public:
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
~ManifestFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->manifest_write_error_.Acquire_Load() != NULL) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
}
}
};
if (non_writable_.Acquire_Load() != NULL) {
return Status::IOError("simulated write error");
}
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != NULL) {
*r = new ManifestFile(this, *r);
}
}
return s;
}
Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
class CountingFile : public RandomAccessFile {
private:
RandomAccessFile* target_;
AtomicCounter* counter_;
public:
CountingFile(RandomAccessFile* target, AtomicCounter* counter)
: target_(target), counter_(counter) {
}
virtual ~CountingFile() { delete target_; }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
counter_->Increment();
return target_->Read(offset, n, result, scratch);
}
};
Status s = target()->NewRandomAccessFile(f, r);
if (s.ok() && count_random_reads_) {
*r = new CountingFile(*r, &random_read_counter_);
}
return s;
}
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
- target()->SleepForMicroseconds(micros);
+ sleep_time_counter_.IncrementBy(micros);
}
+
};
class DBTest {
private:
const FilterPolicy* filter_policy_;
// Sequence of option configurations to try
enum OptionConfig {
kDefault,
kFilter,
kUncompressed,
kEnd
};
int option_config_;
public:
std::string dbname_;
SpecialEnv* env_;
DB* db_;
Options last_options_;
DBTest() : option_config_(kDefault),
env_(new SpecialEnv(Env::Default())) {
filter_policy_ = NewBloomFilterPolicy(10);
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, Options());
db_ = NULL;
Reopen();
}
~DBTest() {
delete db_;
DestroyDB(dbname_, Options());
delete env_;
delete filter_policy_;
}
// Switch to a fresh database with the next option configuration to
// test. Return false if there are no more configurations to test.
bool ChangeOptions() {
option_config_++;
if (option_config_ >= kEnd) {
return false;
} else {
DestroyAndReopen();
return true;
}
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kFilter:
options.filter_policy = filter_policy_;
break;
case kUncompressed:
options.compression = kNoCompression;
break;
default:
break;
}
return options;
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
void Reopen(Options* options = NULL) {
ASSERT_OK(TryReopen(options));
}
void Close() {
delete db_;
db_ = NULL;
}
void DestroyAndReopen(Options* options = NULL) {
delete db_;
db_ = NULL;
DestroyDB(dbname_, Options());
ASSERT_OK(TryReopen(options));
}
Status TryReopen(Options* options) {
delete db_;
db_ = NULL;
Options opts;
if (options != NULL) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
last_options_ = opts;
return DB::Open(opts, dbname_, &db_);
}
Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
ReadOptions options;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
// Return a string that contains all key,value pairs in order,
// formatted like "(k1->v1)(k2->v2)".
std::string Contents() {
std::vector<std::string> forward;
std::string result;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string s = IterStatus(iter);
result.push_back('(');
result.append(s);
result.push_back(')');
forward.push_back(s);
}
// Check reverse iteration results are the reverse of forward results
int matched = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_LT(matched, forward.size());
ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
matched++;
}
ASSERT_EQ(matched, forward.size());
delete iter;
return result;
}
std::string AllEntriesFor(const Slice& user_key) {
Iterator* iter = dbfull()->TEST_NewInternalIterator();
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
iter->Seek(target.Encode());
std::string result;
if (!iter->status().ok()) {
result = iter->status().ToString();
} else {
result = "[ ";
bool first = true;
while (iter->Valid()) {
ParsedInternalKey ikey;
if (!ParseInternalKey(iter->key(), &ikey)) {
result += "CORRUPTED";
} else {
if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
break;
}
if (!first) {
result += ", ";
}
first = false;
switch (ikey.type) {
case kTypeValue:
result += iter->value().ToString();
break;
case kTypeDeletion:
result += "DEL";
break;
}
}
iter->Next();
}
if (!first) {
result += " ";
}
result += "]";
}
delete iter;
return result;
}
int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
int TotalTableFiles() {
int result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
result += NumTableFilesAtLevel(level);
}
return result;
}
// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < config::kNumLevels; level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
int CountFiles() {
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
return static_cast<int>(files.size());
}
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}
void Compact(const Slice& start, const Slice& limit) {
db_->CompactRange(&start, &limit);
}
// Do n memtable compactions, each of which produces an sstable
// covering the range [small,large].
void MakeTables(int n, const std::string& small, const std::string& large) {
for (int i = 0; i < n; i++) {
Put(small, "begin");
Put(large, "end");
dbfull()->TEST_CompactMemTable();
}
}
// Prevent pushing of new sstables into deeper levels by adding
// tables that cover a specified range to all levels.
void FillLevels(const std::string& smallest, const std::string& largest) {
MakeTables(config::kNumLevels, smallest, largest);
}
void DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %lld\n",
static_cast<long long>(
dbfull()->TEST_MaxNextLevelOverlappingBytes()));
for (int level = 0; level < config::kNumLevels; level++) {
int num = NumTableFilesAtLevel(level);
if (num > 0) {
fprintf(stderr, " level %3d : %d files\n", level, num);
}
}
}
std::string DumpSSTableList() {
std::string property;
db_->GetProperty("leveldb.sstables", &property);
return property;
}
std::string IterStatus(Iterator* iter) {
std::string result;
if (iter->Valid()) {
result = iter->key().ToString() + "->" + iter->value().ToString();
} else {
result = "(invalid)";
}
return result;
}
+
+ bool DeleteAnSSTFile() {
+ std::vector<std::string> filenames;
+ ASSERT_OK(env_->GetChildren(dbname_, &filenames));
+ uint64_t number;
+ FileType type;
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
+ ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
+ return true;
+ }
+ }
+ return false;
+ }
};
TEST(DBTest, Empty) {
do {
ASSERT_TRUE(db_ != NULL);
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, ReadWrite) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
} while (ChangeOptions());
}
TEST(DBTest, PutDeleteGet) {
do {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetFromImmutableLayer) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls
} while (ChangeOptions());
}
TEST(DBTest, GetFromVersions) {
do {
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v1", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetSnapshot) {
do {
// Try with both a short key and a long key
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_OK(Put(key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
db_->ReleaseSnapshot(s1);
}
} while (ChangeOptions());
}
TEST(DBTest, GetLevel0Ordering) {
do {
// Check that we process level-0 files in correct order. The code
// below generates two level-0 files where the earlier one comes
// before the later one in the level-0 file list since the earlier
// one has a smaller "smallest" key.
ASSERT_OK(Put("bar", "b"));
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Put("foo", "v2"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetOrderedByLevels) {
do {
ASSERT_OK(Put("foo", "v1"));
Compact("a", "z");
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetPicksCorrectFile) {
do {
// Arrange to have multiple files in a non-level-0 level.
ASSERT_OK(Put("a", "va"));
Compact("a", "b");
ASSERT_OK(Put("x", "vx"));
Compact("x", "y");
ASSERT_OK(Put("f", "vf"));
Compact("f", "g");
ASSERT_EQ("va", Get("a"));
ASSERT_EQ("vf", Get("f"));
ASSERT_EQ("vx", Get("x"));
} while (ChangeOptions());
}
TEST(DBTest, GetEncountersEmptyLevel) {
do {
// Arrange for the following to happen:
// * sstable A in level 0
// * nothing in level 1
// * sstable B in level 2
// Then do enough Get() calls to arrange for an automatic compaction
// of sstable A. A bug would cause the compaction to be marked as
// occuring at level 1 (instead of the correct level 0).
// Step 1: First place sstables in levels 0 and 2
int compaction_count = 0;
while (NumTableFilesAtLevel(0) == 0 ||
NumTableFilesAtLevel(2) == 0) {
ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
compaction_count++;
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
}
// Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
// Step 3: read a bunch of times
for (int i = 0; i < 1000; i++) {
ASSERT_EQ("NOT_FOUND", Get("missing"));
}
// Step 4: Wait for compaction to finish
- env_->SleepForMicroseconds(1000000);
+ DelayMilliseconds(1000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
} while (ChangeOptions());
}
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("foo");
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterSingle) {
ASSERT_OK(Put("a", "va"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterMulti) {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", "vb"));
ASSERT_OK(Put("c", "vc"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("z");
ASSERT_EQ(IterStatus(iter), "(invalid)");
// Switch from reverse to forward
iter->SeekToLast();
iter->Prev();
iter->Prev();
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Switch from forward to reverse
iter->SeekToFirst();
iter->Next();
iter->Next();
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Make sure iter stays at snapshot
ASSERT_OK(Put("a", "va2"));
ASSERT_OK(Put("a2", "va3"));
ASSERT_OK(Put("b", "vb2"));
ASSERT_OK(Put("c", "vc2"));
ASSERT_OK(Delete("b"));
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterSmallAndLargeMix) {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", std::string(100000, 'b')));
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Put("d", std::string(100000, 'd')));
ASSERT_OK(Put("e", std::string(100000, 'e')));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterMultiWithDelete) {
do {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", "vb"));
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Delete("b"));
ASSERT_EQ("NOT_FOUND", Get("b"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->Seek("c");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
delete iter;
} while (ChangeOptions());
}
TEST(DBTest, Recover) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("baz", "v5"));
Reopen();
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v5", Get("baz"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
Reopen();
ASSERT_EQ("v3", Get("foo"));
ASSERT_OK(Put("foo", "v4"));
ASSERT_EQ("v4", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v5", Get("baz"));
} while (ChangeOptions());
}
TEST(DBTest, RecoveryWithEmptyLog) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("foo", "v2"));
Reopen();
Reopen();
ASSERT_OK(Put("foo", "v3"));
Reopen();
ASSERT_EQ("v3", Get("foo"));
} while (ChangeOptions());
}
// Check that writes done during a memtable compaction are recovered
// if the database is shutdown during the memtable compaction.
TEST(DBTest, RecoverDuringMemtableCompaction) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 1000000;
Reopen(&options);
// Trigger a long memtable compaction and reopen the database during it
ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file
ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable
ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction
ASSERT_OK(Put("bar", "v2")); // Goes to new log file
Reopen(&options);
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
} while (ChangeOptions());
}
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
TEST(DBTest, MinorCompactionsHappen) {
Options options = CurrentOptions();
options.write_buffer_size = 10000;
Reopen(&options);
const int N = 500;
int starting_num_tables = TotalTableFiles();
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
}
int ending_num_tables = TotalTableFiles();
ASSERT_GT(ending_num_tables, starting_num_tables);
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
}
Reopen();
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
}
}
TEST(DBTest, RecoverWithLargeLog) {
{
Options options = CurrentOptions();
Reopen(&options);
ASSERT_OK(Put("big1", std::string(200000, '1')));
ASSERT_OK(Put("big2", std::string(200000, '2')));
ASSERT_OK(Put("small3", std::string(10, '3')));
ASSERT_OK(Put("small4", std::string(10, '4')));
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
}
// Make sure that if we re-open with a small write buffer size that
// we flush table files in the middle of a large log file.
Options options = CurrentOptions();
options.write_buffer_size = 100000;
Reopen(&options);
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
ASSERT_EQ(std::string(200000, '1'), Get("big1"));
ASSERT_EQ(std::string(200000, '2'), Get("big2"));
ASSERT_EQ(std::string(10, '3'), Get("small3"));
ASSERT_EQ(std::string(10, '4'), Get("small4"));
ASSERT_GT(NumTableFilesAtLevel(0), 1);
}
TEST(DBTest, CompactionsGenerateMultipleFiles) {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
for (int i = 0; i < 80; i++) {
values.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), values[i]));
}
// Reopening moves updates to level-0
Reopen(&options);
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 1);
for (int i = 0; i < 80; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
}
TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
// We must have at most one file per level except for level-0,
// which may have up to kL0_StopWritesTrigger files.
const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
Random rnd(301);
std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
for (int i = 0; i < 5 * kMaxFiles; i++) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
}
}
TEST(DBTest, SparseMerge) {
Options options = CurrentOptions();
options.compression = kNoCompression;
Reopen(&options);
FillLevels("A", "Z");
// Suppose there is:
// small amount of data with prefix A
// large amount of data with prefix B
// small amount of data with prefix C
// and that recent updates have made small changes to all three prefixes.
// Check that we do not do a compaction that merges all of B in one shot.
const std::string value(1000, 'x');
Put("A", "va");
// Write approximately 100MB of "B" values
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
Put("C", "vc");
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
// Make sparse update
Put("A", "va2");
Put("B100", "bvalue2");
Put("C", "vc2");
dbfull()->TEST_CompactMemTable();
// Compactions should not cause us to create a situation where
// a file overlaps too much data at the next level.
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) {
bool result = (val >= low) && (val <= high);
if (!result) {
fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
(unsigned long long)(val),
(unsigned long long)(low),
(unsigned long long)(high));
}
return result;
}
TEST(DBTest, ApproximateSizes) {
do {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
options.compression = kNoCompression;
DestroyAndReopen();
ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
Reopen(&options);
ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
const int N = 80;
static const int S1 = 100000;
static const int S2 = 105000; // Allow some expansion from metadata
Random rnd(301);
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, S1)));
}
// 0 because GetApproximateSizes() does not account for memtable space
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
for (int compact_start = 0; compact_start < N; compact_start += 10) {
for (int i = 0; i < N; i += 10) {
ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i));
ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1)));
ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10));
}
ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50));
ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50));
std::string cstart_str = Key(compact_start);
std::string cend_str = Key(compact_start + 9);
Slice cstart = cstart_str;
Slice cend = cend_str;
dbfull()->TEST_CompactRange(0, &cstart, &cend);
}
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 0);
}
} while (ChangeOptions());
}
TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
do {
Options options = CurrentOptions();
options.compression = kNoCompression;
Reopen();
Random rnd(301);
std::string big1 = RandomString(&rnd, 100000);
ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(2), big1));
ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(4), big1));
ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
ASSERT_TRUE(Between(Size("", Key(0)), 0, 0));
ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000));
ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000));
ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000));
ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000));
ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000));
ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000));
ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000));
ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000));
ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
dbfull()->TEST_CompactRange(0, NULL, NULL);
}
} while (ChangeOptions());
}
TEST(DBTest, IteratorPinsRef) {
Put("foo", "hello");
// Get iterator that will yield the current contents of the DB.
Iterator* iter = db_->NewIterator(ReadOptions());
// Write to force compactions
Put("foo", "newvalue1");
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
}
Put("foo", "newvalue2");
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("hello", iter->value().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
delete iter;
}
TEST(DBTest, Snapshot) {
do {
Put("foo", "v1");
const Snapshot* s1 = db_->GetSnapshot();
Put("foo", "v2");
const Snapshot* s2 = db_->GetSnapshot();
Put("foo", "v3");
const Snapshot* s3 = db_->GetSnapshot();
Put("foo", "v4");
ASSERT_EQ("v1", Get("foo", s1));
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v3", Get("foo", s3));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s3);
ASSERT_EQ("v1", Get("foo", s1));
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s1);
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s2);
ASSERT_EQ("v4", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, HiddenValuesAreRemoved) {
do {
Random rnd(301);
FillLevels("a", "z");
std::string big = RandomString(&rnd, 50000);
Put("foo", big);
Put("pastfoo", "v");
const Snapshot* snapshot = db_->GetSnapshot();
Put("foo", "tiny");
Put("pastfoo2", "v2"); // Advance sequence number one more
ASSERT_OK(dbfull()->TEST_CompactMemTable());
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(big, Get("foo", snapshot));
ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000));
db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
Slice x("x");
dbfull()->TEST_CompactRange(0, NULL, &x);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GE(NumTableFilesAtLevel(1), 1);
dbfull()->TEST_CompactRange(1, NULL, &x);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
} while (ChangeOptions());
}
TEST(DBTest, DeletionMarkers1) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo");
Put("foo", "v2");
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
Slice z("z");
dbfull()->TEST_CompactRange(last-2, NULL, &z);
// DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(last-1, NULL, NULL);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
}
TEST(DBTest, DeletionMarkers2) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo");
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last-2, NULL, NULL);
// DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last-1, NULL, NULL);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
}
TEST(DBTest, OverlapInLevel0) {
do {
ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
ASSERT_OK(Put("100", "v100"));
ASSERT_OK(Put("999", "v999"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Delete("100"));
ASSERT_OK(Delete("999"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("0,1,1", FilesPerLevel());
// Make files spanning the following ranges in level-0:
// files[0] 200 .. 900
// files[1] 300 .. 500
// Note that files are sorted by smallest key.
ASSERT_OK(Put("300", "v300"));
ASSERT_OK(Put("500", "v500"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Put("200", "v200"));
ASSERT_OK(Put("600", "v600"));
ASSERT_OK(Put("900", "v900"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("2,1,1", FilesPerLevel());
// Compact away the placeholder files we created initially
dbfull()->TEST_CompactRange(1, NULL, NULL);
dbfull()->TEST_CompactRange(2, NULL, NULL);
ASSERT_EQ("2", FilesPerLevel());
// Do a memtable compaction. Before bug-fix, the compaction would
// not detect the overlap with level-0 files and would incorrectly place
// the deletion in a deeper level.
ASSERT_OK(Delete("600"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("3", FilesPerLevel());
ASSERT_EQ("NOT_FOUND", Get("600"));
} while (ChangeOptions());
}
TEST(DBTest, L0_CompactionBug_Issue44_a) {
Reopen();
ASSERT_OK(Put("b", "v"));
Reopen();
ASSERT_OK(Delete("b"));
ASSERT_OK(Delete("a"));
Reopen();
ASSERT_OK(Delete("a"));
Reopen();
ASSERT_OK(Put("a", "v"));
Reopen();
Reopen();
ASSERT_EQ("(a->v)", Contents());
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
ASSERT_EQ("(a->v)", Contents());
}
TEST(DBTest, L0_CompactionBug_Issue44_b) {
Reopen();
Put("","");
Reopen();
Delete("e");
Put("","");
Reopen();
Put("c", "cv");
Reopen();
Put("","");
Reopen();
Put("","");
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
Reopen();
Put("d","dv");
Reopen();
Put("","");
Reopen();
Delete("d");
Delete("b");
Reopen();
ASSERT_EQ("(->)(c->cv)", Contents());
- env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
+ DelayMilliseconds(1000); // Wait for compaction to finish
ASSERT_EQ("(->)(c->cv)", Contents());
}
TEST(DBTest, ComparatorCheck) {
class NewComparator : public Comparator {
public:
virtual const char* Name() const { return "leveldb.NewComparator"; }
virtual int Compare(const Slice& a, const Slice& b) const {
return BytewiseComparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
BytewiseComparator()->FindShortestSeparator(s, l);
}
virtual void FindShortSuccessor(std::string* key) const {
BytewiseComparator()->FindShortSuccessor(key);
}
};
NewComparator cmp;
Options new_options = CurrentOptions();
new_options.comparator = &cmp;
Status s = TryReopen(&new_options);
ASSERT_TRUE(!s.ok());
ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
<< s.ToString();
}
TEST(DBTest, CustomComparator) {
class NumberComparator : public Comparator {
public:
virtual const char* Name() const { return "test.NumberComparator"; }
virtual int Compare(const Slice& a, const Slice& b) const {
return ToNumber(a) - ToNumber(b);
}
virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
ToNumber(*s); // Check format
ToNumber(l); // Check format
}
virtual void FindShortSuccessor(std::string* key) const {
ToNumber(*key); // Check format
}
private:
static int ToNumber(const Slice& x) {
// Check that there are no extra characters.
ASSERT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size()-1] == ']')
<< EscapeString(x);
int val;
char ignored;
ASSERT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
<< EscapeString(x);
return val;
}
};
NumberComparator cmp;
Options new_options = CurrentOptions();
new_options.create_if_missing = true;
new_options.comparator = &cmp;
new_options.filter_policy = NULL; // Cannot use bloom filters
new_options.write_buffer_size = 1000; // Compact more often
DestroyAndReopen(&new_options);
ASSERT_OK(Put("[10]", "ten"));
ASSERT_OK(Put("[0x14]", "twenty"));
for (int i = 0; i < 2; i++) {
ASSERT_EQ("ten", Get("[10]"));
ASSERT_EQ("ten", Get("[0xa]"));
ASSERT_EQ("twenty", Get("[20]"));
ASSERT_EQ("twenty", Get("[0x14]"));
ASSERT_EQ("NOT_FOUND", Get("[15]"));
ASSERT_EQ("NOT_FOUND", Get("[0xf]"));
Compact("[0]", "[9999]");
}
for (int run = 0; run < 2; run++) {
for (int i = 0; i < 1000; i++) {
char buf[100];
snprintf(buf, sizeof(buf), "[%d]", i*10);
ASSERT_OK(Put(buf, buf));
}
Compact("[0]", "[1000000]");
}
}
TEST(DBTest, ManualCompaction) {
ASSERT_EQ(config::kMaxMemCompactLevel, 2)
<< "Need to update this test to match kMaxMemCompactLevel";
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(NULL, NULL);
ASSERT_EQ("0,0,1", FilesPerLevel());
}
TEST(DBTest, DBOpen_Options) {
std::string dbname = test::TmpDir() + "/db_options_test";
DestroyDB(dbname, Options());
// Does not exist, and create_if_missing == false: error
DB* db = NULL;
Options opts;
opts.create_if_missing = false;
Status s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != NULL);
ASSERT_TRUE(db == NULL);
// Does not exist, and create_if_missing == true: OK
opts.create_if_missing = true;
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
// Does exist, and error_if_exists == true: error
opts.create_if_missing = false;
opts.error_if_exists = true;
s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != NULL);
ASSERT_TRUE(db == NULL);
// Does exist, and error_if_exists == false: OK
opts.create_if_missing = true;
opts.error_if_exists = false;
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
}
TEST(DBTest, Locking) {
DB* db2 = NULL;
Status s = DB::Open(CurrentOptions(), dbname_, &db2);
ASSERT_TRUE(!s.ok()) << "Locking did not prevent re-opening db";
}
// Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL);
}
}
env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
}
+TEST(DBTest, ExponentialBackoff) {
+ Options options = CurrentOptions();
+ options.env = env_;
+ Reopen(&options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+ Compact("a", "z");
+ env_->non_writable_.Release_Store(env_); // Force errors for new files
+ env_->sleep_counter_.Reset();
+ env_->sleep_time_counter_.Reset();
+ for (int i = 0; i < 5; i++) {
+ dbfull()->TEST_CompactRange(2, NULL, NULL);
+ }
+ env_->non_writable_.Release_Store(NULL);
+
+ // Wait for compaction to finish
+ DelayMilliseconds(1000);
+
+ ASSERT_GE(env_->sleep_counter_.Read(), 5);
+ ASSERT_LT(env_->sleep_counter_.Read(), 10);
+ ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
+}
+
TEST(DBTest, NonWritableFileSystem) {
Options options = CurrentOptions();
options.write_buffer_size = 1000;
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->non_writable_.Release_Store(env_); // Force errors for new files
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
fprintf(stderr, "iter %d; errors %d\n", i, errors);
if (!Put("foo", big).ok()) {
errors++;
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(NULL);
}
TEST(DBTest, ManifestWriteError) {
// Test for the following problem:
// (a) Compaction produces file F
// (b) Log record containing F is written to MANIFEST file, but Sync() fails
// (c) GC deletes F
// (d) After reopening DB, reads fail since deleted F is named in log record
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
port::AtomicPointer* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
// Insert foo=>bar mapping
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.error_if_exists = false;
DestroyAndReopen(&options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ("bar", Get("foo"));
// Memtable compaction (will succeed)
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("bar", Get("foo"));
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
error_type->Release_Store(env_);
dbfull()->TEST_CompactRange(last, NULL, NULL); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
error_type->Release_Store(NULL);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
}
+TEST(DBTest, MissingSSTFile) {
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Dump the memtable to disk.
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("bar", Get("foo"));
+
+ Close();
+ ASSERT_TRUE(DeleteAnSSTFile());
+ Options options = CurrentOptions();
+ options.paranoid_checks = true;
+ Status s = TryReopen(&options);
+ ASSERT_TRUE(!s.ok());
+ ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
+ << s.ToString();
+}
+
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
const int num_files = CountFiles();
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
}
ASSERT_EQ(CountFiles(), num_files);
}
TEST(DBTest, BloomFilter) {
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.block_cache = NewLRUCache(0); // Prevent cache hits
options.filter_policy = NewBloomFilterPolicy(10);
Reopen(&options);
// Populate multiple layers
const int N = 10000;
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i)));
}
Compact("a", "z");
for (int i = 0; i < N; i += 100) {
ASSERT_OK(Put(Key(i), Key(i)));
}
dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks
env_->delay_sstable_sync_.Release_Store(env_);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i), Get(Key(i)));
}
int reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d present => %d reads\n", N, reads);
ASSERT_GE(reads, N);
ASSERT_LE(reads, N + 2*N/100);
// Lookup present keys. Should rarely read from either sstable.
env_->random_read_counter_.Reset();
for (int i = 0; i < N; i++) {
ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing"));
}
reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
env_->delay_sstable_sync_.Release_Store(NULL);
Close();
delete options.block_cache;
delete options.filter_policy;
}
// Multi-threaded test:
namespace {
static const int kNumThreads = 4;
static const int kTestSeconds = 10;
static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
port::AtomicPointer stop;
port::AtomicPointer counter[kNumThreads];
port::AtomicPointer thread_done[kNumThreads];
};
struct MTThread {
MTState* state;
int id;
};
static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) {
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
snprintf(keybuf, sizeof(keybuf), "%016d", key);
if (rnd.OneIn(2)) {
// Write values of the form <key, my id, counter>.
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
if (s.IsNotFound()) {
// Key has not yet been written
} else {
// Check that the writer thread counter is >= the counter in the value
ASSERT_OK(s);
int k, w, c;
ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
ASSERT_LE(c, reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load()));
}
}
counter++;
}
t->state->thread_done[id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
}
} // namespace
TEST(DBTest, MultiThreaded) {
do {
// Initialize state
MTState mt;
mt.test = this;
mt.stop.Release_Store(0);
for (int id = 0; id < kNumThreads; id++) {
mt.counter[id].Release_Store(0);
mt.thread_done[id].Release_Store(0);
}
// Start threads
MTThread thread[kNumThreads];
for (int id = 0; id < kNumThreads; id++) {
thread[id].state = &mt;
thread[id].id = id;
env_->StartThread(MTThreadBody, &thread[id]);
}
// Let them run for a while
- env_->SleepForMicroseconds(kTestSeconds * 1000000);
+ DelayMilliseconds(kTestSeconds * 1000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == NULL) {
- env_->SleepForMicroseconds(100000);
+ DelayMilliseconds(100);
}
}
} while (ChangeOptions());
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}
class ModelDB: public DB {
public:
class ModelSnapshot : public Snapshot {
public:
KVMap map_;
};
explicit ModelDB(const Options& options): options_(options) { }
~ModelDB() { }
virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
return DB::Put(o, k, v);
}
virtual Status Delete(const WriteOptions& o, const Slice& key) {
return DB::Delete(o, key);
}
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) {
assert(false); // Not implemented
return Status::NotFound(key);
}
virtual Iterator* NewIterator(const ReadOptions& options) {
if (options.snapshot == NULL) {
KVMap* saved = new KVMap;
*saved = map_;
return new ModelIter(saved, true);
} else {
const KVMap* snapshot_state =
&(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
return new ModelIter(snapshot_state, false);
}
}
virtual const Snapshot* GetSnapshot() {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
return snapshot;
}
virtual void ReleaseSnapshot(const Snapshot* snapshot) {
delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
class Handler : public WriteBatch::Handler {
public:
KVMap* map_;
virtual void Put(const Slice& key, const Slice& value) {
(*map_)[key.ToString()] = value.ToString();
}
virtual void Delete(const Slice& key) {
map_->erase(key.ToString());
}
};
Handler handler;
handler.map_ = &map_;
return batch->Iterate(&handler);
}
virtual bool GetProperty(const Slice& property, std::string* value) {
return false;
}
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
for (int i = 0; i < n; i++) {
sizes[i] = 0;
}
}
virtual void CompactRange(const Slice* start, const Slice* end) {
}
private:
class ModelIter: public Iterator {
public:
ModelIter(const KVMap* map, bool owned)
: map_(map), owned_(owned), iter_(map_->end()) {
}
~ModelIter() {
if (owned_) delete map_;
}
virtual bool Valid() const { return iter_ != map_->end(); }
virtual void SeekToFirst() { iter_ = map_->begin(); }
virtual void SeekToLast() {
if (map_->empty()) {
iter_ = map_->end();
} else {
iter_ = map_->find(map_->rbegin()->first);
}
}
virtual void Seek(const Slice& k) {
iter_ = map_->lower_bound(k.ToString());
}
virtual void Next() { ++iter_; }
virtual void Prev() { --iter_; }
virtual Slice key() const { return iter_->first; }
virtual Slice value() const { return iter_->second; }
virtual Status status() const { return Status::OK(); }
private:
const KVMap* const map_;
const bool owned_; // Do we own map_
KVMap::const_iterator iter_;
};
const Options options_;
KVMap map_;
};
static std::string RandomKey(Random* rnd) {
int len = (rnd->OneIn(3)
? 1 // Short sometimes to encourage collisions
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
return test::RandomKey(rnd, len);
}
static bool CompareIterators(int step,
DB* model,
DB* db,
const Snapshot* model_snap,
const Snapshot* db_snap) {
ReadOptions options;
options.snapshot = model_snap;
Iterator* miter = model->NewIterator(options);
options.snapshot = db_snap;
Iterator* dbiter = db->NewIterator(options);
bool ok = true;
int count = 0;
for (miter->SeekToFirst(), dbiter->SeekToFirst();
ok && miter->Valid() && dbiter->Valid();
miter->Next(), dbiter->Next()) {
count++;
if (miter->key().compare(dbiter->key()) != 0) {
fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
step,
EscapeString(miter->key()).c_str(),
EscapeString(dbiter->key()).c_str());
ok = false;
break;
}
if (miter->value().compare(dbiter->value()) != 0) {
fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
step,
EscapeString(miter->key()).c_str(),
EscapeString(miter->value()).c_str(),
EscapeString(miter->value()).c_str());
ok = false;
}
}
if (ok) {
if (miter->Valid() != dbiter->Valid()) {
fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
step, miter->Valid(), dbiter->Valid());
ok = false;
}
}
fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
delete miter;
delete dbiter;
return ok;
}
TEST(DBTest, Randomized) {
Random rnd(test::RandomSeed());
do {
ModelDB model(CurrentOptions());
const int N = 10000;
const Snapshot* model_snap = NULL;
const Snapshot* db_snap = NULL;
std::string k, v;
for (int step = 0; step < N; step++) {
if (step % 100 == 0) {
fprintf(stderr, "Step %d of %d\n", step, N);
}
// TODO(sanjay): Test Get() works
int p = rnd.Uniform(100);
if (p < 45) { // Put
k = RandomKey(&rnd);
v = RandomString(&rnd,
rnd.OneIn(20)
? 100 + rnd.Uniform(100)
: rnd.Uniform(8));
ASSERT_OK(model.Put(WriteOptions(), k, v));
ASSERT_OK(db_->Put(WriteOptions(), k, v));
} else if (p < 90) { // Delete
k = RandomKey(&rnd);
ASSERT_OK(model.Delete(WriteOptions(), k));
ASSERT_OK(db_->Delete(WriteOptions(), k));
} else { // Multi-element batch
WriteBatch b;
const int num = rnd.Uniform(8);
for (int i = 0; i < num; i++) {
if (i == 0 || !rnd.OneIn(10)) {
k = RandomKey(&rnd);
} else {
// Periodically re-use the same key from the previous iter, so
// we have multiple entries in the write batch for the same key
}
if (rnd.OneIn(2)) {
v = RandomString(&rnd, rnd.Uniform(10));
b.Put(k, v);
} else {
b.Delete(k);
}
}
ASSERT_OK(model.Write(WriteOptions(), &b));
ASSERT_OK(db_->Write(WriteOptions(), &b));
}
if ((step % 100) == 0) {
ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
// Save a snapshot from each DB this time that we'll use next
// time we compare things, to make sure the current state is
// preserved with the snapshot
if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
Reopen();
ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
model_snap = model.GetSnapshot();
db_snap = db_->GetSnapshot();
}
}
if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
} while (ChangeOptions());
}
std::string MakeKey(unsigned int num) {
char buf[30];
snprintf(buf, sizeof(buf), "%016u", num);
return std::string(buf);
}
void BM_LogAndApply(int iters, int num_base_files) {
std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
DestroyDB(dbname, Options());
DB* db = NULL;
Options opts;
opts.create_if_missing = true;
Status s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
Env* env = Env::Default();
port::Mutex mu;
MutexLock l(&mu);
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
}
ASSERT_OK(vset.LogAndApply(&vbase, &mu));
uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) {
VersionEdit vedit;
vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
vset.LogAndApply(&vedit, &mu);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;
char buf[16];
snprintf(buf, sizeof(buf), "%d", num_base_files);
fprintf(stderr,
"BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n",
buf, iters, us, ((float)us) / iters);
}
} // namespace leveldb
int main(int argc, char** argv) {
if (argc > 1 && std::string(argv[1]) == "--benchmark") {
leveldb::BM_LogAndApply(1000, 1);
leveldb::BM_LogAndApply(1000, 100);
leveldb::BM_LogAndApply(1000, 10000);
leveldb::BM_LogAndApply(100, 100000);
return 0;
}
return leveldb::test::RunAllTests();
}
diff --git a/src/leveldb/db/dbformat.cc b/src/leveldb/db/dbformat.cc
index 28e11b398d..20a7ca4462 100644
--- a/src/leveldb/db/dbformat.cc
+++ b/src/leveldb/db/dbformat.cc
@@ -1,140 +1,140 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdio.h>
#include "db/dbformat.h"
#include "port/port.h"
#include "util/coding.h"
namespace leveldb {
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | t;
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
std::string ParsedInternalKey::DebugString() const {
char buf[50];
snprintf(buf, sizeof(buf), "' @ %llu : %d",
(unsigned long long) sequence,
int(type));
std::string result = "'";
- result += user_key.ToString();
+ result += EscapeString(user_key.ToString());
result += buf;
return result;
}
std::string InternalKey::DebugString() const {
std::string result;
ParsedInternalKey parsed;
if (ParseInternalKey(rep_, &parsed)) {
result = parsed.DebugString();
} else {
result = "(bad)";
result.append(EscapeString(rep_));
}
return result;
}
const char* InternalKeyComparator::Name() const {
return "leveldb.InternalKeyComparator";
}
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// Attempt to shorten the user portion of the key
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}
void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}
const char* InternalFilterPolicy::Name() const {
return user_policy_->Name();
}
void InternalFilterPolicy::CreateFilter(const Slice* keys, int n,
std::string* dst) const {
// We rely on the fact that the code in table.cc does not mind us
// adjusting keys[].
Slice* mkey = const_cast<Slice*>(keys);
for (int i = 0; i < n; i++) {
mkey[i] = ExtractUserKey(keys[i]);
// TODO(sanjay): Suppress dups?
}
user_policy_->CreateFilter(keys, n, dst);
}
bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
}
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
} // namespace leveldb
diff --git a/src/leveldb/db/filename_test.cc b/src/leveldb/db/filename_test.cc
index 47353d6c9a..5a26da4728 100644
--- a/src/leveldb/db/filename_test.cc
+++ b/src/leveldb/db/filename_test.cc
@@ -1,122 +1,122 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/filename.h"
#include "db/dbformat.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/testharness.h"
namespace leveldb {
class FileNameTest { };
TEST(FileNameTest, Parse) {
Slice db;
FileType type;
uint64_t number;
// Successful parses
static struct {
const char* fname;
uint64_t number;
FileType type;
} cases[] = {
{ "100.log", 100, kLogFile },
{ "0.log", 0, kLogFile },
{ "0.sst", 0, kTableFile },
{ "CURRENT", 0, kCurrentFile },
{ "LOCK", 0, kDBLockFile },
{ "MANIFEST-2", 2, kDescriptorFile },
{ "MANIFEST-7", 7, kDescriptorFile },
{ "LOG", 0, kInfoLogFile },
{ "LOG.old", 0, kInfoLogFile },
{ "18446744073709551615.log", 18446744073709551615ull, kLogFile },
};
for (int i = 0; i < sizeof(cases) / sizeof(cases[0]); i++) {
std::string f = cases[i].fname;
ASSERT_TRUE(ParseFileName(f, &number, &type)) << f;
ASSERT_EQ(cases[i].type, type) << f;
ASSERT_EQ(cases[i].number, number) << f;
}
// Errors
static const char* errors[] = {
"",
"foo",
"foo-dx-100.log",
".log",
"",
"manifest",
"CURREN",
"CURRENTX",
"MANIFES",
"MANIFEST",
"MANIFEST-",
"XMANIFEST-3",
"MANIFEST-3x",
"LOC",
"LOCKx",
"LO",
"LOGx",
"18446744073709551616.log",
"184467440737095516150.log",
"100",
"100.",
"100.lop"
};
for (int i = 0; i < sizeof(errors) / sizeof(errors[0]); i++) {
std::string f = errors[i];
ASSERT_TRUE(!ParseFileName(f, &number, &type)) << f;
- };
+ }
}
TEST(FileNameTest, Construction) {
uint64_t number;
FileType type;
std::string fname;
fname = CurrentFileName("foo");
ASSERT_EQ("foo/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(0, number);
ASSERT_EQ(kCurrentFile, type);
fname = LockFileName("foo");
ASSERT_EQ("foo/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(0, number);
ASSERT_EQ(kDBLockFile, type);
fname = LogFileName("foo", 192);
ASSERT_EQ("foo/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(192, number);
ASSERT_EQ(kLogFile, type);
fname = TableFileName("bar", 200);
ASSERT_EQ("bar/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(200, number);
ASSERT_EQ(kTableFile, type);
fname = DescriptorFileName("bar", 100);
ASSERT_EQ("bar/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(100, number);
ASSERT_EQ(kDescriptorFile, type);
fname = TempFileName("tmp", 999);
ASSERT_EQ("tmp/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
ASSERT_EQ(999, number);
ASSERT_EQ(kTempFile, type);
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc
index 7d0a5de2b9..4fd1ddef21 100644
--- a/src/leveldb/db/version_set.cc
+++ b/src/leveldb/db/version_set.cc
@@ -1,1438 +1,1443 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "leveldb/env.h"
#include "leveldb/table_builder.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
namespace leveldb {
static const int kTargetFileSize = 2 * 1048576;
// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static double MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1
while (level > 1) {
result *= 10;
level--;
}
return result;
}
static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
}
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
sum += files[i]->file_size;
}
return sum;
}
namespace {
std::string IntSetToString(const std::set<uint64_t>& s) {
std::string result = "{";
for (std::set<uint64_t>::const_iterator it = s.begin();
it != s.end();
++it) {
result += (result.size() > 1) ? "," : "";
result += NumberToString(*it);
}
result += "}";
return result;
}
} // namespace
Version::~Version() {
assert(refs_ == 0);
// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
static bool AfterFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs before all keys and is therefore never after *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->largest.user_key()) > 0);
}
static bool BeforeFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs after all keys and is therefore never before *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
}
bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
// Need to check against all files
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
// No overlap
} else {
return true; // Overlap
}
}
return false;
}
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != NULL) {
// Find the earliest possible internal key for smallest_user_key
InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
index = FindFile(icmp, files, small.Encode());
}
if (index >= files.size()) {
// beginning of range is after all files, so no overlap.
return false;
}
return !BeforeFile(ucmp, largest_user_key, files[index]);
}
// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
}
virtual bool Valid() const {
return index_ < flist_->size();
}
virtual void Seek(const Slice& target) {
index_ = FindFile(icmp_, *flist_, target);
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
virtual void Next() {
assert(Valid());
index_++;
}
virtual void Prev() {
assert(Valid());
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const {
assert(Valid());
return (*flist_)[index_]->largest.Encode();
}
Slice value() const {
assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number);
EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
return Slice(value_buf_, sizeof(value_buf_));
}
virtual Status status() const { return Status::OK(); }
private:
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;
// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
};
static Iterator* GetFileIterator(void* arg,
const ReadOptions& options,
const Slice& file_value) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value"));
} else {
return cache->NewIterator(options,
DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
}
}
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
}
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(
vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
}
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
};
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
std::string* value;
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
}
}
}
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) {
return s;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
void Version::Ref() {
++refs_;
}
void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
}
}
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
}
int Version::PickLevelForMemTableOutput(
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
break;
}
level++;
}
}
return level;
}
// Store in "*inputs" all files in "level" that overlap [begin,end]
void Version::GetOverlappingInputs(
int level,
const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
inputs->clear();
Slice user_begin, user_end;
if (begin != NULL) {
user_begin = begin->user_key();
}
if (end != NULL) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
for (size_t i = 0; i < files_[level].size(); ) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
const Slice file_limit = f->largest.user_key();
if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {
inputs->push_back(f);
if (level == 0) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}
}
std::string Version::DebugString() const {
std::string r;
for (int level = 0; level < config::kNumLevels; level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
// 20:43['e' .. 'g']
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
AppendNumberTo(&r, files[i]->number);
r.push_back(':');
AppendNumberTo(&r, files[i]->file_size);
r.append("[");
r.append(files[i]->smallest.DebugString());
r.append(" .. ");
r.append(files[i]->largest.DebugString());
r.append("]\n");
}
}
return r;
}
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base)
: vset_(vset),
base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin();
it != added->end(); ++it) {
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
#endif
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};
VersionSet::VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
: env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
icmp_(*cmp),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
dummy_versions_(this),
current_(NULL) {
AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != NULL) {
current_->Unref();
}
current_ = v;
v->Ref();
// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
if (ManifestContains(record)) {
Log(options_->info_log,
"MANIFEST contains log record despite error; advancing to new "
"version to prevent mismatch between in-memory and logged state");
s = Status::OK();
}
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
// No need to double-check MANIFEST in case of error since it
// will be discarded below.
}
mu->Lock();
}
// Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++) {
if (!compact_pointer_[level].empty()) {
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
edit.SetCompactPointer(level, key);
}
}
// Save files
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
}
}
std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
}
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
snprintf(scratch->buffer, sizeof(scratch->buffer),
"files[ %d %d %d %d %d %d %d ]",
int(current_->files_[0].size()),
int(current_->files_[1].size()),
int(current_->files_[2].size()),
int(current_->files_[3].size()),
int(current_->files_[4].size()),
int(current_->files_[5].size()),
int(current_->files_[6].size()));
return scratch->buffer;
}
// Return true iff the manifest contains the specified record.
bool VersionSet::ManifestContains(const std::string& record) const {
std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
SequentialFile* file = NULL;
Status s = env_->NewSequentialFile(fname, &file);
if (!s.ok()) {
Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
return false;
}
log::Reader reader(file, NULL, true/*checksum*/, 0);
Slice r;
std::string scratch;
bool result = false;
while (reader.ReadRecord(&r, &scratch)) {
if (r == Slice(record)) {
result = true;
break;
}
}
delete file;
Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
return result;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
// Entire file is before "ikey", so just add the file size
result += files[i]->file_size;
} else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
// Entire file is after "ikey", so ignore
if (level > 0) {
// Files other than level 0 are sorted by meta->smallest, so
// no further files in this level will contain data for
// "ikey".
break;
}
} else {
// "ikey" falls in the range for this table. Add the
// approximate offset of "ikey" within the table.
Table* tableptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
if (tableptr != NULL) {
result += tableptr->ApproximateOffsetOf(ikey.Encode());
}
delete iter;
}
}
}
return result;
}
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_;
v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return TotalFileSize(current_->files_[level]);
}
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
&overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_.Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_.Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
}
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, files[i]->number, files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
delete[] list;
return result;
}
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);
return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
smallest.DebugString().c_str(),
largest.DebugString().c_str());
}
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
Compaction* VersionSet::CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return NULL;
}
// Avoid compacting too much in one shot in case the range is large.
- const uint64_t limit = MaxFileSizeForLevel(level);
- uint64_t total = 0;
- for (size_t i = 0; i < inputs.size(); i++) {
- uint64_t s = inputs[i]->file_size;
- total += s;
- if (total >= limit) {
- inputs.resize(i + 1);
- break;
+ // But we cannot do this for level-0 since level-0 files can overlap
+ // and we must not pick one file and drop another older file if the
+ // two files overlap.
+ if (level > 0) {
+ const uint64_t limit = MaxFileSizeForLevel(level);
+ uint64_t total = 0;
+ for (size_t i = 0; i < inputs.size(); i++) {
+ uint64_t s = inputs[i]->file_size;
+ total += s;
+ if (total >= limit) {
+ inputs.resize(i + 1);
+ break;
+ }
}
}
Compaction* c = new Compaction(level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
SetupOtherInputs(c);
return c;
}
Compaction::Compaction(int level)
: level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
input_version_(NULL),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
if (input_version_ != NULL) {
input_version_->Unref();
}
}
bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->DeleteFile(level_ + which, inputs_[which][i]->number);
}
}
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so definitely not base level
return false;
}
break;
}
level_ptrs_[lvl]++;
}
}
return true;
}
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
void Compaction::ReleaseInputs() {
if (input_version_ != NULL) {
input_version_->Unref();
input_version_ = NULL;
}
}
} // namespace leveldb
diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h
index 29d3674479..da8b11a8c0 100644
--- a/src/leveldb/include/leveldb/db.h
+++ b/src/leveldb/include/leveldb/db.h
@@ -1,161 +1,161 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_DB_H_
#define STORAGE_LEVELDB_INCLUDE_DB_H_
#include <stdint.h>
#include <stdio.h>
#include "leveldb/iterator.h"
#include "leveldb/options.h"
namespace leveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
-static const int kMinorVersion = 9;
+static const int kMinorVersion = 12;
struct Options;
struct ReadOptions;
struct WriteOptions;
class WriteBatch;
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
class Snapshot {
protected:
virtual ~Snapshot();
};
// A range of keys
struct Range {
Slice start; // Included in the range
Slice limit; // Not included in the range
Range() { }
Range(const Slice& s, const Slice& l) : start(s), limit(l) { }
};
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
public:
// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores NULL in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
DB() { }
virtual ~DB();
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
//
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
// snapshot is no longer needed.
virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not
// use "snapshot" after this call.
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
// DB implementations can export properties about their state
// via this method. If "property" is a valid property understood by this
// DB implementation, fills "*value" with its current value and returns
// true. Otherwise returns false.
//
//
// Valid property names include:
//
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
// where <N> is an ASCII representation of a level number (e.g. "0").
// "leveldb.stats" - returns a multi-line string that describes statistics
// about the internal operation of the DB.
// "leveldb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents.
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate
// file system space used by keys in "[range[i].start .. range[i].limit)".
//
// Note that the returned sizes measure file system space usage, so
// if the user data compresses by a factor of ten, the returned
// sizes will be one-tenth the size of the corresponding user data size.
//
// The results may not include the sizes of recently written data.
virtual void GetApproximateSizes(const Range* range, int n,
uint64_t* sizes) = 0;
// Compact the underlying storage for the key range [*begin,*end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// begin==NULL is treated as a key before all keys in the database.
// end==NULL is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(NULL, NULL);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
private:
// No copying allowed
DB(const DB&);
void operator=(const DB&);
};
// Destroy the contents of the specified database.
// Be very careful using this method.
Status DestroyDB(const std::string& name, const Options& options);
// If a DB cannot be opened, you may attempt to call this method to
// resurrect as much of the contents of the database as possible.
// Some data may be lost, so be careful when calling this function
// on a database that contains important information.
Status RepairDB(const std::string& dbname, const Options& options);
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_DB_H_
diff --git a/src/leveldb/port/port_win.cc b/src/leveldb/port/port_win.cc
index 99c1d8e346..1b0f060a19 100644
--- a/src/leveldb/port/port_win.cc
+++ b/src/leveldb/port/port_win.cc
@@ -1,149 +1,147 @@
// LevelDB Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of the University of California, Berkeley nor the
// names of its contributors may be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
#include "port/port_win.h"
#include <windows.h>
#include <cassert>
namespace leveldb {
namespace port {
Mutex::Mutex() :
cs_(NULL) {
assert(!cs_);
cs_ = static_cast<void *>(new CRITICAL_SECTION());
::InitializeCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
assert(cs_);
}
Mutex::~Mutex() {
assert(cs_);
::DeleteCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
delete static_cast<CRITICAL_SECTION *>(cs_);
cs_ = NULL;
assert(!cs_);
}
void Mutex::Lock() {
assert(cs_);
::EnterCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
}
void Mutex::Unlock() {
assert(cs_);
::LeaveCriticalSection(static_cast<CRITICAL_SECTION *>(cs_));
}
void Mutex::AssertHeld() {
assert(cs_);
assert(1);
}
CondVar::CondVar(Mutex* mu) :
waiting_(0),
mu_(mu),
sem1_(::CreateSemaphore(NULL, 0, 10000, NULL)),
sem2_(::CreateSemaphore(NULL, 0, 10000, NULL)) {
assert(mu_);
}
CondVar::~CondVar() {
::CloseHandle(sem1_);
::CloseHandle(sem2_);
}
void CondVar::Wait() {
mu_->AssertHeld();
wait_mtx_.Lock();
++waiting_;
wait_mtx_.Unlock();
mu_->Unlock();
// initiate handshake
::WaitForSingleObject(sem1_, INFINITE);
::ReleaseSemaphore(sem2_, 1, NULL);
mu_->Lock();
}
void CondVar::Signal() {
wait_mtx_.Lock();
if (waiting_ > 0) {
--waiting_;
// finalize handshake
::ReleaseSemaphore(sem1_, 1, NULL);
::WaitForSingleObject(sem2_, INFINITE);
}
wait_mtx_.Unlock();
}
void CondVar::SignalAll() {
wait_mtx_.Lock();
- for(long i = 0; i < waiting_; ++i) {
- ::ReleaseSemaphore(sem1_, 1, NULL);
- while(waiting_ > 0) {
- --waiting_;
- ::WaitForSingleObject(sem2_, INFINITE);
- }
+ ::ReleaseSemaphore(sem1_, waiting_, NULL);
+ while(waiting_ > 0) {
+ --waiting_;
+ ::WaitForSingleObject(sem2_, INFINITE);
}
wait_mtx_.Unlock();
}
AtomicPointer::AtomicPointer(void* v) {
Release_Store(v);
}
void InitOnce(OnceType* once, void (*initializer)()) {
once->InitOnce(initializer);
}
void* AtomicPointer::Acquire_Load() const {
void * p = NULL;
InterlockedExchangePointer(&p, rep_);
return p;
}
void AtomicPointer::Release_Store(void* v) {
InterlockedExchangePointer(&rep_, v);
}
void* AtomicPointer::NoBarrier_Load() const {
return rep_;
}
void AtomicPointer::NoBarrier_Store(void* v) {
rep_ = v;
}
}
}
diff --git a/src/leveldb/table/block.cc b/src/leveldb/table/block.cc
index ab83c1112c..79ea9d9ee5 100644
--- a/src/leveldb/table/block.cc
+++ b/src/leveldb/table/block.cc
@@ -1,267 +1,268 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Decodes the blocks generated by block_builder.cc.
#include "table/block.h"
#include <vector>
#include <algorithm>
#include "leveldb/comparator.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/logging.h"
namespace leveldb {
inline uint32_t Block::NumRestarts() const {
- assert(size_ >= 2*sizeof(uint32_t));
+ assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
- restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
- if (restart_offset_ > size_ - sizeof(uint32_t)) {
- // The size is too small for NumRestarts() and therefore
- // restart_offset_ wrapped around.
+ size_t max_restarts_allowed = (size_-sizeof(uint32_t)) / sizeof(uint32_t);
+ if (NumRestarts() > max_restarts_allowed) {
+ // The size is too small for NumRestarts()
size_ = 0;
+ } else {
+ restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}
Block::~Block() {
if (owned_) {
delete[] data_;
}
}
// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively. Will not derefence past "limit".
//
// If any errors are detected, returns NULL. Otherwise, returns a
// pointer to the key delta (just past the three decoded values).
static inline const char* DecodeEntry(const char* p, const char* limit,
uint32_t* shared,
uint32_t* non_shared,
uint32_t* value_length) {
if (limit - p < 3) return NULL;
*shared = reinterpret_cast<const unsigned char*>(p)[0];
*non_shared = reinterpret_cast<const unsigned char*>(p)[1];
*value_length = reinterpret_cast<const unsigned char*>(p)[2];
if ((*shared | *non_shared | *value_length) < 128) {
// Fast path: all three values are encoded in one byte each
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == NULL) return NULL;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == NULL) return NULL;
if ((p = GetVarint32Ptr(p, limit, value_length)) == NULL) return NULL;
}
if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return NULL;
}
return p;
}
class Block::Iter : public Iterator {
private:
const Comparator* const comparator_;
const char* const data_; // underlying block contents
uint32_t const restarts_; // Offset of restart array (list of fixed32)
uint32_t const num_restarts_; // Number of uint32_t entries in restart array
// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
uint32_t restart_index_; // Index of restart block in which current_ falls
std::string key_;
Slice value_;
Status status_;
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
void SeekToRestartPoint(uint32_t index) {
key_.clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}
public:
Iter(const Comparator* comparator,
const char* data,
uint32_t restarts,
uint32_t num_restarts)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_) {
assert(num_restarts_ > 0);
}
virtual bool Valid() const { return current_ < restarts_; }
virtual Status status() const { return status_; }
virtual Slice key() const {
assert(Valid());
return key_;
}
virtual Slice value() const {
assert(Valid());
return value_;
}
virtual void Next() {
assert(Valid());
ParseNextKey();
}
virtual void Prev() {
assert(Valid());
// Scan backwards to a restart point before current_
const uint32_t original = current_;
while (GetRestartPoint(restart_index_) >= original) {
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
restart_index_--;
}
SeekToRestartPoint(restart_index_);
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}
virtual void Seek(const Slice& target) {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr = DecodeEntry(data_ + region_offset,
data_ + restarts_,
&shared, &non_shared, &value_length);
if (key_ptr == NULL || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}
virtual void SeekToFirst() {
SeekToRestartPoint(0);
ParseNextKey();
}
virtual void SeekToLast() {
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}
private:
void CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.clear();
value_.clear();
}
bool ParseNextKey() {
current_ = NextEntryOffset();
const char* p = data_ + current_;
const char* limit = data_ + restarts_; // Restarts come right after data
if (p >= limit) {
// No more entries to return. Mark as invalid.
current_ = restarts_;
restart_index_ = num_restarts_;
return false;
}
// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if (p == NULL || key_.size() < shared) {
CorruptionError();
return false;
} else {
key_.resize(shared);
key_.append(p, non_shared);
value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return true;
}
}
};
Iterator* Block::NewIterator(const Comparator* cmp) {
- if (size_ < 2*sizeof(uint32_t)) {
+ if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(cmp, data_, restart_offset_, num_restarts);
}
}
} // namespace leveldb
diff --git a/src/leveldb/table/table.cc b/src/leveldb/table/table.cc
index dbd6d3a1bf..71c1756e5f 100644
--- a/src/leveldb/table/table.cc
+++ b/src/leveldb/table/table.cc
@@ -1,276 +1,275 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table.h"
#include "leveldb/cache.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "table/block.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
namespace leveldb {
struct Table::Rep {
~Rep() {
delete filter;
delete [] filter_data;
delete index_block;
}
Options options;
Status status;
RandomAccessFile* file;
uint64_t cache_id;
FilterBlockReader* filter;
const char* filter_data;
BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer
Block* index_block;
};
Status Table::Open(const Options& options,
RandomAccessFile* file,
uint64_t size,
Table** table) {
*table = NULL;
if (size < Footer::kEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable");
}
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;
Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
// Read the index block
BlockContents contents;
Block* index_block = NULL;
if (s.ok()) {
s = ReadBlock(file, ReadOptions(), footer.index_handle(), &contents);
if (s.ok()) {
index_block = new Block(contents);
}
}
if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = NULL;
rep->filter = NULL;
*table = new Table(rep);
(*table)->ReadMeta(footer);
} else {
if (index_block) delete index_block;
}
return s;
}
void Table::ReadMeta(const Footer& footer) {
if (rep_->options.filter_policy == NULL) {
return; // Do not need any metadata
}
// TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
// it is an empty block.
ReadOptions opt;
BlockContents contents;
if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
// Do not propagate errors since meta info is not needed for operation
return;
}
Block* meta = new Block(contents);
Iterator* iter = meta->NewIterator(BytewiseComparator());
std::string key = "filter.";
key.append(rep_->options.filter_policy->Name());
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) {
ReadFilter(iter->value());
}
delete iter;
delete meta;
}
void Table::ReadFilter(const Slice& filter_handle_value) {
Slice v = filter_handle_value;
BlockHandle filter_handle;
if (!filter_handle.DecodeFrom(&v).ok()) {
return;
}
// We might want to unify with ReadBlock() if we start
// requiring checksum verification in Table::Open.
ReadOptions opt;
BlockContents block;
if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
return;
}
if (block.heap_allocated) {
rep_->filter_data = block.data.data(); // Will need to delete later
}
rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
}
Table::~Table() {
delete rep_;
}
static void DeleteBlock(void* arg, void* ignored) {
delete reinterpret_cast<Block*>(arg);
}
static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}
static void ReleaseBlock(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = NULL;
Cache::Handle* cache_handle = NULL;
BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.
if (s.ok()) {
BlockContents contents;
if (block_cache != NULL) {
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer+8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != NULL) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock);
}
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
}
}
}
Iterator* iter;
if (block != NULL) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == NULL) {
iter->RegisterCleanup(&DeleteBlock, block, NULL);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}
Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != NULL &&
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
- Slice handle = iiter->value();
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}
uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
Iterator* index_iter =
rep_->index_block->NewIterator(rep_->options.comparator);
index_iter->Seek(key);
uint64_t result;
if (index_iter->Valid()) {
BlockHandle handle;
Slice input = index_iter->value();
Status s = handle.DecodeFrom(&input);
if (s.ok()) {
result = handle.offset();
} else {
// Strange: we can't decode the block handle in the index block.
// We'll just return the offset of the metaindex block, which is
// close to the whole file size for this case.
result = rep_->metaindex_handle.offset();
}
} else {
// key is past the last key in the file. Approximate the offset
// by returning the offset of the metaindex block (which is
// right near the end of the file).
result = rep_->metaindex_handle.offset();
}
delete index_iter;
return result;
}
} // namespace leveldb
diff --git a/src/leveldb/table/table_test.cc b/src/leveldb/table/table_test.cc
index 57cea25334..c723bf84cf 100644
--- a/src/leveldb/table/table_test.cc
+++ b/src/leveldb/table/table_test.cc
@@ -1,838 +1,868 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table.h"
#include <map>
#include <string>
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/table_builder.h"
#include "table/block.h"
#include "table/block_builder.h"
#include "table/format.h"
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
// Return reverse of "key".
// Used to test non-lexicographic comparators.
static std::string Reverse(const Slice& key) {
std::string str(key.ToString());
std::string rev("");
for (std::string::reverse_iterator rit = str.rbegin();
rit != str.rend(); ++rit) {
rev.push_back(*rit);
}
return rev;
}
namespace {
class ReverseKeyComparator : public Comparator {
public:
virtual const char* Name() const {
return "leveldb.ReverseBytewiseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const {
return BytewiseComparator()->Compare(Reverse(a), Reverse(b));
}
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const {
std::string s = Reverse(*start);
std::string l = Reverse(limit);
BytewiseComparator()->FindShortestSeparator(&s, l);
*start = Reverse(s);
}
virtual void FindShortSuccessor(std::string* key) const {
std::string s = Reverse(*key);
BytewiseComparator()->FindShortSuccessor(&s);
*key = Reverse(s);
}
};
} // namespace
static ReverseKeyComparator reverse_key_comparator;
static void Increment(const Comparator* cmp, std::string* key) {
if (cmp == BytewiseComparator()) {
key->push_back('\0');
} else {
assert(cmp == &reverse_key_comparator);
std::string rev = Reverse(*key);
rev.push_back('\0');
*key = Reverse(rev);
}
}
// An STL comparator that uses a Comparator
namespace {
struct STLLessThan {
const Comparator* cmp;
STLLessThan() : cmp(BytewiseComparator()) { }
STLLessThan(const Comparator* c) : cmp(c) { }
bool operator()(const std::string& a, const std::string& b) const {
return cmp->Compare(Slice(a), Slice(b)) < 0;
}
};
} // namespace
class StringSink: public WritableFile {
public:
~StringSink() { }
const std::string& contents() const { return contents_; }
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& data) {
contents_.append(data.data(), data.size());
return Status::OK();
}
private:
std::string contents_;
};
class StringSource: public RandomAccessFile {
public:
StringSource(const Slice& contents)
: contents_(contents.data(), contents.size()) {
}
virtual ~StringSource() { }
uint64_t Size() const { return contents_.size(); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
if (offset > contents_.size()) {
return Status::InvalidArgument("invalid Read offset");
}
if (offset + n > contents_.size()) {
n = contents_.size() - offset;
}
memcpy(scratch, &contents_[offset], n);
*result = Slice(scratch, n);
return Status::OK();
}
private:
std::string contents_;
};
typedef std::map<std::string, std::string, STLLessThan> KVMap;
// Helper class for tests to unify the interface between
// BlockBuilder/TableBuilder and Block/Table.
class Constructor {
public:
explicit Constructor(const Comparator* cmp) : data_(STLLessThan(cmp)) { }
virtual ~Constructor() { }
void Add(const std::string& key, const Slice& value) {
data_[key] = value.ToString();
}
// Finish constructing the data structure with all the keys that have
// been added so far. Returns the keys in sorted order in "*keys"
// and stores the key/value pairs in "*kvmap"
void Finish(const Options& options,
std::vector<std::string>* keys,
KVMap* kvmap) {
*kvmap = data_;
keys->clear();
for (KVMap::const_iterator it = data_.begin();
it != data_.end();
++it) {
keys->push_back(it->first);
}
data_.clear();
Status s = FinishImpl(options, *kvmap);
ASSERT_TRUE(s.ok()) << s.ToString();
}
// Construct the data structure from the data in "data"
virtual Status FinishImpl(const Options& options, const KVMap& data) = 0;
virtual Iterator* NewIterator() const = 0;
virtual const KVMap& data() { return data_; }
virtual DB* db() const { return NULL; } // Overridden in DBConstructor
private:
KVMap data_;
};
class BlockConstructor: public Constructor {
public:
explicit BlockConstructor(const Comparator* cmp)
: Constructor(cmp),
comparator_(cmp),
block_(NULL) { }
~BlockConstructor() {
delete block_;
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
delete block_;
block_ = NULL;
BlockBuilder builder(&options);
for (KVMap::const_iterator it = data.begin();
it != data.end();
++it) {
builder.Add(it->first, it->second);
}
// Open the block
data_ = builder.Finish().ToString();
BlockContents contents;
contents.data = data_;
contents.cachable = false;
contents.heap_allocated = false;
block_ = new Block(contents);
return Status::OK();
}
virtual Iterator* NewIterator() const {
return block_->NewIterator(comparator_);
}
private:
const Comparator* comparator_;
std::string data_;
Block* block_;
BlockConstructor();
};
class TableConstructor: public Constructor {
public:
TableConstructor(const Comparator* cmp)
: Constructor(cmp),
source_(NULL), table_(NULL) {
}
~TableConstructor() {
Reset();
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
Reset();
StringSink sink;
TableBuilder builder(options, &sink);
for (KVMap::const_iterator it = data.begin();
it != data.end();
++it) {
builder.Add(it->first, it->second);
ASSERT_TRUE(builder.status().ok());
}
Status s = builder.Finish();
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(sink.contents().size(), builder.FileSize());
// Open the table
source_ = new StringSource(sink.contents());
Options table_options;
table_options.comparator = options.comparator;
return Table::Open(table_options, source_, sink.contents().size(), &table_);
}
virtual Iterator* NewIterator() const {
return table_->NewIterator(ReadOptions());
}
uint64_t ApproximateOffsetOf(const Slice& key) const {
return table_->ApproximateOffsetOf(key);
}
private:
void Reset() {
delete table_;
delete source_;
table_ = NULL;
source_ = NULL;
}
StringSource* source_;
Table* table_;
TableConstructor();
};
// A helper class that converts internal format keys into user keys
class KeyConvertingIterator: public Iterator {
public:
explicit KeyConvertingIterator(Iterator* iter) : iter_(iter) { }
virtual ~KeyConvertingIterator() { delete iter_; }
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& target) {
ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue);
std::string encoded;
AppendInternalKey(&encoded, ikey);
iter_->Seek(encoded);
}
virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Next() { iter_->Next(); }
virtual void Prev() { iter_->Prev(); }
virtual Slice key() const {
assert(Valid());
ParsedInternalKey key;
if (!ParseInternalKey(iter_->key(), &key)) {
status_ = Status::Corruption("malformed internal key");
return Slice("corrupted key");
}
return key.user_key;
}
virtual Slice value() const { return iter_->value(); }
virtual Status status() const {
return status_.ok() ? iter_->status() : status_;
}
private:
mutable Status status_;
Iterator* iter_;
// No copying allowed
KeyConvertingIterator(const KeyConvertingIterator&);
void operator=(const KeyConvertingIterator&);
};
class MemTableConstructor: public Constructor {
public:
explicit MemTableConstructor(const Comparator* cmp)
: Constructor(cmp),
internal_comparator_(cmp) {
memtable_ = new MemTable(internal_comparator_);
memtable_->Ref();
}
~MemTableConstructor() {
memtable_->Unref();
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
memtable_->Unref();
memtable_ = new MemTable(internal_comparator_);
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
it != data.end();
++it) {
memtable_->Add(seq, kTypeValue, it->first, it->second);
seq++;
}
return Status::OK();
}
virtual Iterator* NewIterator() const {
return new KeyConvertingIterator(memtable_->NewIterator());
}
private:
InternalKeyComparator internal_comparator_;
MemTable* memtable_;
};
class DBConstructor: public Constructor {
public:
explicit DBConstructor(const Comparator* cmp)
: Constructor(cmp),
comparator_(cmp) {
db_ = NULL;
NewDB();
}
~DBConstructor() {
delete db_;
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
delete db_;
db_ = NULL;
NewDB();
for (KVMap::const_iterator it = data.begin();
it != data.end();
++it) {
WriteBatch batch;
batch.Put(it->first, it->second);
ASSERT_TRUE(db_->Write(WriteOptions(), &batch).ok());
}
return Status::OK();
}
virtual Iterator* NewIterator() const {
return db_->NewIterator(ReadOptions());
}
virtual DB* db() const { return db_; }
private:
void NewDB() {
std::string name = test::TmpDir() + "/table_testdb";
Options options;
options.comparator = comparator_;
Status status = DestroyDB(name, options);
ASSERT_TRUE(status.ok()) << status.ToString();
options.create_if_missing = true;
options.error_if_exists = true;
options.write_buffer_size = 10000; // Something small to force merging
status = DB::Open(options, name, &db_);
ASSERT_TRUE(status.ok()) << status.ToString();
}
const Comparator* comparator_;
DB* db_;
};
enum TestType {
TABLE_TEST,
BLOCK_TEST,
MEMTABLE_TEST,
DB_TEST
};
struct TestArgs {
TestType type;
bool reverse_compare;
int restart_interval;
};
static const TestArgs kTestArgList[] = {
{ TABLE_TEST, false, 16 },
{ TABLE_TEST, false, 1 },
{ TABLE_TEST, false, 1024 },
{ TABLE_TEST, true, 16 },
{ TABLE_TEST, true, 1 },
{ TABLE_TEST, true, 1024 },
{ BLOCK_TEST, false, 16 },
{ BLOCK_TEST, false, 1 },
{ BLOCK_TEST, false, 1024 },
{ BLOCK_TEST, true, 16 },
{ BLOCK_TEST, true, 1 },
{ BLOCK_TEST, true, 1024 },
// Restart interval does not matter for memtables
{ MEMTABLE_TEST, false, 16 },
{ MEMTABLE_TEST, true, 16 },
// Do not bother with restart interval variations for DB
{ DB_TEST, false, 16 },
{ DB_TEST, true, 16 },
};
static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]);
class Harness {
public:
Harness() : constructor_(NULL) { }
void Init(const TestArgs& args) {
delete constructor_;
constructor_ = NULL;
options_ = Options();
options_.block_restart_interval = args.restart_interval;
// Use shorter block size for tests to exercise block boundary
// conditions more.
options_.block_size = 256;
if (args.reverse_compare) {
options_.comparator = &reverse_key_comparator;
}
switch (args.type) {
case TABLE_TEST:
constructor_ = new TableConstructor(options_.comparator);
break;
case BLOCK_TEST:
constructor_ = new BlockConstructor(options_.comparator);
break;
case MEMTABLE_TEST:
constructor_ = new MemTableConstructor(options_.comparator);
break;
case DB_TEST:
constructor_ = new DBConstructor(options_.comparator);
break;
}
}
~Harness() {
delete constructor_;
}
void Add(const std::string& key, const std::string& value) {
constructor_->Add(key, value);
}
void Test(Random* rnd) {
std::vector<std::string> keys;
KVMap data;
constructor_->Finish(options_, &keys, &data);
TestForwardScan(keys, data);
TestBackwardScan(keys, data);
TestRandomAccess(rnd, keys, data);
}
void TestForwardScan(const std::vector<std::string>& keys,
const KVMap& data) {
Iterator* iter = constructor_->NewIterator();
ASSERT_TRUE(!iter->Valid());
iter->SeekToFirst();
for (KVMap::const_iterator model_iter = data.begin();
model_iter != data.end();
++model_iter) {
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
iter->Next();
}
ASSERT_TRUE(!iter->Valid());
delete iter;
}
void TestBackwardScan(const std::vector<std::string>& keys,
const KVMap& data) {
Iterator* iter = constructor_->NewIterator();
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
for (KVMap::const_reverse_iterator model_iter = data.rbegin();
model_iter != data.rend();
++model_iter) {
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
iter->Prev();
}
ASSERT_TRUE(!iter->Valid());
delete iter;
}
void TestRandomAccess(Random* rnd,
const std::vector<std::string>& keys,
const KVMap& data) {
static const bool kVerbose = false;
Iterator* iter = constructor_->NewIterator();
ASSERT_TRUE(!iter->Valid());
KVMap::const_iterator model_iter = data.begin();
if (kVerbose) fprintf(stderr, "---\n");
for (int i = 0; i < 200; i++) {
const int toss = rnd->Uniform(5);
switch (toss) {
case 0: {
if (iter->Valid()) {
if (kVerbose) fprintf(stderr, "Next\n");
iter->Next();
++model_iter;
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
}
break;
}
case 1: {
if (kVerbose) fprintf(stderr, "SeekToFirst\n");
iter->SeekToFirst();
model_iter = data.begin();
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
break;
}
case 2: {
std::string key = PickRandomKey(rnd, keys);
model_iter = data.lower_bound(key);
if (kVerbose) fprintf(stderr, "Seek '%s'\n",
EscapeString(key).c_str());
iter->Seek(Slice(key));
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
break;
}
case 3: {
if (iter->Valid()) {
if (kVerbose) fprintf(stderr, "Prev\n");
iter->Prev();
if (model_iter == data.begin()) {
model_iter = data.end(); // Wrap around to invalid value
} else {
--model_iter;
}
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
}
break;
}
case 4: {
if (kVerbose) fprintf(stderr, "SeekToLast\n");
iter->SeekToLast();
if (keys.empty()) {
model_iter = data.end();
} else {
std::string last = data.rbegin()->first;
model_iter = data.lower_bound(last);
}
ASSERT_EQ(ToString(data, model_iter), ToString(iter));
break;
}
}
}
delete iter;
}
std::string ToString(const KVMap& data, const KVMap::const_iterator& it) {
if (it == data.end()) {
return "END";
} else {
return "'" + it->first + "->" + it->second + "'";
}
}
std::string ToString(const KVMap& data,
const KVMap::const_reverse_iterator& it) {
if (it == data.rend()) {
return "END";
} else {
return "'" + it->first + "->" + it->second + "'";
}
}
std::string ToString(const Iterator* it) {
if (!it->Valid()) {
return "END";
} else {
return "'" + it->key().ToString() + "->" + it->value().ToString() + "'";
}
}
std::string PickRandomKey(Random* rnd, const std::vector<std::string>& keys) {
if (keys.empty()) {
return "foo";
} else {
const int index = rnd->Uniform(keys.size());
std::string result = keys[index];
switch (rnd->Uniform(3)) {
case 0:
// Return an existing key
break;
case 1: {
// Attempt to return something smaller than an existing key
if (result.size() > 0 && result[result.size()-1] > '\0') {
result[result.size()-1]--;
}
break;
}
case 2: {
// Return something larger than an existing key
Increment(options_.comparator, &result);
break;
}
}
return result;
}
}
// Returns NULL if not running against a DB
DB* db() const { return constructor_->db(); }
private:
Options options_;
Constructor* constructor_;
};
+// Test empty table/block.
+TEST(Harness, Empty) {
+ for (int i = 0; i < kNumTestArgs; i++) {
+ Init(kTestArgList[i]);
+ Random rnd(test::RandomSeed() + 1);
+ Test(&rnd);
+ }
+}
+
+// Special test for a block with no restart entries. The C++ leveldb
+// code never generates such blocks, but the Java version of leveldb
+// seems to.
+TEST(Harness, ZeroRestartPointsInBlock) {
+ char data[sizeof(uint32_t)];
+ memset(data, 0, sizeof(data));
+ BlockContents contents;
+ contents.data = Slice(data, sizeof(data));
+ contents.cachable = false;
+ contents.heap_allocated = false;
+ Block block(contents);
+ Iterator* iter = block.NewIterator(BytewiseComparator());
+ iter->SeekToFirst();
+ ASSERT_TRUE(!iter->Valid());
+ iter->SeekToLast();
+ ASSERT_TRUE(!iter->Valid());
+ iter->Seek("foo");
+ ASSERT_TRUE(!iter->Valid());
+ delete iter;
+}
+
// Test the empty key
TEST(Harness, SimpleEmptyKey) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 1);
Add("", "v");
Test(&rnd);
}
}
TEST(Harness, SimpleSingle) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 2);
Add("abc", "v");
Test(&rnd);
}
}
TEST(Harness, SimpleMulti) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 3);
Add("abc", "v");
Add("abcd", "v");
Add("ac", "v2");
Test(&rnd);
}
}
TEST(Harness, SimpleSpecialKey) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 4);
Add("\xff\xff", "v3");
Test(&rnd);
}
}
TEST(Harness, Randomized) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
Random rnd(test::RandomSeed() + 5);
for (int num_entries = 0; num_entries < 2000;
num_entries += (num_entries < 50 ? 1 : 200)) {
if ((num_entries % 10) == 0) {
fprintf(stderr, "case %d of %d: num_entries = %d\n",
(i + 1), int(kNumTestArgs), num_entries);
}
for (int e = 0; e < num_entries; e++) {
std::string v;
Add(test::RandomKey(&rnd, rnd.Skewed(4)),
test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
}
Test(&rnd);
}
}
}
TEST(Harness, RandomizedLongDB) {
Random rnd(test::RandomSeed());
TestArgs args = { DB_TEST, false, 16 };
Init(args);
int num_entries = 100000;
for (int e = 0; e < num_entries; e++) {
std::string v;
Add(test::RandomKey(&rnd, rnd.Skewed(4)),
test::RandomString(&rnd, rnd.Skewed(5), &v).ToString());
}
Test(&rnd);
// We must have created enough data to force merging
int files = 0;
for (int level = 0; level < config::kNumLevels; level++) {
std::string value;
char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
ASSERT_TRUE(db()->GetProperty(name, &value));
files += atoi(value.c_str());
}
ASSERT_GT(files, 0);
}
class MemTableTest { };
TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator());
MemTable* memtable = new MemTable(cmp);
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
batch.Put(std::string("k3"), std::string("v3"));
batch.Put(std::string("largekey"), std::string("vlarge"));
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok());
Iterator* iter = memtable->NewIterator();
iter->SeekToFirst();
while (iter->Valid()) {
fprintf(stderr, "key: '%s' -> '%s'\n",
iter->key().ToString().c_str(),
iter->value().ToString().c_str());
iter->Next();
}
delete iter;
memtable->Unref();
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) {
bool result = (val >= low) && (val <= high);
if (!result) {
fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
(unsigned long long)(val),
(unsigned long long)(low),
(unsigned long long)(high));
}
return result;
}
class TableTest { };
TEST(TableTest, ApproximateOffsetOfPlain) {
TableConstructor c(BytewiseComparator());
c.Add("k01", "hello");
c.Add("k02", "hello2");
c.Add("k03", std::string(10000, 'x'));
c.Add("k04", std::string(200000, 'x'));
c.Add("k05", std::string(300000, 'x'));
c.Add("k06", "hello3");
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kNoCompression;
c.Finish(options, &keys, &kvmap);
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01a"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 10000, 11000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04a"), 210000, 211000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k05"), 210000, 211000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
}
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
TEST(TableTest, ApproximateOffsetOfCompressed) {
if (!SnappyCompressionSupported()) {
fprintf(stderr, "skipping compression tests\n");
return;
}
Random rnd(301);
TableConstructor c(BytewiseComparator());
std::string tmp;
c.Add("k01", "hello");
c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
c.Add("k03", "hello3");
c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
std::vector<std::string> keys;
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kSnappyCompression;
c.Finish(options, &keys, &kvmap);
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k02"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6000));
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
diff --git a/src/leveldb/util/cache.cc b/src/leveldb/util/cache.cc
index 24f1f63f4f..8b197bc02a 100644
--- a/src/leveldb/util/cache.cc
+++ b/src/leveldb/util/cache.cc
@@ -1,328 +1,325 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include "leveldb/cache.h"
#include "port/port.h"
#include "util/hash.h"
#include "util/mutexlock.h"
namespace leveldb {
Cache::~Cache() {
}
namespace {
// LRU cache implementation
// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key
Slice key() const {
// For cheaper lookups, we allow a temporary Handle object
// to store a pointer to a key in "value".
if (next == this) {
return *(reinterpret_cast<Slice*>(value));
} else {
return Slice(key_data, key_length);
}
}
};
// We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations
// we have tested. E.g., readrandom speeds up by ~5% over the g++
// 4.4.3's builtin hashtable.
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
if (old == NULL) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != NULL &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
void Resize() {
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
- Slice key = h->key();
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};
// A single shard of sharded cache.
class LRUCache {
public:
LRUCache();
~LRUCache();
// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
void Unref(LRUHandle* e);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
port::Mutex mutex_;
size_t usage_;
- uint64_t last_id_;
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
LRUHandle lru_;
HandleTable table_;
};
LRUCache::LRUCache()
- : usage_(0),
- last_id_(0) {
+ : usage_(0) {
// Make empty circular linked list
lru_.next = &lru_;
lru_.prev = &lru_;
}
LRUCache::~LRUCache() {
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
assert(e->refs == 1); // Error if caller has an unreleased handle
Unref(e);
e = next;
}
}
void LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
if (e->refs <= 0) {
usage_ -= e->charge;
(*e->deleter)(e->key(), e->value);
free(e);
}
}
void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
}
void LRUCache::LRU_Append(LRUHandle* e) {
// Make "e" newest entry by inserting just before lru_
e->next = &lru_;
e->prev = lru_.prev;
e->prev->next = e;
e->next->prev = e;
}
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != NULL) {
e->refs++;
LRU_Remove(e);
LRU_Append(e);
}
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::Release(Cache::Handle* handle) {
MutexLock l(&mutex_);
Unref(reinterpret_cast<LRUHandle*>(handle));
}
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
MutexLock l(&mutex_);
LRUHandle* e = reinterpret_cast<LRUHandle*>(
malloc(sizeof(LRUHandle)-1 + key.size()));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
memcpy(e->key_data, key.data(), key.size());
LRU_Append(e);
usage_ += charge;
LRUHandle* old = table_.Insert(e);
if (old != NULL) {
LRU_Remove(old);
Unref(old);
}
while (usage_ > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
LRU_Remove(old);
table_.Remove(old->key(), old->hash);
Unref(old);
}
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::Erase(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Remove(key, hash);
if (e != NULL) {
LRU_Remove(e);
Unref(e);
}
}
static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
static uint32_t Shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}
public:
explicit ShardedLRUCache(size_t capacity)
: last_id_(0) {
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual ~ShardedLRUCache() { }
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
virtual void Release(Handle* handle) {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
virtual void Erase(const Slice& key) {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
virtual void* Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
virtual uint64_t NewId() {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
};
} // end anonymous namespace
Cache* NewLRUCache(size_t capacity) {
return new ShardedLRUCache(capacity);
}
} // namespace leveldb
diff --git a/src/leveldb/util/coding_test.cc b/src/leveldb/util/coding_test.cc
index 2c52b17b60..fb5726e335 100644
--- a/src/leveldb/util/coding_test.cc
+++ b/src/leveldb/util/coding_test.cc
@@ -1,196 +1,196 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/coding.h"
#include "util/testharness.h"
namespace leveldb {
class Coding { };
TEST(Coding, Fixed32) {
std::string s;
for (uint32_t v = 0; v < 100000; v++) {
PutFixed32(&s, v);
}
const char* p = s.data();
for (uint32_t v = 0; v < 100000; v++) {
uint32_t actual = DecodeFixed32(p);
ASSERT_EQ(v, actual);
p += sizeof(uint32_t);
}
}
TEST(Coding, Fixed64) {
std::string s;
for (int power = 0; power <= 63; power++) {
uint64_t v = static_cast<uint64_t>(1) << power;
PutFixed64(&s, v - 1);
PutFixed64(&s, v + 0);
PutFixed64(&s, v + 1);
}
const char* p = s.data();
for (int power = 0; power <= 63; power++) {
uint64_t v = static_cast<uint64_t>(1) << power;
uint64_t actual;
actual = DecodeFixed64(p);
ASSERT_EQ(v-1, actual);
p += sizeof(uint64_t);
actual = DecodeFixed64(p);
ASSERT_EQ(v+0, actual);
p += sizeof(uint64_t);
actual = DecodeFixed64(p);
ASSERT_EQ(v+1, actual);
p += sizeof(uint64_t);
}
}
// Test that encoding routines generate little-endian encodings
TEST(Coding, EncodingOutput) {
std::string dst;
PutFixed32(&dst, 0x04030201);
ASSERT_EQ(4, dst.size());
ASSERT_EQ(0x01, static_cast<int>(dst[0]));
ASSERT_EQ(0x02, static_cast<int>(dst[1]));
ASSERT_EQ(0x03, static_cast<int>(dst[2]));
ASSERT_EQ(0x04, static_cast<int>(dst[3]));
dst.clear();
PutFixed64(&dst, 0x0807060504030201ull);
ASSERT_EQ(8, dst.size());
ASSERT_EQ(0x01, static_cast<int>(dst[0]));
ASSERT_EQ(0x02, static_cast<int>(dst[1]));
ASSERT_EQ(0x03, static_cast<int>(dst[2]));
ASSERT_EQ(0x04, static_cast<int>(dst[3]));
ASSERT_EQ(0x05, static_cast<int>(dst[4]));
ASSERT_EQ(0x06, static_cast<int>(dst[5]));
ASSERT_EQ(0x07, static_cast<int>(dst[6]));
ASSERT_EQ(0x08, static_cast<int>(dst[7]));
}
TEST(Coding, Varint32) {
std::string s;
for (uint32_t i = 0; i < (32 * 32); i++) {
uint32_t v = (i / 32) << (i % 32);
PutVarint32(&s, v);
}
const char* p = s.data();
const char* limit = p + s.size();
for (uint32_t i = 0; i < (32 * 32); i++) {
uint32_t expected = (i / 32) << (i % 32);
uint32_t actual;
const char* start = p;
p = GetVarint32Ptr(p, limit, &actual);
ASSERT_TRUE(p != NULL);
ASSERT_EQ(expected, actual);
ASSERT_EQ(VarintLength(actual), p - start);
}
ASSERT_EQ(p, s.data() + s.size());
}
TEST(Coding, Varint64) {
// Construct the list of values to check
std::vector<uint64_t> values;
// Some special values
values.push_back(0);
values.push_back(100);
values.push_back(~static_cast<uint64_t>(0));
values.push_back(~static_cast<uint64_t>(0) - 1);
for (uint32_t k = 0; k < 64; k++) {
// Test values near powers of two
const uint64_t power = 1ull << k;
values.push_back(power);
values.push_back(power-1);
values.push_back(power+1);
- };
+ }
std::string s;
for (int i = 0; i < values.size(); i++) {
PutVarint64(&s, values[i]);
}
const char* p = s.data();
const char* limit = p + s.size();
for (int i = 0; i < values.size(); i++) {
ASSERT_TRUE(p < limit);
uint64_t actual;
const char* start = p;
p = GetVarint64Ptr(p, limit, &actual);
ASSERT_TRUE(p != NULL);
ASSERT_EQ(values[i], actual);
ASSERT_EQ(VarintLength(actual), p - start);
}
ASSERT_EQ(p, limit);
}
TEST(Coding, Varint32Overflow) {
uint32_t result;
std::string input("\x81\x82\x83\x84\x85\x11");
ASSERT_TRUE(GetVarint32Ptr(input.data(), input.data() + input.size(), &result)
== NULL);
}
TEST(Coding, Varint32Truncation) {
uint32_t large_value = (1u << 31) + 100;
std::string s;
PutVarint32(&s, large_value);
uint32_t result;
for (int len = 0; len < s.size() - 1; len++) {
ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + len, &result) == NULL);
}
ASSERT_TRUE(GetVarint32Ptr(s.data(), s.data() + s.size(), &result) != NULL);
ASSERT_EQ(large_value, result);
}
TEST(Coding, Varint64Overflow) {
uint64_t result;
std::string input("\x81\x82\x83\x84\x85\x81\x82\x83\x84\x85\x11");
ASSERT_TRUE(GetVarint64Ptr(input.data(), input.data() + input.size(), &result)
== NULL);
}
TEST(Coding, Varint64Truncation) {
uint64_t large_value = (1ull << 63) + 100ull;
std::string s;
PutVarint64(&s, large_value);
uint64_t result;
for (int len = 0; len < s.size() - 1; len++) {
ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + len, &result) == NULL);
}
ASSERT_TRUE(GetVarint64Ptr(s.data(), s.data() + s.size(), &result) != NULL);
ASSERT_EQ(large_value, result);
}
TEST(Coding, Strings) {
std::string s;
PutLengthPrefixedSlice(&s, Slice(""));
PutLengthPrefixedSlice(&s, Slice("foo"));
PutLengthPrefixedSlice(&s, Slice("bar"));
PutLengthPrefixedSlice(&s, Slice(std::string(200, 'x')));
Slice input(s);
Slice v;
ASSERT_TRUE(GetLengthPrefixedSlice(&input, &v));
ASSERT_EQ("", v.ToString());
ASSERT_TRUE(GetLengthPrefixedSlice(&input, &v));
ASSERT_EQ("foo", v.ToString());
ASSERT_TRUE(GetLengthPrefixedSlice(&input, &v));
ASSERT_EQ("bar", v.ToString());
ASSERT_TRUE(GetLengthPrefixedSlice(&input, &v));
ASSERT_EQ(std::string(200, 'x'), v.ToString());
ASSERT_EQ("", input.ToString());
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
diff --git a/src/leveldb/util/comparator.cc b/src/leveldb/util/comparator.cc
index 4b7b5724ef..6cc319242e 100644
--- a/src/leveldb/util/comparator.cc
+++ b/src/leveldb/util/comparator.cc
@@ -1,81 +1,81 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <stdint.h>
#include "leveldb/comparator.h"
#include "leveldb/slice.h"
#include "port/port.h"
#include "util/logging.h"
namespace leveldb {
Comparator::~Comparator() { }
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() { }
virtual const char* Name() const {
return "leveldb.BytewiseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const {
return a.compare(b);
}
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
virtual void FindShortSuccessor(std::string* key) const {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i+1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}
};
} // namespace
-static port::OnceType once = LEVELDB_ONCE_INIT;
+static port::OnceType once_comparator = LEVELDB_ONCE_INIT;
static const Comparator* bytewise;
static void InitModule() {
bytewise = new BytewiseComparatorImpl;
}
const Comparator* BytewiseComparator() {
- port::InitOnce(&once, InitModule);
+ port::InitOnce(&once_comparator, InitModule);
return bytewise;
}
} // namespace leveldb
diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc
index db81f56d11..6badfdc230 100644
--- a/src/leveldb/util/env_posix.cc
+++ b/src/leveldb/util/env_posix.cc
@@ -1,701 +1,701 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#if !defined(LEVELDB_PLATFORM_WINDOWS)
#include <deque>
#include <set>
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h>
#endif
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/posix_logger.h"
namespace leveldb {
namespace {
static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
class PosixSequentialFile: public SequentialFile {
private:
std::string filename_;
FILE* file_;
public:
PosixSequentialFile(const std::string& fname, FILE* f)
: filename_(fname), file_(f) { }
virtual ~PosixSequentialFile() { fclose(file_); }
virtual Status Read(size_t n, Slice* result, char* scratch) {
Status s;
size_t r = fread_unlocked(scratch, 1, n, file_);
*result = Slice(scratch, r);
if (r < n) {
if (feof(file_)) {
// We leave status as ok if we hit the end of the file
} else {
// A partial read with an error: return a non-ok status
s = IOError(filename_, errno);
}
}
return s;
}
virtual Status Skip(uint64_t n) {
if (fseek(file_, n, SEEK_CUR)) {
return IOError(filename_, errno);
}
return Status::OK();
}
};
// pread() based random-access
class PosixRandomAccessFile: public RandomAccessFile {
private:
std::string filename_;
int fd_;
public:
PosixRandomAccessFile(const std::string& fname, int fd)
: filename_(fname), fd_(fd) { }
virtual ~PosixRandomAccessFile() { close(fd_); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
return s;
}
};
// Helper class to limit mmap file usage so that we do not end up
// running out virtual memory or running into kernel performance
// problems for very large databases.
class MmapLimiter {
public:
// Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
MmapLimiter() {
SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
}
// If another mmap slot is available, acquire it and return true.
// Else return false.
bool Acquire() {
if (GetAllowed() <= 0) {
return false;
}
MutexLock l(&mu_);
intptr_t x = GetAllowed();
if (x <= 0) {
return false;
} else {
SetAllowed(x - 1);
return true;
}
}
// Release a slot acquired by a previous call to Acquire() that returned true.
void Release() {
MutexLock l(&mu_);
SetAllowed(GetAllowed() + 1);
}
private:
port::Mutex mu_;
port::AtomicPointer allowed_;
intptr_t GetAllowed() const {
return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
}
// REQUIRES: mu_ must be held
void SetAllowed(intptr_t v) {
allowed_.Release_Store(reinterpret_cast<void*>(v));
}
MmapLimiter(const MmapLimiter&);
void operator=(const MmapLimiter&);
};
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
private:
std::string filename_;
void* mmapped_region_;
size_t length_;
MmapLimiter* limiter_;
public:
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
MmapLimiter* limiter)
: filename_(fname), mmapped_region_(base), length_(length),
limiter_(limiter) {
}
virtual ~PosixMmapReadableFile() {
munmap(mmapped_region_, length_);
limiter_->Release();
}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
if (offset + n > length_) {
*result = Slice();
s = IOError(filename_, EINVAL);
} else {
*result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
}
return s;
}
};
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
private:
std::string filename_;
int fd_;
size_t page_size_;
size_t map_size_; // How much extra memory to map at a time
char* base_; // The mapped region
char* limit_; // Limit of the mapped region
char* dst_; // Where to write next (in range [base_,limit_])
char* last_sync_; // Where have we synced up to
uint64_t file_offset_; // Offset of base_ in file
// Have we done an munmap of unsynced data?
bool pending_sync_;
// Roundup x to a multiple of y
static size_t Roundup(size_t x, size_t y) {
return ((x + y - 1) / y) * y;
}
size_t TruncateToPageBoundary(size_t s) {
s -= (s & (page_size_ - 1));
assert((s % page_size_) == 0);
return s;
}
bool UnmapCurrentRegion() {
bool result = true;
if (base_ != NULL) {
if (last_sync_ < limit_) {
// Defer syncing this data until next Sync() call, if any
pending_sync_ = true;
}
if (munmap(base_, limit_ - base_) != 0) {
result = false;
}
file_offset_ += limit_ - base_;
base_ = NULL;
limit_ = NULL;
last_sync_ = NULL;
dst_ = NULL;
// Increase the amount we map the next time, but capped at 1MB
if (map_size_ < (1<<20)) {
map_size_ *= 2;
}
}
return result;
}
bool MapNewRegion() {
assert(base_ == NULL);
if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
return false;
}
void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
fd_, file_offset_);
if (ptr == MAP_FAILED) {
return false;
}
base_ = reinterpret_cast<char*>(ptr);
limit_ = base_ + map_size_;
dst_ = base_;
last_sync_ = base_;
return true;
}
public:
PosixMmapFile(const std::string& fname, int fd, size_t page_size)
: filename_(fname),
fd_(fd),
page_size_(page_size),
map_size_(Roundup(65536, page_size)),
base_(NULL),
limit_(NULL),
dst_(NULL),
last_sync_(NULL),
file_offset_(0),
pending_sync_(false) {
assert((page_size & (page_size - 1)) == 0);
}
~PosixMmapFile() {
if (fd_ >= 0) {
PosixMmapFile::Close();
}
}
virtual Status Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
while (left > 0) {
assert(base_ <= dst_);
assert(dst_ <= limit_);
size_t avail = limit_ - dst_;
if (avail == 0) {
if (!UnmapCurrentRegion() ||
!MapNewRegion()) {
return IOError(filename_, errno);
}
}
size_t n = (left <= avail) ? left : avail;
memcpy(dst_, src, n);
dst_ += n;
src += n;
left -= n;
}
return Status::OK();
}
virtual Status Close() {
Status s;
size_t unused = limit_ - dst_;
if (!UnmapCurrentRegion()) {
s = IOError(filename_, errno);
} else if (unused > 0) {
// Trim the extra space at the end of the file
if (ftruncate(fd_, file_offset_ - unused) < 0) {
s = IOError(filename_, errno);
}
}
if (close(fd_) < 0) {
if (s.ok()) {
s = IOError(filename_, errno);
}
}
fd_ = -1;
base_ = NULL;
limit_ = NULL;
return s;
}
virtual Status Flush() {
return Status::OK();
}
virtual Status Sync() {
Status s;
if (pending_sync_) {
// Some unmapped data was not synced
pending_sync_ = false;
if (fdatasync(fd_) < 0) {
s = IOError(filename_, errno);
}
}
if (dst_ > last_sync_) {
// Find the beginnings of the pages that contain the first and last
// bytes to be synced.
size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
last_sync_ = dst_;
if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
s = IOError(filename_, errno);
}
}
return s;
}
};
static int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_type = (lock ? F_WRLCK : F_UNLCK);
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
return fcntl(fd, F_SETLK, &f);
}
class PosixFileLock : public FileLock {
public:
int fd_;
std::string name_;
};
// Set of locked files. We keep a separate set instead of just
// relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
// any protection against multiple uses from the same process.
class PosixLockTable {
private:
port::Mutex mu_;
std::set<std::string> locked_files_;
public:
bool Insert(const std::string& fname) {
MutexLock l(&mu_);
return locked_files_.insert(fname).second;
}
void Remove(const std::string& fname) {
MutexLock l(&mu_);
locked_files_.erase(fname);
}
};
class PosixEnv : public Env {
public:
PosixEnv();
virtual ~PosixEnv() {
fprintf(stderr, "Destroying Env::Default()\n");
- exit(1);
+ abort();
}
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result) {
FILE* f = fopen(fname.c_str(), "r");
if (f == NULL) {
*result = NULL;
return IOError(fname, errno);
} else {
*result = new PosixSequentialFile(fname, f);
return Status::OK();
}
}
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) {
*result = NULL;
Status s;
int fd = open(fname.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(fname, errno);
} else if (mmap_limit_.Acquire()) {
uint64_t size;
s = GetFileSize(fname, &size);
if (s.ok()) {
void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
if (base != MAP_FAILED) {
*result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
} else {
s = IOError(fname, errno);
}
}
close(fd);
if (!s.ok()) {
mmap_limit_.Release();
}
} else {
*result = new PosixRandomAccessFile(fname, fd);
}
return s;
}
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) {
Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
if (fd < 0) {
*result = NULL;
s = IOError(fname, errno);
} else {
*result = new PosixMmapFile(fname, fd, page_size_);
}
return s;
}
virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0;
}
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result) {
result->clear();
DIR* d = opendir(dir.c_str());
if (d == NULL) {
return IOError(dir, errno);
}
struct dirent* entry;
while ((entry = readdir(d)) != NULL) {
result->push_back(entry->d_name);
}
closedir(d);
return Status::OK();
}
virtual Status DeleteFile(const std::string& fname) {
Status result;
if (unlink(fname.c_str()) != 0) {
result = IOError(fname, errno);
}
return result;
- };
+ }
virtual Status CreateDir(const std::string& name) {
Status result;
if (mkdir(name.c_str(), 0755) != 0) {
result = IOError(name, errno);
}
return result;
- };
+ }
virtual Status DeleteDir(const std::string& name) {
Status result;
if (rmdir(name.c_str()) != 0) {
result = IOError(name, errno);
}
return result;
- };
+ }
virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
Status s;
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
*size = 0;
s = IOError(fname, errno);
} else {
*size = sbuf.st_size;
}
return s;
}
virtual Status RenameFile(const std::string& src, const std::string& target) {
Status result;
if (rename(src.c_str(), target.c_str()) != 0) {
result = IOError(src, errno);
}
return result;
}
virtual Status LockFile(const std::string& fname, FileLock** lock) {
*lock = NULL;
Status result;
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
if (fd < 0) {
result = IOError(fname, errno);
} else if (!locks_.Insert(fname)) {
close(fd);
result = Status::IOError("lock " + fname, "already held by process");
} else if (LockOrUnlock(fd, true) == -1) {
result = IOError("lock " + fname, errno);
close(fd);
locks_.Remove(fname);
} else {
PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd;
my_lock->name_ = fname;
*lock = my_lock;
}
return result;
}
virtual Status UnlockFile(FileLock* lock) {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
Status result;
if (LockOrUnlock(my_lock->fd_, false) == -1) {
result = IOError("unlock", errno);
}
locks_.Remove(my_lock->name_);
close(my_lock->fd_);
delete my_lock;
return result;
}
virtual void Schedule(void (*function)(void*), void* arg);
virtual void StartThread(void (*function)(void* arg), void* arg);
virtual Status GetTestDirectory(std::string* result) {
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
*result = env;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
*result = buf;
}
// Directory may already exist
CreateDir(*result);
return Status::OK();
}
static uint64_t gettid() {
pthread_t tid = pthread_self();
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}
virtual Status NewLogger(const std::string& fname, Logger** result) {
FILE* f = fopen(fname.c_str(), "w");
if (f == NULL) {
*result = NULL;
return IOError(fname, errno);
} else {
*result = new PosixLogger(f, &PosixEnv::gettid);
return Status::OK();
}
}
virtual uint64_t NowMicros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}
virtual void SleepForMicroseconds(int micros) {
usleep(micros);
}
private:
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
- exit(1);
+ abort();
}
}
// BGThread() is the body of the background thread
void BGThread();
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<PosixEnv*>(arg)->BGThread();
return NULL;
}
size_t page_size_;
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
pthread_t bgthread_;
bool started_bgthread_;
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
typedef std::deque<BGItem> BGQueue;
BGQueue queue_;
PosixLockTable locks_;
MmapLimiter mmap_limit_;
};
PosixEnv::PosixEnv() : page_size_(getpagesize()),
started_bgthread_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
}
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall(
"create thread",
pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
namespace {
struct StartThreadState {
void (*user_function)(void*);
void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return NULL;
}
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
PthreadCall("start thread",
pthread_create(&t, NULL, &StartThreadWrapper, state));
}
} // namespace
static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }
Env* Env::Default() {
pthread_once(&once, InitDefaultEnv);
return default_env;
}
} // namespace leveldb
#endif
diff --git a/src/leveldb/util/hash.cc b/src/leveldb/util/hash.cc
index ba1818082d..07cf022060 100644
--- a/src/leveldb/util/hash.cc
+++ b/src/leveldb/util/hash.cc
@@ -1,45 +1,52 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <string.h>
#include "util/coding.h"
#include "util/hash.h"
+// The FALLTHROUGH_INTENDED macro can be used to annotate implicit fall-through
+// between switch labels. The real definition should be provided externally.
+// This one is a fallback version for unsupported compilers.
+#ifndef FALLTHROUGH_INTENDED
+#define FALLTHROUGH_INTENDED do { } while (0)
+#endif
+
namespace leveldb {
uint32_t Hash(const char* data, size_t n, uint32_t seed) {
// Similar to murmur hash
const uint32_t m = 0xc6a4a793;
const uint32_t r = 24;
const char* limit = data + n;
uint32_t h = seed ^ (n * m);
// Pick up four bytes at a time
while (data + 4 <= limit) {
uint32_t w = DecodeFixed32(data);
data += 4;
h += w;
h *= m;
h ^= (h >> 16);
}
// Pick up remaining bytes
switch (limit - data) {
case 3:
h += data[2] << 16;
- // fall through
+ FALLTHROUGH_INTENDED;
case 2:
h += data[1] << 8;
- // fall through
+ FALLTHROUGH_INTENDED;
case 1:
h += data[0];
h *= m;
h ^= (h >> r);
break;
}
return h;
}
} // namespace leveldb

File Metadata

Mime Type
text/x-diff
Expires
Wed, May 21, 21:31 (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5863256
Default Alt Text
(269 KB)

Event Timeline