Merge pull request #361 from svost/master
[novacoin.git] / src / leveldb / table / table.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "leveldb/table.h"
6
7 #include "leveldb/cache.h"
8 #include "leveldb/comparator.h"
9 #include "leveldb/env.h"
10 #include "leveldb/filter_policy.h"
11 #include "leveldb/options.h"
12 #include "table/block.h"
13 #include "table/filter_block.h"
14 #include "table/format.h"
15 #include "table/two_level_iterator.h"
16 #include "util/coding.h"
17
18 namespace leveldb {
19
20 struct Table::Rep {
21   ~Rep() {
22     delete filter;
23     delete [] filter_data;
24     delete index_block;
25   }
26
27   Options options;
28   Status status;
29   RandomAccessFile* file;
30   uint64_t cache_id;
31   FilterBlockReader* filter;
32   const char* filter_data;
33
34   BlockHandle metaindex_handle;  // Handle to metaindex_block: saved from footer
35   Block* index_block;
36 };
37
38 Status Table::Open(const Options& options,
39                    RandomAccessFile* file,
40                    uint64_t size,
41                    Table** table) {
42   *table = NULL;
43   if (size < Footer::kEncodedLength) {
44     return Status::Corruption("file is too short to be an sstable");
45   }
46
47   char footer_space[Footer::kEncodedLength];
48   Slice footer_input;
49   Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
50                         &footer_input, footer_space);
51   if (!s.ok()) return s;
52
53   Footer footer;
54   s = footer.DecodeFrom(&footer_input);
55   if (!s.ok()) return s;
56
57   // Read the index block
58   BlockContents contents;
59   Block* index_block = NULL;
60   if (s.ok()) {
61     ReadOptions opt;
62     if (options.paranoid_checks) {
63       opt.verify_checksums = true;
64     }
65     s = ReadBlock(file, opt, footer.index_handle(), &contents);
66     if (s.ok()) {
67       index_block = new Block(contents);
68     }
69   }
70
71   if (s.ok()) {
72     // We've successfully read the footer and the index block: we're
73     // ready to serve requests.
74     Rep* rep = new Table::Rep;
75     rep->options = options;
76     rep->file = file;
77     rep->metaindex_handle = footer.metaindex_handle();
78     rep->index_block = index_block;
79     rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
80     rep->filter_data = NULL;
81     rep->filter = NULL;
82     *table = new Table(rep);
83     (*table)->ReadMeta(footer);
84   } else {
85     if (index_block) delete index_block;
86   }
87
88   return s;
89 }
90
91 void Table::ReadMeta(const Footer& footer) {
92   if (rep_->options.filter_policy == NULL) {
93     return;  // Do not need any metadata
94   }
95
96   // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
97   // it is an empty block.
98   ReadOptions opt;
99   if (rep_->options.paranoid_checks) {
100     opt.verify_checksums = true;
101   }
102   BlockContents contents;
103   if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
104     // Do not propagate errors since meta info is not needed for operation
105     return;
106   }
107   Block* meta = new Block(contents);
108
109   Iterator* iter = meta->NewIterator(BytewiseComparator());
110   std::string key = "filter.";
111   key.append(rep_->options.filter_policy->Name());
112   iter->Seek(key);
113   if (iter->Valid() && iter->key() == Slice(key)) {
114     ReadFilter(iter->value());
115   }
116   delete iter;
117   delete meta;
118 }
119
120 void Table::ReadFilter(const Slice& filter_handle_value) {
121   Slice v = filter_handle_value;
122   BlockHandle filter_handle;
123   if (!filter_handle.DecodeFrom(&v).ok()) {
124     return;
125   }
126
127   // We might want to unify with ReadBlock() if we start
128   // requiring checksum verification in Table::Open.
129   ReadOptions opt;
130   if (rep_->options.paranoid_checks) {
131     opt.verify_checksums = true;
132   }
133   BlockContents block;
134   if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
135     return;
136   }
137   if (block.heap_allocated) {
138     rep_->filter_data = block.data.data();     // Will need to delete later
139   }
140   rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
141 }
142
143 Table::~Table() {
144   delete rep_;
145 }
146
147 static void DeleteBlock(void* arg, void* ignored) {
148   delete reinterpret_cast<Block*>(arg);
149 }
150
151 static void DeleteCachedBlock(const Slice& key, void* value) {
152   Block* block = reinterpret_cast<Block*>(value);
153   delete block;
154 }
155
156 static void ReleaseBlock(void* arg, void* h) {
157   Cache* cache = reinterpret_cast<Cache*>(arg);
158   Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
159   cache->Release(handle);
160 }
161
162 // Convert an index iterator value (i.e., an encoded BlockHandle)
163 // into an iterator over the contents of the corresponding block.
164 Iterator* Table::BlockReader(void* arg,
165                              const ReadOptions& options,
166                              const Slice& index_value) {
167   Table* table = reinterpret_cast<Table*>(arg);
168   Cache* block_cache = table->rep_->options.block_cache;
169   Block* block = NULL;
170   Cache::Handle* cache_handle = NULL;
171
172   BlockHandle handle;
173   Slice input = index_value;
174   Status s = handle.DecodeFrom(&input);
175   // We intentionally allow extra stuff in index_value so that we
176   // can add more features in the future.
177
178   if (s.ok()) {
179     BlockContents contents;
180     if (block_cache != NULL) {
181       char cache_key_buffer[16];
182       EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
183       EncodeFixed64(cache_key_buffer+8, handle.offset());
184       Slice key(cache_key_buffer, sizeof(cache_key_buffer));
185       cache_handle = block_cache->Lookup(key);
186       if (cache_handle != NULL) {
187         block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
188       } else {
189         s = ReadBlock(table->rep_->file, options, handle, &contents);
190         if (s.ok()) {
191           block = new Block(contents);
192           if (contents.cachable && options.fill_cache) {
193             cache_handle = block_cache->Insert(
194                 key, block, block->size(), &DeleteCachedBlock);
195           }
196         }
197       }
198     } else {
199       s = ReadBlock(table->rep_->file, options, handle, &contents);
200       if (s.ok()) {
201         block = new Block(contents);
202       }
203     }
204   }
205
206   Iterator* iter;
207   if (block != NULL) {
208     iter = block->NewIterator(table->rep_->options.comparator);
209     if (cache_handle == NULL) {
210       iter->RegisterCleanup(&DeleteBlock, block, NULL);
211     } else {
212       iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
213     }
214   } else {
215     iter = NewErrorIterator(s);
216   }
217   return iter;
218 }
219
220 Iterator* Table::NewIterator(const ReadOptions& options) const {
221   return NewTwoLevelIterator(
222       rep_->index_block->NewIterator(rep_->options.comparator),
223       &Table::BlockReader, const_cast<Table*>(this), options);
224 }
225
226 Status Table::InternalGet(const ReadOptions& options, const Slice& k,
227                           void* arg,
228                           void (*saver)(void*, const Slice&, const Slice&)) {
229   Status s;
230   Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
231   iiter->Seek(k);
232   if (iiter->Valid()) {
233     Slice handle_value = iiter->value();
234     FilterBlockReader* filter = rep_->filter;
235     BlockHandle handle;
236     if (filter != NULL &&
237         handle.DecodeFrom(&handle_value).ok() &&
238         !filter->KeyMayMatch(handle.offset(), k)) {
239       // Not found
240     } else {
241       Iterator* block_iter = BlockReader(this, options, iiter->value());
242       block_iter->Seek(k);
243       if (block_iter->Valid()) {
244         (*saver)(arg, block_iter->key(), block_iter->value());
245       }
246       s = block_iter->status();
247       delete block_iter;
248     }
249   }
250   if (s.ok()) {
251     s = iiter->status();
252   }
253   delete iiter;
254   return s;
255 }
256
257
258 uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
259   Iterator* index_iter =
260       rep_->index_block->NewIterator(rep_->options.comparator);
261   index_iter->Seek(key);
262   uint64_t result;
263   if (index_iter->Valid()) {
264     BlockHandle handle;
265     Slice input = index_iter->value();
266     Status s = handle.DecodeFrom(&input);
267     if (s.ok()) {
268       result = handle.offset();
269     } else {
270       // Strange: we can't decode the block handle in the index block.
271       // We'll just return the offset of the metaindex block, which is
272       // close to the whole file size for this case.
273       result = rep_->metaindex_handle.offset();
274     }
275   } else {
276     // key is past the last key in the file.  Approximate the offset
277     // by returning the offset of the metaindex block (which is
278     // right near the end of the file).
279     result = rep_->metaindex_handle.offset();
280   }
281   delete index_iter;
282   return result;
283 }
284
285 }  // namespace leveldb