volume vacuum: avoid timeout with streaming progress report

fix https://github.com/chrislusf/seaweedfs/issues/2396
This commit is contained in:
Chris Lu 2021-10-24 01:55:34 -07:00
parent 07dd4873db
commit 3be3c17f59
9 changed files with 979 additions and 899 deletions

View file

@ -120,7 +120,7 @@ func runBackup(cmd *Command, args []string) bool {
} }
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
if err = v.Compact2(30*1024*1024*1024, 0); err != nil { if err = v.Compact2(30*1024*1024*1024, 0, nil); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err) fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true return true
} }

View file

@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err) glog.Fatalf("Compact Volume [ERROR] %s\n", err)
} }
} else { } else {
if err = v.Compact2(preallocate, 0); err != nil { if err = v.Compact2(preallocate, 0, nil); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err) glog.Fatalf("Compact Volume [ERROR] %s\n", err)
} }
} }

View file

@ -14,7 +14,7 @@ service VolumeServer {
rpc VacuumVolumeCheck (VacuumVolumeCheckRequest) returns (VacuumVolumeCheckResponse) { rpc VacuumVolumeCheck (VacuumVolumeCheckRequest) returns (VacuumVolumeCheckResponse) {
} }
rpc VacuumVolumeCompact (VacuumVolumeCompactRequest) returns (VacuumVolumeCompactResponse) { rpc VacuumVolumeCompact (VacuumVolumeCompactRequest) returns (stream VacuumVolumeCompactResponse) {
} }
rpc VacuumVolumeCommit (VacuumVolumeCommitRequest) returns (VacuumVolumeCommitResponse) { rpc VacuumVolumeCommit (VacuumVolumeCommitRequest) returns (VacuumVolumeCommitResponse) {
} }
@ -142,6 +142,7 @@ message VacuumVolumeCompactRequest {
int64 preallocate = 2; int64 preallocate = 2;
} }
message VacuumVolumeCompactResponse { message VacuumVolumeCompactResponse {
int64 processed_bytes = 1;
} }
message VacuumVolumeCommitRequest { message VacuumVolumeCommitRequest {

File diff suppressed because it is too large Load diff

View file

@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
} }
func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) { func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
resp := &volume_server_pb.VacuumVolumeCompactResponse{} resp := &volume_server_pb.VacuumVolumeCompactResponse{}
reportInterval := int64(1024*1024*128)
nextReportTarget := reportInterval
err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) var sendErr error
err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
if processed > nextReportTarget {
resp.ProcessedBytes = processed
if sendErr = stream.Send(resp); sendErr != nil {
return false
}
nextReportTarget = processed + reportInterval
}
return true
})
if err != nil { if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err) glog.Errorf("compact volume %d: %v", req.VolumeId, err)
} else { return err
glog.V(1).Infof("compact volume %d", req.VolumeId) }
if sendErr != nil {
glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
return sendErr
} }
return resp, err glog.V(1).Infof("compact volume %d", req.VolumeId)
return nil
} }

View file

@ -15,13 +15,13 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
} }
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
} }
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
if v := s.findVolume(vid); v != nil { if v := s.findVolume(vid); v != nil {
s := stats.NewDiskStatus(v.dir) s := stats.NewDiskStatus(v.dir)
if int64(s.Free) < preallocate { if int64(s.Free) < preallocate {
return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate) return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate)
} }
return v.Compact2(preallocate, compactionBytePerSecond) return v.Compact2(preallocate, compactionBytePerSecond, progressFn)
} }
return fmt.Errorf("volume id %d is not found during compact", vid) return fmt.Errorf("volume id %d is not found during compact", vid)
} }

View file

@ -17,6 +17,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
type ProgressFunc func(processed int64) bool
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 { if v.ContentSize() == 0 {
return 0 return 0
@ -62,7 +64,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
} }
// compact a volume based on deletions in .idx files // compact a volume based on deletions in .idx files
func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil return nil
@ -83,7 +85,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
if err := v.nm.Sync(); err != nil { if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
} }
return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond, progressFn)
} }
func (v *Volume) CommitCompact() error { func (v *Volume) CommitCompact() error {
@ -382,7 +384,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return return
} }
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) {
var ( var (
srcDatBackend, dstDatBackend backend.BackendStorageFile srcDatBackend, dstDatBackend backend.BackendStorageFile
dataFile *os.File dataFile *os.File
@ -421,6 +423,12 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
return nil return nil
} }
if progressFn != nil {
if !progressFn(offset.ToActualOffset()) {
return fmt.Errorf("interrupted")
}
}
n := new(needle.Needle) n := new(needle.Needle)
err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil { if err != nil {

View file

@ -83,7 +83,7 @@ func TestCompaction(t *testing.T) {
} }
startTime := time.Now() startTime := time.Now()
v.Compact2(0, 0) v.Compact2(0, 0, nil)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed) t.Logf("compaction speed: %.2f bytes/s", speed)

View file

@ -3,6 +3,7 @@ package topology
import ( import (
"context" "context"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"sync/atomic" "sync/atomic"
"time" "time"
@ -70,11 +71,26 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
go func(index int, url pb.ServerAddress, vid needle.VolumeId) { go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: uint32(vid), VolumeId: uint32(vid),
Preallocate: preallocate, Preallocate: preallocate,
}) })
if err != nil {
return err return err
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes)
}
return nil
}) })
if err != nil { if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err) glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)