wandb
1// Modified from upstream sources
2// https://github.com/golang/leveldb/blob/master/record/record.go
3// Changes:
4// - Add ability to use different CRC algorithm
5
6// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style
8// license that can be found in the LICENSE file.
9
10// Package record reads and writes sequences of records. Each record is a stream
11// of bytes that completes before the next record starts.
12//
13// When reading, call Next to obtain an io.Reader for the next record. Next will
14// return io.EOF when there are no more records. It is valid to call Next
15// without reading the current record to exhaustion.
16//
17// When writing, call Next to obtain an io.Writer for the next record. Calling
18// Next finishes the current record. Call Close to finish the final record.
19//
20// Optionally, call Flush to finish the current record and flush the underlying
21// writer without starting a new record. To start a new record after flushing,
22// call Next.
23//
24// Neither Readers or Writers are safe to use concurrently.
25//
26// Example code:
27//
28// func read(r io.Reader) ([]string, error) {
29// var ss []string
30// records := record.NewReader(r)
31// for {
32// rec, err := records.Next()
33// if err == io.EOF {
34// break
35// }
36// if err != nil {
37// log.Printf("recovering from %v", err)
38// r.Recover()
39// continue
40// }
41// s, err := ioutil.ReadAll(rec)
42// if err != nil {
43// log.Printf("recovering from %v", err)
44// r.Recover()
45// continue
46// }
47// ss = append(ss, string(s))
48// }
49// return ss, nil
50// }
51//
52// func write(w io.Writer, ss []string) error {
53// records := record.NewWriter(w)
54// for _, s := range ss {
55// rec, err := records.Next()
56// if err != nil {
57// return err
58// }
59// if _, err := rec.Write([]byte(s)), err != nil {
60// return err
61// }
62// }
63// return records.Close()
64// }
65//
66// The wire format is that the stream is divided into 32KiB blocks, and each
67// block contains a number of tightly packed chunks. Chunks cannot cross block
68// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
69// block must be zero.
70//
71// A record maps to one or more chunks. Each chunk has a 7 byte header (a 4
72// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
73// followed by a payload. The checksum is over the chunk type and the payload.
74//
75// There are four chunk types: whether the chunk is the full record, or the
76// first, middle or last chunk of a multi-chunk record. A multi-chunk record
77// has one first chunk, zero or more middle chunks, and one last chunk.
78//
79// The wire format allows for limited recovery in the face of data corruption:
80// on a format error (such as a checksum mismatch), the reader moves to the
81// next block and looks for the next full or first chunk.
82package leveldb83
84// The C++ Level-DB code calls this the log, but it has been renamed to record
85// to avoid clashing with the standard log package, and because it is generally
86// useful outside of logging. The C++ code also uses the term "physical record"
87// instead of "chunk", but "chunk" is shorter and less confusing.
88
89import (90"encoding/binary"91"errors"92"io"93)
94
95// These constants are part of the wire format and should not be changed.
96const (97fullChunkType = 198firstChunkType = 299middleChunkType = 3100lastChunkType = 4101)
102
103const (104blockSize = 32 * 1024105blockSizeMask = blockSize - 1106headerSize = 7107)
108
109var (110// ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.111ErrNotAnIOSeeker = errors.New("leveldb/record: reader does not implement io.Seeker")112
113// ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.114ErrNoLastRecord = errors.New("leveldb/record: no last record exists")115)
116
117type flusher interface {118Flush() error119}
120
121// Reader reads records from an underlying io.Reader.
122type Reader struct {123// r is the underlying reader.124r io.Reader125// seq is the sequence number of the current record.126seq int127// buf[i:j] is the unread portion of the current chunk's payload.128// The low bound, i, excludes the chunk header.129i, j int130// n is the number of bytes of buf that are valid. Once reading has started,131// only the final block can have n < blockSize.132n int133// started is whether Next has been called at all.134started bool135// recovering is true when recovering from corruption.136recovering bool137// last is whether the current chunk is the last chunk of the record.138last bool139// err is any accumulated error.140err error141// buf is the buffer.142buf [blockSize]byte143// CRC function144crc func([]byte) uint32145}
146
147// NewReader returns a new reader.
148func NewReaderExt(r io.Reader, algo CRCAlgo) *Reader {149crc := CRCCustom150if algo == CRCAlgoIEEE {151crc = CRCStandard152}153return &Reader{154r: r,155crc: crc,156}157}
158
159func NewReader(r io.Reader) *Reader {160return NewReaderExt(r, CRCAlgoCustom)161}
162
163// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
164// next block into the buffer if necessary.
165func (r *Reader) nextChunk(wantFirst bool) error {166for {167if r.j+headerSize <= r.n {168checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])169length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])170chunkType := r.buf[r.j+6]171
172if checksum == 0 && length == 0 && chunkType == 0 {173if wantFirst || r.recovering {174// Skip the rest of the block, if it looks like it is all175// zeroes. This is common if the record file was created176// via mmap.177//178// Set r.err to be an error so r.Recover actually recovers.179r.err = errors.New("leveldb/record: block appears to be zeroed")180r.Recover()181continue182}183return errors.New("leveldb/record: invalid chunk")184}185
186r.i = r.j + headerSize187r.j = r.j + headerSize + int(length)188if r.j > r.n {189if r.recovering {190r.Recover()191continue192}193return errors.New("leveldb/record: invalid chunk (length overflows block)")194}195if checksum != r.crc(r.buf[r.i-1:r.j]) {196if r.recovering {197r.Recover()198continue199}200return errors.New("leveldb/record: invalid chunk (checksum mismatch)")201}202if wantFirst {203if chunkType != fullChunkType && chunkType != firstChunkType {204continue205}206}207r.last = chunkType == fullChunkType || chunkType == lastChunkType208r.recovering = false209return nil210}211if r.n < blockSize && r.started {212if r.j != r.n {213return io.ErrUnexpectedEOF214}215return io.EOF216}217n, err := io.ReadFull(r.r, r.buf[:])218if err != nil && err != io.ErrUnexpectedEOF {219return err220}221r.i, r.j, r.n = 0, 0, n222}223}
224
225// Next returns a reader for the next record. It returns io.EOF if there are no
226// more records. The reader returned becomes stale after the next Next call,
227// and should no longer be used.
228func (r *Reader) Next() (io.Reader, error) {229r.seq++230if r.err != nil {231return nil, r.err232}233r.i = r.j234r.err = r.nextChunk(true)235if r.err != nil {236return nil, r.err237}238r.started = true239return singleReader{r, r.seq}, nil240}
241
242// Recover clears any errors read so far, so that calling Next will start
243// reading from the next good 32KiB block. If there are no such blocks, Next
244// will return io.EOF. Recover also marks the current reader, the one most
245// recently returned by Next, as stale. If Recover is called without any
246// prior error, then Recover is a no-op.
247func (r *Reader) Recover() {248if r.err == nil {249return250}251r.recovering = true252r.err = nil253// Discard the rest of the current block.254r.i, r.j, r.last = r.n, r.n, false255// Invalidate any outstanding singleReader.256r.seq++257}
258
259// SeekRecord seeks in the underlying io.Reader such that calling r.Next
260// returns the record whose first chunk header starts at the provided offset.
261// Its behavior is undefined if the argument given is not such an offset, as
262// the bytes at that offset may coincidentally appear to be a valid header.
263//
264// It returns ErrNotAnIOSeeker if the underlying io.Reader does not implement
265// io.Seeker.
266//
267// SeekRecord will fail and return an error if the Reader previously
268// encountered an error, including io.EOF. Such errors can be cleared by
269// calling Recover. Calling SeekRecord after Recover will make calling Next
270// return the record at the given offset, instead of the record at the next
271// good 32KiB block as Recover normally would. Calling SeekRecord before
272// Recover has no effect on Recover's semantics other than changing the
273// starting point for determining the next good 32KiB block.
274//
275// The offset is always relative to the start of the underlying io.Reader, so
276// negative values will result in an error as per io.Seeker.
277func (r *Reader) SeekRecord(offset int64) error {278r.seq++279if r.err != nil {280return r.err281}282
283s, ok := r.r.(io.Seeker)284if !ok {285return ErrNotAnIOSeeker286}287
288// Only seek to an exact block offset.289c := int(offset & blockSizeMask)290if _, r.err = s.Seek(offset&^blockSizeMask, io.SeekStart); r.err != nil {291return r.err292}293
294// Clear the state of the internal reader.295r.i, r.j, r.n = 0, 0, 0296r.started, r.recovering, r.last = false, false, false297if r.err = r.nextChunk(false); r.err != nil {298return r.err299}300
301// Now skip to the offset requested within the block. A subsequent302// call to Next will return the block at the requested offset.303r.i, r.j = c, c304
305return nil306}
307
308type singleReader struct {309r *Reader310seq int311}
312
313func (x singleReader) Read(p []byte) (int, error) {314r := x.r315if r.seq != x.seq {316return 0, errors.New("leveldb/record: stale reader")317}318if r.err != nil {319return 0, r.err320}321for r.i == r.j {322if r.last {323return 0, io.EOF324}325if r.err = r.nextChunk(false); r.err != nil {326return 0, r.err327}328}329n := copy(p, r.buf[r.i:r.j])330r.i += n331return n, nil332}
333
334// Writer writes records to an underlying io.Writer.
335type Writer struct {336// w is the underlying writer.337w io.Writer338// seq is the sequence number of the current record.339seq int340// f is w as a flusher.341f flusher
342// buf[i:j] is the bytes that will become the current chunk.343// The low bound, i, includes the chunk header.344i, j int345// buf[:written] has already been written to w.346// written is zero unless Flush has been called.347written int348// baseOffset is the base offset in w at which writing started. If349// w implements io.Seeker, it's relative to the start of w, 0 otherwise.350baseOffset int64351// blockNumber is the zero based block number currently held in buf.352blockNumber int64353// lastRecordOffset is the offset in w where the last record was354// written (including the chunk header). It is a relative offset to355// baseOffset, thus the absolute offset of the last record is356// baseOffset + lastRecordOffset.357lastRecordOffset int64358// first is whether the current chunk is the first chunk of the record.359first bool360// pending is whether a chunk is buffered but not yet written.361pending bool362// err is any accumulated error.363err error364// buf is the buffer.365buf [blockSize]byte366// CRC function367crc func([]byte) uint32368}
369
370// NewWriter returns a new Writer.
371func NewWriterExt(w io.Writer, algo CRCAlgo) *Writer {372f, _ := w.(flusher)373
374var o int64375if s, ok := w.(io.Seeker); ok {376var err error377if o, err = s.Seek(0, io.SeekCurrent); err != nil {378o = 0379}380}381crc := CRCCustom382if algo == CRCAlgoIEEE {383crc = CRCStandard384}385
386return &Writer{387w: w,388f: f,389baseOffset: o,390lastRecordOffset: -1,391crc: crc,392}393}
394
395func NewWriter(w io.Writer) *Writer {396return NewWriterExt(w, CRCAlgoCustom)397}
398
399// fillHeader fills in the header for the pending chunk.
400func (w *Writer) fillHeader(last bool) {401if w.i+headerSize > w.j || w.j > blockSize {402panic("leveldb/record: bad writer state")403}404if last {405if w.first {406w.buf[w.i+6] = fullChunkType407} else {408w.buf[w.i+6] = lastChunkType409}410} else {411if w.first {412w.buf[w.i+6] = firstChunkType413} else {414w.buf[w.i+6] = middleChunkType415}416}417binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], w.crc(w.buf[w.i+6:w.j]))418binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))419}
420
421// writeBlock writes the buffered block to the underlying writer, and reserves
422// space for the next chunk's header.
423func (w *Writer) writeBlock() {424_, w.err = w.w.Write(w.buf[w.written:])425w.i = 0426w.j = headerSize427w.written = 0428w.blockNumber++429}
430
431// writePending finishes the current record and writes the buffer to the
432// underlying writer.
433func (w *Writer) writePending() {434if w.err != nil {435return436}437if w.pending {438w.fillHeader(true)439w.pending = false440}441_, w.err = w.w.Write(w.buf[w.written:w.j])442w.written = w.j443}
444
445// Close finishes the current record and closes the writer.
446func (w *Writer) Close() error {447w.seq++448w.writePending()449if w.err != nil {450return w.err451}452w.err = errors.New("leveldb/record: closed Writer")453return nil454}
455
456// Flush finishes the current record, writes to the underlying writer, and
457// flushes it if that writer implements interface{ Flush() error }.
458func (w *Writer) Flush() error {459w.seq++460w.writePending()461if w.err != nil {462return w.err463}464if w.f != nil {465w.err = w.f.Flush()466return w.err467}468return nil469}
470
471// Next returns a writer for the next record. The writer returned becomes stale
472// after the next Close, Flush or Next call, and should no longer be used.
473func (w *Writer) Next() (io.Writer, error) {474w.seq++475if w.err != nil {476return nil, w.err477}478if w.pending {479w.fillHeader(true)480}481w.i = w.j482w.j += headerSize483// Check if there is room in the block for the header.484if w.j > blockSize {485// Fill in the rest of the block with zeroes.486for k := w.i; k < blockSize; k++ {487w.buf[k] = 0488}489w.writeBlock()490if w.err != nil {491return nil, w.err492}493}494w.lastRecordOffset = w.baseOffset + w.blockNumber*blockSize + int64(w.i)495w.first = true496w.pending = true497return singleWriter{w, w.seq}, nil498}
499
500// LastRecordOffset returns the offset in the underlying io.Writer of the last
501// record so far - the one created by the most recent Next call. It is the
502// offset of the first chunk header, suitable to pass to Reader.SeekRecord.
503//
504// If that io.Writer also implements io.Seeker, the return value is an absolute
505// offset, in the sense of io.SeekStart, regardless of whether the io.Writer
506// was initially at the zero position when passed to NewWriter. Otherwise, the
507// return value is a relative offset, being the number of bytes written between
508// the NewWriter call and any records written prior to the last record.
509//
510// If there is no last record, i.e. nothing was written, LastRecordOffset will
511// return ErrNoLastRecord.
512func (w *Writer) LastRecordOffset() (int64, error) {513if w.err != nil {514return 0, w.err515}516if w.lastRecordOffset < 0 {517return 0, ErrNoLastRecord518}519return w.lastRecordOffset, nil520}
521
522type singleWriter struct {523w *Writer524seq int525}
526
527func (x singleWriter) Write(p []byte) (int, error) {528w := x.w529if w.seq != x.seq {530return 0, errors.New("leveldb/record: stale writer")531}532if w.err != nil {533return 0, w.err534}535n0 := len(p)536for len(p) > 0 {537// Write a block, if it is full.538if w.j == blockSize {539w.fillHeader(false)540w.writeBlock()541if w.err != nil {542return 0, w.err543}544w.first = false545}546// Copy bytes into the buffer.547n := copy(w.buf[w.j:], p)548w.j += n549p = p[n:]550}551return n0, nil552}
553