mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
refactor: move ReadNeedleDataInto into volume_read.go
This commit is contained in:
parent
9b084d4c88
commit
2bfc8970d2
|
@ -1,7 +1,6 @@
|
||||||
package needle
|
package needle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
@ -9,36 +8,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
|
|
||||||
func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64) (err error) {
|
|
||||||
crc := CRC(0)
|
|
||||||
for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
|
|
||||||
count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
|
|
||||||
toWrite := min(int64(count), needleOffset+size-x)
|
|
||||||
if toWrite > 0 {
|
|
||||||
crc = crc.Update(buf[0:toWrite])
|
|
||||||
if _, err = writer.Write(buf[0:toWrite]); err != nil {
|
|
||||||
return fmt.Errorf("ReadNeedleData write: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
err = nil
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return fmt.Errorf("ReadNeedleData: %v", err)
|
|
||||||
}
|
|
||||||
if count <= 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) {
|
|
||||||
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
|
|
||||||
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadNeedleData uses a needle without n.Data to read the content
|
// ReadNeedleData uses a needle without n.Data to read the content
|
||||||
// volumeOffset: the offset within the volume
|
// volumeOffset: the offset within the volume
|
||||||
// needleOffset: the offset within the needle Data
|
// needleOffset: the offset within the needle Data
|
||||||
|
|
|
@ -1,82 +0,0 @@
|
||||||
package needle
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPageRead(t *testing.T) {
|
|
||||||
baseFileName := "43"
|
|
||||||
offset := int64(8)
|
|
||||||
size := types.Size(1153890) // actual file size 1153862
|
|
||||||
|
|
||||||
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Open Volume Data File [ERROR]: %v", err)
|
|
||||||
}
|
|
||||||
datBackend := backend.NewDiskFile(datFile)
|
|
||||||
defer datBackend.Close()
|
|
||||||
{
|
|
||||||
n := new(Needle)
|
|
||||||
|
|
||||||
bytes, err := ReadNeedleBlob(datBackend, offset, size, Version3)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("readNeedleBlob: %v", err)
|
|
||||||
}
|
|
||||||
if err = n.ReadBytes(bytes, offset, size, Version3); err != nil {
|
|
||||||
t.Fatalf("readNeedleBlob: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("bytes len %d\n", len(bytes))
|
|
||||||
fmt.Printf("name %s size %d\n", n.Name, n.Size)
|
|
||||||
|
|
||||||
fmt.Printf("id %d\n", n.Id)
|
|
||||||
fmt.Printf("DataSize %d\n", n.DataSize)
|
|
||||||
fmt.Printf("Flags %v\n", n.Flags)
|
|
||||||
fmt.Printf("NameSize %d\n", n.NameSize)
|
|
||||||
fmt.Printf("MimeSize %d\n", n.MimeSize)
|
|
||||||
fmt.Printf("PairsSize %d\n", n.PairsSize)
|
|
||||||
fmt.Printf("LastModified %d\n", n.LastModified)
|
|
||||||
fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
|
|
||||||
fmt.Printf("Checksum %d\n", n.Checksum)
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
n, bytes, bodyLength, err := ReadNeedleHeader(datBackend, Version3, offset)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("ReadNeedleHeader: %v", err)
|
|
||||||
}
|
|
||||||
fmt.Printf("bytes len %d\n", len(bytes))
|
|
||||||
fmt.Printf("name %s size %d bodyLength:%d\n", n.Name, n.Size, bodyLength)
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
n := new(Needle)
|
|
||||||
err := n.ReadNeedleMeta(datBackend, offset, size, Version3)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("ReadNeedleHeader: %v", err)
|
|
||||||
}
|
|
||||||
fmt.Printf("name %s size %d\n", n.Name, n.Size)
|
|
||||||
fmt.Printf("id %d\n", n.Id)
|
|
||||||
fmt.Printf("DataSize %d\n", n.DataSize)
|
|
||||||
fmt.Printf("Flags %v\n", n.Flags)
|
|
||||||
fmt.Printf("NameSize %d\n", n.NameSize)
|
|
||||||
fmt.Printf("MimeSize %d\n", n.MimeSize)
|
|
||||||
fmt.Printf("PairsSize %d\n", n.PairsSize)
|
|
||||||
fmt.Printf("LastModified %d\n", n.LastModified)
|
|
||||||
fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
|
|
||||||
fmt.Printf("Checksum %d\n", n.Checksum)
|
|
||||||
|
|
||||||
buf := make([]byte, 1024)
|
|
||||||
if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, 0, int64(n.DataSize)); err != nil {
|
|
||||||
t.Fatalf("ReadNeedleDataInto: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -132,7 +132,37 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
|
||||||
actualOffset += int64(MaxPossibleVolumeSize)
|
actualOffset += int64(MaxPossibleVolumeSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size)
|
// read needle data
|
||||||
|
crc := needle.CRC(0)
|
||||||
|
r := v.DataBackend
|
||||||
|
volumeOffset := actualOffset
|
||||||
|
needleOffset := offset
|
||||||
|
for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
|
||||||
|
count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
|
||||||
|
toWrite := min(count, int(needleOffset+size-x))
|
||||||
|
if toWrite > 0 {
|
||||||
|
crc = crc.Update(buf[0:toWrite])
|
||||||
|
if _, err = writer.Write(buf[0:toWrite]); err != nil {
|
||||||
|
return fmt.Errorf("ReadNeedleData write: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
err = nil
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return fmt.Errorf("ReadNeedleData: %v", err)
|
||||||
|
}
|
||||||
|
if count <= 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needleOffset == 0 && size == int64(n.DataSize) && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) {
|
||||||
|
// the crc.Value() function is to be deprecated. this double checking is for backward compatible.
|
||||||
|
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc, n.Checksum)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func min(x, y int) int {
|
func min(x, y int) int {
|
||||||
|
|
Loading…
Reference in a new issue