seaweedfs/weed/filer/filer.go
Chris Lu 2baed2e1e9 avoid possible metadata subscription data loss
Previous implementation append filer logs into one file. So one file is not always sorted, which can lead to miss reading some entries, especially when different filers have different write throughput.
2021-09-25 01:18:44 -07:00

310 lines
8.8 KiB
Go

package filer
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"os"
"strings"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
const (
LogFlushInterval = time.Minute
PaginationSize = 1024
FilerStoreId = "filer.store.id"
)
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
)
type Filer struct {
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
FsyncBuckets []string
buckets *FilerBuckets
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
UniqueFileId uint32
}
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
go f.loopProcessingDeletion()
return f
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) {
// set peers
found := false
for _, peer := range filers {
if peer == self {
found = true
}
}
if !found {
filers = append(filers, self)
}
f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
f.MetaAggregator.StartLoopSubscribe(f, self)
}
func (f *Filer) SetStore(store FilerStore) {
f.Store = NewFilerStoreWrapper(store)
f.setOrLoadFilerStoreSignature(store)
}
func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
f.Signature = util.RandomInt32()
storeIdBytes = make([]byte, 4)
util.Uint32toBytes(storeIdBytes, uint32(f.Signature))
if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil {
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
}
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
} else if err == nil && len(storeIdBytes) == 4 {
f.Signature = int32(util.BytesToUint32(storeIdBytes))
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
} else {
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
}
}
func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster()
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.Store.BeginTransaction(ctx)
}
func (f *Filer) CommitTransaction(ctx context.Context) error {
return f.Store.CommitTransaction(ctx)
}
func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.Store.RollbackTransaction(ctx)
}
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
if string(entry.FullPath) == "/" {
return nil
}
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
glog.V(0).Infof("directory %s: %v, entry: uid=%d gid=%d",
lastDirectoryEntry.FullPath, lastDirectoryEntry.Attr, entry.Uid, entry.Gid)
return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath)
}
*/
if oldEntry == nil {
dirParts := strings.Split(string(entry.FullPath), "/")
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
return err
}
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
if o_excl {
glog.V(3).Infof("EEXIST: entry %s already exists", entry.FullPath)
return fmt.Errorf("EEXIST: entry %s already exists", entry.FullPath)
}
glog.V(4).Infof("UpdateEntry %s: old entry: %v", entry.FullPath, oldEntry.Name())
if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
return nil
}
func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
if level == 0 {
return nil
}
dirPath := "/" + util.Join(dirParts[:level]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// check the store directly
glog.V(4).Infof("find uncached directory: %s", dirPath)
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
// no such existing directory
if dirEntry == nil {
// ensure parent directory
if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
return err
}
// create the directory
now := time.Now()
dirEntry = &Entry{
FullPath: util.FullPath(dirPath),
Attr: Attr{
Mtime: now,
Crtime: now,
Mode: os.ModeDir | entry.Mode | 0111,
Uid: entry.Uid,
Gid: entry.Gid,
Collection: entry.Collection,
Replication: entry.Replication,
UserName: entry.UserName,
GroupNames: entry.GroupNames,
},
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
}
} else if !dirEntry.IsDirectory() {
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
return fmt.Errorf("%s is a file", dirPath)
}
return nil
}
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
if oldEntry.IsDirectory() && !entry.IsDirectory() {
glog.Errorf("existing %s is a directory", oldEntry.FullPath)
return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
}
if !oldEntry.IsDirectory() && entry.IsDirectory() {
glog.Errorf("existing %s is a file", oldEntry.FullPath)
return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
}
}
return f.Store.UpdateEntry(ctx, entry)
}
var (
Root = &Entry{
FullPath: "/",
Attr: Attr{
Mtime: time.Now(),
Crtime: time.Now(),
Mode: os.ModeDir | 0755,
Uid: OS_UID,
Gid: OS_GID,
},
}
)
func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
if string(p) == "/" {
return Root, nil
}
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound
}
}
return
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
return true
}
}
return eachEntryFunc(entry)
})
if err != nil {
return expiredCount, lastFileName, err
}
return
}
func (f *Filer) Shutdown() {
f.LocalMetaLogBuffer.Shutdown()
f.Store.Shutdown()
}