diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 7f7c8f84b..042621a4c 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -3,12 +3,16 @@ package broker import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "time" ) // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer @@ -73,8 +77,9 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) // drain existing topic partition subscriptions - for _, brokerPartition := range request.BrokerPartitionAssignments { - localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition) + for _, assignment := range request.BrokerPartitionAssignments { + topicPartition := topic.FromPbPartition(assignment.Partition) + localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, topicPartition)) if request.IsDraining { // TODO drain existing topic partition subscriptions @@ -105,3 +110,29 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m return ret, nil } + +func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition topic.Partition) log_buffer.LogFlushFuncType { + topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) + partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) + partitionDir := fmt.Sprintf("%s/%s/%4d-%4d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop) + + return func(startTime, stopTime time.Time, buf []byte) { + if len(buf) == 0 { + return + } + + startTime, stopTime = startTime.UTC(), stopTime.UTC() + fileName := startTime.Format(topic.TIME_FORMAT) + + targetFile := fmt.Sprintf("%s/%s",partitionDir, fileName) + + for { + if err := b.appendToFile(targetFile, buf); err != nil { + glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + time.Sleep(737 * time.Millisecond) + } else { + break + } + } + } +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 34a263032..615964621 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -28,6 +28,7 @@ type MessageQueueBrokerOption struct { Ip string Port int Cipher bool + VolumeServerAccess string // how to access volume servers } type MessageQueueBroker struct { diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go new file mode 100644 index 000000000..866cd17c2 --- /dev/null +++ b/weed/mq/broker/broker_write.go @@ -0,0 +1,82 @@ +package broker + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "os" + "time" +) + +func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error { + + fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data) + if err2 != nil { + return err2 + } + + // find out existing entry + fullpath := util.FullPath(targetFile) + dir, name := fullpath.DirAndName() + entry, err := filer_pb.GetEntry(b, fullpath) + var offset int64 = 0 + if err == filer_pb.ErrNotFound { + entry = &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + FileMode: uint32(os.FileMode(0644)), + Uid: uint32(os.Getuid()), + Gid: uint32(os.Getgid()), + }, + } + } else if err != nil { + return fmt.Errorf("find %s: %v", fullpath, err) + } else { + offset = int64(filer.TotalSize(entry.GetChunks())) + } + + // append to existing chunks + entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano())) + + // update the entry + return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: entry, + }) + }) +} + +func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) { + + reader := util.NewBytesReader(data) + fileId, uploadResult, err, _ = operation.UploadWithRetry( + b, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: b.option.DefaultReplication, + Collection: "topics", + // TtlSec: wfs.option.TtlSec, + // DiskType: string(wfs.option.DiskType), + DataCenter: b.option.DataCenter, + Path: targetFile, + }, + &operation.UploadOption{ + Cipher: b.option.Cipher, + }, + func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if b.option.VolumeServerAccess == "filerProxy" { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId) + } + return fileUrl + }, + reader, + ) + return +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 36712bbbd..5cf315ddb 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -21,7 +21,8 @@ type LocalPartition struct { Subscribers *LocalPartitionSubscribers } -func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition { +var TIME_FORMAT = "2006-01-02-15-04-05" +func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType) *LocalPartition { return &LocalPartition{ Partition: partition, isLeader: isLeader, @@ -29,9 +30,7 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb. logBuffer: log_buffer.NewLogBuffer( fmt.Sprintf("%d/%4d-%4d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), 2*time.Minute, - func(startTime, stopTime time.Time, buf []byte) { - - }, + logFlushFn, func() { }, @@ -63,13 +62,13 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message return p.logBuffer.GetEarliestPosition() } -func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition { +func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType) *LocalPartition { isLeader := assignment.LeaderBroker == string(self) followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) for i, followerBroker := range assignment.FollowerBrokers { followers[i] = pb.ServerAddress(followerBroker) } - return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers) + return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers, logFlushFn) } func (p *LocalPartition) closePublishers() { diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 2cb6f8c41..567d660ef 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -22,6 +22,8 @@ type dataToFlush struct { data *bytes.Buffer } +type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte) + type LogBuffer struct { name string prevBuffers *SealedBuffers @@ -34,7 +36,7 @@ type LogBuffer struct { lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration - flushFn func(startTime, stopTime time.Time, buf []byte) + flushFn LogFlushFuncType notifyFn func() isStopping *atomic.Bool flushChan chan *dataToFlush @@ -42,7 +44,7 @@ type LogBuffer struct { sync.RWMutex } -func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { +func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, notifyFn func()) *LogBuffer { lb := &LogBuffer{ name: name, prevBuffers: newSealedBuffers(PreviousBufferCount),