Bump version to 0.5.9
[novacoin.git] / src / leveldb / util / env_posix.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 #if !defined(LEVELDB_PLATFORM_WINDOWS)
5
6 #include <dirent.h>
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <sys/mman.h>
14 #include <sys/stat.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19 #include <deque>
20 #include <set>
21 #include "leveldb/env.h"
22 #include "leveldb/slice.h"
23 #include "port/port.h"
24 #include "util/logging.h"
25 #include "util/mutexlock.h"
26 #include "util/posix_logger.h"
27
28 namespace leveldb {
29
30 namespace {
31
32 static Status IOError(const std::string& context, int err_number) {
33   return Status::IOError(context, strerror(err_number));
34 }
35
36 class PosixSequentialFile: public SequentialFile {
37  private:
38   std::string filename_;
39   FILE* file_;
40
41  public:
42   PosixSequentialFile(const std::string& fname, FILE* f)
43       : filename_(fname), file_(f) { }
44   virtual ~PosixSequentialFile() { fclose(file_); }
45
46   virtual Status Read(size_t n, Slice* result, char* scratch) {
47     Status s;
48     size_t r = fread_unlocked(scratch, 1, n, file_);
49     *result = Slice(scratch, r);
50     if (r < n) {
51       if (feof(file_)) {
52         // We leave status as ok if we hit the end of the file
53       } else {
54         // A partial read with an error: return a non-ok status
55         s = IOError(filename_, errno);
56       }
57     }
58     return s;
59   }
60
61   virtual Status Skip(uint64_t n) {
62     if (fseek(file_, n, SEEK_CUR)) {
63       return IOError(filename_, errno);
64     }
65     return Status::OK();
66   }
67 };
68
69 // pread() based random-access
70 class PosixRandomAccessFile: public RandomAccessFile {
71  private:
72   std::string filename_;
73   int fd_;
74
75  public:
76   PosixRandomAccessFile(const std::string& fname, int fd)
77       : filename_(fname), fd_(fd) { }
78   virtual ~PosixRandomAccessFile() { close(fd_); }
79
80   virtual Status Read(uint64_t offset, size_t n, Slice* result,
81                       char* scratch) const {
82     Status s;
83     ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
84     *result = Slice(scratch, (r < 0) ? 0 : r);
85     if (r < 0) {
86       // An error: return a non-ok status
87       s = IOError(filename_, errno);
88     }
89     return s;
90   }
91 };
92
93 // Helper class to limit mmap file usage so that we do not end up
94 // running out virtual memory or running into kernel performance
95 // problems for very large databases.
96 class MmapLimiter {
97  public:
98   // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
99   MmapLimiter() {
100     SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
101   }
102
103   // If another mmap slot is available, acquire it and return true.
104   // Else return false.
105   bool Acquire() {
106     if (GetAllowed() <= 0) {
107       return false;
108     }
109     MutexLock l(&mu_);
110     intptr_t x = GetAllowed();
111     if (x <= 0) {
112       return false;
113     } else {
114       SetAllowed(x - 1);
115       return true;
116     }
117   }
118
119   // Release a slot acquired by a previous call to Acquire() that returned true.
120   void Release() {
121     MutexLock l(&mu_);
122     SetAllowed(GetAllowed() + 1);
123   }
124
125  private:
126   port::Mutex mu_;
127   port::AtomicPointer allowed_;
128
129   intptr_t GetAllowed() const {
130     return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
131   }
132
133   // REQUIRES: mu_ must be held
134   void SetAllowed(intptr_t v) {
135     allowed_.Release_Store(reinterpret_cast<void*>(v));
136   }
137
138   MmapLimiter(const MmapLimiter&);
139   void operator=(const MmapLimiter&);
140 };
141
142 // mmap() based random-access
143 class PosixMmapReadableFile: public RandomAccessFile {
144  private:
145   std::string filename_;
146   void* mmapped_region_;
147   size_t length_;
148   MmapLimiter* limiter_;
149
150  public:
151   // base[0,length-1] contains the mmapped contents of the file.
152   PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
153                         MmapLimiter* limiter)
154       : filename_(fname), mmapped_region_(base), length_(length),
155         limiter_(limiter) {
156   }
157
158   virtual ~PosixMmapReadableFile() {
159     munmap(mmapped_region_, length_);
160     limiter_->Release();
161   }
162
163   virtual Status Read(uint64_t offset, size_t n, Slice* result,
164                       char* scratch) const {
165     Status s;
166     if (offset + n > length_) {
167       *result = Slice();
168       s = IOError(filename_, EINVAL);
169     } else {
170       *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
171     }
172     return s;
173   }
174 };
175
176 class PosixWritableFile : public WritableFile {
177  private:
178   std::string filename_;
179   FILE* file_;
180
181  public:
182   PosixWritableFile(const std::string& fname, FILE* f)
183       : filename_(fname), file_(f) { }
184
185   ~PosixWritableFile() {
186     if (file_ != NULL) {
187       // Ignoring any potential errors
188       fclose(file_);
189     }
190   }
191
192   virtual Status Append(const Slice& data) {
193     size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
194     if (r != data.size()) {
195       return IOError(filename_, errno);
196     }
197     return Status::OK();
198   }
199
200   virtual Status Close() {
201     Status result;
202     if (fclose(file_) != 0) {
203       result = IOError(filename_, errno);
204     }
205     file_ = NULL;
206     return result;
207   }
208
209   virtual Status Flush() {
210     if (fflush_unlocked(file_) != 0) {
211       return IOError(filename_, errno);
212     }
213     return Status::OK();
214   }
215
216   Status SyncDirIfManifest() {
217     const char* f = filename_.c_str();
218     const char* sep = strrchr(f, '/');
219     Slice basename;
220     std::string dir;
221     if (sep == NULL) {
222       dir = ".";
223       basename = f;
224     } else {
225       dir = std::string(f, sep - f);
226       basename = sep + 1;
227     }
228     Status s;
229     if (basename.starts_with("MANIFEST")) {
230       int fd = open(dir.c_str(), O_RDONLY);
231       if (fd < 0) {
232         s = IOError(dir, errno);
233       } else {
234         if (fsync(fd) < 0) {
235           s = IOError(dir, errno);
236         }
237         close(fd);
238       }
239     }
240     return s;
241   }
242
243   virtual Status Sync() {
244     // Ensure new files referred to by the manifest are in the filesystem.
245     Status s = SyncDirIfManifest();
246     if (!s.ok()) {
247       return s;
248     }
249     if (fflush_unlocked(file_) != 0 ||
250         fdatasync(fileno(file_)) != 0) {
251       s = Status::IOError(filename_, strerror(errno));
252     }
253     return s;
254   }
255 };
256
257 static int LockOrUnlock(int fd, bool lock) {
258   errno = 0;
259   struct flock f;
260   memset(&f, 0, sizeof(f));
261   f.l_type = (lock ? F_WRLCK : F_UNLCK);
262   f.l_whence = SEEK_SET;
263   f.l_start = 0;
264   f.l_len = 0;        // Lock/unlock entire file
265   return fcntl(fd, F_SETLK, &f);
266 }
267
268 class PosixFileLock : public FileLock {
269  public:
270   int fd_;
271   std::string name_;
272 };
273
274 // Set of locked files.  We keep a separate set instead of just
275 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
276 // any protection against multiple uses from the same process.
277 class PosixLockTable {
278  private:
279   port::Mutex mu_;
280   std::set<std::string> locked_files_;
281  public:
282   bool Insert(const std::string& fname) {
283     MutexLock l(&mu_);
284     return locked_files_.insert(fname).second;
285   }
286   void Remove(const std::string& fname) {
287     MutexLock l(&mu_);
288     locked_files_.erase(fname);
289   }
290 };
291
292 class PosixEnv : public Env {
293  public:
294   PosixEnv();
295   virtual ~PosixEnv() {
296     char msg[] = "Destroying Env::Default()\n";
297     fwrite(msg, 1, sizeof(msg), stderr);
298     abort();
299   }
300
301   virtual Status NewSequentialFile(const std::string& fname,
302                                    SequentialFile** result) {
303     FILE* f = fopen(fname.c_str(), "r");
304     if (f == NULL) {
305       *result = NULL;
306       return IOError(fname, errno);
307     } else {
308       *result = new PosixSequentialFile(fname, f);
309       return Status::OK();
310     }
311   }
312
313   virtual Status NewRandomAccessFile(const std::string& fname,
314                                      RandomAccessFile** result) {
315     *result = NULL;
316     Status s;
317     int fd = open(fname.c_str(), O_RDONLY);
318     if (fd < 0) {
319       s = IOError(fname, errno);
320     } else if (mmap_limit_.Acquire()) {
321       uint64_t size;
322       s = GetFileSize(fname, &size);
323       if (s.ok()) {
324         void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
325         if (base != MAP_FAILED) {
326           *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
327         } else {
328           s = IOError(fname, errno);
329         }
330       }
331       close(fd);
332       if (!s.ok()) {
333         mmap_limit_.Release();
334       }
335     } else {
336       *result = new PosixRandomAccessFile(fname, fd);
337     }
338     return s;
339   }
340
341   virtual Status NewWritableFile(const std::string& fname,
342                                  WritableFile** result) {
343     Status s;
344     FILE* f = fopen(fname.c_str(), "w");
345     if (f == NULL) {
346       *result = NULL;
347       s = IOError(fname, errno);
348     } else {
349       *result = new PosixWritableFile(fname, f);
350     }
351     return s;
352   }
353
354   virtual bool FileExists(const std::string& fname) {
355     return access(fname.c_str(), F_OK) == 0;
356   }
357
358   virtual Status GetChildren(const std::string& dir,
359                              std::vector<std::string>* result) {
360     result->clear();
361     DIR* d = opendir(dir.c_str());
362     if (d == NULL) {
363       return IOError(dir, errno);
364     }
365     struct dirent* entry;
366     while ((entry = readdir(d)) != NULL) {
367       result->push_back(entry->d_name);
368     }
369     closedir(d);
370     return Status::OK();
371   }
372
373   virtual Status DeleteFile(const std::string& fname) {
374     Status result;
375     if (unlink(fname.c_str()) != 0) {
376       result = IOError(fname, errno);
377     }
378     return result;
379   }
380
381   virtual Status CreateDir(const std::string& name) {
382     Status result;
383     if (mkdir(name.c_str(), 0755) != 0) {
384       result = IOError(name, errno);
385     }
386     return result;
387   }
388
389   virtual Status DeleteDir(const std::string& name) {
390     Status result;
391     if (rmdir(name.c_str()) != 0) {
392       result = IOError(name, errno);
393     }
394     return result;
395   }
396
397   virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
398     Status s;
399     struct stat sbuf;
400     if (stat(fname.c_str(), &sbuf) != 0) {
401       *size = 0;
402       s = IOError(fname, errno);
403     } else {
404       *size = sbuf.st_size;
405     }
406     return s;
407   }
408
409   virtual Status RenameFile(const std::string& src, const std::string& target) {
410     Status result;
411     if (rename(src.c_str(), target.c_str()) != 0) {
412       result = IOError(src, errno);
413     }
414     return result;
415   }
416
417   virtual Status LockFile(const std::string& fname, FileLock** lock) {
418     *lock = NULL;
419     Status result;
420     int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
421     if (fd < 0) {
422       result = IOError(fname, errno);
423     } else if (!locks_.Insert(fname)) {
424       close(fd);
425       result = Status::IOError("lock " + fname, "already held by process");
426     } else if (LockOrUnlock(fd, true) == -1) {
427       result = IOError("lock " + fname, errno);
428       close(fd);
429       locks_.Remove(fname);
430     } else {
431       PosixFileLock* my_lock = new PosixFileLock;
432       my_lock->fd_ = fd;
433       my_lock->name_ = fname;
434       *lock = my_lock;
435     }
436     return result;
437   }
438
439   virtual Status UnlockFile(FileLock* lock) {
440     PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
441     Status result;
442     if (LockOrUnlock(my_lock->fd_, false) == -1) {
443       result = IOError("unlock", errno);
444     }
445     locks_.Remove(my_lock->name_);
446     close(my_lock->fd_);
447     delete my_lock;
448     return result;
449   }
450
451   virtual void Schedule(void (*function)(void*), void* arg);
452
453   virtual void StartThread(void (*function)(void* arg), void* arg);
454
455   virtual Status GetTestDirectory(std::string* result) {
456     const char* env = getenv("TEST_TMPDIR");
457     if (env && env[0] != '\0') {
458       *result = env;
459     } else {
460       char buf[100];
461       snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
462       *result = buf;
463     }
464     // Directory may already exist
465     CreateDir(*result);
466     return Status::OK();
467   }
468
469   static uint64_t gettid() {
470     pthread_t tid = pthread_self();
471     uint64_t thread_id = 0;
472     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
473     return thread_id;
474   }
475
476   virtual Status NewLogger(const std::string& fname, Logger** result) {
477     FILE* f = fopen(fname.c_str(), "w");
478     if (f == NULL) {
479       *result = NULL;
480       return IOError(fname, errno);
481     } else {
482       *result = new PosixLogger(f, &PosixEnv::gettid);
483       return Status::OK();
484     }
485   }
486
487   virtual uint64_t NowMicros() {
488     struct timeval tv;
489     gettimeofday(&tv, NULL);
490     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
491   }
492
493   virtual void SleepForMicroseconds(int micros) {
494     usleep(micros);
495   }
496
497  private:
498   void PthreadCall(const char* label, int result) {
499     if (result != 0) {
500       fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
501       abort();
502     }
503   }
504
505   // BGThread() is the body of the background thread
506   void BGThread();
507   static void* BGThreadWrapper(void* arg) {
508     reinterpret_cast<PosixEnv*>(arg)->BGThread();
509     return NULL;
510   }
511
512   pthread_mutex_t mu_;
513   pthread_cond_t bgsignal_;
514   pthread_t bgthread_;
515   bool started_bgthread_;
516
517   // Entry per Schedule() call
518   struct BGItem { void* arg; void (*function)(void*); };
519   typedef std::deque<BGItem> BGQueue;
520   BGQueue queue_;
521
522   PosixLockTable locks_;
523   MmapLimiter mmap_limit_;
524 };
525
526 PosixEnv::PosixEnv() : started_bgthread_(false) {
527   PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
528   PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
529 }
530
531 void PosixEnv::Schedule(void (*function)(void*), void* arg) {
532   PthreadCall("lock", pthread_mutex_lock(&mu_));
533
534   // Start background thread if necessary
535   if (!started_bgthread_) {
536     started_bgthread_ = true;
537     PthreadCall(
538         "create thread",
539         pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
540   }
541
542   // If the queue is currently empty, the background thread may currently be
543   // waiting.
544   if (queue_.empty()) {
545     PthreadCall("signal", pthread_cond_signal(&bgsignal_));
546   }
547
548   // Add to priority queue
549   queue_.push_back(BGItem());
550   queue_.back().function = function;
551   queue_.back().arg = arg;
552
553   PthreadCall("unlock", pthread_mutex_unlock(&mu_));
554 }
555
556 void PosixEnv::BGThread() {
557   while (true) {
558     // Wait until there is an item that is ready to run
559     PthreadCall("lock", pthread_mutex_lock(&mu_));
560     while (queue_.empty()) {
561       PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
562     }
563
564     void (*function)(void*) = queue_.front().function;
565     void* arg = queue_.front().arg;
566     queue_.pop_front();
567
568     PthreadCall("unlock", pthread_mutex_unlock(&mu_));
569     (*function)(arg);
570   }
571 }
572
573 namespace {
574 struct StartThreadState {
575   void (*user_function)(void*);
576   void* arg;
577 };
578 }
579 static void* StartThreadWrapper(void* arg) {
580   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
581   state->user_function(state->arg);
582   delete state;
583   return NULL;
584 }
585
586 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
587   pthread_t t;
588   StartThreadState* state = new StartThreadState;
589   state->user_function = function;
590   state->arg = arg;
591   PthreadCall("start thread",
592               pthread_create(&t, NULL,  &StartThreadWrapper, state));
593 }
594
595 }  // namespace
596
597 static pthread_once_t once = PTHREAD_ONCE_INIT;
598 static Env* default_env;
599 static void InitDefaultEnv() { default_env = new PosixEnv; }
600
601 Env* Env::Default() {
602   pthread_once(&once, InitDefaultEnv);
603   return default_env;
604 }
605
606 }  // namespace leveldb
607
608 #endif