Add Google's LevelDB support
[novacoin.git] / src / leveldb / db / log_reader.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/log_reader.h"
6
7 #include <stdio.h>
8 #include "leveldb/env.h"
9 #include "util/coding.h"
10 #include "util/crc32c.h"
11
12 namespace leveldb {
13 namespace log {
14
15 Reader::Reporter::~Reporter() {
16 }
17
18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
19                uint64_t initial_offset)
20     : file_(file),
21       reporter_(reporter),
22       checksum_(checksum),
23       backing_store_(new char[kBlockSize]),
24       buffer_(),
25       eof_(false),
26       last_record_offset_(0),
27       end_of_buffer_offset_(0),
28       initial_offset_(initial_offset) {
29 }
30
31 Reader::~Reader() {
32   delete[] backing_store_;
33 }
34
35 bool Reader::SkipToInitialBlock() {
36   size_t offset_in_block = initial_offset_ % kBlockSize;
37   uint64_t block_start_location = initial_offset_ - offset_in_block;
38
39   // Don't search a block if we'd be in the trailer
40   if (offset_in_block > kBlockSize - 6) {
41     offset_in_block = 0;
42     block_start_location += kBlockSize;
43   }
44
45   end_of_buffer_offset_ = block_start_location;
46
47   // Skip to start of first block that can contain the initial record
48   if (block_start_location > 0) {
49     Status skip_status = file_->Skip(block_start_location);
50     if (!skip_status.ok()) {
51       ReportDrop(block_start_location, skip_status);
52       return false;
53     }
54   }
55
56   return true;
57 }
58
59 bool Reader::ReadRecord(Slice* record, std::string* scratch) {
60   if (last_record_offset_ < initial_offset_) {
61     if (!SkipToInitialBlock()) {
62       return false;
63     }
64   }
65
66   scratch->clear();
67   record->clear();
68   bool in_fragmented_record = false;
69   // Record offset of the logical record that we're reading
70   // 0 is a dummy value to make compilers happy
71   uint64_t prospective_record_offset = 0;
72
73   Slice fragment;
74   while (true) {
75     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
76     const unsigned int record_type = ReadPhysicalRecord(&fragment);
77     switch (record_type) {
78       case kFullType:
79         if (in_fragmented_record) {
80           // Handle bug in earlier versions of log::Writer where
81           // it could emit an empty kFirstType record at the tail end
82           // of a block followed by a kFullType or kFirstType record
83           // at the beginning of the next block.
84           if (scratch->empty()) {
85             in_fragmented_record = false;
86           } else {
87             ReportCorruption(scratch->size(), "partial record without end(1)");
88           }
89         }
90         prospective_record_offset = physical_record_offset;
91         scratch->clear();
92         *record = fragment;
93         last_record_offset_ = prospective_record_offset;
94         return true;
95
96       case kFirstType:
97         if (in_fragmented_record) {
98           // Handle bug in earlier versions of log::Writer where
99           // it could emit an empty kFirstType record at the tail end
100           // of a block followed by a kFullType or kFirstType record
101           // at the beginning of the next block.
102           if (scratch->empty()) {
103             in_fragmented_record = false;
104           } else {
105             ReportCorruption(scratch->size(), "partial record without end(2)");
106           }
107         }
108         prospective_record_offset = physical_record_offset;
109         scratch->assign(fragment.data(), fragment.size());
110         in_fragmented_record = true;
111         break;
112
113       case kMiddleType:
114         if (!in_fragmented_record) {
115           ReportCorruption(fragment.size(),
116                            "missing start of fragmented record(1)");
117         } else {
118           scratch->append(fragment.data(), fragment.size());
119         }
120         break;
121
122       case kLastType:
123         if (!in_fragmented_record) {
124           ReportCorruption(fragment.size(),
125                            "missing start of fragmented record(2)");
126         } else {
127           scratch->append(fragment.data(), fragment.size());
128           *record = Slice(*scratch);
129           last_record_offset_ = prospective_record_offset;
130           return true;
131         }
132         break;
133
134       case kEof:
135         if (in_fragmented_record) {
136           ReportCorruption(scratch->size(), "partial record without end(3)");
137           scratch->clear();
138         }
139         return false;
140
141       case kBadRecord:
142         if (in_fragmented_record) {
143           ReportCorruption(scratch->size(), "error in middle of record");
144           in_fragmented_record = false;
145           scratch->clear();
146         }
147         break;
148
149       default: {
150         char buf[40];
151         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
152         ReportCorruption(
153             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
154             buf);
155         in_fragmented_record = false;
156         scratch->clear();
157         break;
158       }
159     }
160   }
161   return false;
162 }
163
164 uint64_t Reader::LastRecordOffset() {
165   return last_record_offset_;
166 }
167
168 void Reader::ReportCorruption(size_t bytes, const char* reason) {
169   ReportDrop(bytes, Status::Corruption(reason));
170 }
171
172 void Reader::ReportDrop(size_t bytes, const Status& reason) {
173   if (reporter_ != NULL &&
174       end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
175     reporter_->Corruption(bytes, reason);
176   }
177 }
178
179 unsigned int Reader::ReadPhysicalRecord(Slice* result) {
180   while (true) {
181     if (buffer_.size() < kHeaderSize) {
182       if (!eof_) {
183         // Last read was a full read, so this is a trailer to skip
184         buffer_.clear();
185         Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
186         end_of_buffer_offset_ += buffer_.size();
187         if (!status.ok()) {
188           buffer_.clear();
189           ReportDrop(kBlockSize, status);
190           eof_ = true;
191           return kEof;
192         } else if (buffer_.size() < kBlockSize) {
193           eof_ = true;
194         }
195         continue;
196       } else if (buffer_.size() == 0) {
197         // End of file
198         return kEof;
199       } else {
200         size_t drop_size = buffer_.size();
201         buffer_.clear();
202         ReportCorruption(drop_size, "truncated record at end of file");
203         return kEof;
204       }
205     }
206
207     // Parse the header
208     const char* header = buffer_.data();
209     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
210     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
211     const unsigned int type = header[6];
212     const uint32_t length = a | (b << 8);
213     if (kHeaderSize + length > buffer_.size()) {
214       size_t drop_size = buffer_.size();
215       buffer_.clear();
216       ReportCorruption(drop_size, "bad record length");
217       return kBadRecord;
218     }
219
220     if (type == kZeroType && length == 0) {
221       // Skip zero length record without reporting any drops since
222       // such records are produced by the mmap based writing code in
223       // env_posix.cc that preallocates file regions.
224       buffer_.clear();
225       return kBadRecord;
226     }
227
228     // Check crc
229     if (checksum_) {
230       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
231       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
232       if (actual_crc != expected_crc) {
233         // Drop the rest of the buffer since "length" itself may have
234         // been corrupted and if we trust it, we could find some
235         // fragment of a real log record that just happens to look
236         // like a valid log record.
237         size_t drop_size = buffer_.size();
238         buffer_.clear();
239         ReportCorruption(drop_size, "checksum mismatch");
240         return kBadRecord;
241       }
242     }
243
244     buffer_.remove_prefix(kHeaderSize + length);
245
246     // Skip physical record that started before initial_offset_
247     if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
248         initial_offset_) {
249       result->clear();
250       return kBadRecord;
251     }
252
253     *result = Slice(header + kHeaderSize, length);
254     return type;
255   }
256 }
257
258 }  // namespace log
259 }  // namespace leveldb