refactor ScanVolumeFileFrom()

This commit is contained in:
Chris Lu 2019-10-22 00:50:30 -07:00
parent c9a183eb69
commit fc412e428b
8 changed files with 55 additions and 69 deletions

View file

@ -40,7 +40,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if scanner.dat == nil {
newDatFile, err := os.Create(filepath.Join(*volumePath, "dat_fixed"))

View file

@ -28,7 +28,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second))
glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t)
return nil

View file

@ -102,7 +102,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
needleMap := scanner.needleMap
vid := scanner.vid

View file

@ -43,7 +43,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
return false
}
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)

View file

@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
return lastTimestampNs, sendErr
}
err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
scanner := &VolumeFileScanner4Tailing{
stream:stream,
}
isLastChunk := false
err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner)
// need to send body by chunks
for i := 0; i < len(needleBody); i += BufferSizeLimit {
stopOffset := i + BufferSizeLimit
if stopOffset >= len(needleBody) {
isLastChunk = true
stopOffset = len(needleBody)
}
sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
})
if sendErr != nil {
return sendErr
}
}
lastProcessedTimestampNs = needleAppendAtNs
return nil
})
return
return scanner.lastProcessedTimestampNs, err
}
@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
})
}
// generate the volume idx
type VolumeFileScanner4Tailing struct {
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
lastProcessedTimestampNs uint64
}
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
return nil
}
func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
isLastChunk := false
// need to send body by chunks
for i := 0; i < len(needleBody); i += BufferSizeLimit {
stopOffset := i + BufferSizeLimit
if stopOffset >= len(needleBody) {
isLastChunk = true
stopOffset = len(needleBody)
}
sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
})
if sendErr != nil {
return sendErr
}
}
scanner.lastProcessedTimestampNs = n.AppendAtNs
return nil
}

View file

@ -251,7 +251,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
return false
}
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if n.Size > 0 && n.Size != TombstoneFileSize {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
}

View file

@ -179,7 +179,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
type VolumeFileScanner interface {
VisitSuperBlock(SuperBlock) error
ReadNeedleBody() bool
VisitNeedle(n *needle.Needle, offset int64) error
VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
@ -202,7 +202,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
}
func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
n, _, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
if e != nil {
if e == io.EOF {
return nil
@ -210,14 +210,15 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
}
for n != nil {
var needleBody []byte
if volumeFileScanner.ReadNeedleBody() {
if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
}
}
err := volumeFileScanner.VisitNeedle(n, offset)
err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
if err == io.EOF {
return nil
}
@ -237,36 +238,3 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
}
return nil
}
func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) {
n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
if e != nil {
if e == io.EOF {
return nil
}
return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
}
for n != nil {
var needleBody []byte
if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
}
err = fn(nh, needleBody, n.AppendAtNs)
if err != nil {
glog.V(0).Infof("visit needle error: %v", err)
return
}
offset += NeedleHeaderSize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
}
glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
}
return nil
}

View file

@ -286,7 +286,7 @@ func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool {
return true
}
func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error {
func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if n.HasTtl() && scanner.now >= n.LastModified+uint64(scanner.v.Ttl.Minutes()*60) {
return nil
}