diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index e7cbb6441..eb6946e81 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -48,6 +48,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs Partition: in.Init.Partition, } lock := broker.topicManager.RequestLock(tp, topicConfig, false) + subscription := lock.subscriptions.AddSubscription(subscriberId) defer broker.topicManager.ReleaseLock(tp, false) lastReadTime := time.Now() @@ -102,9 +103,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { - lock.Mutex.Lock() - lock.cond.Wait() - lock.Mutex.Unlock() + subscription.Wait() return true }, eachLogEntryFn) diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go new file mode 100644 index 000000000..f74b0c546 --- /dev/null +++ b/weed/messaging/broker/subscription.go @@ -0,0 +1,82 @@ +package broker + +import ( + "sync" +) + +type TopicPartitionSubscription struct { + sync.Mutex + name string + lastReadTsNs int64 + cond *sync.Cond +} + +func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription { + t := &TopicPartitionSubscription{ + name: name, + } + t.cond = sync.NewCond(t) + return t +} + +func (s *TopicPartitionSubscription) Wait() { + s.Mutex.Lock() + s.cond.Wait() + s.Mutex.Unlock() +} + +func (s *TopicPartitionSubscription) NotifyOne() { + // notify one waiting goroutine + s.cond.Signal() +} + +type TopicPartitionSubscriptions struct { + sync.Mutex + cond *sync.Cond + subscriptions map[string]*TopicPartitionSubscription + subscriptionsLock sync.RWMutex +} + +func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions { + m := &TopicPartitionSubscriptions{ + subscriptions: make(map[string]*TopicPartitionSubscription), + } + m.cond = sync.NewCond(m) + return m +} + +func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription { + m.subscriptionsLock.Lock() + defer m.subscriptionsLock.Unlock() + + if s, found := m.subscriptions[subscription]; found { + return s + } + + s := NewTopicPartitionSubscription(subscription) + m.subscriptions[subscription] = s + + return s + +} + +func (m *TopicPartitionSubscriptions) NotifyAll() { + + m.subscriptionsLock.RLock() + defer m.subscriptionsLock.RUnlock() + + for name, tps := range m.subscriptions { + println("notifying", name) + tps.NotifyOne() + } + +} + +func (m *TopicPartitionSubscriptions) Wait() { + m.Mutex.Lock() + m.cond.Wait() + for _, tps := range m.subscriptions { + tps.NotifyOne() + } + m.Mutex.Unlock() +} diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go index 21594dea5..e5cebb42d 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/messaging/broker/topic_manager.go @@ -25,28 +25,29 @@ func (tp *TopicPartition) String() string { return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) } -type TopicControl struct { +type TopicCursor struct { sync.Mutex cond *sync.Cond subscriberCount int publisherCount int logBuffer *log_buffer.LogBuffer + subscriptions *TopicPartitionSubscriptions } type TopicManager struct { sync.Mutex - topicControls map[TopicPartition]*TopicControl + topicControls map[TopicPartition]*TopicCursor broker *MessageBroker } func NewTopicManager(messageBroker *MessageBroker) *TopicManager { return &TopicManager{ - topicControls: make(map[TopicPartition]*TopicControl), + topicControls: make(map[TopicPartition]*TopicCursor), broker: messageBroker, } } -func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { +func (tm *TopicManager) buildLogBuffer(tl *TopicCursor, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer { flushFn := func(startTime, stopTime time.Time, buf []byte) { @@ -68,21 +69,21 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi } } logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() { - tl.cond.Broadcast() + tl.subscriptions.NotifyAll() }) return logBuffer } -func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl { +func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicCursor { tm.Lock() defer tm.Unlock() lock, found := tm.topicControls[partition] if !found { - lock = &TopicControl{} - lock.cond = sync.NewCond(&lock.Mutex) + lock = &TopicCursor{} tm.topicControls[partition] = lock + lock.subscriptions = NewTopicPartitionSubscriptions() lock.logBuffer = tm.buildLogBuffer(lock, partition, topicConfig) } if isPublisher { diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go index ed25a850c..5465c5913 100644 --- a/weed/messaging/msgclient/sub_chan.go +++ b/weed/messaging/msgclient/sub_chan.go @@ -17,7 +17,7 @@ type SubChannel struct { md5hash hash.Hash } -func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { +func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { tp := broker.TopicPartition{ Namespace: "chan", Topic: chanName, @@ -27,7 +27,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { if err != nil { return nil, err } - sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) + sc, err := setupSubscriberClient(grpcConnection, subscriberId, "chan", chanName, 0, time.Unix(0, 0)) if err != nil { return nil, err }