Changeset View
Changeset View
Standalone View
Standalone View
src/leveldb/util/env_posix.cc
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | ||||
// Use of this source code is governed by a BSD-style license that can be | // Use of this source code is governed by a BSD-style license that can be | ||||
// found in the LICENSE file. See the AUTHORS file for names of contributors. | // found in the LICENSE file. See the AUTHORS file for names of contributors. | ||||
#if !defined(LEVELDB_PLATFORM_WINDOWS) | #if !defined(LEVELDB_PLATFORM_WINDOWS) | ||||
#include <dirent.h> | #include <dirent.h> | ||||
#include <errno.h> | #include <errno.h> | ||||
#include <fcntl.h> | #include <fcntl.h> | ||||
#include <pthread.h> | #include <pthread.h> | ||||
#include <stdio.h> | #include <stdio.h> | ||||
#include <stdlib.h> | #include <stdlib.h> | ||||
#include <string.h> | #include <string.h> | ||||
#include <sys/mman.h> | #include <sys/mman.h> | ||||
#include <sys/resource.h> | |||||
#include <sys/stat.h> | #include <sys/stat.h> | ||||
#include <sys/time.h> | #include <sys/time.h> | ||||
#include <sys/types.h> | #include <sys/types.h> | ||||
#include <time.h> | #include <time.h> | ||||
#include <unistd.h> | #include <unistd.h> | ||||
#include <deque> | #include <deque> | ||||
#include <limits> | |||||
#include <set> | #include <set> | ||||
#include "leveldb/env.h" | #include "leveldb/env.h" | ||||
#include "leveldb/slice.h" | #include "leveldb/slice.h" | ||||
#include "port/port.h" | #include "port/port.h" | ||||
#include "util/logging.h" | #include "util/logging.h" | ||||
#include "util/mutexlock.h" | #include "util/mutexlock.h" | ||||
#include "util/posix_logger.h" | #include "util/posix_logger.h" | ||||
#include "util/env_posix_test_helper.h" | |||||
namespace leveldb { | namespace leveldb { | ||||
namespace { | namespace { | ||||
static int open_read_only_file_limit = -1; | |||||
static int mmap_limit = -1; | |||||
static Status IOError(const std::string& context, int err_number) { | static Status IOError(const std::string& context, int err_number) { | ||||
return Status::IOError(context, strerror(err_number)); | return Status::IOError(context, strerror(err_number)); | ||||
} | } | ||||
// Helper class to limit resource usage to avoid exhaustion. | |||||
// Currently used to limit read-only file descriptors and mmap file usage | |||||
// so that we do not end up running out of file descriptors, virtual memory, | |||||
// or running into kernel performance problems for very large databases. | |||||
class Limiter { | |||||
public: | |||||
// Limit maximum number of resources to |n|. | |||||
Limiter(intptr_t n) { | |||||
SetAllowed(n); | |||||
} | |||||
// If another resource is available, acquire it and return true. | |||||
// Else return false. | |||||
bool Acquire() { | |||||
if (GetAllowed() <= 0) { | |||||
return false; | |||||
} | |||||
MutexLock l(&mu_); | |||||
intptr_t x = GetAllowed(); | |||||
if (x <= 0) { | |||||
return false; | |||||
} else { | |||||
SetAllowed(x - 1); | |||||
return true; | |||||
} | |||||
} | |||||
// Release a resource acquired by a previous call to Acquire() that returned | |||||
// true. | |||||
void Release() { | |||||
MutexLock l(&mu_); | |||||
SetAllowed(GetAllowed() + 1); | |||||
} | |||||
private: | |||||
port::Mutex mu_; | |||||
port::AtomicPointer allowed_; | |||||
intptr_t GetAllowed() const { | |||||
return reinterpret_cast<intptr_t>(allowed_.Acquire_Load()); | |||||
} | |||||
// REQUIRES: mu_ must be held | |||||
void SetAllowed(intptr_t v) { | |||||
allowed_.Release_Store(reinterpret_cast<void*>(v)); | |||||
} | |||||
Limiter(const Limiter&); | |||||
void operator=(const Limiter&); | |||||
}; | |||||
class PosixSequentialFile: public SequentialFile { | class PosixSequentialFile: public SequentialFile { | ||||
private: | private: | ||||
std::string filename_; | std::string filename_; | ||||
FILE* file_; | FILE* file_; | ||||
public: | public: | ||||
PosixSequentialFile(const std::string& fname, FILE* f) | PosixSequentialFile(const std::string& fname, FILE* f) | ||||
: filename_(fname), file_(f) { } | : filename_(fname), file_(f) { } | ||||
Show All 21 Lines | virtual Status Skip(uint64_t n) { | ||||
return Status::OK(); | return Status::OK(); | ||||
} | } | ||||
}; | }; | ||||
// pread() based random-access | // pread() based random-access | ||||
class PosixRandomAccessFile: public RandomAccessFile { | class PosixRandomAccessFile: public RandomAccessFile { | ||||
private: | private: | ||||
std::string filename_; | std::string filename_; | ||||
bool temporary_fd_; // If true, fd_ is -1 and we open on every read. | |||||
int fd_; | int fd_; | ||||
Limiter* limiter_; | |||||
public: | public: | ||||
PosixRandomAccessFile(const std::string& fname, int fd) | PosixRandomAccessFile(const std::string& fname, int fd, Limiter* limiter) | ||||
: filename_(fname), fd_(fd) { } | : filename_(fname), fd_(fd), limiter_(limiter) { | ||||
virtual ~PosixRandomAccessFile() { close(fd_); } | temporary_fd_ = !limiter->Acquire(); | ||||
if (temporary_fd_) { | |||||
virtual Status Read(uint64_t offset, size_t n, Slice* result, | // Open file on every access. | ||||
char* scratch) const { | close(fd_); | ||||
Status s; | fd_ = -1; | ||||
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset)); | |||||
*result = Slice(scratch, (r < 0) ? 0 : r); | |||||
if (r < 0) { | |||||
// An error: return a non-ok status | |||||
s = IOError(filename_, errno); | |||||
} | } | ||||
return s; | |||||
} | } | ||||
}; | |||||
// Helper class to limit mmap file usage so that we do not end up | virtual ~PosixRandomAccessFile() { | ||||
// running out virtual memory or running into kernel performance | if (!temporary_fd_) { | ||||
// problems for very large databases. | close(fd_); | ||||
class MmapLimiter { | limiter_->Release(); | ||||
public: | |||||
// Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes. | |||||
MmapLimiter() { | |||||
SetAllowed(sizeof(void*) >= 8 ? 1000 : 0); | |||||
} | } | ||||
// If another mmap slot is available, acquire it and return true. | |||||
// Else return false. | |||||
bool Acquire() { | |||||
if (GetAllowed() <= 0) { | |||||
return false; | |||||
} | } | ||||
MutexLock l(&mu_); | |||||
intptr_t x = GetAllowed(); | virtual Status Read(uint64_t offset, size_t n, Slice* result, | ||||
if (x <= 0) { | char* scratch) const { | ||||
return false; | int fd = fd_; | ||||
} else { | if (temporary_fd_) { | ||||
SetAllowed(x - 1); | fd = open(filename_.c_str(), O_RDONLY); | ||||
return true; | if (fd < 0) { | ||||
return IOError(filename_, errno); | |||||
} | } | ||||
} | } | ||||
// Release a slot acquired by a previous call to Acquire() that returned true. | Status s; | ||||
void Release() { | ssize_t r = pread(fd, scratch, n, static_cast<off_t>(offset)); | ||||
MutexLock l(&mu_); | *result = Slice(scratch, (r < 0) ? 0 : r); | ||||
SetAllowed(GetAllowed() + 1); | if (r < 0) { | ||||
// An error: return a non-ok status | |||||
s = IOError(filename_, errno); | |||||
} | } | ||||
if (temporary_fd_) { | |||||
private: | // Close the temporary file descriptor opened earlier. | ||||
port::Mutex mu_; | close(fd); | ||||
port::AtomicPointer allowed_; | |||||
intptr_t GetAllowed() const { | |||||
return reinterpret_cast<intptr_t>(allowed_.Acquire_Load()); | |||||
} | } | ||||
return s; | |||||
// REQUIRES: mu_ must be held | |||||
void SetAllowed(intptr_t v) { | |||||
allowed_.Release_Store(reinterpret_cast<void*>(v)); | |||||
} | } | ||||
MmapLimiter(const MmapLimiter&); | |||||
void operator=(const MmapLimiter&); | |||||
}; | }; | ||||
// mmap() based random-access | // mmap() based random-access | ||||
class PosixMmapReadableFile: public RandomAccessFile { | class PosixMmapReadableFile: public RandomAccessFile { | ||||
private: | private: | ||||
std::string filename_; | std::string filename_; | ||||
void* mmapped_region_; | void* mmapped_region_; | ||||
size_t length_; | size_t length_; | ||||
MmapLimiter* limiter_; | Limiter* limiter_; | ||||
public: | public: | ||||
// base[0,length-1] contains the mmapped contents of the file. | // base[0,length-1] contains the mmapped contents of the file. | ||||
PosixMmapReadableFile(const std::string& fname, void* base, size_t length, | PosixMmapReadableFile(const std::string& fname, void* base, size_t length, | ||||
MmapLimiter* limiter) | Limiter* limiter) | ||||
: filename_(fname), mmapped_region_(base), length_(length), | : filename_(fname), mmapped_region_(base), length_(length), | ||||
limiter_(limiter) { | limiter_(limiter) { | ||||
} | } | ||||
virtual ~PosixMmapReadableFile() { | virtual ~PosixMmapReadableFile() { | ||||
munmap(mmapped_region_, length_); | munmap(mmapped_region_, length_); | ||||
limiter_->Release(); | limiter_->Release(); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | if (sep == NULL) { | ||||
basename = sep + 1; | basename = sep + 1; | ||||
} | } | ||||
Status s; | Status s; | ||||
if (basename.starts_with("MANIFEST")) { | if (basename.starts_with("MANIFEST")) { | ||||
int fd = open(dir.c_str(), O_RDONLY); | int fd = open(dir.c_str(), O_RDONLY); | ||||
if (fd < 0) { | if (fd < 0) { | ||||
s = IOError(dir, errno); | s = IOError(dir, errno); | ||||
} else { | } else { | ||||
if (fsync(fd) < 0) { | if (fsync(fd) < 0 && errno != EINVAL) { | ||||
s = IOError(dir, errno); | s = IOError(dir, errno); | ||||
} | } | ||||
close(fd); | close(fd); | ||||
} | } | ||||
} | } | ||||
return s; | return s; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | if (fd < 0) { | ||||
s = IOError(fname, errno); | s = IOError(fname, errno); | ||||
} | } | ||||
} | } | ||||
close(fd); | close(fd); | ||||
if (!s.ok()) { | if (!s.ok()) { | ||||
mmap_limit_.Release(); | mmap_limit_.Release(); | ||||
} | } | ||||
} else { | } else { | ||||
*result = new PosixRandomAccessFile(fname, fd); | *result = new PosixRandomAccessFile(fname, fd, &fd_limit_); | ||||
} | } | ||||
return s; | return s; | ||||
} | } | ||||
virtual Status NewWritableFile(const std::string& fname, | virtual Status NewWritableFile(const std::string& fname, | ||||
WritableFile** result) { | WritableFile** result) { | ||||
Status s; | Status s; | ||||
FILE* f = fopen(fname.c_str(), "w"); | FILE* f = fopen(fname.c_str(), "w"); | ||||
▲ Show 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | private: | ||||
bool started_bgthread_; | bool started_bgthread_; | ||||
// Entry per Schedule() call | // Entry per Schedule() call | ||||
struct BGItem { void* arg; void (*function)(void*); }; | struct BGItem { void* arg; void (*function)(void*); }; | ||||
typedef std::deque<BGItem> BGQueue; | typedef std::deque<BGItem> BGQueue; | ||||
BGQueue queue_; | BGQueue queue_; | ||||
PosixLockTable locks_; | PosixLockTable locks_; | ||||
MmapLimiter mmap_limit_; | Limiter mmap_limit_; | ||||
Limiter fd_limit_; | |||||
}; | }; | ||||
PosixEnv::PosixEnv() : started_bgthread_(false) { | // Return the maximum number of concurrent mmaps. | ||||
static int MaxMmaps() { | |||||
if (mmap_limit >= 0) { | |||||
return mmap_limit; | |||||
} | |||||
// Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes. | |||||
mmap_limit = sizeof(void*) >= 8 ? 1000 : 0; | |||||
return mmap_limit; | |||||
} | |||||
// Return the maximum number of read-only files to keep open. | |||||
static intptr_t MaxOpenFiles() { | |||||
if (open_read_only_file_limit >= 0) { | |||||
return open_read_only_file_limit; | |||||
} | |||||
struct rlimit rlim; | |||||
if (getrlimit(RLIMIT_NOFILE, &rlim)) { | |||||
// getrlimit failed, fallback to hard-coded default. | |||||
open_read_only_file_limit = 50; | |||||
} else if (rlim.rlim_cur == RLIM_INFINITY) { | |||||
open_read_only_file_limit = std::numeric_limits<int>::max(); | |||||
} else { | |||||
// Allow use of 20% of available file descriptors for read-only files. | |||||
open_read_only_file_limit = rlim.rlim_cur / 5; | |||||
} | |||||
return open_read_only_file_limit; | |||||
} | |||||
PosixEnv::PosixEnv() | |||||
: started_bgthread_(false), | |||||
mmap_limit_(MaxMmaps()), | |||||
fd_limit_(MaxOpenFiles()) { | |||||
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); | PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); | ||||
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); | PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); | ||||
} | } | ||||
void PosixEnv::Schedule(void (*function)(void*), void* arg) { | void PosixEnv::Schedule(void (*function)(void*), void* arg) { | ||||
PthreadCall("lock", pthread_mutex_lock(&mu_)); | PthreadCall("lock", pthread_mutex_lock(&mu_)); | ||||
// Start background thread if necessary | // Start background thread if necessary | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | |||||
} | } | ||||
} // namespace | } // namespace | ||||
static pthread_once_t once = PTHREAD_ONCE_INIT; | static pthread_once_t once = PTHREAD_ONCE_INIT; | ||||
static Env* default_env; | static Env* default_env; | ||||
static void InitDefaultEnv() { default_env = new PosixEnv; } | static void InitDefaultEnv() { default_env = new PosixEnv; } | ||||
void EnvPosixTestHelper::SetReadOnlyFDLimit(int limit) { | |||||
assert(default_env == NULL); | |||||
open_read_only_file_limit = limit; | |||||
} | |||||
void EnvPosixTestHelper::SetReadOnlyMMapLimit(int limit) { | |||||
assert(default_env == NULL); | |||||
mmap_limit = limit; | |||||
} | |||||
Env* Env::Default() { | Env* Env::Default() { | ||||
pthread_once(&once, InitDefaultEnv); | pthread_once(&once, InitDefaultEnv); | ||||
return default_env; | return default_env; | ||||
} | } | ||||
} // namespace leveldb | } // namespace leveldb | ||||
#endif | #endif |