diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 063fba5f7..d302afe8c 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,11 +2,12 @@ package topology import ( "context" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -119,10 +120,10 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * return isVacuumSuccess } -func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { +func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool { isCommitSuccess := true isReadOnly := false - for _, dn := range locationlist.list { + for _, dn := range vacuumLocationList.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ @@ -140,8 +141,38 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } } + + //we should check the status of all replicas + if len(locationList.list) > len(vacuumLocationList.list) { + for _, dn := range locationList.list { + isFound := false + for _, dnVaccum := range vacuumLocationList.list { + if dn.id == dnVaccum.id { + isFound = true + break + } + } + if !isFound { + err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if resp != nil && resp.IsReadOnly { + isReadOnly = true + } + return err + }) + if err != nil { + glog.Errorf("Error when checking volume %d status on %s: %v", vid, dn.Url(), err) + //we mark volume read-only, since the volume state is unknown + isReadOnly = true + } + } + } + } + if isCommitSuccess { - for _, dn := range locationlist.list { + for _, dn := range vacuumLocationList.list { vl.SetVolumeAvailable(dn, vid, isReadOnly) } } @@ -226,11 +257,11 @@ func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayou return } - glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) + glog.V(1).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck( grpcDialOption, vid, locationList, garbageThreshold); needVacuum { if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { - t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) + t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList, locationList) } else { t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) }