Add Google's LevelDB support
[novacoin.git] / src / leveldb / db / db_test.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "leveldb/db.h"
6 #include "leveldb/filter_policy.h"
7 #include "db/db_impl.h"
8 #include "db/filename.h"
9 #include "db/version_set.h"
10 #include "db/write_batch_internal.h"
11 #include "leveldb/cache.h"
12 #include "leveldb/env.h"
13 #include "leveldb/table.h"
14 #include "util/hash.h"
15 #include "util/logging.h"
16 #include "util/mutexlock.h"
17 #include "util/testharness.h"
18 #include "util/testutil.h"
19
20 namespace leveldb {
21
22 static std::string RandomString(Random* rnd, int len) {
23   std::string r;
24   test::RandomString(rnd, len, &r);
25   return r;
26 }
27
28 namespace {
29 class AtomicCounter {
30  private:
31   port::Mutex mu_;
32   int count_;
33  public:
34   AtomicCounter() : count_(0) { }
35   void Increment() {
36     IncrementBy(1);
37   }
38   void IncrementBy(int count) {
39     MutexLock l(&mu_);
40     count_ += count;
41   }
42   int Read() {
43     MutexLock l(&mu_);
44     return count_;
45   }
46   void Reset() {
47     MutexLock l(&mu_);
48     count_ = 0;
49   }
50 };
51
52 void DelayMilliseconds(int millis) {
53   Env::Default()->SleepForMicroseconds(millis * 1000);
54 }
55 }
56
57 // Special Env used to delay background operations
58 class SpecialEnv : public EnvWrapper {
59  public:
60   // sstable Sync() calls are blocked while this pointer is non-NULL.
61   port::AtomicPointer delay_sstable_sync_;
62
63   // Simulate no-space errors while this pointer is non-NULL.
64   port::AtomicPointer no_space_;
65
66   // Simulate non-writable file system while this pointer is non-NULL
67   port::AtomicPointer non_writable_;
68
69   // Force sync of manifest files to fail while this pointer is non-NULL
70   port::AtomicPointer manifest_sync_error_;
71
72   // Force write to manifest files to fail while this pointer is non-NULL
73   port::AtomicPointer manifest_write_error_;
74
75   bool count_random_reads_;
76   AtomicCounter random_read_counter_;
77
78   AtomicCounter sleep_counter_;
79   AtomicCounter sleep_time_counter_;
80
81   explicit SpecialEnv(Env* base) : EnvWrapper(base) {
82     delay_sstable_sync_.Release_Store(NULL);
83     no_space_.Release_Store(NULL);
84     non_writable_.Release_Store(NULL);
85     count_random_reads_ = false;
86     manifest_sync_error_.Release_Store(NULL);
87     manifest_write_error_.Release_Store(NULL);
88   }
89
90   Status NewWritableFile(const std::string& f, WritableFile** r) {
91     class SSTableFile : public WritableFile {
92      private:
93       SpecialEnv* env_;
94       WritableFile* base_;
95
96      public:
97       SSTableFile(SpecialEnv* env, WritableFile* base)
98           : env_(env),
99             base_(base) {
100       }
101       ~SSTableFile() { delete base_; }
102       Status Append(const Slice& data) {
103         if (env_->no_space_.Acquire_Load() != NULL) {
104           // Drop writes on the floor
105           return Status::OK();
106         } else {
107           return base_->Append(data);
108         }
109       }
110       Status Close() { return base_->Close(); }
111       Status Flush() { return base_->Flush(); }
112       Status Sync() {
113         while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
114           DelayMilliseconds(100);
115         }
116         return base_->Sync();
117       }
118     };
119     class ManifestFile : public WritableFile {
120      private:
121       SpecialEnv* env_;
122       WritableFile* base_;
123      public:
124       ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
125       ~ManifestFile() { delete base_; }
126       Status Append(const Slice& data) {
127         if (env_->manifest_write_error_.Acquire_Load() != NULL) {
128           return Status::IOError("simulated writer error");
129         } else {
130           return base_->Append(data);
131         }
132       }
133       Status Close() { return base_->Close(); }
134       Status Flush() { return base_->Flush(); }
135       Status Sync() {
136         if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
137           return Status::IOError("simulated sync error");
138         } else {
139           return base_->Sync();
140         }
141       }
142     };
143
144     if (non_writable_.Acquire_Load() != NULL) {
145       return Status::IOError("simulated write error");
146     }
147
148     Status s = target()->NewWritableFile(f, r);
149     if (s.ok()) {
150       if (strstr(f.c_str(), ".sst") != NULL) {
151         *r = new SSTableFile(this, *r);
152       } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
153         *r = new ManifestFile(this, *r);
154       }
155     }
156     return s;
157   }
158
159   Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
160     class CountingFile : public RandomAccessFile {
161      private:
162       RandomAccessFile* target_;
163       AtomicCounter* counter_;
164      public:
165       CountingFile(RandomAccessFile* target, AtomicCounter* counter)
166           : target_(target), counter_(counter) {
167       }
168       virtual ~CountingFile() { delete target_; }
169       virtual Status Read(uint64_t offset, size_t n, Slice* result,
170                           char* scratch) const {
171         counter_->Increment();
172         return target_->Read(offset, n, result, scratch);
173       }
174     };
175
176     Status s = target()->NewRandomAccessFile(f, r);
177     if (s.ok() && count_random_reads_) {
178       *r = new CountingFile(*r, &random_read_counter_);
179     }
180     return s;
181   }
182
183   virtual void SleepForMicroseconds(int micros) {
184     sleep_counter_.Increment();
185     sleep_time_counter_.IncrementBy(micros);
186   }
187
188 };
189
190 class DBTest {
191  private:
192   const FilterPolicy* filter_policy_;
193
194   // Sequence of option configurations to try
195   enum OptionConfig {
196     kDefault,
197     kFilter,
198     kUncompressed,
199     kEnd
200   };
201   int option_config_;
202
203  public:
204   std::string dbname_;
205   SpecialEnv* env_;
206   DB* db_;
207
208   Options last_options_;
209
210   DBTest() : option_config_(kDefault),
211              env_(new SpecialEnv(Env::Default())) {
212     filter_policy_ = NewBloomFilterPolicy(10);
213     dbname_ = test::TmpDir() + "/db_test";
214     DestroyDB(dbname_, Options());
215     db_ = NULL;
216     Reopen();
217   }
218
219   ~DBTest() {
220     delete db_;
221     DestroyDB(dbname_, Options());
222     delete env_;
223     delete filter_policy_;
224   }
225
226   // Switch to a fresh database with the next option configuration to
227   // test.  Return false if there are no more configurations to test.
228   bool ChangeOptions() {
229     option_config_++;
230     if (option_config_ >= kEnd) {
231       return false;
232     } else {
233       DestroyAndReopen();
234       return true;
235     }
236   }
237
238   // Return the current option configuration.
239   Options CurrentOptions() {
240     Options options;
241     switch (option_config_) {
242       case kFilter:
243         options.filter_policy = filter_policy_;
244         break;
245       case kUncompressed:
246         options.compression = kNoCompression;
247         break;
248       default:
249         break;
250     }
251     return options;
252   }
253
254   DBImpl* dbfull() {
255     return reinterpret_cast<DBImpl*>(db_);
256   }
257
258   void Reopen(Options* options = NULL) {
259     ASSERT_OK(TryReopen(options));
260   }
261
262   void Close() {
263     delete db_;
264     db_ = NULL;
265   }
266
267   void DestroyAndReopen(Options* options = NULL) {
268     delete db_;
269     db_ = NULL;
270     DestroyDB(dbname_, Options());
271     ASSERT_OK(TryReopen(options));
272   }
273
274   Status TryReopen(Options* options) {
275     delete db_;
276     db_ = NULL;
277     Options opts;
278     if (options != NULL) {
279       opts = *options;
280     } else {
281       opts = CurrentOptions();
282       opts.create_if_missing = true;
283     }
284     last_options_ = opts;
285
286     return DB::Open(opts, dbname_, &db_);
287   }
288
289   Status Put(const std::string& k, const std::string& v) {
290     return db_->Put(WriteOptions(), k, v);
291   }
292
293   Status Delete(const std::string& k) {
294     return db_->Delete(WriteOptions(), k);
295   }
296
297   std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
298     ReadOptions options;
299     options.snapshot = snapshot;
300     std::string result;
301     Status s = db_->Get(options, k, &result);
302     if (s.IsNotFound()) {
303       result = "NOT_FOUND";
304     } else if (!s.ok()) {
305       result = s.ToString();
306     }
307     return result;
308   }
309
310   // Return a string that contains all key,value pairs in order,
311   // formatted like "(k1->v1)(k2->v2)".
312   std::string Contents() {
313     std::vector<std::string> forward;
314     std::string result;
315     Iterator* iter = db_->NewIterator(ReadOptions());
316     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
317       std::string s = IterStatus(iter);
318       result.push_back('(');
319       result.append(s);
320       result.push_back(')');
321       forward.push_back(s);
322     }
323
324     // Check reverse iteration results are the reverse of forward results
325     int matched = 0;
326     for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
327       ASSERT_LT(matched, forward.size());
328       ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
329       matched++;
330     }
331     ASSERT_EQ(matched, forward.size());
332
333     delete iter;
334     return result;
335   }
336
337   std::string AllEntriesFor(const Slice& user_key) {
338     Iterator* iter = dbfull()->TEST_NewInternalIterator();
339     InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
340     iter->Seek(target.Encode());
341     std::string result;
342     if (!iter->status().ok()) {
343       result = iter->status().ToString();
344     } else {
345       result = "[ ";
346       bool first = true;
347       while (iter->Valid()) {
348         ParsedInternalKey ikey;
349         if (!ParseInternalKey(iter->key(), &ikey)) {
350           result += "CORRUPTED";
351         } else {
352           if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
353             break;
354           }
355           if (!first) {
356             result += ", ";
357           }
358           first = false;
359           switch (ikey.type) {
360             case kTypeValue:
361               result += iter->value().ToString();
362               break;
363             case kTypeDeletion:
364               result += "DEL";
365               break;
366           }
367         }
368         iter->Next();
369       }
370       if (!first) {
371         result += " ";
372       }
373       result += "]";
374     }
375     delete iter;
376     return result;
377   }
378
379   int NumTableFilesAtLevel(int level) {
380     std::string property;
381     ASSERT_TRUE(
382         db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
383                          &property));
384     return atoi(property.c_str());
385   }
386
387   int TotalTableFiles() {
388     int result = 0;
389     for (int level = 0; level < config::kNumLevels; level++) {
390       result += NumTableFilesAtLevel(level);
391     }
392     return result;
393   }
394
395   // Return spread of files per level
396   std::string FilesPerLevel() {
397     std::string result;
398     int last_non_zero_offset = 0;
399     for (int level = 0; level < config::kNumLevels; level++) {
400       int f = NumTableFilesAtLevel(level);
401       char buf[100];
402       snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
403       result += buf;
404       if (f > 0) {
405         last_non_zero_offset = result.size();
406       }
407     }
408     result.resize(last_non_zero_offset);
409     return result;
410   }
411
412   int CountFiles() {
413     std::vector<std::string> files;
414     env_->GetChildren(dbname_, &files);
415     return static_cast<int>(files.size());
416   }
417
418   uint64_t Size(const Slice& start, const Slice& limit) {
419     Range r(start, limit);
420     uint64_t size;
421     db_->GetApproximateSizes(&r, 1, &size);
422     return size;
423   }
424
425   void Compact(const Slice& start, const Slice& limit) {
426     db_->CompactRange(&start, &limit);
427   }
428
429   // Do n memtable compactions, each of which produces an sstable
430   // covering the range [small,large].
431   void MakeTables(int n, const std::string& small, const std::string& large) {
432     for (int i = 0; i < n; i++) {
433       Put(small, "begin");
434       Put(large, "end");
435       dbfull()->TEST_CompactMemTable();
436     }
437   }
438
439   // Prevent pushing of new sstables into deeper levels by adding
440   // tables that cover a specified range to all levels.
441   void FillLevels(const std::string& smallest, const std::string& largest) {
442     MakeTables(config::kNumLevels, smallest, largest);
443   }
444
445   void DumpFileCounts(const char* label) {
446     fprintf(stderr, "---\n%s:\n", label);
447     fprintf(stderr, "maxoverlap: %lld\n",
448             static_cast<long long>(
449                 dbfull()->TEST_MaxNextLevelOverlappingBytes()));
450     for (int level = 0; level < config::kNumLevels; level++) {
451       int num = NumTableFilesAtLevel(level);
452       if (num > 0) {
453         fprintf(stderr, "  level %3d : %d files\n", level, num);
454       }
455     }
456   }
457
458   std::string DumpSSTableList() {
459     std::string property;
460     db_->GetProperty("leveldb.sstables", &property);
461     return property;
462   }
463
464   std::string IterStatus(Iterator* iter) {
465     std::string result;
466     if (iter->Valid()) {
467       result = iter->key().ToString() + "->" + iter->value().ToString();
468     } else {
469       result = "(invalid)";
470     }
471     return result;
472   }
473
474   bool DeleteAnSSTFile() {
475     std::vector<std::string> filenames;
476     ASSERT_OK(env_->GetChildren(dbname_, &filenames));
477     uint64_t number;
478     FileType type;
479     for (size_t i = 0; i < filenames.size(); i++) {
480       if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
481         ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
482         return true;
483       }
484     }
485     return false;
486   }
487 };
488
489 TEST(DBTest, Empty) {
490   do {
491     ASSERT_TRUE(db_ != NULL);
492     ASSERT_EQ("NOT_FOUND", Get("foo"));
493   } while (ChangeOptions());
494 }
495
496 TEST(DBTest, ReadWrite) {
497   do {
498     ASSERT_OK(Put("foo", "v1"));
499     ASSERT_EQ("v1", Get("foo"));
500     ASSERT_OK(Put("bar", "v2"));
501     ASSERT_OK(Put("foo", "v3"));
502     ASSERT_EQ("v3", Get("foo"));
503     ASSERT_EQ("v2", Get("bar"));
504   } while (ChangeOptions());
505 }
506
507 TEST(DBTest, PutDeleteGet) {
508   do {
509     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
510     ASSERT_EQ("v1", Get("foo"));
511     ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
512     ASSERT_EQ("v2", Get("foo"));
513     ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
514     ASSERT_EQ("NOT_FOUND", Get("foo"));
515   } while (ChangeOptions());
516 }
517
518 TEST(DBTest, GetFromImmutableLayer) {
519   do {
520     Options options = CurrentOptions();
521     options.env = env_;
522     options.write_buffer_size = 100000;  // Small write buffer
523     Reopen(&options);
524
525     ASSERT_OK(Put("foo", "v1"));
526     ASSERT_EQ("v1", Get("foo"));
527
528     env_->delay_sstable_sync_.Release_Store(env_);   // Block sync calls
529     Put("k1", std::string(100000, 'x'));             // Fill memtable
530     Put("k2", std::string(100000, 'y'));             // Trigger compaction
531     ASSERT_EQ("v1", Get("foo"));
532     env_->delay_sstable_sync_.Release_Store(NULL);   // Release sync calls
533   } while (ChangeOptions());
534 }
535
536 TEST(DBTest, GetFromVersions) {
537   do {
538     ASSERT_OK(Put("foo", "v1"));
539     dbfull()->TEST_CompactMemTable();
540     ASSERT_EQ("v1", Get("foo"));
541   } while (ChangeOptions());
542 }
543
544 TEST(DBTest, GetSnapshot) {
545   do {
546     // Try with both a short key and a long key
547     for (int i = 0; i < 2; i++) {
548       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
549       ASSERT_OK(Put(key, "v1"));
550       const Snapshot* s1 = db_->GetSnapshot();
551       ASSERT_OK(Put(key, "v2"));
552       ASSERT_EQ("v2", Get(key));
553       ASSERT_EQ("v1", Get(key, s1));
554       dbfull()->TEST_CompactMemTable();
555       ASSERT_EQ("v2", Get(key));
556       ASSERT_EQ("v1", Get(key, s1));
557       db_->ReleaseSnapshot(s1);
558     }
559   } while (ChangeOptions());
560 }
561
562 TEST(DBTest, GetLevel0Ordering) {
563   do {
564     // Check that we process level-0 files in correct order.  The code
565     // below generates two level-0 files where the earlier one comes
566     // before the later one in the level-0 file list since the earlier
567     // one has a smaller "smallest" key.
568     ASSERT_OK(Put("bar", "b"));
569     ASSERT_OK(Put("foo", "v1"));
570     dbfull()->TEST_CompactMemTable();
571     ASSERT_OK(Put("foo", "v2"));
572     dbfull()->TEST_CompactMemTable();
573     ASSERT_EQ("v2", Get("foo"));
574   } while (ChangeOptions());
575 }
576
577 TEST(DBTest, GetOrderedByLevels) {
578   do {
579     ASSERT_OK(Put("foo", "v1"));
580     Compact("a", "z");
581     ASSERT_EQ("v1", Get("foo"));
582     ASSERT_OK(Put("foo", "v2"));
583     ASSERT_EQ("v2", Get("foo"));
584     dbfull()->TEST_CompactMemTable();
585     ASSERT_EQ("v2", Get("foo"));
586   } while (ChangeOptions());
587 }
588
589 TEST(DBTest, GetPicksCorrectFile) {
590   do {
591     // Arrange to have multiple files in a non-level-0 level.
592     ASSERT_OK(Put("a", "va"));
593     Compact("a", "b");
594     ASSERT_OK(Put("x", "vx"));
595     Compact("x", "y");
596     ASSERT_OK(Put("f", "vf"));
597     Compact("f", "g");
598     ASSERT_EQ("va", Get("a"));
599     ASSERT_EQ("vf", Get("f"));
600     ASSERT_EQ("vx", Get("x"));
601   } while (ChangeOptions());
602 }
603
604 TEST(DBTest, GetEncountersEmptyLevel) {
605   do {
606     // Arrange for the following to happen:
607     //   * sstable A in level 0
608     //   * nothing in level 1
609     //   * sstable B in level 2
610     // Then do enough Get() calls to arrange for an automatic compaction
611     // of sstable A.  A bug would cause the compaction to be marked as
612     // occuring at level 1 (instead of the correct level 0).
613
614     // Step 1: First place sstables in levels 0 and 2
615     int compaction_count = 0;
616     while (NumTableFilesAtLevel(0) == 0 ||
617            NumTableFilesAtLevel(2) == 0) {
618       ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
619       compaction_count++;
620       Put("a", "begin");
621       Put("z", "end");
622       dbfull()->TEST_CompactMemTable();
623     }
624
625     // Step 2: clear level 1 if necessary.
626     dbfull()->TEST_CompactRange(1, NULL, NULL);
627     ASSERT_EQ(NumTableFilesAtLevel(0), 1);
628     ASSERT_EQ(NumTableFilesAtLevel(1), 0);
629     ASSERT_EQ(NumTableFilesAtLevel(2), 1);
630
631     // Step 3: read a bunch of times
632     for (int i = 0; i < 1000; i++) {
633       ASSERT_EQ("NOT_FOUND", Get("missing"));
634     }
635
636     // Step 4: Wait for compaction to finish
637     DelayMilliseconds(1000);
638
639     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
640   } while (ChangeOptions());
641 }
642
643 TEST(DBTest, IterEmpty) {
644   Iterator* iter = db_->NewIterator(ReadOptions());
645
646   iter->SeekToFirst();
647   ASSERT_EQ(IterStatus(iter), "(invalid)");
648
649   iter->SeekToLast();
650   ASSERT_EQ(IterStatus(iter), "(invalid)");
651
652   iter->Seek("foo");
653   ASSERT_EQ(IterStatus(iter), "(invalid)");
654
655   delete iter;
656 }
657
658 TEST(DBTest, IterSingle) {
659   ASSERT_OK(Put("a", "va"));
660   Iterator* iter = db_->NewIterator(ReadOptions());
661
662   iter->SeekToFirst();
663   ASSERT_EQ(IterStatus(iter), "a->va");
664   iter->Next();
665   ASSERT_EQ(IterStatus(iter), "(invalid)");
666   iter->SeekToFirst();
667   ASSERT_EQ(IterStatus(iter), "a->va");
668   iter->Prev();
669   ASSERT_EQ(IterStatus(iter), "(invalid)");
670
671   iter->SeekToLast();
672   ASSERT_EQ(IterStatus(iter), "a->va");
673   iter->Next();
674   ASSERT_EQ(IterStatus(iter), "(invalid)");
675   iter->SeekToLast();
676   ASSERT_EQ(IterStatus(iter), "a->va");
677   iter->Prev();
678   ASSERT_EQ(IterStatus(iter), "(invalid)");
679
680   iter->Seek("");
681   ASSERT_EQ(IterStatus(iter), "a->va");
682   iter->Next();
683   ASSERT_EQ(IterStatus(iter), "(invalid)");
684
685   iter->Seek("a");
686   ASSERT_EQ(IterStatus(iter), "a->va");
687   iter->Next();
688   ASSERT_EQ(IterStatus(iter), "(invalid)");
689
690   iter->Seek("b");
691   ASSERT_EQ(IterStatus(iter), "(invalid)");
692
693   delete iter;
694 }
695
696 TEST(DBTest, IterMulti) {
697   ASSERT_OK(Put("a", "va"));
698   ASSERT_OK(Put("b", "vb"));
699   ASSERT_OK(Put("c", "vc"));
700   Iterator* iter = db_->NewIterator(ReadOptions());
701
702   iter->SeekToFirst();
703   ASSERT_EQ(IterStatus(iter), "a->va");
704   iter->Next();
705   ASSERT_EQ(IterStatus(iter), "b->vb");
706   iter->Next();
707   ASSERT_EQ(IterStatus(iter), "c->vc");
708   iter->Next();
709   ASSERT_EQ(IterStatus(iter), "(invalid)");
710   iter->SeekToFirst();
711   ASSERT_EQ(IterStatus(iter), "a->va");
712   iter->Prev();
713   ASSERT_EQ(IterStatus(iter), "(invalid)");
714
715   iter->SeekToLast();
716   ASSERT_EQ(IterStatus(iter), "c->vc");
717   iter->Prev();
718   ASSERT_EQ(IterStatus(iter), "b->vb");
719   iter->Prev();
720   ASSERT_EQ(IterStatus(iter), "a->va");
721   iter->Prev();
722   ASSERT_EQ(IterStatus(iter), "(invalid)");
723   iter->SeekToLast();
724   ASSERT_EQ(IterStatus(iter), "c->vc");
725   iter->Next();
726   ASSERT_EQ(IterStatus(iter), "(invalid)");
727
728   iter->Seek("");
729   ASSERT_EQ(IterStatus(iter), "a->va");
730   iter->Seek("a");
731   ASSERT_EQ(IterStatus(iter), "a->va");
732   iter->Seek("ax");
733   ASSERT_EQ(IterStatus(iter), "b->vb");
734   iter->Seek("b");
735   ASSERT_EQ(IterStatus(iter), "b->vb");
736   iter->Seek("z");
737   ASSERT_EQ(IterStatus(iter), "(invalid)");
738
739   // Switch from reverse to forward
740   iter->SeekToLast();
741   iter->Prev();
742   iter->Prev();
743   iter->Next();
744   ASSERT_EQ(IterStatus(iter), "b->vb");
745
746   // Switch from forward to reverse
747   iter->SeekToFirst();
748   iter->Next();
749   iter->Next();
750   iter->Prev();
751   ASSERT_EQ(IterStatus(iter), "b->vb");
752
753   // Make sure iter stays at snapshot
754   ASSERT_OK(Put("a",  "va2"));
755   ASSERT_OK(Put("a2", "va3"));
756   ASSERT_OK(Put("b",  "vb2"));
757   ASSERT_OK(Put("c",  "vc2"));
758   ASSERT_OK(Delete("b"));
759   iter->SeekToFirst();
760   ASSERT_EQ(IterStatus(iter), "a->va");
761   iter->Next();
762   ASSERT_EQ(IterStatus(iter), "b->vb");
763   iter->Next();
764   ASSERT_EQ(IterStatus(iter), "c->vc");
765   iter->Next();
766   ASSERT_EQ(IterStatus(iter), "(invalid)");
767   iter->SeekToLast();
768   ASSERT_EQ(IterStatus(iter), "c->vc");
769   iter->Prev();
770   ASSERT_EQ(IterStatus(iter), "b->vb");
771   iter->Prev();
772   ASSERT_EQ(IterStatus(iter), "a->va");
773   iter->Prev();
774   ASSERT_EQ(IterStatus(iter), "(invalid)");
775
776   delete iter;
777 }
778
779 TEST(DBTest, IterSmallAndLargeMix) {
780   ASSERT_OK(Put("a", "va"));
781   ASSERT_OK(Put("b", std::string(100000, 'b')));
782   ASSERT_OK(Put("c", "vc"));
783   ASSERT_OK(Put("d", std::string(100000, 'd')));
784   ASSERT_OK(Put("e", std::string(100000, 'e')));
785
786   Iterator* iter = db_->NewIterator(ReadOptions());
787
788   iter->SeekToFirst();
789   ASSERT_EQ(IterStatus(iter), "a->va");
790   iter->Next();
791   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
792   iter->Next();
793   ASSERT_EQ(IterStatus(iter), "c->vc");
794   iter->Next();
795   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
796   iter->Next();
797   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
798   iter->Next();
799   ASSERT_EQ(IterStatus(iter), "(invalid)");
800
801   iter->SeekToLast();
802   ASSERT_EQ(IterStatus(iter), "e->" + std::string(100000, 'e'));
803   iter->Prev();
804   ASSERT_EQ(IterStatus(iter), "d->" + std::string(100000, 'd'));
805   iter->Prev();
806   ASSERT_EQ(IterStatus(iter), "c->vc");
807   iter->Prev();
808   ASSERT_EQ(IterStatus(iter), "b->" + std::string(100000, 'b'));
809   iter->Prev();
810   ASSERT_EQ(IterStatus(iter), "a->va");
811   iter->Prev();
812   ASSERT_EQ(IterStatus(iter), "(invalid)");
813
814   delete iter;
815 }
816
817 TEST(DBTest, IterMultiWithDelete) {
818   do {
819     ASSERT_OK(Put("a", "va"));
820     ASSERT_OK(Put("b", "vb"));
821     ASSERT_OK(Put("c", "vc"));
822     ASSERT_OK(Delete("b"));
823     ASSERT_EQ("NOT_FOUND", Get("b"));
824
825     Iterator* iter = db_->NewIterator(ReadOptions());
826     iter->Seek("c");
827     ASSERT_EQ(IterStatus(iter), "c->vc");
828     iter->Prev();
829     ASSERT_EQ(IterStatus(iter), "a->va");
830     delete iter;
831   } while (ChangeOptions());
832 }
833
834 TEST(DBTest, Recover) {
835   do {
836     ASSERT_OK(Put("foo", "v1"));
837     ASSERT_OK(Put("baz", "v5"));
838
839     Reopen();
840     ASSERT_EQ("v1", Get("foo"));
841
842     ASSERT_EQ("v1", Get("foo"));
843     ASSERT_EQ("v5", Get("baz"));
844     ASSERT_OK(Put("bar", "v2"));
845     ASSERT_OK(Put("foo", "v3"));
846
847     Reopen();
848     ASSERT_EQ("v3", Get("foo"));
849     ASSERT_OK(Put("foo", "v4"));
850     ASSERT_EQ("v4", Get("foo"));
851     ASSERT_EQ("v2", Get("bar"));
852     ASSERT_EQ("v5", Get("baz"));
853   } while (ChangeOptions());
854 }
855
856 TEST(DBTest, RecoveryWithEmptyLog) {
857   do {
858     ASSERT_OK(Put("foo", "v1"));
859     ASSERT_OK(Put("foo", "v2"));
860     Reopen();
861     Reopen();
862     ASSERT_OK(Put("foo", "v3"));
863     Reopen();
864     ASSERT_EQ("v3", Get("foo"));
865   } while (ChangeOptions());
866 }
867
868 // Check that writes done during a memtable compaction are recovered
869 // if the database is shutdown during the memtable compaction.
870 TEST(DBTest, RecoverDuringMemtableCompaction) {
871   do {
872     Options options = CurrentOptions();
873     options.env = env_;
874     options.write_buffer_size = 1000000;
875     Reopen(&options);
876
877     // Trigger a long memtable compaction and reopen the database during it
878     ASSERT_OK(Put("foo", "v1"));                         // Goes to 1st log file
879     ASSERT_OK(Put("big1", std::string(10000000, 'x')));  // Fills memtable
880     ASSERT_OK(Put("big2", std::string(1000, 'y')));      // Triggers compaction
881     ASSERT_OK(Put("bar", "v2"));                         // Goes to new log file
882
883     Reopen(&options);
884     ASSERT_EQ("v1", Get("foo"));
885     ASSERT_EQ("v2", Get("bar"));
886     ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
887     ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
888   } while (ChangeOptions());
889 }
890
891 static std::string Key(int i) {
892   char buf[100];
893   snprintf(buf, sizeof(buf), "key%06d", i);
894   return std::string(buf);
895 }
896
897 TEST(DBTest, MinorCompactionsHappen) {
898   Options options = CurrentOptions();
899   options.write_buffer_size = 10000;
900   Reopen(&options);
901
902   const int N = 500;
903
904   int starting_num_tables = TotalTableFiles();
905   for (int i = 0; i < N; i++) {
906     ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
907   }
908   int ending_num_tables = TotalTableFiles();
909   ASSERT_GT(ending_num_tables, starting_num_tables);
910
911   for (int i = 0; i < N; i++) {
912     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
913   }
914
915   Reopen();
916
917   for (int i = 0; i < N; i++) {
918     ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(Key(i)));
919   }
920 }
921
922 TEST(DBTest, RecoverWithLargeLog) {
923   {
924     Options options = CurrentOptions();
925     Reopen(&options);
926     ASSERT_OK(Put("big1", std::string(200000, '1')));
927     ASSERT_OK(Put("big2", std::string(200000, '2')));
928     ASSERT_OK(Put("small3", std::string(10, '3')));
929     ASSERT_OK(Put("small4", std::string(10, '4')));
930     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
931   }
932
933   // Make sure that if we re-open with a small write buffer size that
934   // we flush table files in the middle of a large log file.
935   Options options = CurrentOptions();
936   options.write_buffer_size = 100000;
937   Reopen(&options);
938   ASSERT_EQ(NumTableFilesAtLevel(0), 3);
939   ASSERT_EQ(std::string(200000, '1'), Get("big1"));
940   ASSERT_EQ(std::string(200000, '2'), Get("big2"));
941   ASSERT_EQ(std::string(10, '3'), Get("small3"));
942   ASSERT_EQ(std::string(10, '4'), Get("small4"));
943   ASSERT_GT(NumTableFilesAtLevel(0), 1);
944 }
945
946 TEST(DBTest, CompactionsGenerateMultipleFiles) {
947   Options options = CurrentOptions();
948   options.write_buffer_size = 100000000;        // Large write buffer
949   Reopen(&options);
950
951   Random rnd(301);
952
953   // Write 8MB (80 values, each 100K)
954   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
955   std::vector<std::string> values;
956   for (int i = 0; i < 80; i++) {
957     values.push_back(RandomString(&rnd, 100000));
958     ASSERT_OK(Put(Key(i), values[i]));
959   }
960
961   // Reopening moves updates to level-0
962   Reopen(&options);
963   dbfull()->TEST_CompactRange(0, NULL, NULL);
964
965   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
966   ASSERT_GT(NumTableFilesAtLevel(1), 1);
967   for (int i = 0; i < 80; i++) {
968     ASSERT_EQ(Get(Key(i)), values[i]);
969   }
970 }
971
972 TEST(DBTest, RepeatedWritesToSameKey) {
973   Options options = CurrentOptions();
974   options.env = env_;
975   options.write_buffer_size = 100000;  // Small write buffer
976   Reopen(&options);
977
978   // We must have at most one file per level except for level-0,
979   // which may have up to kL0_StopWritesTrigger files.
980   const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
981
982   Random rnd(301);
983   std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
984   for (int i = 0; i < 5 * kMaxFiles; i++) {
985     Put("key", value);
986     ASSERT_LE(TotalTableFiles(), kMaxFiles);
987     fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
988   }
989 }
990
991 TEST(DBTest, SparseMerge) {
992   Options options = CurrentOptions();
993   options.compression = kNoCompression;
994   Reopen(&options);
995
996   FillLevels("A", "Z");
997
998   // Suppose there is:
999   //    small amount of data with prefix A
1000   //    large amount of data with prefix B
1001   //    small amount of data with prefix C
1002   // and that recent updates have made small changes to all three prefixes.
1003   // Check that we do not do a compaction that merges all of B in one shot.
1004   const std::string value(1000, 'x');
1005   Put("A", "va");
1006   // Write approximately 100MB of "B" values
1007   for (int i = 0; i < 100000; i++) {
1008     char key[100];
1009     snprintf(key, sizeof(key), "B%010d", i);
1010     Put(key, value);
1011   }
1012   Put("C", "vc");
1013   dbfull()->TEST_CompactMemTable();
1014   dbfull()->TEST_CompactRange(0, NULL, NULL);
1015
1016   // Make sparse update
1017   Put("A",    "va2");
1018   Put("B100", "bvalue2");
1019   Put("C",    "vc2");
1020   dbfull()->TEST_CompactMemTable();
1021
1022   // Compactions should not cause us to create a situation where
1023   // a file overlaps too much data at the next level.
1024   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1025   dbfull()->TEST_CompactRange(0, NULL, NULL);
1026   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1027   dbfull()->TEST_CompactRange(1, NULL, NULL);
1028   ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
1029 }
1030
1031 static bool Between(uint64_t val, uint64_t low, uint64_t high) {
1032   bool result = (val >= low) && (val <= high);
1033   if (!result) {
1034     fprintf(stderr, "Value %llu is not in range [%llu, %llu]\n",
1035             (unsigned long long)(val),
1036             (unsigned long long)(low),
1037             (unsigned long long)(high));
1038   }
1039   return result;
1040 }
1041
1042 TEST(DBTest, ApproximateSizes) {
1043   do {
1044     Options options = CurrentOptions();
1045     options.write_buffer_size = 100000000;        // Large write buffer
1046     options.compression = kNoCompression;
1047     DestroyAndReopen();
1048
1049     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
1050     Reopen(&options);
1051     ASSERT_TRUE(Between(Size("", "xyz"), 0, 0));
1052
1053     // Write 8MB (80 values, each 100K)
1054     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1055     const int N = 80;
1056     static const int S1 = 100000;
1057     static const int S2 = 105000;  // Allow some expansion from metadata
1058     Random rnd(301);
1059     for (int i = 0; i < N; i++) {
1060       ASSERT_OK(Put(Key(i), RandomString(&rnd, S1)));
1061     }
1062
1063     // 0 because GetApproximateSizes() does not account for memtable space
1064     ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
1065
1066     // Check sizes across recovery by reopening a few times
1067     for (int run = 0; run < 3; run++) {
1068       Reopen(&options);
1069
1070       for (int compact_start = 0; compact_start < N; compact_start += 10) {
1071         for (int i = 0; i < N; i += 10) {
1072           ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i));
1073           ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1)));
1074           ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10));
1075         }
1076         ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50));
1077         ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50));
1078
1079         std::string cstart_str = Key(compact_start);
1080         std::string cend_str = Key(compact_start + 9);
1081         Slice cstart = cstart_str;
1082         Slice cend = cend_str;
1083         dbfull()->TEST_CompactRange(0, &cstart, &cend);
1084       }
1085
1086       ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1087       ASSERT_GT(NumTableFilesAtLevel(1), 0);
1088     }
1089   } while (ChangeOptions());
1090 }
1091
1092 TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
1093   do {
1094     Options options = CurrentOptions();
1095     options.compression = kNoCompression;
1096     Reopen();
1097
1098     Random rnd(301);
1099     std::string big1 = RandomString(&rnd, 100000);
1100     ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000)));
1101     ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000)));
1102     ASSERT_OK(Put(Key(2), big1));
1103     ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000)));
1104     ASSERT_OK(Put(Key(4), big1));
1105     ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000)));
1106     ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
1107     ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
1108
1109     // Check sizes across recovery by reopening a few times
1110     for (int run = 0; run < 3; run++) {
1111       Reopen(&options);
1112
1113       ASSERT_TRUE(Between(Size("", Key(0)), 0, 0));
1114       ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000));
1115       ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000));
1116       ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000));
1117       ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000));
1118       ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000));
1119       ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000));
1120       ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000));
1121       ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000));
1122
1123       ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
1124
1125       dbfull()->TEST_CompactRange(0, NULL, NULL);
1126     }
1127   } while (ChangeOptions());
1128 }
1129
1130 TEST(DBTest, IteratorPinsRef) {
1131   Put("foo", "hello");
1132
1133   // Get iterator that will yield the current contents of the DB.
1134   Iterator* iter = db_->NewIterator(ReadOptions());
1135
1136   // Write to force compactions
1137   Put("foo", "newvalue1");
1138   for (int i = 0; i < 100; i++) {
1139     ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
1140   }
1141   Put("foo", "newvalue2");
1142
1143   iter->SeekToFirst();
1144   ASSERT_TRUE(iter->Valid());
1145   ASSERT_EQ("foo", iter->key().ToString());
1146   ASSERT_EQ("hello", iter->value().ToString());
1147   iter->Next();
1148   ASSERT_TRUE(!iter->Valid());
1149   delete iter;
1150 }
1151
1152 TEST(DBTest, Snapshot) {
1153   do {
1154     Put("foo", "v1");
1155     const Snapshot* s1 = db_->GetSnapshot();
1156     Put("foo", "v2");
1157     const Snapshot* s2 = db_->GetSnapshot();
1158     Put("foo", "v3");
1159     const Snapshot* s3 = db_->GetSnapshot();
1160
1161     Put("foo", "v4");
1162     ASSERT_EQ("v1", Get("foo", s1));
1163     ASSERT_EQ("v2", Get("foo", s2));
1164     ASSERT_EQ("v3", Get("foo", s3));
1165     ASSERT_EQ("v4", Get("foo"));
1166
1167     db_->ReleaseSnapshot(s3);
1168     ASSERT_EQ("v1", Get("foo", s1));
1169     ASSERT_EQ("v2", Get("foo", s2));
1170     ASSERT_EQ("v4", Get("foo"));
1171
1172     db_->ReleaseSnapshot(s1);
1173     ASSERT_EQ("v2", Get("foo", s2));
1174     ASSERT_EQ("v4", Get("foo"));
1175
1176     db_->ReleaseSnapshot(s2);
1177     ASSERT_EQ("v4", Get("foo"));
1178   } while (ChangeOptions());
1179 }
1180
1181 TEST(DBTest, HiddenValuesAreRemoved) {
1182   do {
1183     Random rnd(301);
1184     FillLevels("a", "z");
1185
1186     std::string big = RandomString(&rnd, 50000);
1187     Put("foo", big);
1188     Put("pastfoo", "v");
1189     const Snapshot* snapshot = db_->GetSnapshot();
1190     Put("foo", "tiny");
1191     Put("pastfoo2", "v2");        // Advance sequence number one more
1192
1193     ASSERT_OK(dbfull()->TEST_CompactMemTable());
1194     ASSERT_GT(NumTableFilesAtLevel(0), 0);
1195
1196     ASSERT_EQ(big, Get("foo", snapshot));
1197     ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000));
1198     db_->ReleaseSnapshot(snapshot);
1199     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
1200     Slice x("x");
1201     dbfull()->TEST_CompactRange(0, NULL, &x);
1202     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
1203     ASSERT_EQ(NumTableFilesAtLevel(0), 0);
1204     ASSERT_GE(NumTableFilesAtLevel(1), 1);
1205     dbfull()->TEST_CompactRange(1, NULL, &x);
1206     ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
1207
1208     ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
1209   } while (ChangeOptions());
1210 }
1211
1212 TEST(DBTest, DeletionMarkers1) {
1213   Put("foo", "v1");
1214   ASSERT_OK(dbfull()->TEST_CompactMemTable());
1215   const int last = config::kMaxMemCompactLevel;
1216   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
1217
1218   // Place a table at level last-1 to prevent merging with preceding mutation
1219   Put("a", "begin");
1220   Put("z", "end");
1221   dbfull()->TEST_CompactMemTable();
1222   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
1223   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
1224
1225   Delete("foo");
1226   Put("foo", "v2");
1227   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
1228   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
1229   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
1230   Slice z("z");
1231   dbfull()->TEST_CompactRange(last-2, NULL, &z);
1232   // DEL eliminated, but v1 remains because we aren't compacting that level
1233   // (DEL can be eliminated because v2 hides v1).
1234   ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
1235   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
1236   // Merging last-1 w/ last, so we are the base level for "foo", so
1237   // DEL is removed.  (as is v1).
1238   ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
1239 }
1240
1241 TEST(DBTest, DeletionMarkers2) {
1242   Put("foo", "v1");
1243   ASSERT_OK(dbfull()->TEST_CompactMemTable());
1244   const int last = config::kMaxMemCompactLevel;
1245   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
1246
1247   // Place a table at level last-1 to prevent merging with preceding mutation
1248   Put("a", "begin");
1249   Put("z", "end");
1250   dbfull()->TEST_CompactMemTable();
1251   ASSERT_EQ(NumTableFilesAtLevel(last), 1);
1252   ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
1253
1254   Delete("foo");
1255   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1256   ASSERT_OK(dbfull()->TEST_CompactMemTable());  // Moves to level last-2
1257   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1258   dbfull()->TEST_CompactRange(last-2, NULL, NULL);
1259   // DEL kept: "last" file overlaps
1260   ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
1261   dbfull()->TEST_CompactRange(last-1, NULL, NULL);
1262   // Merging last-1 w/ last, so we are the base level for "foo", so
1263   // DEL is removed.  (as is v1).
1264   ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
1265 }
1266
1267 TEST(DBTest, OverlapInLevel0) {
1268   do {
1269     ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
1270
1271     // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
1272     ASSERT_OK(Put("100", "v100"));
1273     ASSERT_OK(Put("999", "v999"));
1274     dbfull()->TEST_CompactMemTable();
1275     ASSERT_OK(Delete("100"));
1276     ASSERT_OK(Delete("999"));
1277     dbfull()->TEST_CompactMemTable();
1278     ASSERT_EQ("0,1,1", FilesPerLevel());
1279
1280     // Make files spanning the following ranges in level-0:
1281     //  files[0]  200 .. 900
1282     //  files[1]  300 .. 500
1283     // Note that files are sorted by smallest key.
1284     ASSERT_OK(Put("300", "v300"));
1285     ASSERT_OK(Put("500", "v500"));
1286     dbfull()->TEST_CompactMemTable();
1287     ASSERT_OK(Put("200", "v200"));
1288     ASSERT_OK(Put("600", "v600"));
1289     ASSERT_OK(Put("900", "v900"));
1290     dbfull()->TEST_CompactMemTable();
1291     ASSERT_EQ("2,1,1", FilesPerLevel());
1292
1293     // Compact away the placeholder files we created initially
1294     dbfull()->TEST_CompactRange(1, NULL, NULL);
1295     dbfull()->TEST_CompactRange(2, NULL, NULL);
1296     ASSERT_EQ("2", FilesPerLevel());
1297
1298     // Do a memtable compaction.  Before bug-fix, the compaction would
1299     // not detect the overlap with level-0 files and would incorrectly place
1300     // the deletion in a deeper level.
1301     ASSERT_OK(Delete("600"));
1302     dbfull()->TEST_CompactMemTable();
1303     ASSERT_EQ("3", FilesPerLevel());
1304     ASSERT_EQ("NOT_FOUND", Get("600"));
1305   } while (ChangeOptions());
1306 }
1307
1308 TEST(DBTest, L0_CompactionBug_Issue44_a) {
1309   Reopen();
1310   ASSERT_OK(Put("b", "v"));
1311   Reopen();
1312   ASSERT_OK(Delete("b"));
1313   ASSERT_OK(Delete("a"));
1314   Reopen();
1315   ASSERT_OK(Delete("a"));
1316   Reopen();
1317   ASSERT_OK(Put("a", "v"));
1318   Reopen();
1319   Reopen();
1320   ASSERT_EQ("(a->v)", Contents());
1321   DelayMilliseconds(1000);  // Wait for compaction to finish
1322   ASSERT_EQ("(a->v)", Contents());
1323 }
1324
1325 TEST(DBTest, L0_CompactionBug_Issue44_b) {
1326   Reopen();
1327   Put("","");
1328   Reopen();
1329   Delete("e");
1330   Put("","");
1331   Reopen();
1332   Put("c", "cv");
1333   Reopen();
1334   Put("","");
1335   Reopen();
1336   Put("","");
1337   DelayMilliseconds(1000);  // Wait for compaction to finish
1338   Reopen();
1339   Put("d","dv");
1340   Reopen();
1341   Put("","");
1342   Reopen();
1343   Delete("d");
1344   Delete("b");
1345   Reopen();
1346   ASSERT_EQ("(->)(c->cv)", Contents());
1347   DelayMilliseconds(1000);  // Wait for compaction to finish
1348   ASSERT_EQ("(->)(c->cv)", Contents());
1349 }
1350
1351 TEST(DBTest, ComparatorCheck) {
1352   class NewComparator : public Comparator {
1353    public:
1354     virtual const char* Name() const { return "leveldb.NewComparator"; }
1355     virtual int Compare(const Slice& a, const Slice& b) const {
1356       return BytewiseComparator()->Compare(a, b);
1357     }
1358     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
1359       BytewiseComparator()->FindShortestSeparator(s, l);
1360     }
1361     virtual void FindShortSuccessor(std::string* key) const {
1362       BytewiseComparator()->FindShortSuccessor(key);
1363     }
1364   };
1365   NewComparator cmp;
1366   Options new_options = CurrentOptions();
1367   new_options.comparator = &cmp;
1368   Status s = TryReopen(&new_options);
1369   ASSERT_TRUE(!s.ok());
1370   ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
1371       << s.ToString();
1372 }
1373
1374 TEST(DBTest, CustomComparator) {
1375   class NumberComparator : public Comparator {
1376    public:
1377     virtual const char* Name() const { return "test.NumberComparator"; }
1378     virtual int Compare(const Slice& a, const Slice& b) const {
1379       return ToNumber(a) - ToNumber(b);
1380     }
1381     virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
1382       ToNumber(*s);     // Check format
1383       ToNumber(l);      // Check format
1384     }
1385     virtual void FindShortSuccessor(std::string* key) const {
1386       ToNumber(*key);   // Check format
1387     }
1388    private:
1389     static int ToNumber(const Slice& x) {
1390       // Check that there are no extra characters.
1391       ASSERT_TRUE(x.size() >= 2 && x[0] == '[' && x[x.size()-1] == ']')
1392           << EscapeString(x);
1393       int val;
1394       char ignored;
1395       ASSERT_TRUE(sscanf(x.ToString().c_str(), "[%i]%c", &val, &ignored) == 1)
1396           << EscapeString(x);
1397       return val;
1398     }
1399   };
1400   NumberComparator cmp;
1401   Options new_options = CurrentOptions();
1402   new_options.create_if_missing = true;
1403   new_options.comparator = &cmp;
1404   new_options.filter_policy = NULL;     // Cannot use bloom filters
1405   new_options.write_buffer_size = 1000;  // Compact more often
1406   DestroyAndReopen(&new_options);
1407   ASSERT_OK(Put("[10]", "ten"));
1408   ASSERT_OK(Put("[0x14]", "twenty"));
1409   for (int i = 0; i < 2; i++) {
1410     ASSERT_EQ("ten", Get("[10]"));
1411     ASSERT_EQ("ten", Get("[0xa]"));
1412     ASSERT_EQ("twenty", Get("[20]"));
1413     ASSERT_EQ("twenty", Get("[0x14]"));
1414     ASSERT_EQ("NOT_FOUND", Get("[15]"));
1415     ASSERT_EQ("NOT_FOUND", Get("[0xf]"));
1416     Compact("[0]", "[9999]");
1417   }
1418
1419   for (int run = 0; run < 2; run++) {
1420     for (int i = 0; i < 1000; i++) {
1421       char buf[100];
1422       snprintf(buf, sizeof(buf), "[%d]", i*10);
1423       ASSERT_OK(Put(buf, buf));
1424     }
1425     Compact("[0]", "[1000000]");
1426   }
1427 }
1428
1429 TEST(DBTest, ManualCompaction) {
1430   ASSERT_EQ(config::kMaxMemCompactLevel, 2)
1431       << "Need to update this test to match kMaxMemCompactLevel";
1432
1433   MakeTables(3, "p", "q");
1434   ASSERT_EQ("1,1,1", FilesPerLevel());
1435
1436   // Compaction range falls before files
1437   Compact("", "c");
1438   ASSERT_EQ("1,1,1", FilesPerLevel());
1439
1440   // Compaction range falls after files
1441   Compact("r", "z");
1442   ASSERT_EQ("1,1,1", FilesPerLevel());
1443
1444   // Compaction range overlaps files
1445   Compact("p1", "p9");
1446   ASSERT_EQ("0,0,1", FilesPerLevel());
1447
1448   // Populate a different range
1449   MakeTables(3, "c", "e");
1450   ASSERT_EQ("1,1,2", FilesPerLevel());
1451
1452   // Compact just the new range
1453   Compact("b", "f");
1454   ASSERT_EQ("0,0,2", FilesPerLevel());
1455
1456   // Compact all
1457   MakeTables(1, "a", "z");
1458   ASSERT_EQ("0,1,2", FilesPerLevel());
1459   db_->CompactRange(NULL, NULL);
1460   ASSERT_EQ("0,0,1", FilesPerLevel());
1461 }
1462
1463 TEST(DBTest, DBOpen_Options) {
1464   std::string dbname = test::TmpDir() + "/db_options_test";
1465   DestroyDB(dbname, Options());
1466
1467   // Does not exist, and create_if_missing == false: error
1468   DB* db = NULL;
1469   Options opts;
1470   opts.create_if_missing = false;
1471   Status s = DB::Open(opts, dbname, &db);
1472   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != NULL);
1473   ASSERT_TRUE(db == NULL);
1474
1475   // Does not exist, and create_if_missing == true: OK
1476   opts.create_if_missing = true;
1477   s = DB::Open(opts, dbname, &db);
1478   ASSERT_OK(s);
1479   ASSERT_TRUE(db != NULL);
1480
1481   delete db;
1482   db = NULL;
1483
1484   // Does exist, and error_if_exists == true: error
1485   opts.create_if_missing = false;
1486   opts.error_if_exists = true;
1487   s = DB::Open(opts, dbname, &db);
1488   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != NULL);
1489   ASSERT_TRUE(db == NULL);
1490
1491   // Does exist, and error_if_exists == false: OK
1492   opts.create_if_missing = true;
1493   opts.error_if_exists = false;
1494   s = DB::Open(opts, dbname, &db);
1495   ASSERT_OK(s);
1496   ASSERT_TRUE(db != NULL);
1497
1498   delete db;
1499   db = NULL;
1500 }
1501
1502 TEST(DBTest, Locking) {
1503   DB* db2 = NULL;
1504   Status s = DB::Open(CurrentOptions(), dbname_, &db2);
1505   ASSERT_TRUE(!s.ok()) << "Locking did not prevent re-opening db";
1506 }
1507
1508 // Check that number of files does not grow when we are out of space
1509 TEST(DBTest, NoSpace) {
1510   Options options = CurrentOptions();
1511   options.env = env_;
1512   Reopen(&options);
1513
1514   ASSERT_OK(Put("foo", "v1"));
1515   ASSERT_EQ("v1", Get("foo"));
1516   Compact("a", "z");
1517   const int num_files = CountFiles();
1518   env_->no_space_.Release_Store(env_);   // Force out-of-space errors
1519   env_->sleep_counter_.Reset();
1520   for (int i = 0; i < 5; i++) {
1521     for (int level = 0; level < config::kNumLevels-1; level++) {
1522       dbfull()->TEST_CompactRange(level, NULL, NULL);
1523     }
1524   }
1525   env_->no_space_.Release_Store(NULL);
1526   ASSERT_LT(CountFiles(), num_files + 3);
1527
1528   // Check that compaction attempts slept after errors
1529   ASSERT_GE(env_->sleep_counter_.Read(), 5);
1530 }
1531
1532 TEST(DBTest, ExponentialBackoff) {
1533   Options options = CurrentOptions();
1534   options.env = env_;
1535   Reopen(&options);
1536
1537   ASSERT_OK(Put("foo", "v1"));
1538   ASSERT_EQ("v1", Get("foo"));
1539   Compact("a", "z");
1540   env_->non_writable_.Release_Store(env_);  // Force errors for new files
1541   env_->sleep_counter_.Reset();
1542   env_->sleep_time_counter_.Reset();
1543   for (int i = 0; i < 5; i++) {
1544     dbfull()->TEST_CompactRange(2, NULL, NULL);
1545   }
1546   env_->non_writable_.Release_Store(NULL);
1547
1548   // Wait for compaction to finish
1549   DelayMilliseconds(1000);
1550
1551   ASSERT_GE(env_->sleep_counter_.Read(), 5);
1552   ASSERT_LT(env_->sleep_counter_.Read(), 10);
1553   ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
1554 }
1555
1556 TEST(DBTest, NonWritableFileSystem) {
1557   Options options = CurrentOptions();
1558   options.write_buffer_size = 1000;
1559   options.env = env_;
1560   Reopen(&options);
1561   ASSERT_OK(Put("foo", "v1"));
1562   env_->non_writable_.Release_Store(env_);  // Force errors for new files
1563   std::string big(100000, 'x');
1564   int errors = 0;
1565   for (int i = 0; i < 20; i++) {
1566     fprintf(stderr, "iter %d; errors %d\n", i, errors);
1567     if (!Put("foo", big).ok()) {
1568       errors++;
1569       DelayMilliseconds(100);
1570     }
1571   }
1572   ASSERT_GT(errors, 0);
1573   env_->non_writable_.Release_Store(NULL);
1574 }
1575
1576 TEST(DBTest, ManifestWriteError) {
1577   // Test for the following problem:
1578   // (a) Compaction produces file F
1579   // (b) Log record containing F is written to MANIFEST file, but Sync() fails
1580   // (c) GC deletes F
1581   // (d) After reopening DB, reads fail since deleted F is named in log record
1582
1583   // We iterate twice.  In the second iteration, everything is the
1584   // same except the log record never makes it to the MANIFEST file.
1585   for (int iter = 0; iter < 2; iter++) {
1586     port::AtomicPointer* error_type = (iter == 0)
1587         ? &env_->manifest_sync_error_
1588         : &env_->manifest_write_error_;
1589
1590     // Insert foo=>bar mapping
1591     Options options = CurrentOptions();
1592     options.env = env_;
1593     options.create_if_missing = true;
1594     options.error_if_exists = false;
1595     DestroyAndReopen(&options);
1596     ASSERT_OK(Put("foo", "bar"));
1597     ASSERT_EQ("bar", Get("foo"));
1598
1599     // Memtable compaction (will succeed)
1600     dbfull()->TEST_CompactMemTable();
1601     ASSERT_EQ("bar", Get("foo"));
1602     const int last = config::kMaxMemCompactLevel;
1603     ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo=>bar is now in last level
1604
1605     // Merging compaction (will fail)
1606     error_type->Release_Store(env_);
1607     dbfull()->TEST_CompactRange(last, NULL, NULL);  // Should fail
1608     ASSERT_EQ("bar", Get("foo"));
1609
1610     // Recovery: should not lose data
1611     error_type->Release_Store(NULL);
1612     Reopen(&options);
1613     ASSERT_EQ("bar", Get("foo"));
1614   }
1615 }
1616
1617 TEST(DBTest, MissingSSTFile) {
1618   ASSERT_OK(Put("foo", "bar"));
1619   ASSERT_EQ("bar", Get("foo"));
1620
1621   // Dump the memtable to disk.
1622   dbfull()->TEST_CompactMemTable();
1623   ASSERT_EQ("bar", Get("foo"));
1624
1625   Close();
1626   ASSERT_TRUE(DeleteAnSSTFile());
1627   Options options = CurrentOptions();
1628   options.paranoid_checks = true;
1629   Status s = TryReopen(&options);
1630   ASSERT_TRUE(!s.ok());
1631   ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
1632       << s.ToString();
1633 }
1634
1635 TEST(DBTest, FilesDeletedAfterCompaction) {
1636   ASSERT_OK(Put("foo", "v2"));
1637   Compact("a", "z");
1638   const int num_files = CountFiles();
1639   for (int i = 0; i < 10; i++) {
1640     ASSERT_OK(Put("foo", "v2"));
1641     Compact("a", "z");
1642   }
1643   ASSERT_EQ(CountFiles(), num_files);
1644 }
1645
1646 TEST(DBTest, BloomFilter) {
1647   env_->count_random_reads_ = true;
1648   Options options = CurrentOptions();
1649   options.env = env_;
1650   options.block_cache = NewLRUCache(0);  // Prevent cache hits
1651   options.filter_policy = NewBloomFilterPolicy(10);
1652   Reopen(&options);
1653
1654   // Populate multiple layers
1655   const int N = 10000;
1656   for (int i = 0; i < N; i++) {
1657     ASSERT_OK(Put(Key(i), Key(i)));
1658   }
1659   Compact("a", "z");
1660   for (int i = 0; i < N; i += 100) {
1661     ASSERT_OK(Put(Key(i), Key(i)));
1662   }
1663   dbfull()->TEST_CompactMemTable();
1664
1665   // Prevent auto compactions triggered by seeks
1666   env_->delay_sstable_sync_.Release_Store(env_);
1667
1668   // Lookup present keys.  Should rarely read from small sstable.
1669   env_->random_read_counter_.Reset();
1670   for (int i = 0; i < N; i++) {
1671     ASSERT_EQ(Key(i), Get(Key(i)));
1672   }
1673   int reads = env_->random_read_counter_.Read();
1674   fprintf(stderr, "%d present => %d reads\n", N, reads);
1675   ASSERT_GE(reads, N);
1676   ASSERT_LE(reads, N + 2*N/100);
1677
1678   // Lookup present keys.  Should rarely read from either sstable.
1679   env_->random_read_counter_.Reset();
1680   for (int i = 0; i < N; i++) {
1681     ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing"));
1682   }
1683   reads = env_->random_read_counter_.Read();
1684   fprintf(stderr, "%d missing => %d reads\n", N, reads);
1685   ASSERT_LE(reads, 3*N/100);
1686
1687   env_->delay_sstable_sync_.Release_Store(NULL);
1688   Close();
1689   delete options.block_cache;
1690   delete options.filter_policy;
1691 }
1692
1693 // Multi-threaded test:
1694 namespace {
1695
1696 static const int kNumThreads = 4;
1697 static const int kTestSeconds = 10;
1698 static const int kNumKeys = 1000;
1699
1700 struct MTState {
1701   DBTest* test;
1702   port::AtomicPointer stop;
1703   port::AtomicPointer counter[kNumThreads];
1704   port::AtomicPointer thread_done[kNumThreads];
1705 };
1706
1707 struct MTThread {
1708   MTState* state;
1709   int id;
1710 };
1711
1712 static void MTThreadBody(void* arg) {
1713   MTThread* t = reinterpret_cast<MTThread*>(arg);
1714   int id = t->id;
1715   DB* db = t->state->test->db_;
1716   uintptr_t counter = 0;
1717   fprintf(stderr, "... starting thread %d\n", id);
1718   Random rnd(1000 + id);
1719   std::string value;
1720   char valbuf[1500];
1721   while (t->state->stop.Acquire_Load() == NULL) {
1722     t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
1723
1724     int key = rnd.Uniform(kNumKeys);
1725     char keybuf[20];
1726     snprintf(keybuf, sizeof(keybuf), "%016d", key);
1727
1728     if (rnd.OneIn(2)) {
1729       // Write values of the form <key, my id, counter>.
1730       // We add some padding for force compactions.
1731       snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
1732                key, id, static_cast<int>(counter));
1733       ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
1734     } else {
1735       // Read a value and verify that it matches the pattern written above.
1736       Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
1737       if (s.IsNotFound()) {
1738         // Key has not yet been written
1739       } else {
1740         // Check that the writer thread counter is >= the counter in the value
1741         ASSERT_OK(s);
1742         int k, w, c;
1743         ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
1744         ASSERT_EQ(k, key);
1745         ASSERT_GE(w, 0);
1746         ASSERT_LT(w, kNumThreads);
1747         ASSERT_LE(c, reinterpret_cast<uintptr_t>(
1748             t->state->counter[w].Acquire_Load()));
1749       }
1750     }
1751     counter++;
1752   }
1753   t->state->thread_done[id].Release_Store(t);
1754   fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
1755 }
1756
1757 }  // namespace
1758
1759 TEST(DBTest, MultiThreaded) {
1760   do {
1761     // Initialize state
1762     MTState mt;
1763     mt.test = this;
1764     mt.stop.Release_Store(0);
1765     for (int id = 0; id < kNumThreads; id++) {
1766       mt.counter[id].Release_Store(0);
1767       mt.thread_done[id].Release_Store(0);
1768     }
1769
1770     // Start threads
1771     MTThread thread[kNumThreads];
1772     for (int id = 0; id < kNumThreads; id++) {
1773       thread[id].state = &mt;
1774       thread[id].id = id;
1775       env_->StartThread(MTThreadBody, &thread[id]);
1776     }
1777
1778     // Let them run for a while
1779     DelayMilliseconds(kTestSeconds * 1000);
1780
1781     // Stop the threads and wait for them to finish
1782     mt.stop.Release_Store(&mt);
1783     for (int id = 0; id < kNumThreads; id++) {
1784       while (mt.thread_done[id].Acquire_Load() == NULL) {
1785         DelayMilliseconds(100);
1786       }
1787     }
1788   } while (ChangeOptions());
1789 }
1790
1791 namespace {
1792 typedef std::map<std::string, std::string> KVMap;
1793 }
1794
1795 class ModelDB: public DB {
1796  public:
1797   class ModelSnapshot : public Snapshot {
1798    public:
1799     KVMap map_;
1800   };
1801
1802   explicit ModelDB(const Options& options): options_(options) { }
1803   ~ModelDB() { }
1804   virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
1805     return DB::Put(o, k, v);
1806   }
1807   virtual Status Delete(const WriteOptions& o, const Slice& key) {
1808     return DB::Delete(o, key);
1809   }
1810   virtual Status Get(const ReadOptions& options,
1811                      const Slice& key, std::string* value) {
1812     assert(false);      // Not implemented
1813     return Status::NotFound(key);
1814   }
1815   virtual Iterator* NewIterator(const ReadOptions& options) {
1816     if (options.snapshot == NULL) {
1817       KVMap* saved = new KVMap;
1818       *saved = map_;
1819       return new ModelIter(saved, true);
1820     } else {
1821       const KVMap* snapshot_state =
1822           &(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
1823       return new ModelIter(snapshot_state, false);
1824     }
1825   }
1826   virtual const Snapshot* GetSnapshot() {
1827     ModelSnapshot* snapshot = new ModelSnapshot;
1828     snapshot->map_ = map_;
1829     return snapshot;
1830   }
1831
1832   virtual void ReleaseSnapshot(const Snapshot* snapshot) {
1833     delete reinterpret_cast<const ModelSnapshot*>(snapshot);
1834   }
1835   virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
1836     class Handler : public WriteBatch::Handler {
1837      public:
1838       KVMap* map_;
1839       virtual void Put(const Slice& key, const Slice& value) {
1840         (*map_)[key.ToString()] = value.ToString();
1841       }
1842       virtual void Delete(const Slice& key) {
1843         map_->erase(key.ToString());
1844       }
1845     };
1846     Handler handler;
1847     handler.map_ = &map_;
1848     return batch->Iterate(&handler);
1849   }
1850
1851   virtual bool GetProperty(const Slice& property, std::string* value) {
1852     return false;
1853   }
1854   virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
1855     for (int i = 0; i < n; i++) {
1856       sizes[i] = 0;
1857     }
1858   }
1859   virtual void CompactRange(const Slice* start, const Slice* end) {
1860   }
1861
1862  private:
1863   class ModelIter: public Iterator {
1864    public:
1865     ModelIter(const KVMap* map, bool owned)
1866         : map_(map), owned_(owned), iter_(map_->end()) {
1867     }
1868     ~ModelIter() {
1869       if (owned_) delete map_;
1870     }
1871     virtual bool Valid() const { return iter_ != map_->end(); }
1872     virtual void SeekToFirst() { iter_ = map_->begin(); }
1873     virtual void SeekToLast() {
1874       if (map_->empty()) {
1875         iter_ = map_->end();
1876       } else {
1877         iter_ = map_->find(map_->rbegin()->first);
1878       }
1879     }
1880     virtual void Seek(const Slice& k) {
1881       iter_ = map_->lower_bound(k.ToString());
1882     }
1883     virtual void Next() { ++iter_; }
1884     virtual void Prev() { --iter_; }
1885     virtual Slice key() const { return iter_->first; }
1886     virtual Slice value() const { return iter_->second; }
1887     virtual Status status() const { return Status::OK(); }
1888    private:
1889     const KVMap* const map_;
1890     const bool owned_;  // Do we own map_
1891     KVMap::const_iterator iter_;
1892   };
1893   const Options options_;
1894   KVMap map_;
1895 };
1896
1897 static std::string RandomKey(Random* rnd) {
1898   int len = (rnd->OneIn(3)
1899              ? 1                // Short sometimes to encourage collisions
1900              : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
1901   return test::RandomKey(rnd, len);
1902 }
1903
1904 static bool CompareIterators(int step,
1905                              DB* model,
1906                              DB* db,
1907                              const Snapshot* model_snap,
1908                              const Snapshot* db_snap) {
1909   ReadOptions options;
1910   options.snapshot = model_snap;
1911   Iterator* miter = model->NewIterator(options);
1912   options.snapshot = db_snap;
1913   Iterator* dbiter = db->NewIterator(options);
1914   bool ok = true;
1915   int count = 0;
1916   for (miter->SeekToFirst(), dbiter->SeekToFirst();
1917        ok && miter->Valid() && dbiter->Valid();
1918        miter->Next(), dbiter->Next()) {
1919     count++;
1920     if (miter->key().compare(dbiter->key()) != 0) {
1921       fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
1922               step,
1923               EscapeString(miter->key()).c_str(),
1924               EscapeString(dbiter->key()).c_str());
1925       ok = false;
1926       break;
1927     }
1928
1929     if (miter->value().compare(dbiter->value()) != 0) {
1930       fprintf(stderr, "step %d: Value mismatch for key '%s': '%s' vs. '%s'\n",
1931               step,
1932               EscapeString(miter->key()).c_str(),
1933               EscapeString(miter->value()).c_str(),
1934               EscapeString(miter->value()).c_str());
1935       ok = false;
1936     }
1937   }
1938
1939   if (ok) {
1940     if (miter->Valid() != dbiter->Valid()) {
1941       fprintf(stderr, "step %d: Mismatch at end of iterators: %d vs. %d\n",
1942               step, miter->Valid(), dbiter->Valid());
1943       ok = false;
1944     }
1945   }
1946   fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
1947   delete miter;
1948   delete dbiter;
1949   return ok;
1950 }
1951
1952 TEST(DBTest, Randomized) {
1953   Random rnd(test::RandomSeed());
1954   do {
1955     ModelDB model(CurrentOptions());
1956     const int N = 10000;
1957     const Snapshot* model_snap = NULL;
1958     const Snapshot* db_snap = NULL;
1959     std::string k, v;
1960     for (int step = 0; step < N; step++) {
1961       if (step % 100 == 0) {
1962         fprintf(stderr, "Step %d of %d\n", step, N);
1963       }
1964       // TODO(sanjay): Test Get() works
1965       int p = rnd.Uniform(100);
1966       if (p < 45) {                               // Put
1967         k = RandomKey(&rnd);
1968         v = RandomString(&rnd,
1969                          rnd.OneIn(20)
1970                          ? 100 + rnd.Uniform(100)
1971                          : rnd.Uniform(8));
1972         ASSERT_OK(model.Put(WriteOptions(), k, v));
1973         ASSERT_OK(db_->Put(WriteOptions(), k, v));
1974
1975       } else if (p < 90) {                        // Delete
1976         k = RandomKey(&rnd);
1977         ASSERT_OK(model.Delete(WriteOptions(), k));
1978         ASSERT_OK(db_->Delete(WriteOptions(), k));
1979
1980
1981       } else {                                    // Multi-element batch
1982         WriteBatch b;
1983         const int num = rnd.Uniform(8);
1984         for (int i = 0; i < num; i++) {
1985           if (i == 0 || !rnd.OneIn(10)) {
1986             k = RandomKey(&rnd);
1987           } else {
1988             // Periodically re-use the same key from the previous iter, so
1989             // we have multiple entries in the write batch for the same key
1990           }
1991           if (rnd.OneIn(2)) {
1992             v = RandomString(&rnd, rnd.Uniform(10));
1993             b.Put(k, v);
1994           } else {
1995             b.Delete(k);
1996           }
1997         }
1998         ASSERT_OK(model.Write(WriteOptions(), &b));
1999         ASSERT_OK(db_->Write(WriteOptions(), &b));
2000       }
2001
2002       if ((step % 100) == 0) {
2003         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
2004         ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap));
2005         // Save a snapshot from each DB this time that we'll use next
2006         // time we compare things, to make sure the current state is
2007         // preserved with the snapshot
2008         if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
2009         if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
2010
2011         Reopen();
2012         ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL));
2013
2014         model_snap = model.GetSnapshot();
2015         db_snap = db_->GetSnapshot();
2016       }
2017     }
2018     if (model_snap != NULL) model.ReleaseSnapshot(model_snap);
2019     if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
2020   } while (ChangeOptions());
2021 }
2022
2023 std::string MakeKey(unsigned int num) {
2024   char buf[30];
2025   snprintf(buf, sizeof(buf), "%016u", num);
2026   return std::string(buf);
2027 }
2028
2029 void BM_LogAndApply(int iters, int num_base_files) {
2030   std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
2031   DestroyDB(dbname, Options());
2032
2033   DB* db = NULL;
2034   Options opts;
2035   opts.create_if_missing = true;
2036   Status s = DB::Open(opts, dbname, &db);
2037   ASSERT_OK(s);
2038   ASSERT_TRUE(db != NULL);
2039
2040   delete db;
2041   db = NULL;
2042
2043   Env* env = Env::Default();
2044
2045   port::Mutex mu;
2046   MutexLock l(&mu);
2047
2048   InternalKeyComparator cmp(BytewiseComparator());
2049   Options options;
2050   VersionSet vset(dbname, &options, NULL, &cmp);
2051   ASSERT_OK(vset.Recover());
2052   VersionEdit vbase;
2053   uint64_t fnum = 1;
2054   for (int i = 0; i < num_base_files; i++) {
2055     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
2056     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
2057     vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
2058   }
2059   ASSERT_OK(vset.LogAndApply(&vbase, &mu));
2060
2061   uint64_t start_micros = env->NowMicros();
2062
2063   for (int i = 0; i < iters; i++) {
2064     VersionEdit vedit;
2065     vedit.DeleteFile(2, fnum);
2066     InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
2067     InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
2068     vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
2069     vset.LogAndApply(&vedit, &mu);
2070   }
2071   uint64_t stop_micros = env->NowMicros();
2072   unsigned int us = stop_micros - start_micros;
2073   char buf[16];
2074   snprintf(buf, sizeof(buf), "%d", num_base_files);
2075   fprintf(stderr,
2076           "BM_LogAndApply/%-6s   %8d iters : %9u us (%7.0f us / iter)\n",
2077           buf, iters, us, ((float)us) / iters);
2078 }
2079
2080 }  // namespace leveldb
2081
2082 int main(int argc, char** argv) {
2083   if (argc > 1 && std::string(argv[1]) == "--benchmark") {
2084     leveldb::BM_LogAndApply(1000, 1);
2085     leveldb::BM_LogAndApply(1000, 100);
2086     leveldb::BM_LogAndApply(1000, 10000);
2087     leveldb::BM_LogAndApply(100, 100000);
2088     return 0;
2089   }
2090
2091   return leveldb::test::RunAllTests();
2092 }