From 07d7abe428186c7771f51589cc397ecefa6453d2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 9 May 2020 00:31:34 -0700 Subject: [PATCH] add deleteTopic, refactoring --- weed/messaging/broker/broker_grpc_server.go | 20 +++++- .../broker/broker_grpc_server_subscribe.go | 2 +- .../{pub_sub_chan.go => pub_chan.go} | 54 ---------------- weed/messaging/msgclient/sub_chan.go | 63 +++++++++++++++++++ 4 files changed, 83 insertions(+), 56 deletions(-) rename weed/messaging/msgclient/{pub_sub_chan.go => pub_chan.go} (58%) create mode 100644 weed/messaging/msgclient/sub_chan.go diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index 32dab6813..305213622 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -2,7 +2,10 @@ package broker import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -11,9 +14,24 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin } func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) { - panic("implement me") + resp := &messaging_pb.DeleteTopicResponse{} + dir, entry := genTopicDirEntry(request.Namespace, request.Topic) + if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { + return nil, err + } else if exists { + err = filer_pb.Remove(broker, dir, entry, true, true, true) + } + return resp, nil } func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) { panic("implement me") } + +func genTopicDir(namespace, topic string) string { + return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace, topic) +} + +func genTopicDirEntry(namespace, topic string) (dir, entry string) { + return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace), topic +} diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 761129e80..f8fd16a14 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -119,7 +119,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() - topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic) + topicDir := genTopicDir(tp.Namespace, tp.Topic) partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition) return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error { diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_chan.go similarity index 58% rename from weed/messaging/msgclient/pub_sub_chan.go rename to weed/messaging/msgclient/pub_chan.go index a11240080..ccf301a6a 100644 --- a/weed/messaging/msgclient/pub_sub_chan.go +++ b/weed/messaging/msgclient/pub_chan.go @@ -3,7 +3,6 @@ package msgclient import ( "io" "log" - "time" "google.golang.org/grpc" @@ -63,56 +62,3 @@ func (pc *PubChannel) Close() error { } return nil } - -type SubChannel struct { - ch chan []byte - stream messaging_pb.SeaweedMessaging_SubscribeClient -} - -func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) - if err != nil { - return nil, err - } - - t := &SubChannel{ - ch: make(chan []byte), - stream: sc, - } - - go func() { - for { - resp, subErr := t.stream.Recv() - if subErr == io.EOF { - return - } - if subErr != nil { - log.Printf("fail to receive from netchan %s: %v", chanName, subErr) - return - } - if resp.Data.IsClose { - t.stream.Send(&messaging_pb.SubscriberMessage{ - IsClose: true, - }) - close(t.ch) - return - } - t.ch <- resp.Data.Value - } - }() - - return t, nil -} - -func (sc *SubChannel) Channel() chan []byte { - return sc.ch -} diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go new file mode 100644 index 000000000..edd4d1049 --- /dev/null +++ b/weed/messaging/msgclient/sub_chan.go @@ -0,0 +1,63 @@ +package msgclient + +import ( + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient +} + +func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) + close(t.ch) + return + } + t.ch <- resp.Data.Value + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +}