diff --git a/weed/command/backup.go b/weed/command/backup.go index 950cbf68e..4c37c2763 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 6117cf9f3..92e25f474 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -41,8 +41,7 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) vid := needle.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, preallocate, 0) + v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/command/server.go b/weed/command/server.go index edf626fe7..0c6731eb2 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -105,7 +105,7 @@ func init() { serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") - serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "WIP directory to store .idx files") + serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") diff --git a/weed/command/volume.go b/weed/command/volume.go index fec783514..9597e843a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -84,7 +84,7 @@ func init() { v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") - v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "WIP directory to store .idx files") + v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") } var cmdVolume = &Command{ diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2aecb140f..cfa3710a8 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -48,7 +48,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // send .dat file // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse - var volumeFileName, idxFileName, datFileName string + var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), @@ -59,24 +59,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return fmt.Errorf("read volume file status failed, %v", err) } - volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId)) - ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) + ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) // println("source:", volFileInfoResp.String()) - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { return err } - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { return err } - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { return err } - os.Remove(volumeFileName + ".note") + os.Remove(dataBaseFileName + ".note") return nil }) @@ -84,18 +85,18 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo if err != nil { return nil, err } - if volumeFileName == "" { + if dataBaseFileName == "" { return nil, fmt.Errorf("not found volume %d file", req.VolumeId) } - idxFileName = volumeFileName + ".idx" - datFileName = volumeFileName + ".dat" + idxFileName = indexBaseFileName + ".idx" + datFileName = dataBaseFileName + ".dat" defer func() { - if err != nil && volumeFileName != "" { + if err != nil && dataBaseFileName != "" { os.Remove(idxFileName) os.Remove(datFileName) - os.Remove(volumeFileName + ".vif") + os.Remove(dataBaseFileName + ".vif") } }() @@ -223,7 +224,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { return fmt.Errorf("volume %d is compacted", req.VolumeId) } - fileName = v.FileName() + req.Ext + fileName = v.FileName(req.Ext) } else { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext for _, location := range vs.store.Locations { @@ -231,6 +232,10 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v if util.FileExists(tName) { fileName = tName } + tName = util.Join(location.IdxDirectory, baseFileName) + if util.FileExists(tName) { + fileName = tName + } } if fileName == "" { if req.IgnoreSourceFileNotFound { diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 55e0261c8..186c6eafc 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -8,7 +8,6 @@ import ( "math" "os" "path" - "path/filepath" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -44,7 +43,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) } - baseFileName := v.FileName() + baseFileName := v.DataFileName() if v.Collection != req.Collection { return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) @@ -56,8 +55,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ } // write .ecx file - if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil { - return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) + if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil { + return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err) } // write .vif files @@ -78,17 +77,18 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s var rebuiltShardIds []uint32 for _, location := range vs.store.Locations { - if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { + if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) { // write .ec00 ~ .ec13 files - baseFileName = path.Join(location.Directory, baseFileName) - if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) + dataBaseFileName := path.Join(location.Directory, baseFileName) + if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) } else { rebuiltShardIds = generatedShardIds } - if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err) + indexBaseFileName := path.Join(location.IdxDirectory, baseFileName) + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err) } break @@ -110,13 +110,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return nil, fmt.Errorf("no space left") } - baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) + dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) + indexBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } @@ -124,7 +125,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcxFile { // copy ecx file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil { return err } return nil @@ -132,14 +133,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcjFile { // copy ecj file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil { return err } } if req.CopyVifFile { // copy vif file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil { return err } } @@ -157,17 +158,19 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // the shard should not be mounted before calling this. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { - baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds) found := false + var indexBaseFilename, dataBaseFilename string for _, location := range vs.store.Locations { - if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { + if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) { found = true - baseFilename = path.Join(location.Directory, baseFilename) + indexBaseFilename = path.Join(location.IdxDirectory, bName) + dataBaseFilename = path.Join(location.Directory, bName) for _, shardId := range req.ShardIds { - os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) + os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId))) } break } @@ -182,12 +185,18 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se hasIdxFile := false existingShardCount := 0 - bName := filepath.Base(baseFilename) for _, location := range vs.store.Locations { fileInfos, err := ioutil.ReadDir(location.Directory) if err != nil { continue } + if location.IdxDirectory != location.Directory { + idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory) + if err != nil { + continue + } + fileInfos = append(fileInfos, idxFileInfos...) + } for _, fileInfo := range fileInfos { if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" { hasEcxFile = true @@ -204,14 +213,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se } if hasEcxFile && existingShardCount == 0 { - if err := os.Remove(baseFilename + ".ecx"); err != nil { + if err := os.Remove(indexBaseFilename + ".ecx"); err != nil { return nil, err } - os.Remove(baseFilename + ".ecj") + os.Remove(indexBaseFilename + ".ecj") } if !hasIdxFile { // .vif is used for ec volumes and normal volumes - os.Remove(baseFilename + ".vif") + os.Remove(dataBaseFilename + ".vif") } return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil @@ -365,26 +374,26 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) } - baseFileName := v.FileName() if v.Collection != req.Collection { return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() // calculate .dat file size - datFileSize, err := erasure_coding.FindDatFileSize(baseFileName) + datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) if err != nil { - return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err) + return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err) } // write .dat file from .ec00 ~ .ec09 files - if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil { - return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err) } // write .idx file from .ecx and .ecj files - if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil { - return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err) + if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil { + return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err) } return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go index 7b3982e40..73d8ae7cb 100644 --- a/weed/server/volume_grpc_tier_download.go +++ b/weed/server/volume_grpc_tier_download.go @@ -58,9 +58,9 @@ func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.Volume }) } // copy the data file - _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn) + _, err := backendStorage.DownloadFile(v.FileName(".dat"), storageKey, fn) if err != nil { - return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err) + return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName(".dat"), err) } if req.KeepRemoteDatFile { diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go index c9694df59..e51de5f1d 100644 --- a/weed/server/volume_grpc_tier_upload.go +++ b/weed/server/volume_grpc_tier_upload.go @@ -93,7 +93,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi } if !req.KeepLocalDatFile { - os.Remove(v.FileName() + ".dat") + os.Remove(v.FileName(".dat")) } return nil diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 4031cd237..75b0f28da 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -157,12 +157,12 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId IgnoreSourceFileNotFound: false, }) if err != nil { - return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err) + return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId)) if err != nil { - return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err) + return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err) } return nil diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 7b68a86fc..2d4d120af 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -53,8 +53,8 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32 } func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) { - if strings.HasSuffix(filename, ".idx") { - base := filename[:len(filename)-len(".idx")] + if strings.HasSuffix(filename, ".idx") || strings.HasSuffix(filename, ".vif") { + base := filename[:len(filename)-4] collection, volumeId, err := parseCollectionVolumeId(base) return volumeId, collection, err } @@ -76,10 +76,10 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne if fileInfo.IsDir() { return false } - if !strings.HasSuffix(basename, ".idx") { + if !strings.HasSuffix(basename, ".idx") && !strings.HasSuffix(basename, ".vif") { return false } - volumeName := basename[:len(basename)-len(".idx")] + volumeName := basename[:len(basename)-4] // check for incomplete volume noteFile := l.Directory + "/" + volumeName + ".note" @@ -108,7 +108,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne } // load the volume - v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0) + v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0) if e != nil { glog.V(0).Infof("new volume %s error %s", volumeName, e) return false diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 07fab96d9..d1237b40f 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid) if err != nil { return fmt.Errorf("failed to create ec volume %d: %v", vid, err) } @@ -122,6 +122,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) { if err != nil { return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err) } + if l.IdxDirectory != l.Directory { + indexFileInfos, err := ioutil.ReadDir(l.IdxDirectory) + if err != nil { + return fmt.Errorf("load all ec shards in dir %s: %v", l.IdxDirectory, err) + } + fileInfos = append(fileInfos, indexFileInfos...) + } sort.Slice(fileInfos, func(i, j int) bool { return fileInfos[i].Name() < fileInfos[j].Name() diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index 795a7d523..bc86d9c04 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -45,14 +45,14 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) { // FindDatFileSize calculate .dat file size from max offset entry // there may be extra deletions after that entry // but they are deletions anyway -func FindDatFileSize(baseFileName string) (datSize int64, err error) { +func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) { - version, err := readEcVolumeVersion(baseFileName) + version, err := readEcVolumeVersion(dataBaseFileName) if err != nil { - return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err) + return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err) } - err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { + err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { if size.IsDeleted() { return nil diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 71fe884df..2183e43d6 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -25,6 +25,7 @@ type EcVolume struct { VolumeId needle.VolumeId Collection string dir string + dirIdx string ecxFile *os.File ecxFileSize int64 ecxCreatedAt time.Time @@ -37,33 +38,34 @@ type EcVolume struct { ecjFileAccessLock sync.Mutex } -func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { - ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid} +func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { + ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid} - baseFileName := EcShardFileName(collection, dir, int(vid)) + dataBaseFileName := EcShardFileName(collection, dir, int(vid)) + indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) // open ecx file - if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil { - return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err) + if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err) } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { - return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr) + return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr) } ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() // open ecj file - if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { - return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) + if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err) } // read volume info ev.Version = needle.Version3 - if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found { + if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { ev.Version = needle.Version(volumeInfo.Version) } else { - pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) + pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) } ev.ShardLocations = make(map[ShardId][]string) @@ -134,15 +136,26 @@ func (ev *EcVolume) Destroy() { for _, s := range ev.Shards { s.Destroy() } - os.Remove(ev.FileName() + ".ecx") - os.Remove(ev.FileName() + ".ecj") - os.Remove(ev.FileName() + ".vif") + os.Remove(ev.FileName(".ecx")) + os.Remove(ev.FileName(".ecj")) + os.Remove(ev.FileName(".vif")) } -func (ev *EcVolume) FileName() string { +func (ev *EcVolume) FileName(ext string) string { + switch ext { + case ".ecx", ".ecj": + return ev.IndexBaseFileName() + ext + } + // .vif + return ev.DataBaseFileName() + ext +} +func (ev *EcVolume) DataBaseFileName() string { return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) +} +func (ev *EcVolume) IndexBaseFileName() string { + return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId)) } func (ev *EcVolume) ShardSize() uint64 { diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index 1ca113ca9..3449ff9dc 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -16,18 +16,18 @@ type SortedFileNeedleMap struct { dbFileSize int64 } -func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { - m = &SortedFileNeedleMap{baseFileName: baseFileName} +func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { + m = &SortedFileNeedleMap{baseFileName: indexBaseFileName} m.indexFile = indexFile - fileName := baseFileName + ".sdx" + fileName := indexBaseFileName + ".sdx" if !isSortedFileFresh(fileName, indexFile) { glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name()) - erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx") + erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx") glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name()) } glog.V(1).Infof("Opening %s...", fileName) - if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil { + if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil { return } dbStat, _ := m.dbFile.Stat() diff --git a/weed/storage/store.go b/weed/storage/store.go index ce4e500d6..7e5768417 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -121,7 +121,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind if location := s.FindFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { + if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/volume.go b/weed/storage/volume.go index a7a963a59..a03846d3d 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -21,6 +21,7 @@ import ( type Volume struct { Id needle.VolumeId dir string + dirIdx string Collection string DataBackend backend.BackendStorageFile nm NeedleMapper @@ -47,9 +48,9 @@ type Volume struct { location *DiskLocation } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, + v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind @@ -61,7 +62,7 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK func (v *Volume) String() string { v.noWriteLock.RLock() defer v.noWriteLock.RUnlock() - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) + return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -74,10 +75,23 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) { return } -func (v *Volume) FileName() (fileName string) { +func (v *Volume) DataFileName() (fileName string) { return VolumeFileName(v.dir, v.Collection, int(v.Id)) } +func (v *Volume) IndexFileName() (fileName string) { + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) +} + +func (v *Volume) FileName(ext string) (fileName string) { + switch ext { + case ".idx", ".cpx", ".ldb": + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext + } + // .dat, .cpd, .vif + return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext +} + func (v *Volume) Version() needle.Version { if v.volumeInfo.Version != 0 { v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 595bd8a35..62004d4da 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -124,9 +124,9 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) { } func (v *Volume) locateLastAppendEntry() (Offset, error) { - indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) + indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) if e != nil { - return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e) + return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e) } defer indexFile.Close() @@ -156,9 +156,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()) if err != nil { - return 0, fmt.Errorf("ReadNeedleHeader: %v", err) + return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err) } - _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength) if err != nil { return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } @@ -168,9 +168,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { // on server side func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) { - indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) + indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) if openErr != nil { - err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr) + err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr) return } defer indexFile.Close() diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 05684cbdb..fe4980e31 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -23,7 +23,6 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI } func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) { - fileName := v.FileName() alreadyHasSuperBlock := false hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0 @@ -34,17 +33,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files) v.LoadRemoteFile() alreadyHasSuperBlock = true - } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists { + } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists { // open dat file if !canRead { - return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) + return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat")) } var dataFile *os.File if canWrite { - dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644) } else { - glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - dataFile, err = os.Open(fileName + ".dat") + glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat")) + dataFile, err = os.Open(v.FileName(".dat")) v.noWriteOrDelete = true } v.lastModifiedTsSeconds = uint64(modifiedTime.Unix()) @@ -54,17 +53,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind v.DataBackend = backend.NewDiskFile(dataFile) } else { if createDatIfMissing { - v.DataBackend, err = backend.CreateVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) + v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb) } else { - return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) + return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat")) } } if err != nil { if !os.IsPermission(err) { - return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err) + return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err) } else { - return fmt.Errorf("load data file %s.dat: %v", fileName, err) + return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err) } } @@ -72,21 +71,27 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind err = v.readSuperBlock() } else { if !v.SuperBlock.Initialized() { - return fmt.Errorf("volume %s.dat not initialized", fileName) + return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) } err = v.maybeWriteSuperBlock() } if err == nil && alsoLoadIndex { + // adjust for existing volumes with .idx together with .dat files + if v.dirIdx != v.dir { + if util.FileExists(v.DataFileName()+".idx") { + v.dirIdx = v.dir + } + } var indexFile *os.File if v.noWriteOrDelete { - glog.V(0).Infoln("open to read file", fileName+".idx") - if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil { - return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err) + glog.V(0).Infoln("open to read file", v.FileName(".idx")) + if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil { + return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) } } else { - glog.V(1).Infoln("open to write file", fileName+".idx") - if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { - return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) + glog.V(1).Infoln("open to write file", v.FileName(".idx")) + if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil { + return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) } } if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { @@ -95,45 +100,45 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } if v.noWriteOrDelete || v.noWriteCanDelete { - if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil { - glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err) + if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil { + glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) } } else { switch needleMapKind { case NeedleMapInMemory: - glog.V(0).Infoln("loading index", fileName+".idx", "to memory") + glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory") if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { - glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err) + glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) } case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb", fileName+".ldb") + glog.V(0).Infoln("loading leveldb", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } case NeedleMapLevelDbMedium: - glog.V(0).Infoln("loading leveldb medium", fileName+".ldb") + glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } case NeedleMapLevelDbLarge: - glog.V(0).Infoln("loading leveldb large", fileName+".ldb") + glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 1a9beea7f..6dc4cb4a5 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -56,7 +56,8 @@ func (v *Volume) Destroy() (err error) { } } v.Close() - removeVolumeFiles(v.FileName()) + removeVolumeFiles(v.DataFileName()) + removeVolumeFiles(v.IndexFileName()) return } diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go index fd7b08654..77efd8a14 100644 --- a/weed/storage/volume_tier.go +++ b/weed/storage/volume_tier.go @@ -14,7 +14,7 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { func (v *Volume) maybeLoadVolumeInfo() (found bool) { - v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif") + v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) if v.hasRemoteFile { glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id, @@ -43,7 +43,7 @@ func (v *Volume) LoadRemoteFile() error { func (v *Volume) SaveVolumeInfo() error { - tierFileName := v.FileName() + ".vif" + tierFileName := v.FileName(".vif") return pb.SaveVolumeInfo(tierFileName, v.volumeInfo) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index a3e5800df..f7b16b7db 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -49,7 +49,6 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error v.isCompacting = false }() - filePath := v.FileName() v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) @@ -59,7 +58,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact fail to sync volume idx %d", v.Id) } - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) + return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) } // compact a volume based on deletions in .idx files @@ -75,7 +74,6 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro v.isCompacting = false }() - filePath := v.FileName() v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ...", v.Id) @@ -85,7 +83,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) } - return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) + return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) } func (v *Volume) CommitCompact() error { @@ -113,40 +111,40 @@ func (v *Volume) CommitCompact() error { stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error - if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) - e = os.Remove(v.FileName() + ".cpd") + e = os.Remove(v.FileName(".cpd")) if e != nil { return e } - e = os.Remove(v.FileName() + ".cpx") + e = os.Remove(v.FileName(".cpx")) if e != nil { return e } } else { if runtime.GOOS == "windows" { - e = os.RemoveAll(v.FileName() + ".dat") + e = os.RemoveAll(v.FileName(".dat")) if e != nil { return e } - e = os.RemoveAll(v.FileName() + ".idx") + e = os.RemoveAll(v.FileName(".idx")) if e != nil { return e } } var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) + if e = os.Rename(v.FileName(".cpd"), v.FileName(".dat")); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName(".cpd"), e) } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) + if e = os.Rename(v.FileName(".cpx"), v.FileName(".idx")); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName(".cpx"), e) } } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - os.RemoveAll(v.FileName() + ".ldb") + os.RemoveAll(v.FileName(".ldb")) glog.V(3).Infof("Loading volume %d commit file...", v.Id) if e = v.load(true, false, v.needleMapKind, 0); e != nil { @@ -158,8 +156,8 @@ func (v *Volume) CommitCompact() error { func (v *Volume) cleanupCompact() error { glog.V(0).Infof("Cleaning up volume %d vacuuming...", v.Id) - e1 := os.Remove(v.FileName() + ".cpd") - e2 := os.Remove(v.FileName() + ".cpx") + e1 := os.Remove(v.FileName(".cpd")) + e2 := os.Remove(v.FileName(".cpx")) if e1 != nil { return e1 } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index f96e9b0cf..cd5a4f430 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -69,7 +69,7 @@ func TestCompaction(t *testing.T) { } defer os.RemoveAll(dir) // clean up - v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -96,7 +96,7 @@ func TestCompaction(t *testing.T) { v.Close() - v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) + v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) }