Page MenuHomePhabricator

No OneTemporary

diff --git a/Makefile b/Makefile
index 344ff2972a..f8903b69e4 100644
--- a/Makefile
+++ b/Makefile
@@ -1,214 +1,215 @@
# Copyright (c) 2011 The LevelDB Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.
#-----------------------------------------------
# Uncomment exactly one of the lines labelled (A), (B), and (C) below
# to switch between compilation modes.
OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode)
# OPT ?= -g2 # (B) Debug mode, w/ full line-level debugging symbols
# OPT ?= -O2 -g2 -DNDEBUG # (C) Profiling mode: opt, but w/debugging symbols
#-----------------------------------------------
# detect what platform we're building on
$(shell CC="$(CC)" CXX="$(CXX)" TARGET_OS="$(TARGET_OS)" \
./build_detect_platform build_config.mk ./)
# this file is generated by the previous line to set build flags and sources
include build_config.mk
CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)
LDFLAGS += $(PLATFORM_LDFLAGS)
LIBS += $(PLATFORM_LIBS)
LIBOBJECTS = $(SOURCES:.cc=.o)
MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o)
TESTUTIL = ./util/testutil.o
TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TESTS = \
arena_test \
autocompact_test \
bloom_test \
c_test \
cache_test \
coding_test \
corruption_test \
crc32c_test \
db_test \
dbformat_test \
env_test \
filename_test \
filter_block_test \
issue178_test \
issue200_test \
log_test \
memenv_test \
skiplist_test \
table_test \
version_edit_test \
version_set_test \
write_batch_test
PROGRAMS = db_bench leveldbutil $(TESTS)
BENCHMARKS = db_bench_sqlite3 db_bench_tree_db
LIBRARY = libleveldb.a
MEMENVLIBRARY = libmemenv.a
default: all
# Should we build shared libraries?
ifneq ($(PLATFORM_SHARED_EXT),)
ifneq ($(PLATFORM_SHARED_VERSIONED),true)
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1)
SHARED3 = $(SHARED1)
SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = 1
-SHARED_MINOR = 15
+SHARED_MINOR = 17
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
SHARED = $(SHARED1) $(SHARED2) $(SHARED3)
$(SHARED1): $(SHARED3)
ln -fs $(SHARED3) $(SHARED1)
$(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
endif
$(SHARED3):
$(CXX) $(LDFLAGS) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) -o $(SHARED3) $(LIBS)
endif # PLATFORM_SHARED_EXT
all: $(SHARED) $(LIBRARY)
check: all $(PROGRAMS) $(TESTS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean:
-rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk
-rm -rf ios-x86/* ios-arm/*
$(LIBRARY): $(LIBOBJECTS)
rm -f $@
$(AR) -rs $@ $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ $(LIBS)
db_bench_sqlite3: doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_sqlite3.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lsqlite3 $(LIBS)
db_bench_tree_db: doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) $(LDFLAGS) doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL) -o $@ -lkyotocabinet $(LIBS)
leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
$(CXX) $(LDFLAGS) db/leveldb_main.o $(LIBOBJECTS) -o $@ $(LIBS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
c_test: db/c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
corruption_test: db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/corruption_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
crc32c_test: util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
db_test: db/db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
env_test: util/env_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
issue178_test: issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) issues/issue178_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@
$(AR) -rs $@ $(MEMENVOBJECTS)
memenv_test : helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS)
$(CXX) $(LDFLAGS) helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHARNESS) -o $@ $(LIBS)
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
+IOSARCH=-arch armv6 -arch armv7 -arch armv7s -arch arm64
.cc.o:
mkdir -p ios-x86/$(dir $@)
- $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
+ $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
- xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
+ xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o:
mkdir -p ios-x86/$(dir $@)
- $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
+ $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
- xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
+ xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
.cc.o:
$(CXX) $(CXXFLAGS) -c $< -o $@
.c.o:
$(CC) $(CFLAGS) -c $< -o $@
endif
diff --git a/db/filename.cc b/db/filename.cc
index 27d750697b..da32946d99 100644
--- a/db/filename.cc
+++ b/db/filename.cc
@@ -1,149 +1,144 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <ctype.h>
#include <stdio.h>
#include "db/filename.h"
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "util/logging.h"
namespace leveldb {
// A utility routine: write "data" to the named file and Sync() it.
extern Status WriteStringToFileSync(Env* env, const Slice& data,
const std::string& fname);
static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) {
char buf[100];
snprintf(buf, sizeof(buf), "/%06llu.%s",
static_cast<unsigned long long>(number),
suffix);
return name + buf;
}
std::string LogFileName(const std::string& name, uint64_t number) {
assert(number > 0);
return MakeFileName(name, number, "log");
}
-// TableFileName returns the filenames we usually write to, while
-// SSTTableFileName returns the alternative filenames we also try to read from
-// for backward compatibility. For now, swap them around.
-// TODO: when compatibility is no longer necessary, swap them back
-// (TableFileName to use "ldb" and SSTTableFileName to use "sst").
std::string TableFileName(const std::string& name, uint64_t number) {
assert(number > 0);
- return MakeFileName(name, number, "sst");
+ return MakeFileName(name, number, "ldb");
}
std::string SSTTableFileName(const std::string& name, uint64_t number) {
assert(number > 0);
- return MakeFileName(name, number, "ldb");
+ return MakeFileName(name, number, "sst");
}
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
char buf[100];
snprintf(buf, sizeof(buf), "/MANIFEST-%06llu",
static_cast<unsigned long long>(number));
return dbname + buf;
}
std::string CurrentFileName(const std::string& dbname) {
return dbname + "/CURRENT";
}
std::string LockFileName(const std::string& dbname) {
return dbname + "/LOCK";
}
std::string TempFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "dbtmp");
}
std::string InfoLogFileName(const std::string& dbname) {
return dbname + "/LOG";
}
// Return the name of the old info log file for "dbname".
std::string OldInfoLogFileName(const std::string& dbname) {
return dbname + "/LOG.old";
}
// Owned filenames have the form:
// dbname/CURRENT
// dbname/LOCK
// dbname/LOG
// dbname/LOG.old
// dbname/MANIFEST-[0-9]+
// dbname/[0-9]+.(log|sst|ldb)
bool ParseFileName(const std::string& fname,
uint64_t* number,
FileType* type) {
Slice rest(fname);
if (rest == "CURRENT") {
*number = 0;
*type = kCurrentFile;
} else if (rest == "LOCK") {
*number = 0;
*type = kDBLockFile;
} else if (rest == "LOG" || rest == "LOG.old") {
*number = 0;
*type = kInfoLogFile;
} else if (rest.starts_with("MANIFEST-")) {
rest.remove_prefix(strlen("MANIFEST-"));
uint64_t num;
if (!ConsumeDecimalNumber(&rest, &num)) {
return false;
}
if (!rest.empty()) {
return false;
}
*type = kDescriptorFile;
*number = num;
} else {
// Avoid strtoull() to keep filename format independent of the
// current locale
uint64_t num;
if (!ConsumeDecimalNumber(&rest, &num)) {
return false;
}
Slice suffix = rest;
if (suffix == Slice(".log")) {
*type = kLogFile;
} else if (suffix == Slice(".sst") || suffix == Slice(".ldb")) {
*type = kTableFile;
} else if (suffix == Slice(".dbtmp")) {
*type = kTempFile;
} else {
return false;
}
*number = num;
}
return true;
}
Status SetCurrentFile(Env* env, const std::string& dbname,
uint64_t descriptor_number) {
// Remove leading "dbname/" and add newline to manifest file name
std::string manifest = DescriptorFileName(dbname, descriptor_number);
Slice contents = manifest;
assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp);
if (s.ok()) {
s = env->RenameFile(tmp, CurrentFileName(dbname));
}
if (!s.ok()) {
env->DeleteFile(tmp);
}
return s;
}
} // namespace leveldb
diff --git a/db/log_reader.cc b/db/log_reader.cc
index b35f115aad..4919216d04 100644
--- a/db/log_reader.cc
+++ b/db/log_reader.cc
@@ -1,259 +1,266 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/log_reader.h"
#include <stdio.h>
#include "leveldb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace leveldb {
namespace log {
Reader::Reporter::~Reporter() {
}
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset)
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
}
Reader::~Reader() {
delete[] backing_store_;
}
bool Reader::SkipToInitialBlock() {
size_t offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;
// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
offset_in_block = 0;
block_start_location += kBlockSize;
}
end_of_buffer_offset_ = block_start_location;
// Skip to start of first block that can contain the initial record
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(block_start_location, skip_status);
return false;
}
}
return true;
}
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment);
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
- ReportCorruption(scratch->size(), "partial record without end(3)");
+ // This can be caused by the writer dying immediately after
+ // writing a physical record but before completing the next; don't
+ // treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
void Reader::ReportDrop(size_t bytes, const Status& reason) {
if (reporter_ != NULL &&
end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
reporter_->Corruption(bytes, reason);
}
}
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
- } else if (buffer_.size() == 0) {
- // End of file
- return kEof;
} else {
- size_t drop_size = buffer_.size();
+ // Note that if buffer_ is non-empty, we have a truncated header at the
+ // end of the file, which can be caused by the writer crashing in the
+ // middle of writing the header. Instead of considering this an error,
+ // just report EOF.
buffer_.clear();
- ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
// Parse the header
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
- ReportCorruption(drop_size, "bad record length");
- return kBadRecord;
+ if (!eof_) {
+ ReportCorruption(drop_size, "bad record length");
+ return kBadRecord;
+ }
+ // If the end of the file has been reached without reading |length| bytes
+ // of payload, assume the writer died in the middle of writing the record.
+ // Don't report a corruption.
+ return kEof;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// Check crc
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
}
} // namespace log
} // namespace leveldb
diff --git a/db/log_test.cc b/db/log_test.cc
index 4c5cf87573..91d3caafc3 100644
--- a/db/log_test.cc
+++ b/db/log_test.cc
@@ -1,500 +1,530 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "leveldb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "util/testharness.h"
namespace leveldb {
namespace log {
// Construct a string of the specified length made out of the supplied
// partial string.
static std::string BigString(const std::string& partial_string, size_t n) {
std::string result;
while (result.size() < n) {
result.append(partial_string);
}
result.resize(n);
return result;
}
// Construct a string from a number
static std::string NumberString(int n) {
char buf[50];
snprintf(buf, sizeof(buf), "%d.", n);
return std::string(buf);
}
// Return a skewed potentially long string
static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17));
}
class LogTest {
private:
class StringDest : public WritableFile {
public:
std::string contents_;
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& slice) {
contents_.append(slice.data(), slice.size());
return Status::OK();
}
};
class StringSource : public SequentialFile {
public:
Slice contents_;
bool force_error_;
bool returned_partial_;
StringSource() : force_error_(false), returned_partial_(false) { }
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
if (force_error_) {
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
}
if (contents_.size() < n) {
n = contents_.size();
returned_partial_ = true;
}
*result = Slice(contents_.data(), n);
contents_.remove_prefix(n);
return Status::OK();
}
virtual Status Skip(uint64_t n) {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
}
contents_.remove_prefix(n);
return Status::OK();
}
};
class ReportCollector : public Reader::Reporter {
public:
size_t dropped_bytes_;
std::string message_;
ReportCollector() : dropped_bytes_(0) { }
virtual void Corruption(size_t bytes, const Status& status) {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
};
StringDest dest_;
StringSource source_;
ReportCollector report_;
bool reading_;
Writer writer_;
Reader reader_;
// Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[];
static uint64_t initial_offset_last_record_offsets_[];
public:
LogTest() : reading_(false),
writer_(&dest_),
reader_(&source_, &report_, true/*checksum*/,
0/*initial_offset*/) {
}
void Write(const std::string& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read";
writer_.AddRecord(Slice(msg));
}
size_t WrittenBytes() const {
return dest_.contents_.size();
}
std::string Read() {
if (!reading_) {
reading_ = true;
source_.contents_ = Slice(dest_.contents_);
}
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
return record.ToString();
} else {
return "EOF";
}
}
void IncrementByte(int offset, int delta) {
dest_.contents_[offset] += delta;
}
void SetByte(int offset, char new_byte) {
dest_.contents_[offset] = new_byte;
}
void ShrinkSize(int bytes) {
dest_.contents_.resize(dest_.contents_.size() - bytes);
}
void FixChecksum(int header_offset, int len) {
// Compute crc of type/len/data
uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
crc = crc32c::Mask(crc);
EncodeFixed32(&dest_.contents_[header_offset], crc);
}
void ForceError() {
source_.force_error_ = true;
}
size_t DroppedBytes() const {
return report_.dropped_bytes_;
}
std::string ReportMessage() const {
return report_.message_;
}
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
return report_.message_;
} else {
return "OK";
}
}
void WriteInitialOffsetLog() {
for (int i = 0; i < 4; i++) {
std::string record(initial_offset_record_sizes_[i],
static_cast<char>('a' + i));
Write(record);
}
}
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
source_.contents_ = Slice(dest_.contents_);
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
WrittenBytes() + offset_past_end);
Slice record;
std::string scratch;
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
delete offset_reader;
}
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
reading_ = true;
source_.contents_ = Slice(dest_.contents_);
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
initial_offset);
Slice record;
std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
record.size());
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
offset_reader->LastRecordOffset());
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
delete offset_reader;
}
};
size_t LogTest::initial_offset_record_sizes_[] =
{10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
1};
uint64_t LogTest::initial_offset_last_record_offsets_[] =
{0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
TEST(LogTest, Empty) {
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, ReadWrite) {
Write("foo");
Write("bar");
Write("");
Write("xxxx");
ASSERT_EQ("foo", Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("", Read());
ASSERT_EQ("xxxx", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST(LogTest, ManyBlocks) {
for (int i = 0; i < 100000; i++) {
Write(NumberString(i));
}
for (int i = 0; i < 100000; i++) {
ASSERT_EQ(NumberString(i), Read());
}
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, Fragmentation) {
Write("small");
Write(BigString("medium", 50000));
Write(BigString("large", 100000));
ASSERT_EQ("small", Read());
ASSERT_EQ(BigString("medium", 50000), Read());
ASSERT_EQ(BigString("large", 100000), Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("", Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, ShortTrailer) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("", Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, AlignedEof) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, RandomRead) {
const int N = 500;
Random write_rnd(301);
for (int i = 0; i < N; i++) {
Write(RandomSkewedString(i, &write_rnd));
}
Random read_rnd(301);
for (int i = 0; i < N; i++) {
ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
}
ASSERT_EQ("EOF", Read());
}
// Tests of all the error paths in log_reader.cc follow:
TEST(LogTest, ReadError) {
Write("foo");
ForceError();
ASSERT_EQ("EOF", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}
TEST(LogTest, BadRecordType) {
Write("foo");
// Type is stored in header[6]
IncrementByte(6, 100);
FixChecksum(0, 3);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3, DroppedBytes());
ASSERT_EQ("OK", MatchError("unknown record type"));
}
-TEST(LogTest, TruncatedTrailingRecord) {
+TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read());
- ASSERT_EQ(kHeaderSize - 1, DroppedBytes());
- ASSERT_EQ("OK", MatchError("truncated record at end of file"));
+ // Truncated last record is ignored, not treated as an error.
+ ASSERT_EQ(0, DroppedBytes());
+ ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, BadLength) {
+ const int kPayloadSize = kBlockSize - kHeaderSize;
+ Write(BigString("bar", kPayloadSize));
+ Write("foo");
+ // Least significant size byte is stored in header[4].
+ IncrementByte(4, 1);
+ ASSERT_EQ("foo", Read());
+ ASSERT_EQ(kBlockSize, DroppedBytes());
+ ASSERT_EQ("OK", MatchError("bad record length"));
+}
+
+TEST(LogTest, BadLengthAtEndIsIgnored) {
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
- ASSERT_EQ(kHeaderSize + 2, DroppedBytes());
- ASSERT_EQ("OK", MatchError("bad record length"));
+ ASSERT_EQ(0, DroppedBytes());
+ ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, ChecksumMismatch) {
Write("foo");
IncrementByte(0, 10);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(10, DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
}
TEST(LogTest, UnexpectedMiddleType) {
Write("foo");
SetByte(6, kMiddleType);
FixChecksum(0, 3);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
}
TEST(LogTest, UnexpectedLastType) {
Write("foo");
SetByte(6, kLastType);
FixChecksum(0, 3);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
}
TEST(LogTest, UnexpectedFullType) {
Write("foo");
Write("bar");
SetByte(6, kFirstType);
FixChecksum(0, 3);
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3, DroppedBytes());
ASSERT_EQ("OK", MatchError("partial record without end"));
}
TEST(LogTest, UnexpectedFirstType) {
Write("foo");
Write(BigString("bar", 100000));
SetByte(6, kFirstType);
FixChecksum(0, 3);
ASSERT_EQ(BigString("bar", 100000), Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3, DroppedBytes());
ASSERT_EQ("OK", MatchError("partial record without end"));
}
+TEST(LogTest, MissingLastIsIgnored) {
+ Write(BigString("bar", kBlockSize));
+ // Remove the LAST block, including header.
+ ShrinkSize(14);
+ ASSERT_EQ("EOF", Read());
+ ASSERT_EQ("", ReportMessage());
+ ASSERT_EQ(0, DroppedBytes());
+}
+
+TEST(LogTest, PartialLastIsIgnored) {
+ Write(BigString("bar", kBlockSize));
+ // Cause a bad record length in the LAST block.
+ ShrinkSize(1);
+ ASSERT_EQ("EOF", Read());
+ ASSERT_EQ("", ReportMessage());
+ ASSERT_EQ(0, DroppedBytes());
+}
+
TEST(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2)
// where the middle two fragments disappear. We do not want
// first(R1),last(R2) to get joined and returned as a valid record.
// Write records that span two blocks
Write(BigString("foo", kBlockSize));
Write(BigString("bar", kBlockSize));
Write("correct");
// Wipe the middle block
for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
SetByte(offset, 'x');
}
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
const int dropped = DroppedBytes();
ASSERT_LE(dropped, 2*kBlockSize + 100);
ASSERT_GE(dropped, 2*kBlockSize);
}
TEST(LogTest, ReadStart) {
CheckInitialOffsetRecord(0, 0);
}
TEST(LogTest, ReadSecondOneOff) {
CheckInitialOffsetRecord(1, 1);
}
TEST(LogTest, ReadSecondTenThousand) {
CheckInitialOffsetRecord(10000, 1);
}
TEST(LogTest, ReadSecondStart) {
CheckInitialOffsetRecord(10007, 1);
}
TEST(LogTest, ReadThirdOneOff) {
CheckInitialOffsetRecord(10008, 2);
}
TEST(LogTest, ReadThirdStart) {
CheckInitialOffsetRecord(20014, 2);
}
TEST(LogTest, ReadFourthOneOff) {
CheckInitialOffsetRecord(20015, 3);
}
TEST(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
}
TEST(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
}
TEST(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
}
TEST(LogTest, ReadFourthStart) {
CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
3);
}
TEST(LogTest, ReadEnd) {
CheckOffsetPastEndReturnsNoRecords(0);
}
TEST(LogTest, ReadPastEnd) {
CheckOffsetPastEndReturnsNoRecords(5);
}
} // namespace log
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
diff --git a/db/repair.cc b/db/repair.cc
index 96c9b37af1..7727fafc58 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -1,462 +1,461 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// We recover the contents of the descriptor from the other files we find.
// (1) Any log files are first converted to tables
// (2) We scan every table to compute
// (a) smallest/largest for the table
// (b) largest sequence number in the table
// (3) We generate descriptor contents:
// - log number is set to zero
// - next-file-number is set to 1 + largest file number we found
// - last-sequence-number is set to largest sequence# found across
// all tables (see 2c)
// - compaction pointers are cleared
// - every table file is added at level 0
//
// Possible optimization 1:
// (a) Compute total size and use to pick appropriate max-level M
// (b) Sort tables by largest sequence# in the table
// (c) For each table: if it overlaps earlier table, place in level-0,
// else place in level-M.
// Possible optimization 2:
// Store per-table metadata (smallest, largest, largest-seq#, ...)
// in the table's meta section to speed up ScanTable.
#include "db/builder.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "leveldb/comparator.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
namespace leveldb {
namespace {
class Repairer {
public:
Repairer(const std::string& dbname, const Options& options)
: dbname_(dbname),
env_(options.env),
icmp_(options.comparator),
ipolicy_(options.filter_policy),
options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, 10);
}
~Repairer() {
delete table_cache_;
if (owns_info_log_) {
delete options_.info_log;
}
if (owns_cache_) {
delete options_.block_cache;
}
}
Status Run() {
Status status = FindFiles();
if (status.ok()) {
ConvertLogFilesToTables();
ExtractMetaData();
status = WriteDescriptor();
}
if (status.ok()) {
unsigned long long bytes = 0;
for (size_t i = 0; i < tables_.size(); i++) {
bytes += tables_[i].meta.file_size;
}
Log(options_.info_log,
"**** Repaired leveldb %s; "
"recovered %d files; %llu bytes. "
"Some data may have been lost. "
"****",
dbname_.c_str(),
static_cast<int>(tables_.size()),
bytes);
}
return status;
}
private:
struct TableInfo {
FileMetaData meta;
SequenceNumber max_sequence;
};
std::string const dbname_;
Env* const env_;
InternalKeyComparator const icmp_;
InternalFilterPolicy const ipolicy_;
Options const options_;
bool owns_info_log_;
bool owns_cache_;
TableCache* table_cache_;
VersionEdit edit_;
std::vector<std::string> manifests_;
std::vector<uint64_t> table_numbers_;
std::vector<uint64_t> logs_;
std::vector<TableInfo> tables_;
uint64_t next_file_number_;
Status FindFiles() {
std::vector<std::string> filenames;
Status status = env_->GetChildren(dbname_, &filenames);
if (!status.ok()) {
return status;
}
if (filenames.empty()) {
return Status::IOError(dbname_, "repair found no files");
}
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
if (type == kDescriptorFile) {
manifests_.push_back(filenames[i]);
} else {
if (number + 1 > next_file_number_) {
next_file_number_ = number + 1;
}
if (type == kLogFile) {
logs_.push_back(number);
} else if (type == kTableFile) {
table_numbers_.push_back(number);
} else {
// Ignore other files
}
}
}
}
return status;
}
void ConvertLogFilesToTables() {
for (size_t i = 0; i < logs_.size(); i++) {
std::string logname = LogFileName(dbname_, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
(unsigned long long) logs_[i],
status.ToString().c_str());
}
ArchiveFile(logname);
}
}
Status ConvertLogToTable(uint64_t log) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
uint64_t lognum;
virtual void Corruption(size_t bytes, const Status& s) {
// We print error messages for corruption, but continue repairing.
Log(info_log, "Log #%llu: dropping %d bytes; %s",
(unsigned long long) lognum,
static_cast<int>(bytes),
s.ToString().c_str());
}
};
// Open the log file
std::string logname = LogFileName(dbname_, log);
SequentialFile* lfile;
Status status = env_->NewSequentialFile(logname, &lfile);
if (!status.ok()) {
return status;
}
// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.lognum = log;
// We intentially make log::Reader do checksumming so that
// corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence
// numbers).
log::Reader reader(lfile, &reporter, false/*do not checksum*/,
0/*initial_offset*/);
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, mem);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
Log(options_.info_log, "Log #%llu: ignoring %s",
(unsigned long long) log,
status.ToString().c_str());
status = Status::OK(); // Keep going with rest of file
}
}
delete lfile;
// Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits.
FileMetaData meta;
meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
delete iter;
mem->Unref();
mem = NULL;
if (status.ok()) {
if (meta.file_size > 0) {
table_numbers_.push_back(meta.number);
}
}
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long) log,
counter,
(unsigned long long) meta.number,
status.ToString().c_str());
return status;
}
void ExtractMetaData() {
- std::vector<TableInfo> kept;
for (size_t i = 0; i < table_numbers_.size(); i++) {
ScanTable(table_numbers_[i]);
}
}
Iterator* NewTableIterator(const FileMetaData& meta) {
// Same as compaction iterators: if paranoid_checks are on, turn
// on checksum verification.
ReadOptions r;
r.verify_checksums = options_.paranoid_checks;
return table_cache_->NewIterator(r, meta.number, meta.file_size);
}
void ScanTable(uint64_t number) {
TableInfo t;
t.meta.number = number;
std::string fname = TableFileName(dbname_, number);
Status status = env_->GetFileSize(fname, &t.meta.file_size);
if (!status.ok()) {
// Try alternate file name.
fname = SSTTableFileName(dbname_, number);
Status s2 = env_->GetFileSize(fname, &t.meta.file_size);
if (s2.ok()) {
status = Status::OK();
}
}
if (!status.ok()) {
ArchiveFile(TableFileName(dbname_, number));
ArchiveFile(SSTTableFileName(dbname_, number));
Log(options_.info_log, "Table #%llu: dropped: %s",
(unsigned long long) t.meta.number,
status.ToString().c_str());
return;
}
// Extract metadata by scanning through table.
int counter = 0;
Iterator* iter = NewTableIterator(t.meta);
bool empty = true;
ParsedInternalKey parsed;
t.max_sequence = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long) t.meta.number,
EscapeString(key).c_str());
continue;
}
counter++;
if (empty) {
empty = false;
t.meta.smallest.DecodeFrom(key);
}
t.meta.largest.DecodeFrom(key);
if (parsed.sequence > t.max_sequence) {
t.max_sequence = parsed.sequence;
}
}
if (!iter->status().ok()) {
status = iter->status();
}
delete iter;
Log(options_.info_log, "Table #%llu: %d entries %s",
(unsigned long long) t.meta.number,
counter,
status.ToString().c_str());
if (status.ok()) {
tables_.push_back(t);
} else {
RepairTable(fname, t); // RepairTable archives input file.
}
}
void RepairTable(const std::string& src, TableInfo t) {
// We will copy src contents to a new table and then rename the
// new table over the source.
// Create builder.
std::string copy = TableFileName(dbname_, next_file_number_++);
WritableFile* file;
Status s = env_->NewWritableFile(copy, &file);
if (!s.ok()) {
return;
}
TableBuilder* builder = new TableBuilder(options_, file);
// Copy data.
Iterator* iter = NewTableIterator(t.meta);
int counter = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
builder->Add(iter->key(), iter->value());
counter++;
}
delete iter;
ArchiveFile(src);
if (counter == 0) {
builder->Abandon(); // Nothing to save
} else {
s = builder->Finish();
if (s.ok()) {
t.meta.file_size = builder->FileSize();
}
}
delete builder;
builder = NULL;
if (s.ok()) {
s = file->Close();
}
delete file;
file = NULL;
if (counter > 0 && s.ok()) {
std::string orig = TableFileName(dbname_, t.meta.number);
s = env_->RenameFile(copy, orig);
if (s.ok()) {
Log(options_.info_log, "Table #%llu: %d entries repaired",
(unsigned long long) t.meta.number, counter);
tables_.push_back(t);
}
}
if (!s.ok()) {
env_->DeleteFile(copy);
}
}
Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1);
WritableFile* file;
Status status = env_->NewWritableFile(tmp, &file);
if (!status.ok()) {
return status;
}
SequenceNumber max_sequence = 0;
for (size_t i = 0; i < tables_.size(); i++) {
if (max_sequence < tables_[i].max_sequence) {
max_sequence = tables_[i].max_sequence;
}
}
edit_.SetComparatorName(icmp_.user_comparator()->Name());
edit_.SetLogNumber(0);
edit_.SetNextFile(next_file_number_);
edit_.SetLastSequence(max_sequence);
for (size_t i = 0; i < tables_.size(); i++) {
// TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i];
edit_.AddFile(0, t.meta.number, t.meta.file_size,
t.meta.smallest, t.meta.largest);
}
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
{
log::Writer log(file);
std::string record;
edit_.EncodeTo(&record);
status = log.AddRecord(record);
}
if (status.ok()) {
status = file->Close();
}
delete file;
file = NULL;
if (!status.ok()) {
env_->DeleteFile(tmp);
} else {
// Discard older manifests
for (size_t i = 0; i < manifests_.size(); i++) {
ArchiveFile(dbname_ + "/" + manifests_[i]);
}
// Install new manifest
status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
if (status.ok()) {
status = SetCurrentFile(env_, dbname_, 1);
} else {
env_->DeleteFile(tmp);
}
}
return status;
}
void ArchiveFile(const std::string& fname) {
// Move into another directory. E.g., for
// dir/foo
// rename to
// dir/lost/foo
const char* slash = strrchr(fname.c_str(), '/');
std::string new_dir;
if (slash != NULL) {
new_dir.assign(fname.data(), slash - fname.data());
}
new_dir.append("/lost");
env_->CreateDir(new_dir); // Ignore error
std::string new_file = new_dir;
new_file.append("/");
new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
Status s = env_->RenameFile(fname, new_file);
Log(options_.info_log, "Archiving %s: %s\n",
fname.c_str(), s.ToString().c_str());
}
};
} // namespace
Status RepairDB(const std::string& dbname, const Options& options) {
Repairer repairer(dbname, options);
return repairer.Run();
}
} // namespace leveldb
diff --git a/db/version_set.cc b/db/version_set.cc
index 517edd3b18..aa83df55e4 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -1,1498 +1,1484 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "leveldb/env.h"
#include "leveldb/table_builder.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
namespace leveldb {
static const int kTargetFileSize = 2 * 1048576;
// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static double MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1
while (level > 1) {
result *= 10;
level--;
}
return result;
}
static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
}
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
sum += files[i]->file_size;
}
return sum;
}
-namespace {
-std::string IntSetToString(const std::set<uint64_t>& s) {
- std::string result = "{";
- for (std::set<uint64_t>::const_iterator it = s.begin();
- it != s.end();
- ++it) {
- result += (result.size() > 1) ? "," : "";
- result += NumberToString(*it);
- }
- result += "}";
- return result;
-}
-} // namespace
-
Version::~Version() {
assert(refs_ == 0);
// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
static bool AfterFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs before all keys and is therefore never after *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->largest.user_key()) > 0);
}
static bool BeforeFile(const Comparator* ucmp,
const Slice* user_key, const FileMetaData* f) {
// NULL user_key occurs after all keys and is therefore never before *f
return (user_key != NULL &&
ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
}
bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
// Need to check against all files
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
// No overlap
} else {
return true; // Overlap
}
}
return false;
}
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != NULL) {
// Find the earliest possible internal key for smallest_user_key
InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
index = FindFile(icmp, files, small.Encode());
}
if (index >= files.size()) {
// beginning of range is after all files, so no overlap.
return false;
}
return !BeforeFile(ucmp, largest_user_key, files[index]);
}
// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
}
virtual bool Valid() const {
return index_ < flist_->size();
}
virtual void Seek(const Slice& target) {
index_ = FindFile(icmp_, *flist_, target);
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
virtual void Next() {
assert(Valid());
index_++;
}
virtual void Prev() {
assert(Valid());
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const {
assert(Valid());
return (*flist_)[index_]->largest.Encode();
}
Slice value() const {
assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number);
EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
return Slice(value_buf_, sizeof(value_buf_));
}
virtual Status status() const { return Status::OK(); }
private:
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;
// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
};
static Iterator* GetFileIterator(void* arg,
const ReadOptions& options,
const Slice& file_value) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value"));
} else {
return cache->NewIterator(options,
DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
}
}
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
}
void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < files_[0].size(); i++) {
iters->push_back(
vset_->table_cache_->NewIterator(
options, files_[0][i]->number, files_[0][i]->file_size));
}
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
};
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
std::string* value;
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
}
}
}
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*)) {
// TODO(sanjay): Change Version::Get() to use this function.
const Comparator* ucmp = vset_->icmp_.user_comparator();
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
if (!s.ok()) {
return s;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
return false;
}
struct State {
GetStats stats; // Holds first matching file
int matches;
static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);
state->matches++;
if (state->matches == 1) {
// Remember first match.
state->stats.seek_file = f;
state->stats.seek_file_level = level;
}
// We can stop iterating once we have a second match.
return state->matches < 2;
}
};
State state;
state.matches = 0;
ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
// Must have at least two matches since we want to merge across
// files. But what if we have a single file that contains many
// overwrites and deletions? Should we have another mechanism for
// finding such files?
if (state.matches >= 2) {
// 1MB cost is about 1 seek (see comment in Builder::Apply).
return UpdateStats(state.stats);
}
return false;
}
void Version::Ref() {
++refs_;
}
void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
delete this;
}
}
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
}
int Version::PickLevelForMemTableOutput(
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
break;
}
}
level++;
}
}
return level;
}
// Store in "*inputs" all files in "level" that overlap [begin,end]
void Version::GetOverlappingInputs(
int level,
const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
assert(level >= 0);
assert(level < config::kNumLevels);
inputs->clear();
Slice user_begin, user_end;
if (begin != NULL) {
user_begin = begin->user_key();
}
if (end != NULL) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
for (size_t i = 0; i < files_[level].size(); ) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
const Slice file_limit = f->largest.user_key();
if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {
inputs->push_back(f);
if (level == 0) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}
}
std::string Version::DebugString() const {
std::string r;
for (int level = 0; level < config::kNumLevels; level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
// 20:43['e' .. 'g']
r.append("--- level ");
AppendNumberTo(&r, level);
r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
AppendNumberTo(&r, files[i]->number);
r.push_back(':');
AppendNumberTo(&r, files[i]->file_size);
r.append("[");
r.append(files[i]->smallest.DebugString());
r.append(" .. ");
r.append(files[i]->largest.DebugString());
r.append("]\n");
}
}
return r;
}
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base)
: vset_(vset),
base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin();
it != added->end(); ++it) {
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
iter != del.end();
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number);
}
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
#endif
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
};
VersionSet::VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
: env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
icmp_(*cmp),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
dummy_versions_(this),
current_(NULL) {
AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != NULL) {
current_->Unref();
}
current_ = v;
v->Ref();
// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
// Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++) {
if (!compact_pointer_[level].empty()) {
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
edit.SetCompactPointer(level, key);
}
}
// Save files
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
}
}
std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
}
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
snprintf(scratch->buffer, sizeof(scratch->buffer),
"files[ %d %d %d %d %d %d %d ]",
int(current_->files_[0].size()),
int(current_->files_[1].size()),
int(current_->files_[2].size()),
int(current_->files_[3].size()),
int(current_->files_[4].size()),
int(current_->files_[5].size()),
int(current_->files_[6].size()));
return scratch->buffer;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
// Entire file is before "ikey", so just add the file size
result += files[i]->file_size;
} else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
// Entire file is after "ikey", so ignore
if (level > 0) {
// Files other than level 0 are sorted by meta->smallest, so
// no further files in this level will contain data for
// "ikey".
break;
}
} else {
// "ikey" falls in the range for this table. Add the
// approximate offset of "ikey" within the table.
Table* tableptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
if (tableptr != NULL) {
result += tableptr->ApproximateOffsetOf(ikey.Encode());
}
delete iter;
}
}
}
return result;
}
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_;
v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return TotalFileSize(current_->files_[level]);
}
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
&overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_.Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_.Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
}
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, files[i]->number, files[i]->file_size);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
delete[] list;
return result;
}
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
}
c->input_version_ = current_;
c->input_version_->Ref();
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
SetupOtherInputs(c);
return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
smallest.DebugString().c_str(),
largest.DebugString().c_str());
}
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
Compaction* VersionSet::CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return NULL;
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(level);
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
inputs.resize(i + 1);
break;
}
}
}
Compaction* c = new Compaction(level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
SetupOtherInputs(c);
return c;
}
Compaction::Compaction(int level)
: level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
input_version_(NULL),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
if (input_version_ != NULL) {
input_version_->Unref();
}
}
bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->DeleteFile(level_ + which, inputs_[which][i]->number);
}
}
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so definitely not base level
return false;
}
break;
}
level_ptrs_[lvl]++;
}
}
return true;
}
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
void Compaction::ReleaseInputs() {
if (input_version_ != NULL) {
input_version_->Unref();
input_version_ = NULL;
}
}
} // namespace leveldb
diff --git a/include/leveldb/c.h b/include/leveldb/c.h
index 1fa58866c3..1048fe3b86 100644
--- a/include/leveldb/c.h
+++ b/include/leveldb/c.h
@@ -1,291 +1,290 @@
/* Copyright (c) 2011 The LevelDB Authors. All rights reserved.
Use of this source code is governed by a BSD-style license that can be
found in the LICENSE file. See the AUTHORS file for names of contributors.
C bindings for leveldb. May be useful as a stable ABI that can be
used by programs that keep leveldb in a shared library, or for
a JNI api.
Does not support:
. getters for the option types
. custom comparators that implement key shortening
- . capturing post-write-snapshot
. custom iter, db, env, cache implementations using just the C bindings
Some conventions:
(1) We expose just opaque struct pointers and functions to clients.
This allows us to change internal representations without having to
recompile clients.
(2) For simplicity, there is no equivalent to the Slice type. Instead,
the caller has to pass the pointer and length as separate
arguments.
(3) Errors are represented by a null-terminated c string. NULL
means no error. All operations that can raise an error are passed
a "char** errptr" as the last argument. One of the following must
be true on entry:
*errptr == NULL
*errptr points to a malloc()ed null-terminated error message
(On Windows, *errptr must have been malloc()-ed by this library.)
On success, a leveldb routine leaves *errptr unchanged.
On failure, leveldb frees the old value of *errptr and
set *errptr to a malloc()ed error message.
(4) Bools have the type unsigned char (0 == false; rest == true)
(5) All of the pointer arguments must be non-NULL.
*/
#ifndef STORAGE_LEVELDB_INCLUDE_C_H_
#define STORAGE_LEVELDB_INCLUDE_C_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdarg.h>
#include <stddef.h>
#include <stdint.h>
/* Exported types */
typedef struct leveldb_t leveldb_t;
typedef struct leveldb_cache_t leveldb_cache_t;
typedef struct leveldb_comparator_t leveldb_comparator_t;
typedef struct leveldb_env_t leveldb_env_t;
typedef struct leveldb_filelock_t leveldb_filelock_t;
typedef struct leveldb_filterpolicy_t leveldb_filterpolicy_t;
typedef struct leveldb_iterator_t leveldb_iterator_t;
typedef struct leveldb_logger_t leveldb_logger_t;
typedef struct leveldb_options_t leveldb_options_t;
typedef struct leveldb_randomfile_t leveldb_randomfile_t;
typedef struct leveldb_readoptions_t leveldb_readoptions_t;
typedef struct leveldb_seqfile_t leveldb_seqfile_t;
typedef struct leveldb_snapshot_t leveldb_snapshot_t;
typedef struct leveldb_writablefile_t leveldb_writablefile_t;
typedef struct leveldb_writebatch_t leveldb_writebatch_t;
typedef struct leveldb_writeoptions_t leveldb_writeoptions_t;
/* DB operations */
extern leveldb_t* leveldb_open(
const leveldb_options_t* options,
const char* name,
char** errptr);
extern void leveldb_close(leveldb_t* db);
extern void leveldb_put(
leveldb_t* db,
const leveldb_writeoptions_t* options,
const char* key, size_t keylen,
const char* val, size_t vallen,
char** errptr);
extern void leveldb_delete(
leveldb_t* db,
const leveldb_writeoptions_t* options,
const char* key, size_t keylen,
char** errptr);
extern void leveldb_write(
leveldb_t* db,
const leveldb_writeoptions_t* options,
leveldb_writebatch_t* batch,
char** errptr);
/* Returns NULL if not found. A malloc()ed array otherwise.
Stores the length of the array in *vallen. */
extern char* leveldb_get(
leveldb_t* db,
const leveldb_readoptions_t* options,
const char* key, size_t keylen,
size_t* vallen,
char** errptr);
extern leveldb_iterator_t* leveldb_create_iterator(
leveldb_t* db,
const leveldb_readoptions_t* options);
extern const leveldb_snapshot_t* leveldb_create_snapshot(
leveldb_t* db);
extern void leveldb_release_snapshot(
leveldb_t* db,
const leveldb_snapshot_t* snapshot);
/* Returns NULL if property name is unknown.
Else returns a pointer to a malloc()-ed null-terminated value. */
extern char* leveldb_property_value(
leveldb_t* db,
const char* propname);
extern void leveldb_approximate_sizes(
leveldb_t* db,
int num_ranges,
const char* const* range_start_key, const size_t* range_start_key_len,
const char* const* range_limit_key, const size_t* range_limit_key_len,
uint64_t* sizes);
extern void leveldb_compact_range(
leveldb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len);
/* Management operations */
extern void leveldb_destroy_db(
const leveldb_options_t* options,
const char* name,
char** errptr);
extern void leveldb_repair_db(
const leveldb_options_t* options,
const char* name,
char** errptr);
/* Iterator */
extern void leveldb_iter_destroy(leveldb_iterator_t*);
extern unsigned char leveldb_iter_valid(const leveldb_iterator_t*);
extern void leveldb_iter_seek_to_first(leveldb_iterator_t*);
extern void leveldb_iter_seek_to_last(leveldb_iterator_t*);
extern void leveldb_iter_seek(leveldb_iterator_t*, const char* k, size_t klen);
extern void leveldb_iter_next(leveldb_iterator_t*);
extern void leveldb_iter_prev(leveldb_iterator_t*);
extern const char* leveldb_iter_key(const leveldb_iterator_t*, size_t* klen);
extern const char* leveldb_iter_value(const leveldb_iterator_t*, size_t* vlen);
extern void leveldb_iter_get_error(const leveldb_iterator_t*, char** errptr);
/* Write batch */
extern leveldb_writebatch_t* leveldb_writebatch_create();
extern void leveldb_writebatch_destroy(leveldb_writebatch_t*);
extern void leveldb_writebatch_clear(leveldb_writebatch_t*);
extern void leveldb_writebatch_put(
leveldb_writebatch_t*,
const char* key, size_t klen,
const char* val, size_t vlen);
extern void leveldb_writebatch_delete(
leveldb_writebatch_t*,
const char* key, size_t klen);
extern void leveldb_writebatch_iterate(
leveldb_writebatch_t*,
void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen));
/* Options */
extern leveldb_options_t* leveldb_options_create();
extern void leveldb_options_destroy(leveldb_options_t*);
extern void leveldb_options_set_comparator(
leveldb_options_t*,
leveldb_comparator_t*);
extern void leveldb_options_set_filter_policy(
leveldb_options_t*,
leveldb_filterpolicy_t*);
extern void leveldb_options_set_create_if_missing(
leveldb_options_t*, unsigned char);
extern void leveldb_options_set_error_if_exists(
leveldb_options_t*, unsigned char);
extern void leveldb_options_set_paranoid_checks(
leveldb_options_t*, unsigned char);
extern void leveldb_options_set_env(leveldb_options_t*, leveldb_env_t*);
extern void leveldb_options_set_info_log(leveldb_options_t*, leveldb_logger_t*);
extern void leveldb_options_set_write_buffer_size(leveldb_options_t*, size_t);
extern void leveldb_options_set_max_open_files(leveldb_options_t*, int);
extern void leveldb_options_set_cache(leveldb_options_t*, leveldb_cache_t*);
extern void leveldb_options_set_block_size(leveldb_options_t*, size_t);
extern void leveldb_options_set_block_restart_interval(leveldb_options_t*, int);
enum {
leveldb_no_compression = 0,
leveldb_snappy_compression = 1
};
extern void leveldb_options_set_compression(leveldb_options_t*, int);
/* Comparator */
extern leveldb_comparator_t* leveldb_comparator_create(
void* state,
void (*destructor)(void*),
int (*compare)(
void*,
const char* a, size_t alen,
const char* b, size_t blen),
const char* (*name)(void*));
extern void leveldb_comparator_destroy(leveldb_comparator_t*);
/* Filter policy */
extern leveldb_filterpolicy_t* leveldb_filterpolicy_create(
void* state,
void (*destructor)(void*),
char* (*create_filter)(
void*,
const char* const* key_array, const size_t* key_length_array,
int num_keys,
size_t* filter_length),
unsigned char (*key_may_match)(
void*,
const char* key, size_t length,
const char* filter, size_t filter_length),
const char* (*name)(void*));
extern void leveldb_filterpolicy_destroy(leveldb_filterpolicy_t*);
extern leveldb_filterpolicy_t* leveldb_filterpolicy_create_bloom(
int bits_per_key);
/* Read options */
extern leveldb_readoptions_t* leveldb_readoptions_create();
extern void leveldb_readoptions_destroy(leveldb_readoptions_t*);
extern void leveldb_readoptions_set_verify_checksums(
leveldb_readoptions_t*,
unsigned char);
extern void leveldb_readoptions_set_fill_cache(
leveldb_readoptions_t*, unsigned char);
extern void leveldb_readoptions_set_snapshot(
leveldb_readoptions_t*,
const leveldb_snapshot_t*);
/* Write options */
extern leveldb_writeoptions_t* leveldb_writeoptions_create();
extern void leveldb_writeoptions_destroy(leveldb_writeoptions_t*);
extern void leveldb_writeoptions_set_sync(
leveldb_writeoptions_t*, unsigned char);
/* Cache */
extern leveldb_cache_t* leveldb_cache_create_lru(size_t capacity);
extern void leveldb_cache_destroy(leveldb_cache_t* cache);
/* Env */
extern leveldb_env_t* leveldb_create_default_env();
extern void leveldb_env_destroy(leveldb_env_t*);
/* Utility */
/* Calls free(ptr).
REQUIRES: ptr was malloc()-ed and returned by one of the routines
in this file. Note that in certain cases (typically on Windows), you
may need to call this routine instead of free(ptr) to dispose of
malloc()-ed memory returned by this library. */
extern void leveldb_free(void* ptr);
/* Return the major version number for this release. */
extern int leveldb_major_version();
/* Return the minor version number for this release. */
extern int leveldb_minor_version();
#ifdef __cplusplus
} /* end extern "C" */
#endif
#endif /* STORAGE_LEVELDB_INCLUDE_C_H_ */
diff --git a/include/leveldb/db.h b/include/leveldb/db.h
index 5ffb29d526..40851b2aa8 100644
--- a/include/leveldb/db.h
+++ b/include/leveldb/db.h
@@ -1,161 +1,161 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_DB_H_
#define STORAGE_LEVELDB_INCLUDE_DB_H_
#include <stdint.h>
#include <stdio.h>
#include "leveldb/iterator.h"
#include "leveldb/options.h"
namespace leveldb {
// Update Makefile if you change these
static const int kMajorVersion = 1;
-static const int kMinorVersion = 15;
+static const int kMinorVersion = 17;
struct Options;
struct ReadOptions;
struct WriteOptions;
class WriteBatch;
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
class Snapshot {
protected:
virtual ~Snapshot();
};
// A range of keys
struct Range {
Slice start; // Included in the range
Slice limit; // Not included in the range
Range() { }
Range(const Slice& s, const Slice& l) : start(s), limit(l) { }
};
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
public:
// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores NULL in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
DB() { }
virtual ~DB();
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
//
// Caller should delete the iterator when it is no longer needed.
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
// snapshot is no longer needed.
virtual const Snapshot* GetSnapshot() = 0;
// Release a previously acquired snapshot. The caller must not
// use "snapshot" after this call.
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
// DB implementations can export properties about their state
// via this method. If "property" is a valid property understood by this
// DB implementation, fills "*value" with its current value and returns
// true. Otherwise returns false.
//
//
// Valid property names include:
//
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
// where <N> is an ASCII representation of a level number (e.g. "0").
// "leveldb.stats" - returns a multi-line string that describes statistics
// about the internal operation of the DB.
// "leveldb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents.
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate
// file system space used by keys in "[range[i].start .. range[i].limit)".
//
// Note that the returned sizes measure file system space usage, so
// if the user data compresses by a factor of ten, the returned
// sizes will be one-tenth the size of the corresponding user data size.
//
// The results may not include the sizes of recently written data.
virtual void GetApproximateSizes(const Range* range, int n,
uint64_t* sizes) = 0;
// Compact the underlying storage for the key range [*begin,*end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// begin==NULL is treated as a key before all keys in the database.
// end==NULL is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(NULL, NULL);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
private:
// No copying allowed
DB(const DB&);
void operator=(const DB&);
};
// Destroy the contents of the specified database.
// Be very careful using this method.
Status DestroyDB(const std::string& name, const Options& options);
// If a DB cannot be opened, you may attempt to call this method to
// resurrect as much of the contents of the database as possible.
// Some data may be lost, so be careful when calling this function
// on a database that contains important information.
Status RepairDB(const std::string& dbname, const Options& options);
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_DB_H_
diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h
index 74ea8fa49a..bc367986f7 100644
--- a/include/leveldb/slice.h
+++ b/include/leveldb/slice.h
@@ -1,109 +1,109 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Slice is a simple structure containing a pointer into some external
// storage and a size. The user of a Slice must ensure that the slice
// is not used after the corresponding external storage has been
// deallocated.
//
// Multiple threads can invoke const methods on a Slice without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Slice must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_SLICE_H_
#define STORAGE_LEVELDB_INCLUDE_SLICE_H_
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include <string>
namespace leveldb {
class Slice {
public:
// Create an empty slice.
Slice() : data_(""), size_(0) { }
// Create a slice that refers to d[0,n-1].
Slice(const char* d, size_t n) : data_(d), size_(n) { }
// Create a slice that refers to the contents of "s"
Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) { }
// Return a pointer to the beginning of the referenced data
const char* data() const { return data_; }
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
// Return the ith byte in the referenced data.
// REQUIRES: n < size()
char operator[](size_t n) const {
assert(n < size());
return data_[n];
}
// Change this slice to refer to an empty array
void clear() { data_ = ""; size_ = 0; }
// Drop the first "n" bytes from this slice.
void remove_prefix(size_t n) {
assert(n <= size());
data_ += n;
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); }
// Three-way comparison. Returns value:
// < 0 iff "*this" < "b",
// == 0 iff "*this" == "b",
// > 0 iff "*this" > "b"
int compare(const Slice& b) const;
// Return true iff "x" is a prefix of "*this"
bool starts_with(const Slice& x) const {
return ((size_ >= x.size_) &&
(memcmp(data_, x.data_, x.size_) == 0));
}
private:
const char* data_;
size_t size_;
// Intentionally copyable
};
inline bool operator==(const Slice& x, const Slice& y) {
return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0));
}
inline bool operator!=(const Slice& x, const Slice& y) {
return !(x == y);
}
inline int Slice::compare(const Slice& b) const {
- const int min_len = (size_ < b.size_) ? size_ : b.size_;
+ const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
int r = memcmp(data_, b.data_, min_len);
if (r == 0) {
if (size_ < b.size_) r = -1;
else if (size_ > b.size_) r = +1;
}
return r;
}
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_SLICE_H_

File Metadata

Mime Type
text/x-diff
Expires
Sun, Apr 27, 10:21 (21 h, 39 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5573234
Default Alt Text
(119 KB)

Event Timeline