re-enable system logs

This commit is contained in:
Chris Lu 2020-04-12 14:03:07 -07:00
parent 6f948e4887
commit d30483d642
9 changed files with 53 additions and 31 deletions

View file

@ -9,12 +9,13 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var ( var (
logdataFile = flag.String("logdata", "", "log data file saved under /.meta/log/...") logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir)
) )
func main() { func main() {

View file

@ -36,9 +36,11 @@ type Filer struct {
buckets *FilerBuckets buckets *FilerBuckets
Cipher bool Cipher bool
metaLogBuffer *log_buffer.LogBuffer metaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
} }
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer { func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters), MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
@ -46,6 +48,8 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
} }
f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
go f.loopProcessingDeletion() go f.loopProcessingDeletion()

View file

@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
// println("fullpath:", fullpath) // println("fullpath:", fullpath)
if strings.HasPrefix(fullpath, "/.meta") { if strings.HasPrefix(fullpath, SystemLogDir) {
return return
} }
@ -69,11 +69,10 @@ func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventN
func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
return targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log",
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
startTime.Second(), startTime.Nanosecond()) // startTime.Second(), startTime.Nanosecond(),
)
if err := f.appendToFile(targetFile, buf); err != nil { if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err) glog.V(0).Infof("log write failed %s: %v", targetFile, err)

View file

@ -13,25 +13,10 @@ import (
func (f *Filer) appendToFile(targetFile string, data []byte) error { func (f *Filer) appendToFile(targetFile string, data []byte) error {
// assign a volume location assignResult, err, uploadResult, err2 := f.assignAndUpload(data)
assignRequest := &operation.VolumeAssignRequest{ if err2 != nil {
Count: 1, return err2
} }
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
if err != nil {
return fmt.Errorf("AssignVolume: %v", err)
}
if assignResult.Error != "" {
return fmt.Errorf("AssignVolume error: %v", assignResult.Error)
}
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
if err != nil {
return fmt.Errorf("upload data %s: %v", targetUrl, err)
}
// println("uploaded to", targetUrl)
// find out existing entry // find out existing entry
fullpath := util.FullPath(targetFile) fullpath := util.FullPath(targetFile)
@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
return err return err
} }
func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, error, *operation.UploadResult, error) {
// assign a volume location
assignRequest := &operation.VolumeAssignRequest{
Count: 1,
Collection: f.metaLogCollection,
Replication: f.metaLogReplication,
WritableVolumeCount: 1,
}
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, nil, fmt.Errorf("AssignVolume: %v", err)
}
if assignResult.Error != "" {
return nil, nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
}
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
if err != nil {
return nil, nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
// println("uploaded to", targetUrl)
return assignResult, err, uploadResult, nil
}

View file

@ -11,7 +11,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil) filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}
@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil) filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}

View file

@ -11,7 +11,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil) filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDB2Store{} store := &LevelDB2Store{}
@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil) filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDB2Store{} store := &LevelDB2Store{}

6
weed/filer2/topics.go Normal file
View file

@ -0,0 +1,6 @@
package filer2
const(
TopicsDir = "/topics"
SystemLogDir = TopicsDir + "/.system/log"
)

View file

@ -4,6 +4,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -37,7 +38,7 @@ func (fs *FilerServer) ListenForEvents(req *filer_pb.ListenForEventsRequest, str
fullpath := util.Join(dirPath, entryName) fullpath := util.Join(dirPath, entryName)
// skip on filer internal meta logs // skip on filer internal meta logs
if strings.HasPrefix(fullpath, "/.meta") { if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
return nil return nil
} }

View file

@ -73,7 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!") glog.Fatal("master list is required!")
} }
fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, fs.notifyMetaListeners) fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
fs.filer.Cipher = option.Cipher fs.filer.Cipher = option.Cipher
maybeStartMetrics(fs, option) maybeStartMetrics(fs, option)