seaweedfs/weed/filesys/dir_rename.go

132 lines
3.8 KiB
Go
Raw Normal View History

2018-06-07 06:39:30 +00:00
package filesys
import (
2019-01-17 01:17:19 +00:00
"context"
2021-07-01 08:19:28 +00:00
"github.com/chrislusf/seaweedfs/weed/filer"
2020-07-24 04:08:42 +00:00
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
2021-10-17 11:22:42 +00:00
"io"
2020-07-24 04:08:42 +00:00
2020-01-16 03:09:00 +00:00
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2020-03-23 07:01:34 +00:00
"github.com/chrislusf/seaweedfs/weed/util"
2018-06-07 06:39:30 +00:00
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newDir := newDirectory.(*Dir)
newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
2020-03-26 05:19:19 +00:00
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
2018-06-07 06:39:30 +00:00
2020-07-24 04:08:42 +00:00
// update remote filer
2021-10-17 11:22:42 +00:00
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2018-06-07 06:39:30 +00:00
2021-10-17 11:22:42 +00:00
request := &filer_pb.StreamRenameEntryRequest{
OldDirectory: dir.FullPath(),
OldName: req.OldName,
NewDirectory: newDir.FullPath(),
NewName: req.NewName,
2021-07-01 08:19:28 +00:00
Signatures: []int32{dir.wfs.signature},
2018-06-07 06:39:30 +00:00
}
2021-10-17 11:22:42 +00:00
stream, err := client.StreamRenameEntry(ctx, request)
2018-06-07 06:39:30 +00:00
if err != nil {
2020-09-22 16:16:07 +00:00
glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
2020-12-11 07:50:32 +00:00
return fuse.EXDEV
2018-06-07 06:39:30 +00:00
}
2021-10-17 11:22:42 +00:00
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
return recvErr
}
}
if err = dir.handleRenameResponse(ctx, resp); err != nil {
return err
}
}
return nil
2018-06-07 06:39:30 +00:00
})
2020-07-24 04:08:42 +00:00
if err != nil {
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
return nil
2018-06-07 06:39:30 +00:00
}
2021-07-01 08:19:28 +00:00
2021-10-17 11:22:42 +00:00
func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
// comes from filer StreamRenameEntry, can only be create or delete entry
if resp.EventNotification.NewEntry != nil {
// with new entry, the old entry name also exists. This is the first step to create new entry
newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
return err
}
2021-07-01 08:19:28 +00:00
2021-10-17 11:22:42 +00:00
oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
2021-07-01 08:19:28 +00:00
2021-10-17 11:22:42 +00:00
oldPath := oldParent.Child(oldName)
newPath := newParent.Child(newName)
oldFsNode := NodeWithId(oldPath.AsInode())
newFsNode := NodeWithId(newPath.AsInode())
2021-07-01 08:19:28 +00:00
newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
var newDir *Dir
if found {
newDir = newDirNode.(*Dir)
}
dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
if file, ok := internalNode.(*File); ok {
glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
file.Name = newName
file.id = uint64(newFsNode)
if found {
file.dir = newDir
}
}
if dir, ok := internalNode.(*Dir); ok {
glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
dir.name = newName
dir.id = uint64(newFsNode)
if found {
dir.parent = newDir
}
}
})
// change file handle
inodeId := oldPath.AsInode()
dir.wfs.handlesLock.Lock()
if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil {
glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
delete(dir.wfs.handles, inodeId)
dir.wfs.handles[newPath.AsInode()] = existingHandle
}
dir.wfs.handlesLock.Unlock()
2021-10-17 11:22:42 +00:00
}else if resp.EventNotification.OldEntry != nil {
// without new entry, only old entry name exists. This is the second step to delete old entry
if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
return err
2021-07-01 08:19:28 +00:00
}
}
return nil
}