diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index e1a8999f3..1855e643c 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -31,21 +31,21 @@ var ( ) type CopyOptions struct { - include *string - replication *string - collection *string - ttl *string - diskType *string - maxMB *int - masterClient *wdclient.MasterClient - concurrentFiles *int - concurrentChunks *int - grpcDialOption grpc.DialOption - masters []string - cipher bool - ttlSec int32 - checkSize *bool - verbose *bool + include *string + replication *string + collection *string + ttl *string + diskType *string + maxMB *int + masterClient *wdclient.MasterClient + concurrentFiles *int + concurrentChunks *int + grpcDialOption grpc.DialOption + masters []string + cipher bool + ttlSec int32 + checkSize *bool + verbose *bool volumeServerAccess *string } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 892af17a0..d4305b666 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -80,7 +80,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } -func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { +func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { client, err := remote_storage.GetRemoteStorage(remoteStorage) if err != nil { return nil, err diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 349db3178..b8438b61f 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -37,24 +37,24 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // process ack messages go func() { - for { - _, err := stream.Recv() - if err != nil { - glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) - } - - select { - case <-ctx.Done(): - err := ctx.Err() - if err == context.Canceled { - // Client disconnected - return + for { + _, err := stream.Recv() + if err != nil { + glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) + } + + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return + } + return + default: + // Continue processing the request } - return - default: - // Continue processing the request } - } }() // send commands to subscriber @@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess } glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) return err - case message := <- cgi.ResponseChan: + case message := <-cgi.ResponseChan: if err := stream.Send(message); err != nil { glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err) } diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 988b971af..5e8c8275e 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -32,9 +32,10 @@ type Balancer struct { // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name } + func NewBalancer() *Balancer { return &Balancer{ - Brokers: cmap.New[*BrokerStats](), + Brokers: cmap.New[*BrokerStats](), TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](), } } diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index 7ceb2a9fc..9dc6140b3 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -6,8 +6,8 @@ import ( ) type PartitionSlotToBroker struct { - RangeStart int32 - RangeStop int32 + RangeStart int32 + RangeStop int32 AssignedBroker string } @@ -36,12 +36,12 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke } } ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{ - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, AssignedBroker: broker, }) } -func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { +func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { for _, partitionSlot := range ps.PartitionSlots { if partitionSlot.AssignedBroker == broker { partitionSlot.AssignedBroker = "" diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index be06a01f8..566a26ef7 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -6,28 +6,29 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) + type ConsumerGroupInstance struct { InstanceId string // the consumer group instance may not have an active partition - Partitions []*topic.Partition - ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse + Partitions []*topic.Partition + ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse } type ConsumerGroup struct { // map a consumer group instance id to a consumer group instance ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] - mapping *PartitionConsumerMapping + mapping *PartitionConsumerMapping } func NewConsumerGroup() *ConsumerGroup { return &ConsumerGroup{ ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), - mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount), + mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount), } } func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance { return &ConsumerGroupInstance{ - InstanceId: instanceId, + InstanceId: instanceId, ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), } } diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index f4d65ea5b..7ca536c6b 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -6,7 +6,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) - type TopicConsumerGroups struct { // map a consumer group name to a consumer group ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] @@ -19,13 +18,13 @@ type TopicConsumerGroups struct { type Coordinator struct { // map topic name to consumer groups TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] - balancer *pub_balancer.Balancer + balancer *pub_balancer.Balancer } func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator { return &Coordinator{ TopicSubscribers: cmap.New[*TopicConsumerGroups](), - balancer: balancer, + balancer: balancer, } } @@ -50,7 +49,7 @@ func toTopicName(topic *mq_pb.Topic) string { return topicName } -func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{ +func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance { tcg := c.GetTopicConsumerGroups(topic) cg, _ := tcg.ConsumerGroups.Get(consumerGroup) if cg == nil {