mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Only when tailing volume, the zero-ed cookie should skip checking.
This only happens when checkCookie == false and fsync == false.
This commit is contained in:
parent
a8617c1a39
commit
78e8ddf910
|
@ -48,7 +48,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
||||||
// copied from *Needle.prepareWriteBuffer()
|
// copied from *Needle.prepareWriteBuffer()
|
||||||
n.Size = 4 + types.Size(n.DataSize) + 1
|
n.Size = 4 + types.Size(n.DataSize) + 1
|
||||||
n.Checksum = needle.NewCRC(n.Data)
|
n.Checksum = needle.NewCRC(n.Data)
|
||||||
if _, err = vs.store.WriteVolumeNeedle(v.Id, n, false); err != nil {
|
if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil {
|
||||||
return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
|
return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
|
||||||
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
||||||
|
|
||||||
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
||||||
_, err := vs.store.WriteVolumeNeedle(v.Id, n, false)
|
_, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -332,13 +332,13 @@ func (s *Store) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) {
|
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, checkCookie bool, fsync bool) (isUnchanged bool, err error) {
|
||||||
if v := s.findVolume(i); v != nil {
|
if v := s.findVolume(i); v != nil {
|
||||||
if v.IsReadOnly() {
|
if v.IsReadOnly() {
|
||||||
err = fmt.Errorf("volume %d is read only", i)
|
err = fmt.Errorf("volume %d is read only", i)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, _, isUnchanged, err = v.writeNeedle2(n, fsync && s.isStopping)
|
_, _, isUnchanged, err = v.writeNeedle2(n, checkCookie, fsync && s.isStopping)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.V(0).Infoln("volume", i, "not found!")
|
glog.V(0).Infoln("volume", i, "not found!")
|
||||||
|
|
|
@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) {
|
||||||
}
|
}
|
||||||
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
||||||
n := newRandomNeedle(uint64(i))
|
n := newRandomNeedle(uint64(i))
|
||||||
_, size, _, err := v.writeNeedle2(n, false)
|
_, size, _, err := v.writeNeedle2(n, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("write file %d: %v", i, err)
|
t.Fatalf("write file %d: %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
|
||||||
v.asyncRequestsChan <- request
|
v.asyncRequestsChan <- request
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
|
func (v *Volume) syncWrite(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
||||||
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
||||||
actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version())
|
actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version())
|
||||||
|
|
||||||
|
@ -103,10 +103,10 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return v.doWriteRequest(n)
|
return v.doWriteRequest(n, checkCookie)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
func (v *Volume) writeNeedle2(n *needle.Needle, checkCookie bool, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
||||||
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
||||||
if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
|
if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
|
||||||
n.SetHasTtl()
|
n.SetHasTtl()
|
||||||
|
@ -114,7 +114,7 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size
|
||||||
}
|
}
|
||||||
|
|
||||||
if !fsync {
|
if !fsync {
|
||||||
return v.syncWrite(n)
|
return v.syncWrite(n, checkCookie)
|
||||||
} else {
|
} else {
|
||||||
asyncRequest := needle.NewAsyncRequest(n, true)
|
asyncRequest := needle.NewAsyncRequest(n, true)
|
||||||
// using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
|
// using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
|
||||||
|
@ -127,7 +127,7 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
|
func (v *Volume) doWriteRequest(n *needle.Needle, checkCookie bool) (offset uint64, size Size, isUnchanged bool, err error) {
|
||||||
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
||||||
if v.isFileUnchanged(n) {
|
if v.isFileUnchanged(n) {
|
||||||
size = Size(n.DataSize)
|
size = Size(n.DataSize)
|
||||||
|
@ -143,10 +143,12 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
|
||||||
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
|
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if n.Cookie == 0 {
|
if n.Cookie == 0 && !checkCookie {
|
||||||
// this is from batch deletion, and read back again when tailing a remote volume
|
// this is from batch deletion, and read back again when tailing a remote volume
|
||||||
|
// which only happens when checkCookie == false and fsync == false
|
||||||
n.Cookie = existingNeedle.Cookie
|
n.Cookie = existingNeedle.Cookie
|
||||||
} else if existingNeedle.Cookie != n.Cookie {
|
}
|
||||||
|
if existingNeedle.Cookie != n.Cookie {
|
||||||
glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
|
glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
|
||||||
needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
|
needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
|
||||||
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
|
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
|
||||||
|
@ -274,7 +276,7 @@ func (v *Volume) startWorker() {
|
||||||
|
|
||||||
for i := 0; i < len(currentRequests); i++ {
|
for i := 0; i < len(currentRequests); i++ {
|
||||||
if currentRequests[i].IsWriteRequest {
|
if currentRequests[i].IsWriteRequest {
|
||||||
offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N)
|
offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N, true)
|
||||||
currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err)
|
currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err)
|
||||||
} else {
|
} else {
|
||||||
size, err := v.doDeleteRequest(currentRequests[i].N)
|
size, err := v.doDeleteRequest(currentRequests[i].N)
|
||||||
|
|
|
@ -42,7 +42,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.GetVolume(volumeId) != nil {
|
if s.GetVolume(volumeId) != nil {
|
||||||
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync)
|
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to write to local disk: %v", err)
|
err = fmt.Errorf("failed to write to local disk: %v", err)
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
|
|
Loading…
Reference in a new issue