mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
parent
ba984a4e29
commit
f2723c1bc8
|
@ -158,9 +158,6 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6
|
||||||
files := map[types.NeedleId]needleState{}
|
files := map[types.NeedleId]needleState{}
|
||||||
err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
|
||||||
if offset.IsZero() || size.IsDeleted() {
|
if offset.IsZero() || size.IsDeleted() {
|
||||||
if size < 0 {
|
|
||||||
size = -size
|
|
||||||
}
|
|
||||||
files[key] = needleState{
|
files[key] = needleState{
|
||||||
state: stateDeleted,
|
state: stateDeleted,
|
||||||
size: size,
|
size: size,
|
||||||
|
|
|
@ -30,7 +30,6 @@ var (
|
||||||
fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files")
|
fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files")
|
||||||
fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
|
fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
|
||||||
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
|
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
|
||||||
fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", false, "include deleted entries in the index file")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type VolumeFileScanner4Fix struct {
|
type VolumeFileScanner4Fix struct {
|
||||||
|
@ -51,14 +50,9 @@ func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64
|
||||||
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
|
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
|
||||||
if n.Size.IsValid() {
|
if n.Size.IsValid() {
|
||||||
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
|
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
|
||||||
glog.V(2).Infof("saved %s %d bytes with error %v", n.Id.String(), n.Size, pe)
|
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
|
||||||
} else {
|
} else {
|
||||||
if val, found := scanner.nm.Get(n.Id); *fixIncludeDeleted && found && val.Size > 0 {
|
glog.V(2).Infof("skipping deleted file ...")
|
||||||
pe := scanner.nm.Set(n.Id, val.Offset, -val.Size)
|
|
||||||
glog.V(2).Infof("update deleted %s %d bytes with error %v", n.Id.String(), -val.Size, pe)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
glog.V(1).Infof("skipping deleted file %s size %d ...", n.Id.String(), n.Size)
|
|
||||||
return scanner.nm.Delete(n.Id)
|
return scanner.nm.Delete(n.Id)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -89,7 +83,7 @@ func runFix(cmd *Command, args []string) bool {
|
||||||
os.Remove(indexFileName)
|
os.Remove(indexFileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := nm.SaveToIdx(indexFileName, *fixIncludeDeleted); err != nil {
|
if err := nm.SaveToIdx(indexFileName); err != nil {
|
||||||
glog.Fatalf("save to .idx File: %v", err)
|
glog.Fatalf("save to .idx File: %v", err)
|
||||||
os.Remove(indexFileName)
|
os.Remove(indexFileName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ const (
|
||||||
type NeedleMapper interface {
|
type NeedleMapper interface {
|
||||||
Put(key NeedleId, offset Offset, size Size) error
|
Put(key NeedleId, offset Offset, size Size) error
|
||||||
Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
|
Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
|
||||||
Delete(key NeedleId) error
|
Delete(key NeedleId, offset Offset) error
|
||||||
Close()
|
Close()
|
||||||
Destroy() error
|
Destroy() error
|
||||||
ContentSize() uint64
|
ContentSize() uint64
|
||||||
|
|
|
@ -80,7 +80,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *MemDb) SaveToIdx(idxName string, includeDeleted bool) (ret error) {
|
func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
|
||||||
idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -88,10 +88,7 @@ func (cm *MemDb) SaveToIdx(idxName string, includeDeleted bool) (ret error) {
|
||||||
defer idxFile.Close()
|
defer idxFile.Close()
|
||||||
|
|
||||||
return cm.AscendingVisit(func(value NeedleValue) error {
|
return cm.AscendingVisit(func(value NeedleValue) error {
|
||||||
if value.Offset.IsZero() {
|
if value.Offset.IsZero() || value.Size.IsDeleted() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !includeDeleted && value.Size.IsDeleted() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := idxFile.Write(value.ToBytes())
|
_, err := idxFile.Write(value.ToBytes())
|
||||||
|
|
|
@ -74,7 +74,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
|
return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
|
||||||
if !offset.IsZero() {
|
if !offset.IsZero() && size.IsValid() {
|
||||||
levelDbWrite(db, key, offset, size)
|
levelDbWrite(db, key, offset, size)
|
||||||
} else {
|
} else {
|
||||||
levelDbDelete(db, key)
|
levelDbDelete(db, key)
|
||||||
|
@ -123,7 +123,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
|
||||||
return db.Delete(bytes, nil)
|
return db.Delete(bytes, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LevelDbNeedleMap) Delete(key NeedleId) error {
|
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||||
oldNeedle, found := m.Get(key)
|
oldNeedle, found := m.Get(key)
|
||||||
if !found || oldNeedle.Size.IsDeleted() {
|
if !found || oldNeedle.Size.IsDeleted() {
|
||||||
return nil
|
return nil
|
||||||
|
@ -131,7 +131,7 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId) error {
|
||||||
m.logDelete(oldNeedle.Size)
|
m.logDelete(oldNeedle.Size)
|
||||||
|
|
||||||
// write to index file first
|
// write to index file first
|
||||||
if err := m.appendToIndexFile(key, oldNeedle.Offset, -oldNeedle.Size); err != nil {
|
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,18 +30,13 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||||
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
|
||||||
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
|
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
|
||||||
nm.MaybeSetMaxFileKey(key)
|
nm.MaybeSetMaxFileKey(key)
|
||||||
if !offset.IsZero() {
|
if !offset.IsZero() && size.IsValid() {
|
||||||
nm.FileCounter++
|
nm.FileCounter++
|
||||||
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
|
||||||
|
|
||||||
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
|
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
|
||||||
if !oldOffset.IsZero() && oldSize.IsValid() {
|
if !oldOffset.IsZero() && oldSize.IsValid() {
|
||||||
nm.DeletionCounter++
|
nm.DeletionCounter++
|
||||||
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
|
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
|
||||||
} else if size < 0 {
|
|
||||||
// deletion
|
|
||||||
nm.DeletionCounter++
|
|
||||||
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(-size)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
oldSize := nm.m.Delete(NeedleId(key))
|
oldSize := nm.m.Delete(NeedleId(key))
|
||||||
|
@ -59,18 +54,14 @@ func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error {
|
||||||
nm.logPut(key, oldSize, size)
|
nm.logPut(key, oldSize, size)
|
||||||
return nm.appendToIndexFile(key, offset, size)
|
return nm.appendToIndexFile(key, offset, size)
|
||||||
}
|
}
|
||||||
func (nm *NeedleMap) Get(key NeedleId) (existingValue *needle_map.NeedleValue, ok bool) {
|
func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
|
||||||
existingValue, ok = nm.m.Get(NeedleId(key))
|
element, ok = nm.m.Get(NeedleId(key))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (nm *NeedleMap) Delete(key NeedleId) error {
|
func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||||
existingValue, ok := nm.m.Get(NeedleId(key))
|
|
||||||
if !ok || existingValue.Size.IsDeleted() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
deletedBytes := nm.m.Delete(NeedleId(key))
|
deletedBytes := nm.m.Delete(NeedleId(key))
|
||||||
nm.logDelete(deletedBytes)
|
nm.logDelete(deletedBytes)
|
||||||
return nm.appendToIndexFile(key, existingValue.Offset, -existingValue.Size)
|
return nm.appendToIndexFile(key, offset, TombstoneFileSize)
|
||||||
}
|
}
|
||||||
func (nm *NeedleMap) Close() {
|
func (nm *NeedleMap) Close() {
|
||||||
indexFileName := nm.indexFile.Name()
|
indexFileName := nm.indexFile.Name()
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) {
|
||||||
for i := 0; i < 10000; i++ {
|
for i := 0; i < 10000; i++ {
|
||||||
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1))
|
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1))
|
||||||
if rand.Float32() < 0.2 {
|
if rand.Float32() < 0.2 {
|
||||||
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i)) + 1)))
|
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,9 @@ func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error
|
||||||
return os.ErrInvalid
|
return os.ErrInvalid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *SortedFileNeedleMap) Delete(key NeedleId) error {
|
func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
|
||||||
|
|
||||||
offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
|
_, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == erasure_coding.NotFoundError {
|
if err == erasure_coding.NotFoundError {
|
||||||
|
@ -85,7 +85,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write to index file first
|
// write to index file first
|
||||||
if err := m.appendToIndexFile(key, offset, -size); err != nil {
|
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
|
_, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
|
||||||
|
|
|
@ -256,5 +256,5 @@ func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset in
|
||||||
if n.Size > 0 && n.Size.IsValid() {
|
if n.Size > 0 && n.Size.IsValid() {
|
||||||
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
|
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
|
||||||
}
|
}
|
||||||
return scanner.v.nm.Delete(n.Id)
|
return scanner.v.nm.Delete(n.Id, ToOffset(offset))
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,12 +200,12 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
|
||||||
size := nv.Size
|
size := nv.Size
|
||||||
n.Data = nil
|
n.Data = nil
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
_, _, _, err := n.Append(v.DataBackend, v.Version())
|
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
v.lastAppendAtNs = n.AppendAtNs
|
v.lastAppendAtNs = n.AppendAtNs
|
||||||
if err = v.nm.Delete(n.Id); err != nil {
|
if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
return size, err
|
return size, err
|
||||||
|
@ -238,12 +238,12 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
|
||||||
size := nv.Size
|
size := nv.Size
|
||||||
n.Data = nil
|
n.Data = nil
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
_, _, _, err := n.Append(v.DataBackend, v.Version())
|
offset, _, _, err := n.Append(v.DataBackend, v.Version())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
v.lastAppendAtNs = n.AppendAtNs
|
v.lastAppendAtNs = n.AppendAtNs
|
||||||
if err = v.nm.Delete(n.Id); err != nil {
|
if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
|
||||||
return size, err
|
return size, err
|
||||||
}
|
}
|
||||||
return size, err
|
return size, err
|
||||||
|
@ -263,7 +263,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
|
||||||
readSize := nv.Size
|
readSize := nv.Size
|
||||||
if readSize.IsDeleted() {
|
if readSize.IsDeleted() {
|
||||||
if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
|
if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
|
||||||
glog.V(3).Infof("reading deleted %s size %d", n.String(), readSize)
|
glog.V(3).Infof("reading deleted %s", n.String())
|
||||||
readSize = -readSize
|
readSize = -readSize
|
||||||
} else {
|
} else {
|
||||||
return -1, errors.New("already deleted")
|
return -1, errors.New("already deleted")
|
||||||
|
|
|
@ -374,7 +374,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nm.SaveToIdx(idxName, false)
|
err = nm.SaveToIdx(idxName)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,7 +441,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
newNm.SaveToIdx(datIdxName, false)
|
newNm.SaveToIdx(datIdxName)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue