Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F13115199
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
134 KB
Subscribers
None
View Options
diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile
index fef085b9c..42c4952fe 100644
--- a/src/leveldb/Makefile
+++ b/src/leveldb/Makefile
@@ -1,202 +1,202 @@
# Copyright (c) 2011 The LevelDB Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.
#-----------------------------------------------
# Uncomment exactly one of the lines labelled (A), (B), and (C) below
# to switch between compilation modes.
OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode)
# OPT ?= -g2 # (B) Debug mode, w/ full line-level debugging symbols
# OPT ?= -O2 -g2 -DNDEBUG # (C) Profiling mode: opt, but w/debugging symbols
#-----------------------------------------------
# detect what platform we're building on
$(shell CC=$(CC) CXX=$(CXX) TARGET_OS=$(TARGET_OS) \
./build_detect_platform build_config.mk ./)
# this file is generated by the previous line to set build flags and sources
include build_config.mk
CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)
LDFLAGS += $(PLATFORM_LDFLAGS)
LIBS += $(PLATFORM_LIBS)
LIBOBJECTS = $(SOURCES:.cc=.o)
MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o)
TESTUTIL = ./util/testutil.o
TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TESTS = \
arena_test \
bloom_test \
c_test \
cache_test \
coding_test \
corruption_test \
crc32c_test \
db_test \
dbformat_test \
env_test \
filename_test \
filter_block_test \
log_test \
memenv_test \
skiplist_test \
table_test \
version_edit_test \
version_set_test \
write_batch_test
PROGRAMS = db_bench leveldbutil $(TESTS)
BENCHMARKS = db_bench_sqlite3 db_bench_tree_db
LIBRARY = libleveldb.a
MEMENVLIBRARY = libmemenv.a
default: all
# Should we build shared libraries?
ifneq ($(PLATFORM_SHARED_EXT),)
ifneq ($(PLATFORM_SHARED_VERSIONED),true)
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1)
SHARED3 = $(SHARED1)
SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
-SHARED_MINOR = 8
+SHARED_MINOR = 9
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
SHARED = $(SHARED1) $(SHARED2) $(SHARED3)
$(SHARED1): $(SHARED3)
ln -fs $(SHARED3) $(SHARED1)
$(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
endif
$(SHARED3):
$(CXX) $(LDFLAGS) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) -o $(SHARED3) $(LIBS)
endif # PLATFORM_SHARED_EXT
all: $(SHARED) $(LIBRARY)
check: all $(PROGRAMS) $(TESTS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean:
-rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk
-rm -rf ios-x86/* ios-arm/*
$(LIBRARY): $(LIBOBJECTS)
rm -f $@
$(AR) -rs $@ $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ $(LIBS)
db_bench_sqlite3: doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lsqlite3 $(LIBS)
db_bench_tree_db: doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lkyotocabinet $(LIBS)
leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
$(CXX) $(LDFLAGS) db/leveldb_main.o $(LIBOBJECTS) -o $@ $(LIBS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
c_test: db/c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
corruption_test: db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
crc32c_test: util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
db_test: db/db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
env_test: util/env_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@
$(AR) -rs $@ $(MEMENVOBJECTS)
memenv_test : helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS)
$(CXX) $(LDFLAGS) helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS) -o $@ $(LIBS)
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
.cc.o:
mkdir -p ios-x86/$(dir $@)
$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o:
mkdir -p ios-x86/$(dir $@)
$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
.cc.o:
$(CXX) $(CXXFLAGS) -c $< -o $@
.c.o:
$(CC) $(CFLAGS) -c $< -o $@
endif
diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc
index 74abd13e5..684ea3bdb 100644
--- a/src/leveldb/db/db_test.cc
+++ b/src/leveldb/db/db_test.cc
@@ -1,1952 +1,2027 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "leveldb/cache.h"
#include "leveldb/env.h"
#include "leveldb/table.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
static std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
}
namespace {
class AtomicCounter {
private:
port::Mutex mu_;
int count_;
public:
AtomicCounter() : count_(0) { }
void Increment() {
MutexLock l(&mu_);
count_++;
}
int Read() {
MutexLock l(&mu_);
return count_;
}
void Reset() {
MutexLock l(&mu_);
count_ = 0;
}
};
}
// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
public:
// sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_;
// Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_;
// Simulate non-writable file system while this pointer is non-NULL
port::AtomicPointer non_writable_;
+ // Force sync of manifest files to fail while this pointer is non-NULL
+ port::AtomicPointer manifest_sync_error_;
+
+ // Force write to manifest files to fail while this pointer is non-NULL
+ port::AtomicPointer manifest_write_error_;
+
bool count_random_reads_;
AtomicCounter random_read_counter_;
AtomicCounter sleep_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
no_space_.Release_Store(NULL);
non_writable_.Release_Store(NULL);
count_random_reads_ = false;
+ manifest_sync_error_.Release_Store(NULL);
+ manifest_write_error_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class SSTableFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
public:
SSTableFile(SpecialEnv* env, WritableFile* base)
: env_(env),
base_(base) {
}
~SSTableFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != NULL) {
// Drop writes on the floor
return Status::OK();
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
env_->SleepForMicroseconds(100000);
}
return base_->Sync();
}
};
+ class ManifestFile : public WritableFile {
+ private:
+ SpecialEnv* env_;
+ WritableFile* base_;
+ public:
+ ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
+ ~ManifestFile() { delete base_; }
+ Status Append(const Slice& data) {
+ if (env_->manifest_write_error_.Acquire_Load() != NULL) {
+ return Status::IOError("simulated writer error");
+ } else {
+ return base_->Append(data);
+ }
+ }
+ Status Close() { return base_->Close(); }
+ Status Flush() { return base_->Flush(); }
+ Status Sync() {
+ if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
+ return Status::IOError("simulated sync error");
+ } else {
+ return base_->Sync();
+ }
+ }
+ };
if (non_writable_.Acquire_Load() != NULL) {
return Status::IOError("simulated write error");
}
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
+ } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
+ *r = new ManifestFile(this, *r);
}
}
return s;
}
Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
class CountingFile : public RandomAccessFile {
private:
RandomAccessFile* target_;
AtomicCounter* counter_;
public:
CountingFile(RandomAccessFile* target, AtomicCounter* counter)
: target_(target), counter_(counter) {
}
virtual ~CountingFile() { delete target_; }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
counter_->Increment();
return target_->Read(offset, n, result, scratch);
}
};
Status s = target()->NewRandomAccessFile(f, r);
if (s.ok() && count_random_reads_) {
*r = new CountingFile(*r, &random_read_counter_);
}
return s;
}
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
target()->SleepForMicroseconds(micros);
}
};
class DBTest {
private:
const FilterPolicy* filter_policy_;
// Sequence of option configurations to try
enum OptionConfig {
kDefault,
kFilter,
kUncompressed,
kEnd
};
int option_config_;
public:
std::string dbname_;
SpecialEnv* env_;
DB* db_;
Options last_options_;
DBTest() : option_config_(kDefault),
env_(new SpecialEnv(Env::Default())) {
filter_policy_ = NewBloomFilterPolicy(10);
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, Options());
db_ = NULL;
Reopen();
}
~DBTest() {
delete db_;
DestroyDB(dbname_, Options());
delete env_;
delete filter_policy_;
}
// Switch to a fresh database with the next option configuration to
// test. Return false if there are no more configurations to test.
bool ChangeOptions() {
option_config_++;
if (option_config_ >= kEnd) {
return false;
} else {
DestroyAndReopen();
return true;
}
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kFilter:
options.filter_policy = filter_policy_;
break;
case kUncompressed:
options.compression = kNoCompression;
break;
default:
break;
}
return options;
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
void Reopen(Options* options = NULL) {
ASSERT_OK(TryReopen(options));
}
void Close() {
delete db_;
db_ = NULL;
}
void DestroyAndReopen(Options* options = NULL) {
delete db_;
db_ = NULL;
DestroyDB(dbname_, Options());
ASSERT_OK(TryReopen(options));
}
Status TryReopen(Options* options) {
delete db_;
db_ = NULL;
Options opts;
if (options != NULL) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
last_options_ = opts;
return DB::Open(opts, dbname_, &db_);
}
Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
ReadOptions options;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
// Return a string that contains all key,value pairs in order,
// formatted like "(k1->v1)(k2->v2)".
std::string Contents() {
std::vector<std::string> forward;
std::string result;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string s = IterStatus(iter);
result.push_back('(');
result.append(s);
result.push_back(')');
forward.push_back(s);
}
// Check reverse iteration results are the reverse of forward results
int matched = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
ASSERT_LT(matched, forward.size());
ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
matched++;
}
ASSERT_EQ(matched, forward.size());
delete iter;
return result;
}
std::string AllEntriesFor(const Slice& user_key) {
Iterator* iter = dbfull()->TEST_NewInternalIterator();
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
iter->Seek(target.Encode());
std::string result;
if (!iter->status().ok()) {
result = iter->status().ToString();
} else {
result = "[ ";
bool first = true;
while (iter->Valid()) {
ParsedInternalKey ikey;
if (!ParseInternalKey(iter->key(), &ikey)) {
result += "CORRUPTED";
} else {
if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
break;
}
if (!first) {
result += ", ";
}
first = false;
switch (ikey.type) {
case kTypeValue:
result += iter->value().ToString();
break;
case kTypeDeletion:
result += "DEL";
break;
}
}
iter->Next();
}
if (!first) {
result += " ";
}
result += "]";
}
delete iter;
return result;
}
int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
int TotalTableFiles() {
int result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
result += NumTableFilesAtLevel(level);
}
return result;
}
// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < config::kNumLevels; level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
int CountFiles() {
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
return static_cast<int>(files.size());
}
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}
void Compact(const Slice& start, const Slice& limit) {
db_->CompactRange(&start, &limit);
}
// Do n memtable compactions, each of which produces an sstable
// covering the range [small,large].
void MakeTables(int n, const std::string& small, const std::string& large) {
for (int i = 0; i < n; i++) {
Put(small, "begin");
Put(large, "end");
dbfull()->TEST_CompactMemTable();
}
}
// Prevent pushing of new sstables into deeper levels by adding
// tables that cover a specified range to all levels.
void FillLevels(const std::string& smallest, const std::string& largest) {
MakeTables(config::kNumLevels, smallest, largest);
}
void DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %lld\n",
static_cast<long long>(
dbfull()->TEST_MaxNextLevelOverlappingBytes()));
for (int level = 0; level < config::kNumLevels; level++) {
int num = NumTableFilesAtLevel(level);
if (num > 0) {
fprintf(stderr, " level %3d : %d files\n", level, num);
}
}
}
std::string DumpSSTableList() {
std::string property;
db_->GetProperty("leveldb.sstables", &property);
return property;
}
std::string IterStatus(Iterator* iter) {
std::string result;
if (iter->Valid()) {
result = iter->key().ToString() + "->" + iter->value().ToString();
} else {
result = "(invalid)";
}
return result;
}
};
TEST(DBTest, Empty) {
do {
ASSERT_TRUE(db_ != NULL);
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, ReadWrite) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
} while (ChangeOptions());
}
TEST(DBTest, PutDeleteGet) {
do {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetFromImmutableLayer) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls
} while (ChangeOptions());
}
TEST(DBTest, GetFromVersions) {
do {
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v1", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetSnapshot) {
do {
// Try with both a short key and a long key
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_OK(Put(key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
db_->ReleaseSnapshot(s1);
}
} while (ChangeOptions());
}
TEST(DBTest, GetLevel0Ordering) {
do {
// Check that we process level-0 files in correct order. The code
// below generates two level-0 files where the earlier one comes
// before the later one in the level-0 file list since the earlier
// one has a smaller "smallest" key.
ASSERT_OK(Put("bar", "b"));
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Put("foo", "v2"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetOrderedByLevels) {
do {
ASSERT_OK(Put("foo", "v1"));
Compact("a", "z");
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetPicksCorrectFile) {
do {
// Arrange to have multiple files in a non-level-0 level.
ASSERT_OK(Put("a", "va"));
Compact("a", "b");
ASSERT_OK(Put("x", "vx"));
Compact("x", "y");
ASSERT_OK(Put("f", "vf"));
Compact("f", "g");
ASSERT_EQ("va", Get("a"));
ASSERT_EQ("vf", Get("f"));
ASSERT_EQ("vx", Get("x"));
} while (ChangeOptions());
}
TEST(DBTest, GetEncountersEmptyLevel) {
do {
// Arrange for the following to happen:
// * sstable A in level 0
// * nothing in level 1
// * sstable B in level 2
// Then do enough Get() calls to arrange for an automatic compaction
// of sstable A. A bug would cause the compaction to be marked as
// occuring at level 1 (instead of the correct level 0).
// Step 1: First place sstables in levels 0 and 2
int compaction_count = 0;
while (NumTableFilesAtLevel(0) == 0 ||
NumTableFilesAtLevel(2) == 0) {
ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
compaction_count++;
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
}
// Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
// Step 3: read a bunch of times
for (int i = 0; i < 1000; i++) {
ASSERT_EQ("NOT_FOUND", Get("missing"));
}
// Step 4: Wait for compaction to finish
env_->SleepForMicroseconds(1000000);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
} while (ChangeOptions());
}
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("foo");
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterSingle) {
ASSERT_OK(Put("a", "va"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterMulti) {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", "vb"));
ASSERT_OK(Put("c", "vc"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("a");
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("z");
ASSERT_EQ(IterStatus(iter), "(invalid)");
// Switch from reverse to forward
iter->SeekToLast();
iter->Prev();
iter->Prev();
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Switch from forward to reverse
iter->SeekToFirst();
iter->Next();
iter->Next();
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
// Make sure iter stays at snapshot
ASSERT_OK(Put("a", "va2"));
ASSERT_OK(Put("a2", "va3"));
ASSERT_OK(Put("b", "vb2"));
ASSERT_OK(Put("c", "vc2"));
ASSERT_OK(Delete("b"));
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterSmallAndLargeMix) {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", std::string(100000, 'b')));
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Put("d", std::string(100000, 'd')));
ASSERT_OK(Put("e", std::string(100000, 'e')));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Next();
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "(invalid)");
delete iter;
}
TEST(DBTest, IterMultiWithDelete) {
do {
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Put("b", "vb"));
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Delete("b"));
ASSERT_EQ("NOT_FOUND", Get("b"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->Seek("c");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
delete iter;
} while (ChangeOptions());
}
TEST(DBTest, Recover) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("baz", "v5"));
Reopen();
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v5", Get("baz"));
ASSERT_OK(Put("bar", "v2"));
ASSERT_OK(Put("foo", "v3"));
Reopen();
ASSERT_EQ("v3", Get("foo"));
ASSERT_OK(Put("foo", "v4"));
ASSERT_EQ("v4", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v5", Get("baz"));
} while (ChangeOptions());
}
TEST(DBTest, RecoveryWithEmptyLog) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("foo", "v2"));
Reopen();
Reopen();
ASSERT_OK(Put("foo", "v3"));
Reopen();
ASSERT_EQ("v3", Get("foo"));
} while (ChangeOptions());
}
// Check that writes done during a memtable compaction are recovered
// if the database is shutdown during the memtable compaction.
TEST(DBTest, RecoverDuringMemtableCompaction) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 1000000;
Reopen(&options);
// Trigger a long memtable compaction and reopen the database during it
ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file
ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable
ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction
ASSERT_OK(Put("bar", "v2")); // Goes to new log file
Reopen(&options);
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
} while (ChangeOptions());
}
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
TEST(DBTest, MinorCompactionsHappen) {
Options options = CurrentOptions();
options.write_buffer_size = 10000;
Reopen(&options);
const int N = 500;
int starting_num_tables = TotalTableFiles();
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
}
int ending_num_tables = TotalTableFiles();
ASSERT_GT(ending_num_tables, starting_num_tables);
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
}
Reopen();
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
}
}
TEST(DBTest, RecoverWithLargeLog) {
{
Options options = CurrentOptions();
Reopen(&options);
ASSERT_OK(Put("big1", std::string(200000, '1')));
ASSERT_OK(Put("big2", std::string(200000, '2')));
ASSERT_OK(Put("small3", std::string(10, '3')));
ASSERT_OK(Put("small4", std::string(10, '4')));
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
}
// Make sure that if we re-open with a small write buffer size that
// we flush table files in the middle of a large log file.
Options options = CurrentOptions();
options.write_buffer_size = 100000;
Reopen(&options);
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
ASSERT_EQ(std::string(200000, '1'), Get("big1"));
ASSERT_EQ(std::string(200000, '2'), Get("big2"));
ASSERT_EQ(std::string(10, '3'), Get("small3"));
ASSERT_EQ(std::string(10, '4'), Get("small4"));
ASSERT_GT(NumTableFilesAtLevel(0), 1);
}
TEST(DBTest, CompactionsGenerateMultipleFiles) {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
for (int i = 0; i < 80; i++) {
values.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), values[i]));
}
// Reopening moves updates to level-0
Reopen(&options);
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 1);
for (int i = 0; i < 80; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
}
TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
// We must have at most one file per level except for level-0,
// which may have up to kL0_StopWritesTrigger files.
const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
Random rnd(301);
std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
for (int i = 0; i < 5 * kMaxFiles; i++) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
}
}
TEST(DBTest, SparseMerge) {
Options options = CurrentOptions();
options.compression = kNoCompression;
Reopen(&options);
FillLevels("A", "Z");
// Suppose there is:
// small amount of data with prefix A
// large amount of data with prefix B
// small amount of data with prefix C
// and that recent updates have made small changes to all three prefixes.
// Check that we do not do a compaction that merges all of B in one shot.
const std::string value(1000, 'x');
Put("A", "va");
// Write approximately 100MB of "B" values
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
Put("C", "vc");
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, NULL, NULL);
// Make sparse update
Put("A", "va2");
Put("B100", "bvalue2");
Put("C", "vc2");
dbfull()->TEST_CompactMemTable();
// Compactions should not cause us to create a situation where
// a file overlaps too much data at the next level.
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(0, NULL, NULL);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(1, NULL, NULL);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) {
bool result = (val >= low) && (val <= high);
if (!result) {
fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
(unsigned long long)(val),
(unsigned long long)(low),
(unsigned long long)(high));
}
return result;
}
TEST(DBTest, ApproximateSizes) {
do {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
options.compression = kNoCompression;
DestroyAndReopen();
ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
Reopen(&options);
ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
const int N = 80;
static const int S1 = 100000;
static const int S2 = 105000; // Allow some expansion from metadata
Random rnd(301);
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, S1)));
}
// 0 because GetApproximateSizes() does not account for memtable space
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
for (int compact_start = 0; compact_start < N; compact_start += 10) {
for (int i = 0; i < N; i += 10) {
ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i));
ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1)));
ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10));
}
ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50));
ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50));
std::string cstart_str = Key(compact_start);
std::string cend_str = Key(compact_start + 9);
Slice cstart = cstart_str;
Slice cend = cend_str;
dbfull()->TEST_CompactRange(0, &cstart, &cend);
}
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 0);
}
} while (ChangeOptions());
}
TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
do {
Options options = CurrentOptions();
options.compression = kNoCompression;
Reopen();
Random rnd(301);
std::string big1 = RandomString(&rnd, 100000);
ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(2), big1));
ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(4), big1));
ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000)));
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
// Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) {
Reopen(&options);
ASSERT_TRUE(Between(Size("", Key(0)), 0, 0));
ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000));
ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000));
ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000));
ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000));
ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000));
ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000));
ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000));
ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000));
ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
dbfull()->TEST_CompactRange(0, NULL, NULL);
}
} while (ChangeOptions());
}
TEST(DBTest, IteratorPinsRef) {
Put("foo", "hello");
// Get iterator that will yield the current contents of the DB.
Iterator* iter = db_->NewIterator(ReadOptions());
// Write to force compactions
Put("foo", "newvalue1");
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
}
Put("foo", "newvalue2");
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("hello", iter->value().ToString());
iter->Next();
ASSERT_TRUE(!iter->Valid());
delete iter;
}
TEST(DBTest, Snapshot) {
do {
Put("foo", "v1");
const Snapshot* s1 = db_->GetSnapshot();
Put("foo", "v2");
const Snapshot* s2 = db_->GetSnapshot();
Put("foo", "v3");
const Snapshot* s3 = db_->GetSnapshot();
Put("foo", "v4");
ASSERT_EQ("v1", Get("foo", s1));
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v3", Get("foo", s3));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s3);
ASSERT_EQ("v1", Get("foo", s1));
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s1);
ASSERT_EQ("v2", Get("foo", s2));
ASSERT_EQ("v4", Get("foo"));
db_->ReleaseSnapshot(s2);
ASSERT_EQ("v4", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, HiddenValuesAreRemoved) {
do {
Random rnd(301);
FillLevels("a", "z");
std::string big = RandomString(&rnd, 50000);
Put("foo", big);
Put("pastfoo", "v");
const Snapshot* snapshot = db_->GetSnapshot();
Put("foo", "tiny");
Put("pastfoo2", "v2"); // Advance sequence number one more
ASSERT_OK(dbfull()->TEST_CompactMemTable());
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(big, Get("foo", snapshot));
ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000));
db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
Slice x("x");
dbfull()->TEST_CompactRange(0, NULL, &x);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GE(NumTableFilesAtLevel(1), 1);
dbfull()->TEST_CompactRange(1, NULL, &x);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
} while (ChangeOptions());
}
TEST(DBTest, DeletionMarkers1) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo");
Put("foo", "v2");
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
Slice z("z");
dbfull()->TEST_CompactRange(last-2, NULL, &z);
// DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(last-1, NULL, NULL);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
}
TEST(DBTest, DeletionMarkers2) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo");
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last-2, NULL, NULL);
// DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last-1, NULL, NULL);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
}
TEST(DBTest, OverlapInLevel0) {
do {
ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
ASSERT_OK(Put("100", "v100"));
ASSERT_OK(Put("999", "v999"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Delete("100"));
ASSERT_OK(Delete("999"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("0,1,1", FilesPerLevel());
// Make files spanning the following ranges in level-0:
// files[0] 200 .. 900
// files[1] 300 .. 500
// Note that files are sorted by smallest key.
ASSERT_OK(Put("300", "v300"));
ASSERT_OK(Put("500", "v500"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Put("200", "v200"));
ASSERT_OK(Put("600", "v600"));
ASSERT_OK(Put("900", "v900"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("2,1,1", FilesPerLevel());
// Compact away the placeholder files we created initially
dbfull()->TEST_CompactRange(1, NULL, NULL);
dbfull()->TEST_CompactRange(2, NULL, NULL);
ASSERT_EQ("2", FilesPerLevel());
// Do a memtable compaction. Before bug-fix, the compaction would
// not detect the overlap with level-0 files and would incorrectly place
// the deletion in a deeper level.
ASSERT_OK(Delete("600"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("3", FilesPerLevel());
ASSERT_EQ("NOT_FOUND", Get("600"));
} while (ChangeOptions());
}
TEST(DBTest, L0_CompactionBug_Issue44_a) {
Reopen();
ASSERT_OK(Put("b", "v"));
Reopen();
ASSERT_OK(Delete("b"));
ASSERT_OK(Delete("a"));
Reopen();
ASSERT_OK(Delete("a"));
Reopen();
ASSERT_OK(Put("a", "v"));
Reopen();
Reopen();
ASSERT_EQ("(a->v)", Contents());
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ASSERT_EQ("(a->v)", Contents());
}
TEST(DBTest, L0_CompactionBug_Issue44_b) {
Reopen();
Put("","");
Reopen();
Delete("e");
Put("","");
Reopen();
Put("c", "cv");
Reopen();
Put("","");
Reopen();
Put("","");
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
Reopen();
Put("d","dv");
Reopen();
Put("","");
Reopen();
Delete("d");
Delete("b");
Reopen();
ASSERT_EQ("(->)(c->cv)", Contents());
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ASSERT_EQ("(->)(c->cv)", Contents());
}
TEST(DBTest, ComparatorCheck) {
class NewComparator : public Comparator {
public:
virtual const char* Name() const { return "leveldb.NewComparator"; }
virtual int Compare(const Slice& a, const Slice& b) const {
return BytewiseComparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
BytewiseComparator()->FindShortestSeparator(s, l);
}
virtual void FindShortSuccessor(std::string* key) const {
BytewiseComparator()->FindShortSuccessor(key);
}
};
NewComparator cmp;
Options new_options = CurrentOptions();
new_options.comparator = &cmp;
Status s = TryReopen(&new_options);
ASSERT_TRUE(!s.ok());
ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
<< s.ToString();
}
TEST(DBTest, CustomComparator) {
class NumberComparator : public Comparator {
public:
virtual const char* Name() const { return "test.NumberComparator"; }
virtual int Compare(const Slice& a, const Slice& b) const {
return ToNumber(a) - ToNumber(b);
}
virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
ToNumber(*s); // Check format
ToNumber(l); // Check format
}
virtual void FindShortSuccessor(std::string* key) const {
ToNumber(*key); // Check format
}
private:
static int ToNumber(const Slice& x) {
// Check that there are no extra characters.
ASSERT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size()-1] == ']')
<< EscapeString(x);
int val;
char ignored;
ASSERT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
<< EscapeString(x);
return val;
}
};
NumberComparator cmp;
Options new_options = CurrentOptions();
new_options.create_if_missing = true;
new_options.comparator = &cmp;
new_options.filter_policy = NULL; // Cannot use bloom filters
new_options.write_buffer_size = 1000; // Compact more often
DestroyAndReopen(&new_options);
ASSERT_OK(Put("[10]", "ten"));
ASSERT_OK(Put("[0x14]", "twenty"));
for (int i = 0; i < 2; i++) {
ASSERT_EQ("ten", Get("[10]"));
ASSERT_EQ("ten", Get("[0xa]"));
ASSERT_EQ("twenty", Get("[20]"));
ASSERT_EQ("twenty", Get("[0x14]"));
ASSERT_EQ("NOT_FOUND", Get("[15]"));
ASSERT_EQ("NOT_FOUND", Get("[0xf]"));
Compact("[0]", "[9999]");
}
for (int run = 0; run < 2; run++) {
for (int i = 0; i < 1000; i++) {
char buf[100];
snprintf(buf, sizeof(buf), "[%d]", i*10);
ASSERT_OK(Put(buf, buf));
}
Compact("[0]", "[1000000]");
}
}
TEST(DBTest, ManualCompaction) {
ASSERT_EQ(config::kMaxMemCompactLevel, 2)
<< "Need to update this test to match kMaxMemCompactLevel";
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(NULL, NULL);
ASSERT_EQ("0,0,1", FilesPerLevel());
}
TEST(DBTest, DBOpen_Options) {
std::string dbname = test::TmpDir() + "/db_options_test";
DestroyDB(dbname, Options());
// Does not exist, and create_if_missing == false: error
DB* db = NULL;
Options opts;
opts.create_if_missing = false;
Status s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != NULL);
ASSERT_TRUE(db == NULL);
// Does not exist, and create_if_missing == true: OK
opts.create_if_missing = true;
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
// Does exist, and error_if_exists == true: error
opts.create_if_missing = false;
opts.error_if_exists = true;
s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != NULL);
ASSERT_TRUE(db == NULL);
// Does exist, and error_if_exists == false: OK
opts.create_if_missing = true;
opts.error_if_exists = false;
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
}
TEST(DBTest, Locking) {
DB* db2 = NULL;
Status s = DB::Open(CurrentOptions(), dbname_, &db2);
ASSERT_TRUE(!s.ok()) << "Locking did not prevent re-opening db";
}
// Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL);
}
}
env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
ASSERT_GE(env_->sleep_counter_.Read(), 5);
}
TEST(DBTest, NonWritableFileSystem) {
Options options = CurrentOptions();
options.write_buffer_size = 1000;
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->non_writable_.Release_Store(env_); // Force errors for new files
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
fprintf(stderr, "iter %d; errors %d\n", i, errors);
if (!Put("foo", big).ok()) {
errors++;
env_->SleepForMicroseconds(100000);
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(NULL);
}
+TEST(DBTest, ManifestWriteError) {
+ // Test for the following problem:
+ // (a) Compaction produces file F
+ // (b) Log record containing F is written to MANIFEST file, but Sync() fails
+ // (c) GC deletes F
+ // (d) After reopening DB, reads fail since deleted F is named in log record
+
+ // We iterate twice. In the second iteration, everything is the
+ // same except the log record never makes it to the MANIFEST file.
+ for (int iter = 0; iter < 2; iter++) {
+ port::AtomicPointer* error_type = (iter == 0)
+ ? &env_->manifest_sync_error_
+ : &env_->manifest_write_error_;
+
+ // Insert foo=>bar mapping
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.create_if_missing = true;
+ options.error_if_exists = false;
+ DestroyAndReopen(&options);
+ ASSERT_OK(Put("foo", "bar"));
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Memtable compaction (will succeed)
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("bar", Get("foo"));
+ const int last = config::kMaxMemCompactLevel;
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
+
+ // Merging compaction (will fail)
+ error_type->Release_Store(env_);
+ dbfull()->TEST_CompactRange(last, NULL, NULL); // Should fail
+ ASSERT_EQ("bar", Get("foo"));
+
+ // Recovery: should not lose data
+ error_type->Release_Store(NULL);
+ Reopen(&options);
+ ASSERT_EQ("bar", Get("foo"));
+ }
+}
+
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
const int num_files = CountFiles();
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
}
ASSERT_EQ(CountFiles(), num_files);
}
TEST(DBTest, BloomFilter) {
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.block_cache = NewLRUCache(0); // Prevent cache hits
options.filter_policy = NewBloomFilterPolicy(10);
Reopen(&options);
// Populate multiple layers
const int N = 10000;
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i)));
}
Compact("a", "z");
for (int i = 0; i < N; i += 100) {
ASSERT_OK(Put(Key(i), Key(i)));
}
dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks
env_->delay_sstable_sync_.Release_Store(env_);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
for (int i = 0; i < N; i++) {
ASSERT_EQ(Key(i), Get(Key(i)));
}
int reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d present => %d reads\n", N, reads);
ASSERT_GE(reads, N);
ASSERT_LE(reads, N + 2*N/100);
// Lookup present keys. Should rarely read from either sstable.
env_->random_read_counter_.Reset();
for (int i = 0; i < N; i++) {
ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing"));
}
reads = env_->random_read_counter_.Read();
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
env_->delay_sstable_sync_.Release_Store(NULL);
Close();
delete options.block_cache;
delete options.filter_policy;
}
// Multi-threaded test:
namespace {
static const int kNumThreads = 4;
static const int kTestSeconds = 10;
static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
port::AtomicPointer stop;
port::AtomicPointer counter[kNumThreads];
port::AtomicPointer thread_done[kNumThreads];
};
struct MTThread {
MTState* state;
int id;
};
static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) {
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
snprintf(keybuf, sizeof(keybuf), "%016d", key);
if (rnd.OneIn(2)) {
// Write values of the form <key, my id, counter>.
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
if (s.IsNotFound()) {
// Key has not yet been written
} else {
// Check that the writer thread counter is >= the counter in the value
ASSERT_OK(s);
int k, w, c;
ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
ASSERT_LE(c, reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load()));
}
}
counter++;
}
t->state->thread_done[id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
}
} // namespace
TEST(DBTest, MultiThreaded) {
do {
// Initialize state
MTState mt;
mt.test = this;
mt.stop.Release_Store(0);
for (int id = 0; id < kNumThreads; id++) {
mt.counter[id].Release_Store(0);
mt.thread_done[id].Release_Store(0);
}
// Start threads
MTThread thread[kNumThreads];
for (int id = 0; id < kNumThreads; id++) {
thread[id].state = &mt;
thread[id].id = id;
env_->StartThread(MTThreadBody, &thread[id]);
}
// Let them run for a while
env_->SleepForMicroseconds(kTestSeconds * 1000000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == NULL) {
env_->SleepForMicroseconds(100000);
}
}
} while (ChangeOptions());
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}
class ModelDB: public DB {
public:
class ModelSnapshot : public Snapshot {
public:
KVMap map_;
};
explicit ModelDB(const Options& options): options_(options) { }
~ModelDB() { }
virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
return DB::Put(o, k, v);
}
virtual Status Delete(const WriteOptions& o, const Slice& key) {
return DB::Delete(o, key);
}
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) {
assert(false); // Not implemented
return Status::NotFound(key);
}
virtual Iterator* NewIterator(const ReadOptions& options) {
if (options.snapshot == NULL) {
KVMap* saved = new KVMap;
*saved = map_;
return new ModelIter(saved, true);
} else {
const KVMap* snapshot_state =
&(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
return new ModelIter(snapshot_state, false);
}
}
virtual const Snapshot* GetSnapshot() {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
return snapshot;
}
virtual void ReleaseSnapshot(const Snapshot* snapshot) {
delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
class Handler : public WriteBatch::Handler {
public:
KVMap* map_;
virtual void Put(const Slice& key, const Slice& value) {
(*map_)[key.ToString()] = value.ToString();
}
virtual void Delete(const Slice& key) {
map_->erase(key.ToString());
}
};
Handler handler;
handler.map_ = &map_;
return batch->Iterate(&handler);
}
virtual bool GetProperty(const Slice& property, std::string* value) {
return false;
}
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
for (int i = 0; i < n; i++) {
sizes[i] = 0;
}
}
virtual void CompactRange(const Slice* start, const Slice* end) {
}
private:
class ModelIter: public Iterator {
public:
ModelIter(const KVMap* map, bool owned)
: map_(map), owned_(owned), iter_(map_->end()) {
}
~ModelIter() {
if (owned_) delete map_;
}
virtual bool Valid() const { return iter_ != map_->end(); }
virtual void SeekToFirst() { iter_ = map_->begin(); }
virtual void SeekToLast() {
if (map_->empty()) {
iter_ = map_->end();
} else {
iter_ = map_->find(map_->rbegin()->first);
}
}
virtual void Seek(const Slice& k) {
iter_ = map_->lower_bound(k.ToString());
}
virtual void Next() { ++iter_; }
virtual void Prev() { --iter_; }
virtual Slice key() const { return iter_->first; }
virtual Slice value() const { return iter_->second; }
virtual Status status() const { return Status::OK(); }
private:
const KVMap* const map_;
const bool owned_; // Do we own map_
KVMap::const_iterator iter_;
};
const Options options_;
KVMap map_;
};
static std::string RandomKey(Random* rnd) {
int len = (rnd->OneIn(3)
? 1 // Short sometimes to encourage collisions
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
return test::RandomKey(rnd, len);
}
static bool CompareIterators(int step,
DB* model,
DB* db,
const Snapshot* model_snap,
const Snapshot* db_snap) {
ReadOptions options;
options.snapshot = model_snap;
Iterator* miter = model->NewIterator(options);
options.snapshot = db_snap;
Iterator* dbiter = db->NewIterator(options);
bool ok = true;
int count = 0;
for (miter->SeekToFirst(), dbiter->SeekToFirst();
ok && miter->Valid() && dbiter->Valid();
miter->Next(), dbiter->Next()) {
count++;
if (miter->key().compare(dbiter->key()) != 0) {
fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
step,
EscapeString(miter->key()).c_str(),
EscapeString(dbiter->key()).c_str());
ok = false;
break;
}
if (miter->value().compare(dbiter->value()) != 0) {
fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
step,
EscapeString(miter->key()).c_str(),
EscapeString(miter->value()).c_str(),
EscapeString(miter->value()).c_str());
ok = false;
}
}
if (ok) {
if (miter->Valid() != dbiter->Valid()) {
fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
step, miter->Valid(), dbiter->Valid());
ok = false;
}
}
fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
delete miter;
delete dbiter;
return ok;
}
TEST(DBTest, Randomized) {
Random rnd(test::RandomSeed());
do {
ModelDB model(CurrentOptions());
const int N = 10000;
const Snapshot* model_snap = NULL;
const Snapshot* db_snap = NULL;
std::string k, v;
for (int step = 0; step < N; step++) {
if (step % 100 == 0) {
fprintf(stderr, "Step %d of %d\n", step, N);
}
// TODO(sanjay): Test Get() works
int p = rnd.Uniform(100);
if (p < 45) { // Put
k = RandomKey(&rnd);
v = RandomString(&rnd,
rnd.OneIn(20)
? 100 + rnd.Uniform(100)
: rnd.Uniform(8));
ASSERT_OK(model.Put(WriteOptions(), k, v));
ASSERT_OK(db_->Put(WriteOptions(), k, v));
} else if (p < 90) { // Delete
k = RandomKey(&rnd);
ASSERT_OK(model.Delete(WriteOptions(), k));
ASSERT_OK(db_->Delete(WriteOptions(), k));
} else { // Multi-element batch
WriteBatch b;
const int num = rnd.Uniform(8);
for (int i = 0; i < num; i++) {
if (i == 0 || !rnd.OneIn(10)) {
k = RandomKey(&rnd);
} else {
// Periodically re-use the same key from the previous iter, so
// we have multiple entries in the write batch for the same key
}
if (rnd.OneIn(2)) {
v = RandomString(&rnd, rnd.Uniform(10));
b.Put(k, v);
} else {
b.Delete(k);
}
}
ASSERT_OK(model.Write(WriteOptions(), &b));
ASSERT_OK(db_->Write(WriteOptions(), &b));
}
if ((step % 100) == 0) {
ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
// Save a snapshot from each DB this time that we'll use next
// time we compare things, to make sure the current state is
// preserved with the snapshot
if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
Reopen();
ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
model_snap = model.GetSnapshot();
db_snap = db_->GetSnapshot();
}
}
if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
} while (ChangeOptions());
}
std::string MakeKey(unsigned int num) {
char buf[30];
snprintf(buf, sizeof(buf), "%016u", num);
return std::string(buf);
}
void BM_LogAndApply(int iters, int num_base_files) {
std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
DestroyDB(dbname, Options());
DB* db = NULL;
Options opts;
opts.create_if_missing = true;
Status s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
Env* env = Env::Default();
port::Mutex mu;
MutexLock l(&mu);
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
}
ASSERT_OK(vset.LogAndApply(&vbase, &mu));
uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) {
VersionEdit vedit;
vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
vset.LogAndApply(&vedit, &mu);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;
char buf[16];
snprintf(buf, sizeof(buf), "%d", num_base_files);
fprintf(stderr,
"BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n",
buf, iters, us, ((float)us) / iters);
}
} // namespace leveldb
int main(int argc, char** argv) {
if (argc > 1 && std::string(argv[1]) == "--benchmark") {
leveldb::BM_LogAndApply(1000, 1);
leveldb::BM_LogAndApply(1000, 100);
leveldb::BM_LogAndApply(1000, 10000);
leveldb::BM_LogAndApply(100, 100000);
return 0;
}
return leveldb::test::RunAllTests();
}
diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc
index bdd4a579b..7d0a5de2b 100644
--- a/src/leveldb/db/version_set.cc
+++ b/src/leveldb/db/version_set.cc
@@ -1,1402 +1,1438 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "leveldb/env.h"
#include "leveldb/table_builder.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
namespace leveldb {
static const int kTargetFileSize = 2 * 1048576;
// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static double MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1
while (level > 1) {
result *= 10;
level--;
}
return result;
}
static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
}
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
sum += files[i]->file_size;
}
return sum;
}
namespace {
std::string IntSetToString(const std::set<uint64_t>& s) {
std::string result = "{";
for (std::set<uint64_t>::const_iterator it = s.begin();
it != s.end();
++it) {
result += (result.size() > 1) ? "," : "";
result += NumberToString(*it);
}
result += "}";
return result;
}
} // namespace
Version::~Version() {
assert(refs_ == 0);
// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
static bool AfterFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs before all keys and is therefore never after *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->largest.user_key()) > 0);
}
static bool BeforeFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs after all keys and is therefore never before *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
}
bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
// Need to check against all files
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
// No overlap
} else {
return true; // Overlap
}
}
return false;
}
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != NULL) {
// Find the earliest possible internal key for smallest_user_key
InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
index = FindFile(icmp, files, small.Encode());
}
if (index >= files.size()) {
// beginning of range is after all files, so no overlap.
return false;
}
return !BeforeFile(ucmp, largest_user_key, files[index]);
}
// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
}
virtual bool Valid() const {
return index_ < flist_->size();
}
virtual void Seek(const Slice& target) {
index_ = FindFile(icmp_, *flist_, target);
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
virtual void Next() {
assert(Valid());
index_++;
}
virtual void Prev() {
assert(Valid());
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const {
assert(Valid());
return (*flist_)[index_]->largest.Encode();
}
Slice value() const {
assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number);
EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
return Slice(value_buf_, sizeof(value_buf_));
}
virtual Status status() const { return Status::OK(); }
private:
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;
// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
};
static Iterator* GetFileIterator(void* arg,
const ReadOptions& options,
const Slice& file_value) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value"));
} else {
return cache->NewIterator(options,
DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
}
}
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
}
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(
vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
}
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
};
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
std::string* value;
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
}
}
}
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) {
return s;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
void Version::Ref() {
++refs_;
}
void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
}
}
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
}
int Version::PickLevelForMemTableOutput(
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
break;
}
level++;
}
}
return level;
}
// Store in "*inputs" all files in "level" that overlap [begin,end]
void Version::GetOverlappingInputs(
int level,
const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
inputs->clear();
Slice user_begin, user_end;
if (begin != NULL) {
user_begin = begin->user_key();
}
if (end != NULL) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
for (size_t i = 0; i < files_[level].size(); ) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
const Slice file_limit = f->largest.user_key();
if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {
inputs->push_back(f);
if (level == 0) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}
}
std::string Version::DebugString() const {
std::string r;
for (int level = 0; level < config::kNumLevels; level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
// 20:43['e' .. 'g']
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
AppendNumberTo(&r, files[i]->number);
r.push_back(':');
AppendNumberTo(&r, files[i]->file_size);
r.append("[");
r.append(files[i]->smallest.DebugString());
r.append(" .. ");
r.append(files[i]->largest.DebugString());
r.append("]\n");
}
}
return r;
}
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base)
: vset_(vset),
base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin();
it != added->end(); ++it) {
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
#endif
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};
VersionSet::VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
: env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
icmp_(*cmp),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
dummy_versions_(this),
current_(NULL) {
AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != NULL) {
current_->Unref();
}
current_ = v;
v->Ref();
// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
+ if (!s.ok()) {
+ Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
+ if (ManifestContains(record)) {
+ Log(options_->info_log,
+ "MANIFEST contains log record despite error; advancing to new "
+ "version to prevent mismatch between in-memory and logged state");
+ s = Status::OK();
+ }
+ }
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
+ // No need to double-check MANIFEST in case of error since it
+ // will be discarded below.
}
mu->Lock();
}
// Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++) {
if (!compact_pointer_[level].empty()) {
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
edit.SetCompactPointer(level, key);
}
}
// Save files
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
}
}
std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
}
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
snprintf(scratch->buffer, sizeof(scratch->buffer),
"files[ %d %d %d %d %d %d %d ]",
int(current_->files_[0].size()),
int(current_->files_[1].size()),
int(current_->files_[2].size()),
int(current_->files_[3].size()),
int(current_->files_[4].size()),
int(current_->files_[5].size()),
int(current_->files_[6].size()));
return scratch->buffer;
}
+// Return true iff the manifest contains the specified record.
+bool VersionSet::ManifestContains(const std::string& record) const {
+ std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
+ Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
+ SequentialFile* file = NULL;
+ Status s = env_->NewSequentialFile(fname, &file);
+ if (!s.ok()) {
+ Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
+ return false;
+ }
+ log::Reader reader(file, NULL, true/*checksum*/, 0);
+ Slice r;
+ std::string scratch;
+ bool result = false;
+ while (reader.ReadRecord(&r, &scratch)) {
+ if (r == Slice(record)) {
+ result = true;
+ break;
+ }
+ }
+ delete file;
+ Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
+ return result;
+}
+
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
// Entire file is before "ikey", so just add the file size
result += files[i]->file_size;
} else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
// Entire file is after "ikey", so ignore
if (level > 0) {
// Files other than level 0 are sorted by meta->smallest, so
// no further files in this level will contain data for
// "ikey".
break;
}
} else {
// "ikey" falls in the range for this table. Add the
// approximate offset of "ikey" within the table.
Table* tableptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
if (tableptr != NULL) {
result += tableptr->ApproximateOffsetOf(ikey.Encode());
}
delete iter;
}
}
}
return result;
}
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_;
v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return TotalFileSize(current_->files_[level]);
}
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
&overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_.Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_.Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
}
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, files[i]->number, files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
delete[] list;
return result;
}
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);
return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
smallest.DebugString().c_str(),
largest.DebugString().c_str());
}
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
Compaction* VersionSet::CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return NULL;
}
// Avoid compacting too much in one shot in case the range is large.
const uint64_t limit = MaxFileSizeForLevel(level);
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
inputs.resize(i + 1);
break;
}
}
Compaction* c = new Compaction(level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
SetupOtherInputs(c);
return c;
}
Compaction::Compaction(int level)
: level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
input_version_(NULL),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
if (input_version_ != NULL) {
input_version_->Unref();
}
}
bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->DeleteFile(level_ + which, inputs_[which][i]->number);
}
}
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so definitely not base level
return false;
}
break;
}
level_ptrs_[lvl]++;
}
}
return true;
}
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
void Compaction::ReleaseInputs() {
if (input_version_ != NULL) {
input_version_->Unref();
input_version_ = NULL;
}
}
} // namespace leveldb
diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h
index 792899b7f..9d084fdb7 100644
--- a/src/leveldb/db/version_set.h
+++ b/src/leveldb/db/version_set.h
@@ -1,381 +1,383 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// The representation of a DBImpl consists of a set of Versions. The
// newest version is called "current". Older versions may be kept
// around to provide a consistent view to live iterators.
//
// Each Version keeps track of a set of Table files per level. The
// entire set of versions is maintained in a VersionSet.
//
// Version,VersionSet are thread-compatible, but require external
// synchronization on all accesses.
#ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_
#define STORAGE_LEVELDB_DB_VERSION_SET_H_
#include <map>
#include <set>
#include <vector>
#include "db/dbformat.h"
#include "db/version_edit.h"
#include "port/port.h"
#include "port/thread_annotations.h"
namespace leveldb {
namespace log { class Writer; }
class Compaction;
class Iterator;
class MemTable;
class TableBuilder;
class TableCache;
class Version;
class VersionSet;
class WritableFile;
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
// REQUIRES: "files" contains a sorted list of non-overlapping files.
extern int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key);
// Returns true iff some file in "files" overlaps the user key range
// [*smallest,*largest].
// smallest==NULL represents a key smaller than all keys in the DB.
// largest==NULL represents a key largest than all keys in the DB.
// REQUIRES: If disjoint_sorted_files, files[] contains disjoint ranges
// in sorted order.
extern bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key);
class Version {
public:
// Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
void Unref();
void GetOverlappingInputs(
int level,
const InternalKey* begin, // NULL means before all keys
const InternalKey* end, // NULL means after all keys
std::vector<FileMetaData*>* inputs);
// Returns true iff some file in the specified level overlaps
// some part of [*smallest_user_key,*largest_user_key].
// smallest_user_key==NULL represents a key smaller than all keys in the DB.
// largest_user_key==NULL represents a key largest than all keys in the DB.
bool OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key);
// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key);
int NumFiles(int level) const { return files_[level].size(); }
// Return a human readable string that describes this version's contents.
std::string DebugString() const;
private:
friend class Compaction;
friend class VersionSet;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_;
int compaction_level_;
explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
}
~Version();
// No copying allowed
Version(const Version&);
void operator=(const Version&);
};
class VersionSet {
public:
VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator*);
~VersionSet();
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);
// Recover the last saved descriptor from persistent storage.
Status Recover();
// Return the current version.
Version* current() const { return current_; }
// Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; }
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }
// Arrange to reuse "file_number" unless a newer file number has
// already been allocated.
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
void ReuseFileNumber(uint64_t file_number) {
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
}
// Return the number of Table files at the specified level.
int NumLevelFiles(int level) const;
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
// Return the last sequence number.
uint64_t LastSequence() const { return last_sequence_; }
// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}
// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);
// Return the current log file number.
uint64_t LogNumber() const { return log_number_; }
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }
// Pick level and inputs for a new compaction.
// Returns NULL if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns NULL if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}
// Add all files listed in any live version to *live.
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);
// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
private:
class Builder;
friend class Compaction;
friend class Version;
void Finalize(Version* v);
void GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest);
void GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest);
void SetupOtherInputs(Compaction* c);
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
void AppendVersion(Version* v);
+ bool ManifestContains(const std::string& record) const;
+
Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
// No copying allowed
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
};
// A Compaction encapsulates information about a compaction.
class Compaction {
public:
~Compaction();
// Return the level that is being compacted. Inputs from "level"
// and "level+1" will be merged to produce a set of "level+1" files.
int level() const { return level_; }
// Return the object that holds the edits to the descriptor done
// by this compaction.
VersionEdit* edit() { return &edit_; }
// "which" must be either 0 or 1
int num_input_files(int which) const { return inputs_[which].size(); }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
// Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
// Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);
// Returns true if the information we have available guarantees that
// the compaction is producing data in "level+1" for which no data exists
// in levels greater than "level+1".
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
// Release the input version for the compaction, once the compaction
// is successful.
void ReleaseInputs();
private:
friend class Version;
friend class VersionSet;
explicit Compaction(int level);
int level_;
uint64_t max_output_file_size_;
Version* input_version_;
VersionEdit edit_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
int64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
// State for implementing IsBaseLevelForKey
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_VERSION_SET_H_
diff --git a/src/leveldb/include/leveldb/db.h b/src/leveldb/include/leveldb/db.h
index f1e70a01e..29d367447 100644
--- a/src/leveldb/include/leveldb/db.h
+++ b/src/leveldb/include/leveldb/db.h
@@ -1,161 +1,161 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_DB_H_
#define STORAGE_LEVELDB_INCLUDE_DB_H_
#include <stdint.h>
#include <stdio.h>
#include "leveldb/iterator.h"
#include "leveldb/options.h"
namespace leveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
-static const int kMinorVersion = 8;
+static const int kMinorVersion = 9;
struct Options;
struct ReadOptions;
struct WriteOptions;
class WriteBatch;
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
class Snapshot {
protected:
virtual ~Snapshot();
};
// A range of keys
struct Range {
Slice start; // Included in the range
Slice limit; // Not included in the range
Range() { }
Range(const Slice& s, const Slice& l) : start(s), limit(l) { }
};
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
public:
// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores NULL in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
DB() { }
virtual ~DB();
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
//
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
// snapshot is no longer needed.
virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not
// use "snapshot" after this call.
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
// DB implementations can export properties about their state
// via this method. If "property" is a valid property understood by this
// DB implementation, fills "*value" with its current value and returns
// true. Otherwise returns false.
//
//
// Valid property names include:
//
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
// where <N> is an ASCII representation of a level number (e.g. "0").
// "leveldb.stats" - returns a multi-line string that describes statistics
// about the internal operation of the DB.
// "leveldb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents.
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate
// file system space used by keys in "[range[i].start .. range[i].limit)".
//
// Note that the returned sizes measure file system space usage, so
// if the user data compresses by a factor of ten, the returned
// sizes will be one-tenth the size of the corresponding user data size.
//
// The results may not include the sizes of recently written data.
virtual void GetApproximateSizes(const Range* range, int n,
uint64_t* sizes) = 0;
// Compact the underlying storage for the key range [*begin,*end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// begin==NULL is treated as a key before all keys in the database.
// end==NULL is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(NULL, NULL);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
private:
// No copying allowed
DB(const DB&);
void operator=(const DB&);
};
// Destroy the contents of the specified database.
// Be very careful using this method.
Status DestroyDB(const std::string& name, const Options& options);
// If a DB cannot be opened, you may attempt to call this method to
// resurrect as much of the contents of the database as possible.
// Some data may be lost, so be careful when calling this function
// on a database that contains important information.
Status RepairDB(const std::string& dbname, const Options& options);
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_DB_H_
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Mar 2, 10:16 (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5187315
Default Alt Text
(134 KB)
Attached To
rABC Bitcoin ABC
Event Timeline
Log In to Comment