wandb

Форк
0
/
record.go 
552 строки · 15.1 Кб
1
// Modified from upstream sources
2
// https://github.com/golang/leveldb/blob/master/record/record.go
3
// Changes:
4
// - Add ability to use different CRC algorithm
5

6
// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style
8
// license that can be found in the LICENSE file.
9

10
// Package record reads and writes sequences of records. Each record is a stream
11
// of bytes that completes before the next record starts.
12
//
13
// When reading, call Next to obtain an io.Reader for the next record. Next will
14
// return io.EOF when there are no more records. It is valid to call Next
15
// without reading the current record to exhaustion.
16
//
17
// When writing, call Next to obtain an io.Writer for the next record. Calling
18
// Next finishes the current record. Call Close to finish the final record.
19
//
20
// Optionally, call Flush to finish the current record and flush the underlying
21
// writer without starting a new record. To start a new record after flushing,
22
// call Next.
23
//
24
// Neither Readers or Writers are safe to use concurrently.
25
//
26
// Example code:
27
//
28
//	func read(r io.Reader) ([]string, error) {
29
//		var ss []string
30
//		records := record.NewReader(r)
31
//		for {
32
//			rec, err := records.Next()
33
//			if err == io.EOF {
34
//				break
35
//			}
36
//			if err != nil {
37
//				log.Printf("recovering from %v", err)
38
//				r.Recover()
39
//				continue
40
//			}
41
//			s, err := ioutil.ReadAll(rec)
42
//			if err != nil {
43
//				log.Printf("recovering from %v", err)
44
//				r.Recover()
45
//				continue
46
//			}
47
//			ss = append(ss, string(s))
48
//		}
49
//		return ss, nil
50
//	}
51
//
52
//	func write(w io.Writer, ss []string) error {
53
//		records := record.NewWriter(w)
54
//		for _, s := range ss {
55
//			rec, err := records.Next()
56
//			if err != nil {
57
//				return err
58
//			}
59
//			if _, err := rec.Write([]byte(s)), err != nil {
60
//				return err
61
//			}
62
//		}
63
//		return records.Close()
64
//	}
65
//
66
// The wire format is that the stream is divided into 32KiB blocks, and each
67
// block contains a number of tightly packed chunks. Chunks cannot cross block
68
// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
69
// block must be zero.
70
//
71
// A record maps to one or more chunks. Each chunk has a 7 byte header (a 4
72
// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
73
// followed by a payload. The checksum is over the chunk type and the payload.
74
//
75
// There are four chunk types: whether the chunk is the full record, or the
76
// first, middle or last chunk of a multi-chunk record. A multi-chunk record
77
// has one first chunk, zero or more middle chunks, and one last chunk.
78
//
79
// The wire format allows for limited recovery in the face of data corruption:
80
// on a format error (such as a checksum mismatch), the reader moves to the
81
// next block and looks for the next full or first chunk.
82
package leveldb
83

84
// The C++ Level-DB code calls this the log, but it has been renamed to record
85
// to avoid clashing with the standard log package, and because it is generally
86
// useful outside of logging. The C++ code also uses the term "physical record"
87
// instead of "chunk", but "chunk" is shorter and less confusing.
88

89
import (
90
	"encoding/binary"
91
	"errors"
92
	"io"
93
)
94

95
// These constants are part of the wire format and should not be changed.
96
const (
97
	fullChunkType   = 1
98
	firstChunkType  = 2
99
	middleChunkType = 3
100
	lastChunkType   = 4
101
)
102

103
const (
104
	blockSize     = 32 * 1024
105
	blockSizeMask = blockSize - 1
106
	headerSize    = 7
107
)
108

109
var (
110
	// ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.
111
	ErrNotAnIOSeeker = errors.New("leveldb/record: reader does not implement io.Seeker")
112

113
	// ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.
114
	ErrNoLastRecord = errors.New("leveldb/record: no last record exists")
115
)
116

117
type flusher interface {
118
	Flush() error
119
}
120

121
// Reader reads records from an underlying io.Reader.
122
type Reader struct {
123
	// r is the underlying reader.
124
	r io.Reader
125
	// seq is the sequence number of the current record.
126
	seq int
127
	// buf[i:j] is the unread portion of the current chunk's payload.
128
	// The low bound, i, excludes the chunk header.
129
	i, j int
130
	// n is the number of bytes of buf that are valid. Once reading has started,
131
	// only the final block can have n < blockSize.
132
	n int
133
	// started is whether Next has been called at all.
134
	started bool
135
	// recovering is true when recovering from corruption.
136
	recovering bool
137
	// last is whether the current chunk is the last chunk of the record.
138
	last bool
139
	// err is any accumulated error.
140
	err error
141
	// buf is the buffer.
142
	buf [blockSize]byte
143
	// CRC function
144
	crc func([]byte) uint32
145
}
146

147
// NewReader returns a new reader.
148
func NewReaderExt(r io.Reader, algo CRCAlgo) *Reader {
149
	crc := CRCCustom
150
	if algo == CRCAlgoIEEE {
151
		crc = CRCStandard
152
	}
153
	return &Reader{
154
		r:   r,
155
		crc: crc,
156
	}
157
}
158

159
func NewReader(r io.Reader) *Reader {
160
	return NewReaderExt(r, CRCAlgoCustom)
161
}
162

163
// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
164
// next block into the buffer if necessary.
165
func (r *Reader) nextChunk(wantFirst bool) error {
166
	for {
167
		if r.j+headerSize <= r.n {
168
			checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
169
			length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
170
			chunkType := r.buf[r.j+6]
171

172
			if checksum == 0 && length == 0 && chunkType == 0 {
173
				if wantFirst || r.recovering {
174
					// Skip the rest of the block, if it looks like it is all
175
					// zeroes. This is common if the record file was created
176
					// via mmap.
177
					//
178
					// Set r.err to be an error so r.Recover actually recovers.
179
					r.err = errors.New("leveldb/record: block appears to be zeroed")
180
					r.Recover()
181
					continue
182
				}
183
				return errors.New("leveldb/record: invalid chunk")
184
			}
185

186
			r.i = r.j + headerSize
187
			r.j = r.j + headerSize + int(length)
188
			if r.j > r.n {
189
				if r.recovering {
190
					r.Recover()
191
					continue
192
				}
193
				return errors.New("leveldb/record: invalid chunk (length overflows block)")
194
			}
195
			if checksum != r.crc(r.buf[r.i-1:r.j]) {
196
				if r.recovering {
197
					r.Recover()
198
					continue
199
				}
200
				return errors.New("leveldb/record: invalid chunk (checksum mismatch)")
201
			}
202
			if wantFirst {
203
				if chunkType != fullChunkType && chunkType != firstChunkType {
204
					continue
205
				}
206
			}
207
			r.last = chunkType == fullChunkType || chunkType == lastChunkType
208
			r.recovering = false
209
			return nil
210
		}
211
		if r.n < blockSize && r.started {
212
			if r.j != r.n {
213
				return io.ErrUnexpectedEOF
214
			}
215
			return io.EOF
216
		}
217
		n, err := io.ReadFull(r.r, r.buf[:])
218
		if err != nil && err != io.ErrUnexpectedEOF {
219
			return err
220
		}
221
		r.i, r.j, r.n = 0, 0, n
222
	}
223
}
224

225
// Next returns a reader for the next record. It returns io.EOF if there are no
226
// more records. The reader returned becomes stale after the next Next call,
227
// and should no longer be used.
228
func (r *Reader) Next() (io.Reader, error) {
229
	r.seq++
230
	if r.err != nil {
231
		return nil, r.err
232
	}
233
	r.i = r.j
234
	r.err = r.nextChunk(true)
235
	if r.err != nil {
236
		return nil, r.err
237
	}
238
	r.started = true
239
	return singleReader{r, r.seq}, nil
240
}
241

242
// Recover clears any errors read so far, so that calling Next will start
243
// reading from the next good 32KiB block. If there are no such blocks, Next
244
// will return io.EOF. Recover also marks the current reader, the one most
245
// recently returned by Next, as stale. If Recover is called without any
246
// prior error, then Recover is a no-op.
247
func (r *Reader) Recover() {
248
	if r.err == nil {
249
		return
250
	}
251
	r.recovering = true
252
	r.err = nil
253
	// Discard the rest of the current block.
254
	r.i, r.j, r.last = r.n, r.n, false
255
	// Invalidate any outstanding singleReader.
256
	r.seq++
257
}
258

259
// SeekRecord seeks in the underlying io.Reader such that calling r.Next
260
// returns the record whose first chunk header starts at the provided offset.
261
// Its behavior is undefined if the argument given is not such an offset, as
262
// the bytes at that offset may coincidentally appear to be a valid header.
263
//
264
// It returns ErrNotAnIOSeeker if the underlying io.Reader does not implement
265
// io.Seeker.
266
//
267
// SeekRecord will fail and return an error if the Reader previously
268
// encountered an error, including io.EOF. Such errors can be cleared by
269
// calling Recover. Calling SeekRecord after Recover will make calling Next
270
// return the record at the given offset, instead of the record at the next
271
// good 32KiB block as Recover normally would. Calling SeekRecord before
272
// Recover has no effect on Recover's semantics other than changing the
273
// starting point for determining the next good 32KiB block.
274
//
275
// The offset is always relative to the start of the underlying io.Reader, so
276
// negative values will result in an error as per io.Seeker.
277
func (r *Reader) SeekRecord(offset int64) error {
278
	r.seq++
279
	if r.err != nil {
280
		return r.err
281
	}
282

283
	s, ok := r.r.(io.Seeker)
284
	if !ok {
285
		return ErrNotAnIOSeeker
286
	}
287

288
	// Only seek to an exact block offset.
289
	c := int(offset & blockSizeMask)
290
	if _, r.err = s.Seek(offset&^blockSizeMask, io.SeekStart); r.err != nil {
291
		return r.err
292
	}
293

294
	// Clear the state of the internal reader.
295
	r.i, r.j, r.n = 0, 0, 0
296
	r.started, r.recovering, r.last = false, false, false
297
	if r.err = r.nextChunk(false); r.err != nil {
298
		return r.err
299
	}
300

301
	// Now skip to the offset requested within the block. A subsequent
302
	// call to Next will return the block at the requested offset.
303
	r.i, r.j = c, c
304

305
	return nil
306
}
307

308
type singleReader struct {
309
	r   *Reader
310
	seq int
311
}
312

313
func (x singleReader) Read(p []byte) (int, error) {
314
	r := x.r
315
	if r.seq != x.seq {
316
		return 0, errors.New("leveldb/record: stale reader")
317
	}
318
	if r.err != nil {
319
		return 0, r.err
320
	}
321
	for r.i == r.j {
322
		if r.last {
323
			return 0, io.EOF
324
		}
325
		if r.err = r.nextChunk(false); r.err != nil {
326
			return 0, r.err
327
		}
328
	}
329
	n := copy(p, r.buf[r.i:r.j])
330
	r.i += n
331
	return n, nil
332
}
333

334
// Writer writes records to an underlying io.Writer.
335
type Writer struct {
336
	// w is the underlying writer.
337
	w io.Writer
338
	// seq is the sequence number of the current record.
339
	seq int
340
	// f is w as a flusher.
341
	f flusher
342
	// buf[i:j] is the bytes that will become the current chunk.
343
	// The low bound, i, includes the chunk header.
344
	i, j int
345
	// buf[:written] has already been written to w.
346
	// written is zero unless Flush has been called.
347
	written int
348
	// baseOffset is the base offset in w at which writing started. If
349
	// w implements io.Seeker, it's relative to the start of w, 0 otherwise.
350
	baseOffset int64
351
	// blockNumber is the zero based block number currently held in buf.
352
	blockNumber int64
353
	// lastRecordOffset is the offset in w where the last record was
354
	// written (including the chunk header). It is a relative offset to
355
	// baseOffset, thus the absolute offset of the last record is
356
	// baseOffset + lastRecordOffset.
357
	lastRecordOffset int64
358
	// first is whether the current chunk is the first chunk of the record.
359
	first bool
360
	// pending is whether a chunk is buffered but not yet written.
361
	pending bool
362
	// err is any accumulated error.
363
	err error
364
	// buf is the buffer.
365
	buf [blockSize]byte
366
	// CRC function
367
	crc func([]byte) uint32
368
}
369

370
// NewWriter returns a new Writer.
371
func NewWriterExt(w io.Writer, algo CRCAlgo) *Writer {
372
	f, _ := w.(flusher)
373

374
	var o int64
375
	if s, ok := w.(io.Seeker); ok {
376
		var err error
377
		if o, err = s.Seek(0, io.SeekCurrent); err != nil {
378
			o = 0
379
		}
380
	}
381
	crc := CRCCustom
382
	if algo == CRCAlgoIEEE {
383
		crc = CRCStandard
384
	}
385

386
	return &Writer{
387
		w:                w,
388
		f:                f,
389
		baseOffset:       o,
390
		lastRecordOffset: -1,
391
		crc:              crc,
392
	}
393
}
394

395
func NewWriter(w io.Writer) *Writer {
396
	return NewWriterExt(w, CRCAlgoCustom)
397
}
398

399
// fillHeader fills in the header for the pending chunk.
400
func (w *Writer) fillHeader(last bool) {
401
	if w.i+headerSize > w.j || w.j > blockSize {
402
		panic("leveldb/record: bad writer state")
403
	}
404
	if last {
405
		if w.first {
406
			w.buf[w.i+6] = fullChunkType
407
		} else {
408
			w.buf[w.i+6] = lastChunkType
409
		}
410
	} else {
411
		if w.first {
412
			w.buf[w.i+6] = firstChunkType
413
		} else {
414
			w.buf[w.i+6] = middleChunkType
415
		}
416
	}
417
	binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], w.crc(w.buf[w.i+6:w.j]))
418
	binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
419
}
420

421
// writeBlock writes the buffered block to the underlying writer, and reserves
422
// space for the next chunk's header.
423
func (w *Writer) writeBlock() {
424
	_, w.err = w.w.Write(w.buf[w.written:])
425
	w.i = 0
426
	w.j = headerSize
427
	w.written = 0
428
	w.blockNumber++
429
}
430

431
// writePending finishes the current record and writes the buffer to the
432
// underlying writer.
433
func (w *Writer) writePending() {
434
	if w.err != nil {
435
		return
436
	}
437
	if w.pending {
438
		w.fillHeader(true)
439
		w.pending = false
440
	}
441
	_, w.err = w.w.Write(w.buf[w.written:w.j])
442
	w.written = w.j
443
}
444

445
// Close finishes the current record and closes the writer.
446
func (w *Writer) Close() error {
447
	w.seq++
448
	w.writePending()
449
	if w.err != nil {
450
		return w.err
451
	}
452
	w.err = errors.New("leveldb/record: closed Writer")
453
	return nil
454
}
455

456
// Flush finishes the current record, writes to the underlying writer, and
457
// flushes it if that writer implements interface{ Flush() error }.
458
func (w *Writer) Flush() error {
459
	w.seq++
460
	w.writePending()
461
	if w.err != nil {
462
		return w.err
463
	}
464
	if w.f != nil {
465
		w.err = w.f.Flush()
466
		return w.err
467
	}
468
	return nil
469
}
470

471
// Next returns a writer for the next record. The writer returned becomes stale
472
// after the next Close, Flush or Next call, and should no longer be used.
473
func (w *Writer) Next() (io.Writer, error) {
474
	w.seq++
475
	if w.err != nil {
476
		return nil, w.err
477
	}
478
	if w.pending {
479
		w.fillHeader(true)
480
	}
481
	w.i = w.j
482
	w.j += headerSize
483
	// Check if there is room in the block for the header.
484
	if w.j > blockSize {
485
		// Fill in the rest of the block with zeroes.
486
		for k := w.i; k < blockSize; k++ {
487
			w.buf[k] = 0
488
		}
489
		w.writeBlock()
490
		if w.err != nil {
491
			return nil, w.err
492
		}
493
	}
494
	w.lastRecordOffset = w.baseOffset + w.blockNumber*blockSize + int64(w.i)
495
	w.first = true
496
	w.pending = true
497
	return singleWriter{w, w.seq}, nil
498
}
499

500
// LastRecordOffset returns the offset in the underlying io.Writer of the last
501
// record so far - the one created by the most recent Next call. It is the
502
// offset of the first chunk header, suitable to pass to Reader.SeekRecord.
503
//
504
// If that io.Writer also implements io.Seeker, the return value is an absolute
505
// offset, in the sense of io.SeekStart, regardless of whether the io.Writer
506
// was initially at the zero position when passed to NewWriter. Otherwise, the
507
// return value is a relative offset, being the number of bytes written between
508
// the NewWriter call and any records written prior to the last record.
509
//
510
// If there is no last record, i.e. nothing was written, LastRecordOffset will
511
// return ErrNoLastRecord.
512
func (w *Writer) LastRecordOffset() (int64, error) {
513
	if w.err != nil {
514
		return 0, w.err
515
	}
516
	if w.lastRecordOffset < 0 {
517
		return 0, ErrNoLastRecord
518
	}
519
	return w.lastRecordOffset, nil
520
}
521

522
type singleWriter struct {
523
	w   *Writer
524
	seq int
525
}
526

527
func (x singleWriter) Write(p []byte) (int, error) {
528
	w := x.w
529
	if w.seq != x.seq {
530
		return 0, errors.New("leveldb/record: stale writer")
531
	}
532
	if w.err != nil {
533
		return 0, w.err
534
	}
535
	n0 := len(p)
536
	for len(p) > 0 {
537
		// Write a block, if it is full.
538
		if w.j == blockSize {
539
			w.fillHeader(false)
540
			w.writeBlock()
541
			if w.err != nil {
542
				return 0, w.err
543
			}
544
			w.first = false
545
		}
546
		// Copy bytes into the buffer.
547
		n := copy(w.buf[w.j:], p)
548
		w.j += n
549
		p = p[n:]
550
	}
551
	return n0, nil
552
}
553

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.