mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
weed replicate: replicate atomic rename to other systems
This commit is contained in:
parent
8c823abe1f
commit
189c890715
|
@ -24,7 +24,8 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
|
||||||
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
|
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName)
|
var events MoveEvents
|
||||||
|
moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
|
||||||
if moveErr != nil {
|
if moveErr != nil {
|
||||||
fs.filer.RollbackTransaction(ctx)
|
fs.filer.RollbackTransaction(ctx)
|
||||||
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err)
|
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err)
|
||||||
|
@ -35,19 +36,26 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, entry := range events.newEntries {
|
||||||
|
fs.filer.NotifyUpdateEvent(nil, entry, false)
|
||||||
|
}
|
||||||
|
for _, entry := range events.oldEntries {
|
||||||
|
fs.filer.NotifyUpdateEvent(entry, nil, false)
|
||||||
|
}
|
||||||
|
|
||||||
return &filer_pb.AtomicRenameEntryResponse{}, nil
|
return &filer_pb.AtomicRenameEntryResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string) error {
|
func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
|
||||||
if entry.IsDirectory() {
|
if entry.IsDirectory() {
|
||||||
if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
|
if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName)
|
return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string) error {
|
func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
|
||||||
|
|
||||||
currentDirPath := oldParent.Child(entry.Name())
|
currentDirPath := oldParent.Child(entry.Name())
|
||||||
newDirPath := newParent.Child(newName)
|
newDirPath := newParent.Child(newName)
|
||||||
|
@ -68,7 +76,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
|
||||||
for _, item := range entries {
|
for _, item := range entries {
|
||||||
lastFileName = item.Name()
|
lastFileName = item.Name()
|
||||||
println("processing", lastFileName)
|
println("processing", lastFileName)
|
||||||
err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
|
err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -80,18 +88,19 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string) error {
|
func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) (error) {
|
||||||
|
|
||||||
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
|
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
|
||||||
|
|
||||||
glog.V(1).Infof("moving entry %s => %s", oldPath, newPath)
|
glog.V(1).Infof("moving entry %s => %s", oldPath, newPath)
|
||||||
|
|
||||||
// add to new directory
|
// add to new directory
|
||||||
createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
|
newEntry := &filer2.Entry{
|
||||||
FullPath: newPath,
|
FullPath: newPath,
|
||||||
Attr: entry.Attr,
|
Attr: entry.Attr,
|
||||||
Chunks: entry.Chunks,
|
Chunks: entry.Chunks,
|
||||||
})
|
}
|
||||||
|
createErr := fs.filer.CreateEntry(ctx, newEntry)
|
||||||
if createErr != nil {
|
if createErr != nil {
|
||||||
return createErr
|
return createErr
|
||||||
}
|
}
|
||||||
|
@ -101,6 +110,14 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP
|
||||||
if deleteErr != nil {
|
if deleteErr != nil {
|
||||||
return deleteErr
|
return deleteErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events.oldEntries = append(events.oldEntries, entry)
|
||||||
|
events.newEntries = append(events.newEntries, newEntry)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MoveEvents struct {
|
||||||
|
oldEntries []*filer2.Entry
|
||||||
|
newEntries []*filer2.Entry
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue