diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 3b5b36a21..1cef07dce 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -171,6 +171,7 @@ message ReadVolumeFileStatusResponse { uint64 idx_file_size = 3; uint64 dat_file_timestamp = 4; uint64 dat_file_size = 5; + uint64 file_count = 6; } message DiskStatus { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 0f3b47ee0..81cae93c6 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -656,6 +656,7 @@ type ReadVolumeFileStatusResponse struct { IdxFileSize uint64 `protobuf:"varint,3,opt,name=idx_file_size,json=idxFileSize" json:"idx_file_size,omitempty"` DatFileTimestamp uint64 `protobuf:"varint,4,opt,name=dat_file_timestamp,json=datFileTimestamp" json:"dat_file_timestamp,omitempty"` DatFileSize uint64 `protobuf:"varint,5,opt,name=dat_file_size,json=datFileSize" json:"dat_file_size,omitempty"` + FileCount uint64 `protobuf:"varint,6,opt,name=file_count,json=fileCount" json:"file_count,omitempty"` } func (m *ReadVolumeFileStatusResponse) Reset() { *m = ReadVolumeFileStatusResponse{} } @@ -698,6 +699,13 @@ func (m *ReadVolumeFileStatusResponse) GetDatFileSize() uint64 { return 0 } +func (m *ReadVolumeFileStatusResponse) GetFileCount() uint64 { + if m != nil { + return m.FileCount + } + return 0 +} + type DiskStatus struct { Dir string `protobuf:"bytes,1,opt,name=dir" json:"dir,omitempty"` All uint64 `protobuf:"varint,2,opt,name=all" json:"all,omitempty"` diff --git a/weed/server/volume_grpc_replicate.go b/weed/server/volume_grpc_replicate.go index 1a31a37f3..c991a496e 100644 --- a/weed/server/volume_grpc_replicate.go +++ b/weed/server/volume_grpc_replicate.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" @@ -34,22 +35,28 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ // send .idx file // send .dat file // confirm size and timestamp - + var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse + datFileName := volumeFileName + ".dat" + idxFileName := volumeFileName + ".idx" err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - - // TODO read file sizes before copying - client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{}) + var err error + volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + if nil != err { + return fmt.Errorf("read volume file status failed, %v", err) + } copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: req.VolumeId, IsIdxFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".idx") + err = writeToFile(copyFileClient, idxFileName) if err != nil { return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err) } @@ -58,24 +65,26 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ VolumeId: req.VolumeId, IsDatFile: true, }) - if err != nil { return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) } - err = writeToFile(copyFileClient, volumeFileName+".dat") + err = writeToFile(copyFileClient, datFileName) if err != nil { return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err) } return nil }) - if err != nil { + os.Remove(idxFileName) + os.Remove(datFileName) return nil, err } - // TODO: check the timestamp and size + if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 + return nil, err + } // mount the volume err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) @@ -84,11 +93,35 @@ func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_ } return &volume_server_pb.ReplicateVolumeResponse{}, err +} +/** + only check the the differ of the file size + todo: maybe should check the received count and deleted count of the volume + */ +func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { + stat, err := os.Stat(idxFileName) + if err != nil { + return fmt.Errorf("get idx file info failed, %v", err) + } + if originFileInf.IdxFileSize != uint64(stat.Size()) { + return fmt.Errorf("the idx file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.IdxFileSize) + } + + stat, err = os.Stat(datFileName) + if err != nil { + return fmt.Errorf("get dat file info failed, %v", err) + } + if originFileInf.DatFileSize != uint64(stat.Size()) { + return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.DatFileSize) + } + return nil } func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { - println("writing to ", fileName) + glog.V(4).Infof("writing to ", fileName) dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return nil @@ -110,6 +143,17 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { resp := &volume_server_pb.ReadVolumeFileStatusResponse{} + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.VolumeId = req.VolumeId + resp.DatFileSize = v.DataFileSize() + resp.IdxFileSize = v.IndexFileSize() + resp.DatFileTimestamp = v.LastModifiedTime() + resp.IdxFileTimestamp = v.LastModifiedTime() + resp.FileCount = uint64(v.FileCount()) return resp, nil } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 807fefa38..280963c2c 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -79,6 +79,25 @@ func (v *Volume) Size() int64 { return 0 // -1 causes integer overflow and the volume to become unwritable. } +func (v *Volume)IndexFileSize() uint64 { + return v.nm.IndexFileSize() +} + +func (v *Volume)DataFileSize() uint64 { + return uint64(v.Size()) +} + +/** +unix time in seconds + */ +func (v *Volume)LastModifiedTime() uint64 { + return v.lastModifiedTime +} + +func (v *Volume)FileCount() uint { + return uint(v.nm.FileCount()) +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() diff --git a/weed/topology/replication_health_checker.go b/weed/topology/replication_health_checker.go new file mode 100644 index 000000000..947e7d45c --- /dev/null +++ b/weed/topology/replication_health_checker.go @@ -0,0 +1,297 @@ +package topology + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "google.golang.org/grpc" + "sort" + "strings" + "sync" +) + +/** + check the replication health + */ +func (t *Topology) RepairUnhealthyReplicationInLayout(grpcDialOption grpc.DialOption, layout *VolumeLayout, eVid storage.VolumeId) error { + ctx := context.Background() + locations, exist := layout.vid2location[eVid] + if !exist { + retErr := fmt.Errorf("the volume:%v has no locations", eVid) + glog.V(0).Infof(retErr.Error()) + return retErr + } + + //glog.V(5).Infof("volume:%v, locations:%v", eVid, locations.list) + fileStat, err := getReplicationInfo(grpcDialOption, ctx, eVid, locations) + if err != nil { + glog.Errorf("get replication status failed, %v", err) + return err + } + + if isSameVolumeReplications(fileStat, layout.volumeSizeLimit) { + glog.V(0).Infof("the volume:%v has %d same replication, need not repair", eVid, len(fileStat)) + return nil + } + + // compact all the replications of volume + { + glog.V(4).Infof("begin compact all the replications of volume:%v", eVid) + allUrls := make([]string, 0, len(fileStat)) + for _, fs := range fileStat { + allUrls = append(allUrls, fs.url) + } + + if tryBatchCompactVolume(ctx, grpcDialOption, eVid, allUrls) == false { + err := fmt.Errorf("compact all the replications of volume:%v", eVid) + glog.Error(err.Error()) + return err + } + glog.V(4).Infof("success compact all the replications of volume:%v", eVid) + } + + // get replication status again + fileStat, err = getReplicationInfo(grpcDialOption, ctx, eVid, locations) + if err != nil { + return err + } + + okUrls, errUrls := filterErrorReplication(fileStat) + if len(errUrls) == 0 { + return nil // they are the same + } + + if len(okUrls) == 0 { + return fmt.Errorf("no correct volume replications, that's impossible") + } + + glog.V(4).Infof("need repair replication : %v", errUrls) + if len(locations.list) <= 0 { + return fmt.Errorf("that's impossible, the locatins of volume:%v is empty", eVid) + } + for _, url := range errUrls { + vInfo := locations.list[0].volumes[eVid] + err = syncReplication(grpcDialOption, okUrls[0], url, vInfo) + if nil != err { + glog.Error(err) + return err + } + } + return nil +} + +type FileStatus struct { + url string + fileStat *volume_server_pb.ReadVolumeFileStatusResponse +} + +func getReplicationInfo(grpcDialOption grpc.DialOption, ctx context.Context, vid storage.VolumeId, locs *VolumeLocationList) (fs []FileStatus, err error) { + type ResponsePair struct { + url string + status *volume_server_pb.ReadVolumeFileStatusResponse + err error + } + + var wg sync.WaitGroup + resultChan := make(chan ResponsePair, len(locs.list)) + wg.Add(len(locs.list)) + getFileStatFunc := func(url string, volumeId storage.VolumeId) { + defer wg.Done() + glog.V(4).Infof("volumeId:%v, location:%v", volumeId, url) + err := operation.WithVolumeServerClient(url, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + req := &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: uint32(volumeId), + } + respTmp, err := client.ReadVolumeFileStatus(ctx, req) + resultChan <- ResponsePair{ + url: url, + status: respTmp, + err: err, + } + return nil + }) + if nil != err { + glog.Error(err) + } + } + for _, node := range locs.list { + go getFileStatFunc(node.Url(), vid) + } + + go func() { // close channel + wg.Wait() + close(resultChan) + }() + + var errs []string + for result := range resultChan { + if result.err == nil { + fs = append(fs, FileStatus{ + url: result.url, + fileStat: result.status, + }) + continue + } + tmp := fmt.Sprintf("url : %s, error : %v", result.url, result.err) + errs = append(errs, tmp) + } + + if len(fs) == len(locs.list) { + return fs, nil + } + err = fmt.Errorf("get volume[%v] replication status failed, err : %s", vid, strings.Join(errs, "; ")) + return nil, err +} + +/** + : + the file count is the total count of the volume received from user clients +todo: this policy is not perfected or not rigorous, need fix + */ +func filterErrorReplication(fileStat []FileStatus) (okUrls, errUrls []string) { + sort.Slice(fileStat, func(i, j int) bool { + return fileStat[i].fileStat.FileCount > fileStat[j].fileStat.FileCount + }) + if fileStat[0].fileStat.FileCount != fileStat[len(fileStat)-1].fileStat.FileCount { + okFileCounter := fileStat[0].fileStat.FileCount + for _, v := range fileStat { + if okFileCounter == v.fileStat.FileCount { + okUrls = append(okUrls, v.url) + } else { + errUrls = append(errUrls, v.url) + } + } + return + } + return +} + +// execute the compact transaction +func compactVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + glog.V(0).Infoln("Start vacuuming", vid, "on", volumeUrl) + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when vacuuming %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete vacuuming volume:%v on %s", vid, volumeUrl) + return true +} + +// commit the compact transaction when compactVolume() return true +func commitCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when committing vacuum %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, volumeUrl) + return true +} + +// rollback the compact transaction when compactVolume return false +func cleanupCompactedVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeUrl string, vid storage.VolumeId) bool { + glog.V(0).Infoln("Start cleaning up", vid, "on", volumeUrl) + err := operation.WithVolumeServerClient(volumeUrl, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, err := volumeServerClient.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: uint32(vid), + }) + return err + }) + if err != nil { + glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, volumeUrl, err) + return false + } + glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, volumeUrl) + return false +} + +func tryCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid storage.VolumeId, volumeUrl string) bool { + if compactVolume(ctx, grpcDialOption, volumeUrl, vid) == false { + return cleanupCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) + } + return commitCompactedVolume(ctx, grpcDialOption, volumeUrl, vid) +} + +func tryBatchCompactVolume(ctx context.Context, grpcDialOption grpc.DialOption, + vid storage.VolumeId, urls []string) bool { + resultChan := make(chan error) + var wg sync.WaitGroup + wg.Add(len(urls)) + for _, url := range urls { + go func(volumeUrl string) { + defer wg.Done() + if tryCompactVolume(ctx, grpcDialOption, vid, volumeUrl) == false { + resultChan <- fmt.Errorf("url:%s", volumeUrl) + } + }(url) + } + + go func() { + wg.Wait() + close(resultChan) + }() + + var errs []string + for result := range resultChan { + if result != nil { + errs = append(errs, result.Error()) + } + } + if len(errs) > 0 { + glog.Errorf("consist volume:%v compact reversion failed, %s", vid, strings.Join(errs, "; ")) + return false + } + return true +} + +func isSameVolumeReplications(fileStat []FileStatus, volumeSizeLimit uint64) bool { + fileSizeSet := make(map[uint64]bool) + fileCountSet := make(map[uint64]bool) + lastModifiedSet := make(map[uint64]bool) + var oneFileSize uint64 = 0 + for _, v := range fileStat { + fileCountSet[v.fileStat.FileCount] = true + lastModifiedSet[v.fileStat.DatFileTimestamp] = true + fileSizeSet[v.fileStat.DatFileSize] = true + oneFileSize = v.fileStat.DatFileSize + } + + if (len(lastModifiedSet) == 1) && (len(fileCountSet) == 1) && + (len(fileSizeSet) == 1) && (oneFileSize >= volumeSizeLimit) { + return true + } + return false +} + +func syncReplication(grpcDialOption grpc.DialOption, srcUrl, destUrl string, vinfo storage.VolumeInfo) error { + ctx := context.Background() + err := operation.WithVolumeServerClient(destUrl, grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + if _, err := client.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ + VolumeId: uint32(vinfo.Id), + Collection: vinfo.Collection, + Replication: vinfo.ReplicaPlacement.String(), + Ttl: vinfo.Ttl.String(), + SourceDataNode: srcUrl, + }); err != nil { + glog.Errorf("sync replication failed, %v", err) + return err + } + return nil + }) + return err +}