volume: change all writes to fsync during graceful stopping

fix https://github.com/chrislusf/seaweedfs/issues/2193
This commit is contained in:
Chris Lu 2021-07-13 01:29:57 -07:00
parent 01adc567aa
commit 49c66e88a0
3 changed files with 12 additions and 1 deletions

View file

@ -259,6 +259,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// Stop heartbeats // Stop heartbeats
if !volumeServer.StopHeartbeat() { if !volumeServer.StopHeartbeat() {
volumeServer.SetStopping()
glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
} }

View file

@ -112,6 +112,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
return vs return vs
} }
func (vs *VolumeServer) SetStopping() {
glog.V(0).Infoln("Stopping volume server...")
vs.store.SetStopping()
}
func (vs *VolumeServer) Shutdown() { func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...") glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close() vs.store.Close()

View file

@ -46,6 +46,7 @@ type Store struct {
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
isStopping bool
} }
func (s *Store) String() (str string) { func (s *Store) String() (str string) {
@ -321,6 +322,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
func (s *Store) SetStopping() {
s.isStopping = true
}
func (s *Store) Close() { func (s *Store) Close() {
for _, location := range s.Locations { for _, location := range s.Locations {
location.Close() location.Close()
@ -333,7 +338,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync boo
err = fmt.Errorf("volume %d is read only", i) err = fmt.Errorf("volume %d is read only", i)
return return
} }
_, _, isUnchanged, err = v.writeNeedle2(n, fsync) _, _, isUnchanged, err = v.writeNeedle2(n, fsync && s.isStopping)
return return
} }
glog.V(0).Infoln("volume", i, "not found!") glog.V(0).Infoln("volume", i, "not found!")