diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 06c997f36..7747c9af6 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -38,9 +38,11 @@ type Filer struct { LocalMetaLogBuffer *log_buffer.LogBuffer metaLogCollection string metaLogReplication string + MetaAggregator *MetaAggregator } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption, + filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), @@ -56,6 +58,20 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string return f } +func (f *Filer) AggregateFromPeers(self string, filers []string) { + + // set peers + if strings.HasPrefix(f.GetStore().GetName(), "leveldb") && len(filers) > 0 { + glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(filers), filers) + } + if len(filers) == 0 { + filers = append(filers, self) + } + f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) + f.MetaAggregator.StartLoopSubscribe(time.Now().UnixNano()) + +} + func (f *Filer) SetStore(store FilerStore) { f.store = NewFilerStoreWrapper(store) } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 8ef75cf02..4fd38abe5 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -37,10 +37,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - err = fs.metaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.metaAggregator.ListenersLock.Lock() - fs.metaAggregator.ListenersCond.Wait() - fs.metaAggregator.ListenersLock.Unlock() + err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.filer.MetaAggregator.ListenersLock.Lock() + fs.filer.MetaAggregator.ListenersCond.Wait() + fs.filer.MetaAggregator.ListenersLock.Unlock() return true }, eachLogEntryFn) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c6ab6ef0f..6995c7cfe 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "os" - "strings" "sync" "time" @@ -60,7 +59,6 @@ type FilerServer struct { option *FilerOption secret security.SigningKey filer *filer2.Filer - metaAggregator *filer2.MetaAggregator grpcDialOption grpc.DialOption // notifying clients @@ -121,15 +119,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - // set peers - if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 { - glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers) - } - if len(option.Filers) == 0 { - option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)) - } - fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption) - fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano()) + fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) fs.filer.LoadBuckets()