mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Delete volume is empty (#4561)
* use onlyEmpty for deleteVolume https://github.com/seaweedfs/seaweedfs/issues/4559 * fix IsEmpty * fix test --------- Co-authored-by: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.co>
This commit is contained in:
parent
2e42cdeb22
commit
25535e9c36
|
@ -219,6 +219,7 @@ message VolumeUnmountResponse {
|
||||||
|
|
||||||
message VolumeDeleteRequest {
|
message VolumeDeleteRequest {
|
||||||
uint32 volume_id = 1;
|
uint32 volume_id = 1;
|
||||||
|
bool only_empty = 2;
|
||||||
}
|
}
|
||||||
message VolumeDeleteResponse {
|
message VolumeDeleteResponse {
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -100,7 +100,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
|
||||||
|
|
||||||
resp := &volume_server_pb.VolumeDeleteResponse{}
|
resp := &volume_server_pb.VolumeDeleteResponse{}
|
||||||
|
|
||||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
|
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("volume delete %v: %v", req, err)
|
glog.Errorf("volume delete %v: %v", req, err)
|
||||||
|
|
|
@ -32,7 +32,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
|
||||||
|
|
||||||
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
|
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
|
||||||
|
|
||||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
|
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
|
return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,7 +181,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection
|
||||||
// ask the source volume server to delete the original volume
|
// ask the source volume server to delete the original volume
|
||||||
for _, location := range existingLocations {
|
for _, location := range existingLocations {
|
||||||
fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
|
fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
|
||||||
err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress())
|
err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
|
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
|
||||||
|
|
||||||
volumeId := needle.VolumeId(*volumeIdInt)
|
volumeId := needle.VolumeId(*volumeIdInt)
|
||||||
|
|
||||||
return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
|
return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, false)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,8 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
|
||||||
if v.Size <= 8 && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
|
if v.Size <= 8 && v.ModifiedAtSecond > 0 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
|
||||||
if *applyBalancing {
|
if *applyBalancing {
|
||||||
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
|
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
|
||||||
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(dn)); deleteErr != nil {
|
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id),
|
||||||
|
pb.NewServerAddressFromDataNode(dn), true); deleteErr != nil {
|
||||||
err = deleteErr
|
err = deleteErr
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -223,7 +223,8 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil {
|
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
|
||||||
|
pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil {
|
||||||
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
|
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
|
||||||
if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
|
if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer, false); err != nil {
|
||||||
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
|
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,10 +187,11 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
|
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, onlyEmpty bool) (err error) {
|
||||||
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
||||||
VolumeId: uint32(volumeId),
|
VolumeId: uint32(volumeId),
|
||||||
|
OnlyEmpty: onlyEmpty,
|
||||||
})
|
})
|
||||||
return deleteErr
|
return deleteErr
|
||||||
})
|
})
|
||||||
|
|
|
@ -270,7 +270,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
|
||||||
// remove the remaining replicas
|
// remove the remaining replicas
|
||||||
for _, loc := range locations {
|
for _, loc := range locations {
|
||||||
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
|
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
|
||||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil {
|
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil {
|
||||||
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
||||||
}
|
}
|
||||||
// reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to
|
// reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to
|
||||||
|
|
|
@ -120,7 +120,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Printf("delete volume %d from %s\n", vid, location.Url)
|
fmt.Printf("delete volume %d from %s\n", vid, location.Url)
|
||||||
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress())
|
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
|
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -507,8 +507,11 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
|
||||||
return fmt.Errorf("volume %d not found on disk", i)
|
return fmt.Errorf("volume %d not found on disk", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) DeleteVolume(i needle.VolumeId) error {
|
func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
|
||||||
v := s.findVolume(i)
|
v := s.findVolume(i)
|
||||||
|
if onlyEmpty && !v.IsEmpty() {
|
||||||
|
return fmt.Errorf("delete volume %d not empty", i)
|
||||||
|
}
|
||||||
if v == nil {
|
if v == nil {
|
||||||
return fmt.Errorf("delete volume %d not found on disk", i)
|
return fmt.Errorf("delete volume %d not found on disk", i)
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,3 +332,6 @@ func (v *Volume) IsReadOnly() bool {
|
||||||
defer v.noWriteLock.RUnlock()
|
defer v.noWriteLock.RUnlock()
|
||||||
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
|
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
|
||||||
}
|
}
|
||||||
|
func (v *Volume) IsEmpty() bool {
|
||||||
|
return v.ContentSize() == 0
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue