mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
volume server: accept fsync=true in write requests
This commit is contained in:
parent
5987810e5e
commit
e4af63a721
|
@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
|
||||||
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
||||||
|
|
||||||
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
||||||
_, err := vs.store.WriteVolumeNeedle(v.Id, n)
|
_, err := vs.store.WriteVolumeNeedle(v.Id, n, false)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -252,7 +252,7 @@ func (s *Store) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) {
|
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) {
|
||||||
if v := s.findVolume(i); v != nil {
|
if v := s.findVolume(i); v != nil {
|
||||||
if v.IsReadOnly() {
|
if v.IsReadOnly() {
|
||||||
err = fmt.Errorf("volume %d is read only", i)
|
err = fmt.Errorf("volume %d is read only", i)
|
||||||
|
@ -260,7 +260,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchan
|
||||||
}
|
}
|
||||||
// using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
|
// using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
|
||||||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
|
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
|
||||||
_, _, isUnchanged, err = v.writeNeedle(n)
|
_, _, isUnchanged, err = v.writeNeedle(n, fsync)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
|
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ func (v *Volume) Destroy() (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
|
func (v *Volume) writeNeedle(n *needle.Needle, fsync bool) (offset uint64, size uint32, isUnchanged bool, err error) {
|
||||||
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
|
||||||
v.dataFileAccessLock.Lock()
|
v.dataFileAccessLock.Lock()
|
||||||
defer v.dataFileAccessLock.Unlock()
|
defer v.dataFileAccessLock.Unlock()
|
||||||
|
@ -98,6 +98,11 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
|
||||||
if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
|
if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if fsync {
|
||||||
|
if err = v.DataBackend.Sync(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
v.lastAppendAtNs = n.AppendAtNs
|
v.lastAppendAtNs = n.AppendAtNs
|
||||||
|
|
||||||
// add to needle map
|
// add to needle map
|
||||||
|
|
|
@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) {
|
||||||
}
|
}
|
||||||
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
|
||||||
n := newRandomNeedle(uint64(i))
|
n := newRandomNeedle(uint64(i))
|
||||||
_, size, _, err := v.writeNeedle(n)
|
_, size, _, err := v.writeNeedle(n, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("write file %d: %v", i, err)
|
t.Fatalf("write file %d: %v", i, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
|
||||||
//check JWT
|
//check JWT
|
||||||
jwt := security.GetJwt(r)
|
jwt := security.GetJwt(r)
|
||||||
|
|
||||||
|
// check whether this is a replicated write request
|
||||||
var remoteLocations []operation.Location
|
var remoteLocations []operation.Location
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
|
// this is the initial request
|
||||||
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
|
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
|
@ -31,8 +33,14 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// read fsync value
|
||||||
|
fsync := false
|
||||||
|
if r.FormValue("fsync") == "true" {
|
||||||
|
fsync = true
|
||||||
|
}
|
||||||
|
|
||||||
if s.GetVolume(volumeId) != nil {
|
if s.GetVolume(volumeId) != nil {
|
||||||
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
|
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, fsync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to write to local disk: %v", err)
|
err = fmt.Errorf("failed to write to local disk: %v", err)
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
|
|
Loading…
Reference in a new issue