From 9e72e9e4b8d58e0f6f99088f0e449d1c9c55e562 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Apr 2020 17:40:08 -0700 Subject: [PATCH] able to subscribe any topic from any point of time --- weed/filer2/filer_notify.go | 4 +- weed/filer2/reader_at.go | 54 ++++++++++--------- weed/filer2/stream.go | 12 ++++- weed/filesys/dir.go | 3 +- .../broker/broker_grpc_server_subscribe.go | 45 ++++++++++++++++ weed/pb/filer_pb/filer_client.go | 16 ++++-- weed/pb/filer_pb/filer_client_bfs.go | 3 +- weed/s3api/filer_util.go | 3 +- weed/server/webdav_server.go | 3 +- weed/shell/command_bucket_list.go | 3 +- weed/shell/command_fs_du.go | 3 +- weed/shell/command_fs_ls.go | 5 +- weed/shell/command_fs_tree.go | 6 +-- 13 files changed, 117 insertions(+), 43 deletions(-) diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 6b83d8e63..28ade51cc 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -109,7 +109,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( } // println("processing", hourMinuteEntry.FullPath) chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) - if err := readEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { break @@ -123,7 +123,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( return nil } -func readEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { for { n, err := r.Read(sizeBuf) if err != nil { diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index f56ef6388..a9772da5b 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -27,34 +27,40 @@ type ChunkReadAt struct { // var _ = io.ReaderAt(&ChunkReadAt{}) +type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) + +func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { + return func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := VolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + } +} + func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: func(fileId string) (targetUrl string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := VolumeId(fileId) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - - locations := resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) - - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) - - return nil - }) - return - }, + lookupFileId: LookupFn(filerClient), bufferOffset: -1, chunkCache: chunkCache, } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 4e785fade..4c8213b07 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -81,7 +81,7 @@ type ChunkStreamReader struct { bufferOffset int64 bufferPos int chunkIndex int - lookupFileId func(fileId string) (targetUrl string, err error) + lookupFileId LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -98,6 +98,16 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ } } +func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: LookupFn(filerClient), + } +} + func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index b91aa3a72..c3dab919c 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -263,7 +263,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) cacheTtl := 5 * time.Minute - processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) { + processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() if entry.IsDirectory { @@ -274,6 +274,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { ret = append(ret, dirent) } dir.wfs.cacheSet(fullpath, entry, cacheTtl) + return nil } if dir.wfs.option.AsyncMetaDataCaching { diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 290c84e34..c5e033420 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -3,10 +3,12 @@ package broker import ( "fmt" "io" + "strings" "time" "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -57,6 +59,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs case messaging_pb.SubscriberMessage_InitMessage_LATEST: case messaging_pb.SubscriberMessage_InitMessage_EARLIEST: } + var processedTsNs int64 // how to process each message // an error returned will end the subscription @@ -81,9 +84,18 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) return err } + processedTsNs = logEntry.TsNs return nil } + if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil { + return err + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() @@ -94,3 +106,36 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + +func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { + startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) + + sizeBuf := make([]byte, 4) + startTsNs := startTime.UnixNano() + + topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + + return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { + dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name) + return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error { + if dayEntry.Name == startDate { + if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 { + return nil + } + } + // println("processing", hourMinuteEntry.FullPath) + chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks) + defer chunkedFileReader.Close() + if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + chunkedFileReader.Close() + if err == io.EOF { + return nil + } + return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err) + } + return nil + }, "", false, 24*60) + }, startDate, true, 366) + +} diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index c5d863262..73b66472d 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -56,19 +56,21 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry return } -func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) { +type EachEntryFunciton func(entry *Entry, isLast bool) error + +func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) { return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32) } -func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) { +func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit) } -func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) { +func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { @@ -92,7 +94,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f if recvErr != nil { if recvErr == io.EOF { if prevEntry != nil { - fn(prevEntry, true) + if err := fn(prevEntry, true); err != nil { + return err + } } break } else { @@ -100,7 +104,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f } } if prevEntry != nil { - fn(prevEntry, false) + if err := fn(prevEntry, false); err != nil { + return err + } } prevEntry = resp.Entry } diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go index 7c9a8ae28..4e5b65f12 100644 --- a/weed/pb/filer_pb/filer_client_bfs.go +++ b/weed/pb/filer_pb/filer_client_bfs.go @@ -45,7 +45,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) { - return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) { + return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error { fn(parentPath, entry) @@ -57,6 +57,7 @@ func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queu jobQueueWg.Add(1) queue.Enqueue(util.FullPath(subDir)) } + return nil }) } diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 51249cf39..7f49c320e 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -23,8 +23,9 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) { - err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) error { entries = append(entries, entry) + return nil }, startFrom, inclusive, limit) return diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index f195b09f7..a4a1d8b8b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -511,7 +511,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { dir, _ := util.FullPath(f.name).DirAndName() - err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ size: int64(filer2.TotalSize(entry.GetChunks())), name: entry.Name, @@ -525,6 +525,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { } glog.V(4).Infof("entry: %v", fi.name) ret = append(ret, &fi) + return nil }) old := f.off diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go index b982ff646..2e446b6b2 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_bucket_list.go @@ -45,12 +45,13 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("read buckets: %v", err) } - err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" { fmt.Fprintf(writer, " %s\n", entry.Name) } else { fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication) } + return nil }, "", false, math.MaxUint32) if err != nil { return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 08c553e7c..96551dd5a 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -54,7 +54,7 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { - err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error { var fileBlockCount, fileByteCount uint64 @@ -78,6 +78,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir if name != "" && !entry.IsDirectory { fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name) } + return nil }) return } diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 569b3bb9b..36133992f 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -62,10 +62,10 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer dir, name := util.FullPath(path).DirAndName() entryCount := 0 - err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error { if !showHidden && strings.HasPrefix(entry.Name, ".") { - return + return nil } entryCount++ @@ -100,6 +100,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer fmt.Fprintf(writer, "%s\n", entry.Name) } + return nil }) if isLongFormat && err == nil { diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index b0752ea03..a8c5b2018 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -51,10 +51,10 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d prefix.addMarker(level) - err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error { if level < 0 && name != "" { if entry.Name != name { - return + return nil } } @@ -70,7 +70,7 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d } else { fileCount++ } - + return nil }) return }