mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer: load filer conf when starting
This commit is contained in:
parent
0a406f652e
commit
590f02179d
|
@ -41,6 +41,7 @@ type Filer struct {
|
||||||
metaLogReplication string
|
metaLogReplication string
|
||||||
MetaAggregator *MetaAggregator
|
MetaAggregator *MetaAggregator
|
||||||
Signature int32
|
Signature int32
|
||||||
|
FilerConf *FilerConf
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
|
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
|
||||||
|
@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
|
||||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
|
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
|
||||||
fileIdDeletionQueue: util.NewUnboundedQueue(),
|
fileIdDeletionQueue: util.NewUnboundedQueue(),
|
||||||
GrpcDialOption: grpcDialOption,
|
GrpcDialOption: grpcDialOption,
|
||||||
|
FilerConf: NewFilerConf(),
|
||||||
}
|
}
|
||||||
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
|
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
|
||||||
f.metaLogCollection = collection
|
f.metaLogCollection = collection
|
||||||
|
|
|
@ -1,48 +1,92 @@
|
||||||
package filer
|
package filer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/viant/ptrie"
|
"github.com/viant/ptrie"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
const (
|
||||||
|
DirectoryEtc = "/etc"
|
||||||
|
FilerConfName = "filer.conf"
|
||||||
|
)
|
||||||
|
|
||||||
type FilerConf struct {
|
type FilerConf struct {
|
||||||
rules ptrie.Trie
|
rules ptrie.Trie
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFilerConf(data []byte) (fc *FilerConf) {
|
func NewFilerConf() (fc *FilerConf) {
|
||||||
fc = &FilerConf{
|
fc = &FilerConf{
|
||||||
rules: ptrie.New(),
|
rules: ptrie.New(),
|
||||||
}
|
}
|
||||||
|
|
||||||
conf := &filer_pb.FilerConf{}
|
|
||||||
err := proto.UnmarshalText(string(data), conf)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("unable to parse filer conf: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fc.doLoadConf(conf)
|
|
||||||
return fc
|
return fc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) {
|
func (fc *FilerConf) loadFromFiler(filer *Filer) (err error){
|
||||||
|
filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName)
|
||||||
|
entry, err := filer.FindEntry(context.Background(), filerConfPath)
|
||||||
|
if err != nil {
|
||||||
|
if err == filer_pb.ErrNotFound {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
glog.Errorf("read filer conf entry %s: %v", filerConfPath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc.loadFromChunks(filer, entry.Chunks)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) {
|
||||||
|
data, err := filer.readEntry(chunks)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("read filer conf content: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc.loadFromBytes(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (fc *FilerConf) loadFromBytes(data []byte) (err error) {
|
||||||
|
conf := &filer_pb.FilerConf{}
|
||||||
|
err = proto.UnmarshalText(string(data), conf)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("unable to parse filer conf: %v", err)
|
||||||
|
// this is not recoverable
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fc.doLoadConf(conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) {
|
||||||
for _, location := range conf.Locations {
|
for _, location := range conf.Locations {
|
||||||
err := fc.rules.Put([]byte(location.LocationPrefix), location)
|
err = fc.rules.Put([]byte(location.LocationPrefix), location)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("put location prefix: %v", err)
|
glog.Errorf("put location prefix: %v", err)
|
||||||
|
// this is not recoverable
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{}
|
||||||
|
)
|
||||||
|
|
||||||
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf){
|
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf){
|
||||||
fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
|
fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
|
||||||
pathConf = value.(*filer_pb.FilerConf_PathConf)
|
pathConf = value.(*filer_pb.FilerConf_PathConf)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
if pathConf == nil {
|
if pathConf == nil {
|
||||||
return &filer_pb.FilerConf_PathConf{}
|
return EmptyFilerConfPathConf
|
||||||
}
|
}
|
||||||
return pathConf
|
return pathConf
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,12 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/viant/ptrie"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFilerConf(t *testing.T) {
|
func TestFilerConf(t *testing.T) {
|
||||||
|
|
||||||
fc := &FilerConf{
|
fc := NewFilerConf()
|
||||||
rules: ptrie.New(),
|
|
||||||
}
|
|
||||||
conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{
|
conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{
|
||||||
{
|
{
|
||||||
LocationPrefix: "/buckets/abc",
|
LocationPrefix: "/buckets/abc",
|
||||||
|
|
|
@ -6,10 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
|
||||||
const (
|
|
||||||
DirectoryEtc = "/etc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
|
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
|
||||||
|
@ -26,15 +23,15 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("procesing %v", event)
|
glog.V(0).Infof("procesing %v", event)
|
||||||
if entry.Name == "filer.conf" {
|
if entry.Name == FilerConfName {
|
||||||
f.reloadFilerConfiguration(entry)
|
f.reloadFilerConfiguration(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){
|
func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
err := StreamContent(f.MasterClient, &buf, entry.Chunks, 0, math.MaxInt64)
|
err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -42,13 +39,23 @@ func (f *Filer) readEntry(entry *filer_pb.Entry) ([]byte, error){
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
|
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
|
||||||
data, err := f.readEntry(entry)
|
fc := NewFilerConf()
|
||||||
|
err := fc.loadFromChunks(f, entry.Chunks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("read entry %s: %v", entry.Name, err)
|
glog.Errorf("read filer conf chunks: %v", err)
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
f.FilerConf = fc
|
||||||
println(string(data))
|
}
|
||||||
|
|
||||||
|
func (f *Filer) LoadFilerConf() {
|
||||||
|
fc := NewFilerConf()
|
||||||
|
err := util.Retry("loadFilerConf", func() error {
|
||||||
|
return fc.loadFromFiler(f)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("read filer conf: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.FilerConf = fc
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,6 +131,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||||
|
|
||||||
fs.filer.LoadBuckets()
|
fs.filer.LoadBuckets()
|
||||||
|
|
||||||
|
fs.filer.LoadFilerConf()
|
||||||
|
|
||||||
grace.OnInterrupt(func() {
|
grace.OnInterrupt(func() {
|
||||||
fs.filer.Shutdown()
|
fs.filer.Shutdown()
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue