mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
volume: avoid sharing volume dat file handle
possibly help on https://github.com/chrislusf/seaweedfs/issues/1184
This commit is contained in:
parent
bb1be61602
commit
c8b2dac6c1
|
@ -72,7 +72,7 @@ func (v *Volume) Compact2(preallocate int64) error {
|
||||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||||
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
||||||
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
|
return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) CommitCompact() error {
|
func (v *Volume) CommitCompact() error {
|
||||||
|
@ -353,26 +353,31 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) {
|
func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) {
|
||||||
var (
|
var (
|
||||||
dstDatBackend backend.BackendStorageFile
|
srcDatBackend, dstDatBackend backend.BackendStorageFile
|
||||||
|
dataFile *os.File
|
||||||
)
|
)
|
||||||
if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil {
|
if dstDatBackend, err = createVolumeFile(dstDatName, preallocate, 0); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer dstDatBackend.Close()
|
defer dstDatBackend.Close()
|
||||||
|
|
||||||
oldNm := needle_map.NewMemDb()
|
oldNm := needle_map.NewMemDb()
|
||||||
newNm := needle_map.NewMemDb()
|
newNm := needle_map.NewMemDb()
|
||||||
if err = oldNm.LoadFromIdx(v.FileName()+".idx"); err != nil {
|
if err = oldNm.LoadFromIdx(srcIdxName); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if dataFile, err = os.Open(srcDatName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
srcDatBackend = backend.NewDiskFile(dataFile)
|
||||||
|
|
||||||
now := uint64(time.Now().Unix())
|
now := uint64(time.Now().Unix())
|
||||||
|
|
||||||
v.SuperBlock.CompactionRevision++
|
sb.CompactionRevision++
|
||||||
dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0)
|
dstDatBackend.WriteAt(sb.Bytes(), 0)
|
||||||
newOffset := int64(v.SuperBlock.BlockSize())
|
newOffset := int64(sb.BlockSize())
|
||||||
|
|
||||||
oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
|
||||||
|
|
||||||
|
@ -383,28 +388,28 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
|
||||||
}
|
}
|
||||||
|
|
||||||
n := new(needle.Needle)
|
n := new(needle.Needle)
|
||||||
err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version())
|
err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
|
if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
|
if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
|
||||||
return fmt.Errorf("cannot put needle: %s", err)
|
return fmt.Errorf("cannot put needle: %s", err)
|
||||||
}
|
}
|
||||||
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
|
if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil {
|
||||||
return fmt.Errorf("cannot append needle: %s", err)
|
return fmt.Errorf("cannot append needle: %s", err)
|
||||||
}
|
}
|
||||||
newOffset += n.DiskSize(v.Version())
|
newOffset += n.DiskSize(version)
|
||||||
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
|
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
newNm.SaveToIdx(idxName)
|
newNm.SaveToIdx(datIdxName)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue