refactoring

This commit is contained in:
Chris Lu 2020-04-16 02:55:09 -07:00
parent ce4b369be2
commit f5a748d33c
5 changed files with 104 additions and 30 deletions

View file

@ -8,13 +8,12 @@ import (
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/messaging"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -79,7 +78,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
} }
} }
qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ qs, err := messaging.NewMessageBroker(&messaging.MessageBrokerOption{
Filers: []string{*msgBrokerOpt.filer}, Filers: []string{*msgBrokerOpt.filer},
DefaultReplication: "", DefaultReplication: "",
MaxMB: 0, MaxMB: 0,

View file

@ -0,0 +1,98 @@
package messaging
import (
"context"
"fmt"
"io"
"time"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
)
func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error {
panic("implement me")
}
func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
// process initial request
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
namespace, topic, partition := in.Namespace, in.Topic, in.Partition
updatesChan := make(chan int32)
go func() {
for update := range updatesChan {
if err := stream.Send(&messaging_pb.PublishResponse{
PartitionCount: update,
}); err != nil {
glog.V(0).Infof("err sending publish response: %v", err)
return
}
}
}()
logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
//targetFile :=
fmt.Sprintf("%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
filer2.TopicsDir, namespace, topic,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
partition,
)
/*
if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
}
*/
}, func() {
// notify subscribers
})
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
m := &messaging_pb.Message{
Timestamp: time.Now().UnixNano(),
Key: in.Key,
Value: in.Value,
Headers: in.Headers,
}
data, err := proto.Marshal(m)
if err != nil {
glog.Errorf("marshall error: %v\n", err)
continue
}
logBuffer.AddToBuffer(in.Key, data)
}
}
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
panic("implement me")
}
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}

View file

@ -1,4 +1,4 @@
package weed_server package messaging
import ( import (
"context" "context"

View file

@ -1,23 +0,0 @@
package weed_server
import (
"context"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error {
panic("implement me")
}
func (broker *MessageBroker) Publish(server messaging_pb.SeaweedMessaging_PublishServer) error {
panic("implement me")
}
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
panic("implement me")
}
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}

View file

@ -51,7 +51,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb return lb
} }
func (m *LogBuffer) AddToBuffer(key, data []byte) { func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
m.Lock() m.Lock()
defer func() { defer func() {
@ -65,7 +65,7 @@ func (m *LogBuffer) AddToBuffer(key, data []byte) {
ts := time.Now() ts := time.Now()
logEntry := &filer_pb.LogEntry{ logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(), TsNs: ts.UnixNano(),
PartitionKeyHash: util.HashToInt32(key), PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data, Data: data,
} }