From e46c3415f752e2e0c252c420adb882c4bcb7416b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Jan 2017 01:01:12 -0800 Subject: [PATCH] gRpc for master~volume heartbeat --- weed/command/master.go | 27 +- weed/command/server.go | 28 +- weed/command/volume.go | 2 +- weed/operation/system_message.pb.go | 203 ----------- weed/operation/system_message_test.go | 59 --- weed/pb/Makefile | 6 + weed/pb/seaweed.pb.go | 384 ++++++++++++++++++++ weed/pb/seaweed.proto | 41 +++ weed/server/master_grpc_server.go | 57 +++ weed/server/master_server.go | 1 - weed/server/master_server_handlers_admin.go | 36 -- weed/server/volume_grpc_client.go | 74 ++++ weed/server/volume_server.go | 34 +- weed/storage/store.go | 102 ++---- weed/storage/volume_info.go | 25 +- weed/topology/data_node.go | 3 +- weed/topology/node.go | 6 - weed/topology/rack.go | 5 - weed/topology/topology.go | 40 +- weed/topology/topology_event_handling.go | 14 - 20 files changed, 664 insertions(+), 483 deletions(-) delete mode 100644 weed/operation/system_message.pb.go delete mode 100644 weed/operation/system_message_test.go create mode 100644 weed/pb/Makefile create mode 100644 weed/pb/seaweed.pb.go create mode 100644 weed/pb/seaweed.proto create mode 100644 weed/server/master_grpc_server.go create mode 100644 weed/server/volume_grpc_client.go diff --git a/weed/command/master.go b/weed/command/master.go index ec54fbd7b..eee22810b 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -10,9 +10,13 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/soheilhy/cmux" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) func init() { @@ -39,7 +43,7 @@ var ( mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file") defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") - mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds") + mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") @@ -99,8 +103,25 @@ func runMaster(cmd *Command, args []string) bool { ms.SetRaftServer(raftServer) }() - if e := http.Serve(listener, r); e != nil { - glog.Fatalf("Fail to serve: %v", e) + // start grpc and http server + m := cmux.New(listener) + + grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + httpL := m.Match(cmux.Any()) + + // Create your protocol servers. + grpcS := grpc.NewServer() + pb.RegisterSeaweedServer(grpcS, ms) + reflection.Register(grpcS) + + httpS := &http.Server{Handler: r} + + go grpcS.Serve(grpcL) + go httpS.Serve(httpL) + + if err := m.Serve(); err != nil { + glog.Fatalf("master server failed to serve: %v", err) } + return true } diff --git a/weed/command/server.go b/weed/command/server.go index 87146940f..5bde22517 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -11,10 +11,14 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/soheilhy/cmux" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) type ServerOptions struct { @@ -51,7 +55,7 @@ var ( serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") + serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") @@ -230,9 +234,27 @@ func runServer(cmd *Command, args []string) bool { }() raftWaitForMaster.Done() - if e := http.Serve(masterListener, r); e != nil { - glog.Fatalf("Master Fail to serve:%s", e.Error()) + + // start grpc and http server + m := cmux.New(masterListener) + + grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + httpL := m.Match(cmux.Any()) + + // Create your protocol servers. + grpcS := grpc.NewServer() + pb.RegisterSeaweedServer(grpcS, ms) + reflection.Register(grpcS) + + httpS := &http.Server{Handler: r} + + go grpcS.Serve(grpcL) + go httpS.Serve(httpL) + + if err := m.Serve(); err != nil { + glog.Fatalf("master server failed to serve: %v", err) } + }() volumeWait.Wait() diff --git a/weed/command/volume.go b/weed/command/volume.go index ba498b8e4..0e69325b6 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -48,7 +48,7 @@ func init() { v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") - v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") + v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") diff --git a/weed/operation/system_message.pb.go b/weed/operation/system_message.pb.go deleted file mode 100644 index 742a1ca4e..000000000 --- a/weed/operation/system_message.pb.go +++ /dev/null @@ -1,203 +0,0 @@ -// Code generated by protoc-gen-go. -// source: system_message.proto -// DO NOT EDIT! - -/* -Package operation is a generated protocol buffer package. - -It is generated from these files: - system_message.proto - -It has these top-level messages: - VolumeInformationMessage - JoinMessage -*/ -package operation - -import proto "github.com/golang/protobuf/proto" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = math.Inf - -type VolumeInformationMessage struct { - Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"` - Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"` - Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` - FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"` - DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"` - DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"` - ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` - ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` - Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` - Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } -func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } -func (*VolumeInformationMessage) ProtoMessage() {} - -const Default_VolumeInformationMessage_Version uint32 = 2 - -func (m *VolumeInformationMessage) GetId() uint32 { - if m != nil && m.Id != nil { - return *m.Id - } - return 0 -} - -func (m *VolumeInformationMessage) GetSize() uint64 { - if m != nil && m.Size != nil { - return *m.Size - } - return 0 -} - -func (m *VolumeInformationMessage) GetCollection() string { - if m != nil && m.Collection != nil { - return *m.Collection - } - return "" -} - -func (m *VolumeInformationMessage) GetFileCount() uint64 { - if m != nil && m.FileCount != nil { - return *m.FileCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeleteCount() uint64 { - if m != nil && m.DeleteCount != nil { - return *m.DeleteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { - if m != nil && m.DeletedByteCount != nil { - return *m.DeletedByteCount - } - return 0 -} - -func (m *VolumeInformationMessage) GetReadOnly() bool { - if m != nil && m.ReadOnly != nil { - return *m.ReadOnly - } - return false -} - -func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { - if m != nil && m.ReplicaPlacement != nil { - return *m.ReplicaPlacement - } - return 0 -} - -func (m *VolumeInformationMessage) GetVersion() uint32 { - if m != nil && m.Version != nil { - return *m.Version - } - return Default_VolumeInformationMessage_Version -} - -func (m *VolumeInformationMessage) GetTtl() uint32 { - if m != nil && m.Ttl != nil { - return *m.Ttl - } - return 0 -} - -type JoinMessage struct { - IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` - Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` - Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"` - PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"` - MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"` - MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"` - DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"` - Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` - Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` - AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *JoinMessage) Reset() { *m = JoinMessage{} } -func (m *JoinMessage) String() string { return proto.CompactTextString(m) } -func (*JoinMessage) ProtoMessage() {} - -func (m *JoinMessage) GetIsInit() bool { - if m != nil && m.IsInit != nil { - return *m.IsInit - } - return false -} - -func (m *JoinMessage) GetIp() string { - if m != nil && m.Ip != nil { - return *m.Ip - } - return "" -} - -func (m *JoinMessage) GetPort() uint32 { - if m != nil && m.Port != nil { - return *m.Port - } - return 0 -} - -func (m *JoinMessage) GetPublicUrl() string { - if m != nil && m.PublicUrl != nil { - return *m.PublicUrl - } - return "" -} - -func (m *JoinMessage) GetMaxVolumeCount() uint32 { - if m != nil && m.MaxVolumeCount != nil { - return *m.MaxVolumeCount - } - return 0 -} - -func (m *JoinMessage) GetMaxFileKey() uint64 { - if m != nil && m.MaxFileKey != nil { - return *m.MaxFileKey - } - return 0 -} - -func (m *JoinMessage) GetDataCenter() string { - if m != nil && m.DataCenter != nil { - return *m.DataCenter - } - return "" -} - -func (m *JoinMessage) GetRack() string { - if m != nil && m.Rack != nil { - return *m.Rack - } - return "" -} - -func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage { - if m != nil { - return m.Volumes - } - return nil -} - -func (m *JoinMessage) GetAdminPort() uint32 { - if m != nil && m.AdminPort != nil { - return *m.AdminPort - } - return 0 -} - -func init() { -} diff --git a/weed/operation/system_message_test.go b/weed/operation/system_message_test.go deleted file mode 100644 index d18ca49a4..000000000 --- a/weed/operation/system_message_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package operation - -import ( - "encoding/json" - "log" - "testing" - - "github.com/golang/protobuf/proto" -) - -func TestSerialDeserial(t *testing.T) { - volumeMessage := &VolumeInformationMessage{ - Id: proto.Uint32(12), - Size: proto.Uint64(2341234), - Collection: proto.String("benchmark"), - FileCount: proto.Uint64(2341234), - DeleteCount: proto.Uint64(234), - DeletedByteCount: proto.Uint64(21234), - ReadOnly: proto.Bool(false), - ReplicaPlacement: proto.Uint32(210), - Version: proto.Uint32(2), - } - var volumeMessages []*VolumeInformationMessage - volumeMessages = append(volumeMessages, volumeMessage) - - joinMessage := &JoinMessage{ - IsInit: proto.Bool(true), - Ip: proto.String("127.0.3.12"), - Port: proto.Uint32(34546), - PublicUrl: proto.String("localhost:2342"), - MaxVolumeCount: proto.Uint32(210), - MaxFileKey: proto.Uint64(324234423), - DataCenter: proto.String("dc1"), - Rack: proto.String("rack2"), - Volumes: volumeMessages, - } - - data, err := proto.Marshal(joinMessage) - if err != nil { - log.Fatal("marshaling error: ", err) - } - newMessage := &JoinMessage{} - err = proto.Unmarshal(data, newMessage) - if err != nil { - log.Fatal("unmarshaling error: ", err) - } - log.Println("The pb data size is", len(data)) - - jsonData, jsonError := json.Marshal(joinMessage) - if jsonError != nil { - log.Fatal("json marshaling error: ", jsonError) - } - log.Println("The json data size is", len(jsonData), string(jsonData)) - - // Now test and newTest contain the same data. - if *joinMessage.PublicUrl != *newMessage.PublicUrl { - log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl) - } -} diff --git a/weed/pb/Makefile b/weed/pb/Makefile new file mode 100644 index 000000000..8d0eb7854 --- /dev/null +++ b/weed/pb/Makefile @@ -0,0 +1,6 @@ +all: gen + +.PHONY : gen + +gen: + protoc seaweed.proto --go_out=plugins=grpc:. diff --git a/weed/pb/seaweed.pb.go b/weed/pb/seaweed.pb.go new file mode 100644 index 000000000..02de2d8a6 --- /dev/null +++ b/weed/pb/seaweed.pb.go @@ -0,0 +1,384 @@ +// Code generated by protoc-gen-go. +// source: seaweed.proto +// DO NOT EDIT! + +/* +Package pb is a generated protocol buffer package. + +It is generated from these files: + seaweed.proto + +It has these top-level messages: + Heartbeat + HeartbeatResponse + VolumeInformationMessage +*/ +package pb + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Heartbeat struct { + IsInit bool `protobuf:"varint,1,opt,name=is_init,json=isInit" json:"is_init,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"` + Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"` + PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"` + MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"` + MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"` + DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"` + Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"` + AdminPort uint32 `protobuf:"varint,10,opt,name=admin_port,json=adminPort" json:"admin_port,omitempty"` +} + +func (m *Heartbeat) Reset() { *m = Heartbeat{} } +func (m *Heartbeat) String() string { return proto.CompactTextString(m) } +func (*Heartbeat) ProtoMessage() {} +func (*Heartbeat) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Heartbeat) GetIsInit() bool { + if m != nil { + return m.IsInit + } + return false +} + +func (m *Heartbeat) GetIp() string { + if m != nil { + return m.Ip + } + return "" +} + +func (m *Heartbeat) GetPort() uint32 { + if m != nil { + return m.Port + } + return 0 +} + +func (m *Heartbeat) GetPublicUrl() string { + if m != nil { + return m.PublicUrl + } + return "" +} + +func (m *Heartbeat) GetMaxVolumeCount() uint32 { + if m != nil { + return m.MaxVolumeCount + } + return 0 +} + +func (m *Heartbeat) GetMaxFileKey() uint64 { + if m != nil { + return m.MaxFileKey + } + return 0 +} + +func (m *Heartbeat) GetDataCenter() string { + if m != nil { + return m.DataCenter + } + return "" +} + +func (m *Heartbeat) GetRack() string { + if m != nil { + return m.Rack + } + return "" +} + +func (m *Heartbeat) GetVolumes() []*VolumeInformationMessage { + if m != nil { + return m.Volumes + } + return nil +} + +func (m *Heartbeat) GetAdminPort() uint32 { + if m != nil { + return m.AdminPort + } + return 0 +} + +type HeartbeatResponse struct { + VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volumeSizeLimit" json:"volumeSizeLimit,omitempty"` + SecretKey string `protobuf:"bytes,2,opt,name=secretKey" json:"secretKey,omitempty"` +} + +func (m *HeartbeatResponse) Reset() { *m = HeartbeatResponse{} } +func (m *HeartbeatResponse) String() string { return proto.CompactTextString(m) } +func (*HeartbeatResponse) ProtoMessage() {} +func (*HeartbeatResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *HeartbeatResponse) GetVolumeSizeLimit() uint64 { + if m != nil { + return m.VolumeSizeLimit + } + return 0 +} + +func (m *HeartbeatResponse) GetSecretKey() string { + if m != nil { + return m.SecretKey + } + return "" +} + +type VolumeInformationMessage struct { + Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + Size uint64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"` + Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + FileCount uint64 `protobuf:"varint,4,opt,name=file_count,json=fileCount" json:"file_count,omitempty"` + DeleteCount uint64 `protobuf:"varint,5,opt,name=delete_count,json=deleteCount" json:"delete_count,omitempty"` + DeletedByteCount uint64 `protobuf:"varint,6,opt,name=deleted_byte_count,json=deletedByteCount" json:"deleted_byte_count,omitempty"` + ReadOnly bool `protobuf:"varint,7,opt,name=read_only,json=readOnly" json:"read_only,omitempty"` + ReplicaPlacement uint32 `protobuf:"varint,8,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"` + Version uint32 `protobuf:"varint,9,opt,name=version" json:"version,omitempty"` + Ttl uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` +} + +func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} } +func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) } +func (*VolumeInformationMessage) ProtoMessage() {} +func (*VolumeInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *VolumeInformationMessage) GetId() uint32 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *VolumeInformationMessage) GetSize() uint64 { + if m != nil { + return m.Size + } + return 0 +} + +func (m *VolumeInformationMessage) GetCollection() string { + if m != nil { + return m.Collection + } + return "" +} + +func (m *VolumeInformationMessage) GetFileCount() uint64 { + if m != nil { + return m.FileCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeleteCount() uint64 { + if m != nil { + return m.DeleteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 { + if m != nil { + return m.DeletedByteCount + } + return 0 +} + +func (m *VolumeInformationMessage) GetReadOnly() bool { + if m != nil { + return m.ReadOnly + } + return false +} + +func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 { + if m != nil { + return m.ReplicaPlacement + } + return 0 +} + +func (m *VolumeInformationMessage) GetVersion() uint32 { + if m != nil { + return m.Version + } + return 0 +} + +func (m *VolumeInformationMessage) GetTtl() uint32 { + if m != nil { + return m.Ttl + } + return 0 +} + +func init() { + proto.RegisterType((*Heartbeat)(nil), "pb.Heartbeat") + proto.RegisterType((*HeartbeatResponse)(nil), "pb.HeartbeatResponse") + proto.RegisterType((*VolumeInformationMessage)(nil), "pb.VolumeInformationMessage") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Seaweed service + +type SeaweedClient interface { + SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error) +} + +type seaweedClient struct { + cc *grpc.ClientConn +} + +func NewSeaweedClient(cc *grpc.ClientConn) SeaweedClient { + return &seaweedClient{cc} +} + +func (c *seaweedClient) SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Seaweed_serviceDesc.Streams[0], c.cc, "/pb.Seaweed/SendHeartbeat", opts...) + if err != nil { + return nil, err + } + x := &seaweedSendHeartbeatClient{stream} + return x, nil +} + +type Seaweed_SendHeartbeatClient interface { + Send(*Heartbeat) error + Recv() (*HeartbeatResponse, error) + grpc.ClientStream +} + +type seaweedSendHeartbeatClient struct { + grpc.ClientStream +} + +func (x *seaweedSendHeartbeatClient) Send(m *Heartbeat) error { + return x.ClientStream.SendMsg(m) +} + +func (x *seaweedSendHeartbeatClient) Recv() (*HeartbeatResponse, error) { + m := new(HeartbeatResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for Seaweed service + +type SeaweedServer interface { + SendHeartbeat(Seaweed_SendHeartbeatServer) error +} + +func RegisterSeaweedServer(s *grpc.Server, srv SeaweedServer) { + s.RegisterService(&_Seaweed_serviceDesc, srv) +} + +func _Seaweed_SendHeartbeat_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SeaweedServer).SendHeartbeat(&seaweedSendHeartbeatServer{stream}) +} + +type Seaweed_SendHeartbeatServer interface { + Send(*HeartbeatResponse) error + Recv() (*Heartbeat, error) + grpc.ServerStream +} + +type seaweedSendHeartbeatServer struct { + grpc.ServerStream +} + +func (x *seaweedSendHeartbeatServer) Send(m *HeartbeatResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *seaweedSendHeartbeatServer) Recv() (*Heartbeat, error) { + m := new(Heartbeat) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Seaweed_serviceDesc = grpc.ServiceDesc{ + ServiceName: "pb.Seaweed", + HandlerType: (*SeaweedServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SendHeartbeat", + Handler: _Seaweed_SendHeartbeat_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "seaweed.proto", +} + +func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 511 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x93, 0x41, 0x6f, 0xd3, 0x4c, + 0x10, 0x86, 0x3f, 0x3b, 0xfe, 0x92, 0x78, 0x52, 0x97, 0x74, 0x25, 0x84, 0x05, 0x05, 0x4c, 0x4e, + 0x96, 0x40, 0x11, 0x2a, 0x12, 0x17, 0x6e, 0x54, 0xaa, 0xa8, 0x0a, 0xa2, 0xda, 0x08, 0x2e, 0x1c, + 0xac, 0xb5, 0x3d, 0x45, 0xab, 0xae, 0xd7, 0xd6, 0x7a, 0x53, 0xe2, 0xfe, 0x39, 0x2e, 0xfc, 0x30, + 0xb4, 0xb3, 0x49, 0x5a, 0x90, 0xb8, 0xcd, 0x3c, 0xfb, 0x8e, 0x77, 0x67, 0xde, 0x31, 0x24, 0x3d, + 0x8a, 0x1f, 0x88, 0xf5, 0xb2, 0x33, 0xad, 0x6d, 0x59, 0xd8, 0x95, 0x8b, 0x9f, 0x21, 0xc4, 0x1f, + 0x50, 0x18, 0x5b, 0xa2, 0xb0, 0xec, 0x11, 0x4c, 0x64, 0x5f, 0x48, 0x2d, 0x6d, 0x1a, 0x64, 0x41, + 0x3e, 0xe5, 0x63, 0xd9, 0x9f, 0x6b, 0x69, 0xd9, 0x21, 0x84, 0xb2, 0x4b, 0xc3, 0x2c, 0xc8, 0x63, + 0x1e, 0xca, 0x8e, 0x31, 0x88, 0xba, 0xd6, 0xd8, 0x74, 0x94, 0x05, 0x79, 0xc2, 0x29, 0x66, 0x4f, + 0x01, 0xba, 0x75, 0xa9, 0x64, 0x55, 0xac, 0x8d, 0x4a, 0x23, 0xd2, 0xc6, 0x9e, 0x7c, 0x31, 0x8a, + 0xe5, 0x30, 0x6f, 0xc4, 0xa6, 0xb8, 0x69, 0xd5, 0xba, 0xc1, 0xa2, 0x6a, 0xd7, 0xda, 0xa6, 0xff, + 0x53, 0xf9, 0x61, 0x23, 0x36, 0x5f, 0x09, 0x9f, 0x3a, 0xca, 0x32, 0x38, 0x70, 0xca, 0x2b, 0xa9, + 0xb0, 0xb8, 0xc6, 0x21, 0x1d, 0x67, 0x41, 0x1e, 0x71, 0x68, 0xc4, 0xe6, 0x4c, 0x2a, 0xbc, 0xc0, + 0x81, 0x3d, 0x87, 0x59, 0x2d, 0xac, 0x28, 0x2a, 0xd4, 0x16, 0x4d, 0x3a, 0xa1, 0xbb, 0xc0, 0xa1, + 0x53, 0x22, 0xee, 0x7d, 0x46, 0x54, 0xd7, 0xe9, 0x94, 0x4e, 0x28, 0x66, 0x6f, 0x61, 0xe2, 0x2f, + 0xef, 0xd3, 0x38, 0x1b, 0xe5, 0xb3, 0x93, 0xe3, 0x65, 0x57, 0x2e, 0xfd, 0xc5, 0xe7, 0xfa, 0xaa, + 0x35, 0x8d, 0xb0, 0xb2, 0xd5, 0x9f, 0xb0, 0xef, 0xc5, 0x77, 0xe4, 0x3b, 0xb1, 0xeb, 0x4b, 0xd4, + 0x8d, 0xd4, 0x05, 0x75, 0x0c, 0xf4, 0xe4, 0x98, 0xc8, 0x65, 0x6b, 0xec, 0xe2, 0x1b, 0x1c, 0xed, + 0x07, 0xc8, 0xb1, 0xef, 0x5a, 0xdd, 0x23, 0xcb, 0xe1, 0x81, 0x2f, 0x5f, 0xc9, 0x5b, 0xfc, 0x28, + 0x9b, 0xed, 0x40, 0x23, 0xfe, 0x37, 0x66, 0xc7, 0x10, 0xf7, 0x58, 0x19, 0xb4, 0x17, 0x38, 0x6c, + 0x07, 0x7c, 0x07, 0x16, 0xbf, 0x42, 0x48, 0xff, 0xf5, 0x42, 0x32, 0xa5, 0xa6, 0xef, 0x26, 0x3c, + 0x94, 0xb5, 0x6b, 0xba, 0x97, 0xb7, 0x48, 0x5f, 0x89, 0x38, 0xc5, 0xec, 0x19, 0x40, 0xd5, 0x2a, + 0x85, 0x95, 0x2b, 0x24, 0xbb, 0x62, 0x7e, 0x8f, 0xb8, 0xe6, 0x68, 0xce, 0xde, 0x8f, 0x88, 0x2a, + 0x63, 0x47, 0xbc, 0x15, 0x2f, 0xe0, 0xa0, 0x46, 0x85, 0xf6, 0xbe, 0x61, 0x11, 0x9f, 0x79, 0xe6, + 0x25, 0xaf, 0x80, 0xf9, 0xb4, 0x2e, 0xca, 0x61, 0x2f, 0xf4, 0x9e, 0xcd, 0xb7, 0x27, 0xef, 0x87, + 0x9d, 0xfa, 0x09, 0xc4, 0x06, 0x45, 0x5d, 0xb4, 0x5a, 0x0d, 0xe4, 0xdb, 0x94, 0x4f, 0x1d, 0xf8, + 0xac, 0xd5, 0xc0, 0x5e, 0xc2, 0x91, 0xc1, 0x4e, 0xc9, 0x4a, 0x14, 0x9d, 0x12, 0x15, 0x36, 0xa8, + 0x2d, 0x59, 0x98, 0xf0, 0xf9, 0xf6, 0xe0, 0x72, 0xc7, 0x59, 0x0a, 0x93, 0x1b, 0x34, 0xbd, 0x6b, + 0x2b, 0x26, 0xc9, 0x2e, 0x65, 0x73, 0x18, 0x59, 0xab, 0xb6, 0x4e, 0xb9, 0xf0, 0xe4, 0x0c, 0x26, + 0x2b, 0xbf, 0xfa, 0xec, 0x1d, 0x24, 0x2b, 0xd4, 0xf5, 0xdd, 0xce, 0x27, 0x6e, 0x0b, 0xf6, 0xe9, + 0xe3, 0x87, 0x7f, 0xa4, 0x3b, 0x43, 0x17, 0xff, 0xe5, 0xc1, 0xeb, 0xa0, 0x1c, 0xd3, 0x8f, 0xf3, + 0xe6, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7d, 0xc1, 0xd3, 0x35, 0x49, 0x03, 0x00, 0x00, +} diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto new file mode 100644 index 000000000..2dc8343a2 --- /dev/null +++ b/weed/pb/seaweed.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package pb; + +////////////////////////////////////////////////// + +service Seaweed { + rpc SendHeartbeat(stream Heartbeat) returns (stream HeartbeatResponse) {} +} + +////////////////////////////////////////////////// + +message Heartbeat { + bool is_init = 1; + string ip = 2; + uint32 port = 3; + string public_url = 4; + uint32 max_volume_count = 5; + uint64 max_file_key = 6; + string data_center = 7; + string rack = 8; + repeated VolumeInformationMessage volumes = 9; + uint32 admin_port = 10; +} +message HeartbeatResponse { + uint64 volumeSizeLimit = 1; + string secretKey = 2; +} + +message VolumeInformationMessage { + uint32 id = 1; + uint64 size = 2; + string collection = 3; + uint64 file_count = 4; + uint64 delete_count = 5; + uint64 deleted_byte_count = 6; + bool read_only = 7; + uint32 replica_placement = 8; + uint32 version = 9; + uint32 ttl = 10; +} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go new file mode 100644 index 000000000..29c95a3d4 --- /dev/null +++ b/weed/server/master_grpc_server.go @@ -0,0 +1,57 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/topology" +) + +func (ms MasterServer) SendHeartbeat(stream pb.Seaweed_SendHeartbeatServer) error { + var dn *topology.DataNode + t := ms.Topo + for { + heartbeat, err := stream.Recv() + if err == nil { + if dn == nil { + t.Sequence.SetMax(heartbeat.MaxFileKey) + dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, + int(heartbeat.Port), heartbeat.PublicUrl, + int(heartbeat.MaxVolumeCount)) + glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) + if err := stream.Send(&pb.HeartbeatResponse{ + VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + SecretKey: string(ms.guard.SecretKey), + }); err != nil { + return err + } + } + + var volumeInfos []storage.VolumeInfo + for _, v := range heartbeat.Volumes { + if vi, err := storage.NewVolumeInfo(v); err == nil { + volumeInfos = append(volumeInfos, vi) + } else { + glog.V(0).Infof("Fail to convert joined volume information: %v", err) + } + } + deletedVolumes := dn.UpdateVolumes(volumeInfos) + for _, v := range volumeInfos { + t.RegisterVolumeLayout(v, dn) + } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } + + } else { + glog.V(0).Infof("lost volume server %s:%d", dn.Ip, dn.Port) + if dn != nil { + t.UnRegisterDataNode(dn) + } + return err + } + } +} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9f59c2400..f02cb2790 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -72,7 +72,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/ui/index.html", ms.uiStatusHandler) r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index efe81bf89..b15125576 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -1,21 +1,16 @@ package weed_server import ( - "encoding/json" "errors" "fmt" - "io/ioutil" "math/rand" "net/http" "strconv" - "strings" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -34,37 +29,6 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R ms.Topo.DeleteCollection(r.FormValue("collection")) } -func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - joinMessage := &operation.JoinMessage{} - if err = proto.Unmarshal(body, joinMessage); err != nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - if *joinMessage.Ip == "" { - *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")] - } - if glog.V(4) { - if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil { - glog.V(0).Infoln("json marshaling error: ", jsonError) - writeJsonError(w, r, http.StatusBadRequest, jsonError) - return - } else { - glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) - } - } - - ms.Topo.ProcessJoinMessage(joinMessage) - writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), - }) -} - func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go new file mode 100644 index 000000000..54e2c2f75 --- /dev/null +++ b/weed/server/volume_grpc_client.go @@ -0,0 +1,74 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +func (vs *VolumeServer) heartbeat() { + + glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) + vs.masterNodes = storage.NewMasterNodes(vs.masterNode) + vs.store.SetDataCenter(vs.dataCenter) + vs.store.SetRack(vs.rack) + + for { + err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second) + if err != nil { + glog.V(0).Infof("heartbeat error: %v", err) + time.Sleep(time.Duration(3*vs.pulseSeconds) * time.Second) + } + } +} + +func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { + + masterNode, err := vs.masterNodes.FindMaster() + if err != nil { + return fmt.Errorf("No master found: %v", err) + } + + grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("fail to dial: %v", err) + } + defer grpcConection.Close() + + client := pb.NewSeaweedClient(grpcConection) + stream, err := client.SendHeartbeat(context.Background()) + if err != nil { + glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err) + return err + } + vs.SetMasterNode(masterNode) + glog.V(0).Infof("Heartbeat to %s", masterNode) + + vs.store.Client = stream + defer func() { vs.store.Client = nil }() + + go func() { + for { + in, err := stream.Recv() + if err != nil { + return + } + vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit() + vs.guard.SecretKey = security.Secret(in.GetSecretKey()) + } + }() + + for { + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return err + } + time.Sleep(sleepInterval) + } +} diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 1a912a169..e86c33bda 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,10 +1,8 @@ package weed_server import ( - "math/rand" "net/http" "sync" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -19,6 +17,7 @@ type VolumeServer struct { rack string store *storage.Store guard *security.Guard + masterNodes *storage.MasterNodes needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -70,36 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, publicMux.HandleFunc("/", vs.publicReadOnlyHandler) } - go func() { - connected := true - - glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) - vs.store.SetBootstrapMaster(vs.GetMasterNode()) - vs.store.SetDataCenter(vs.dataCenter) - vs.store.SetRack(vs.rack) - for { - glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode()) - master, secretKey, err := vs.store.SendHeartbeatToMaster() - if err == nil { - if !connected { - connected = true - vs.SetMasterNode(master) - vs.guard.SecretKey = secretKey - glog.V(0).Infoln("Volume Server Connected with master at", master) - } - } else { - glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err) - if connected { - connected = false - } - } - if connected { - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) - } else { - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond) - } - } - }() + go vs.heartbeat() return vs } diff --git a/weed/storage/store.go b/weed/storage/store.go index be2044d64..c62ac9ab7 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -1,7 +1,6 @@ package storage import ( - "encoding/json" "errors" "fmt" "math/rand" @@ -10,9 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/pb" ) const ( @@ -76,12 +73,12 @@ type Store struct { dataCenter string //optional informaton, overwriting master setting if exists rack string //optional information, overwriting master setting if exists connected bool - volumeSizeLimit uint64 //read from the master - masterNodes *MasterNodes + VolumeSizeLimit uint64 //read from the master + Client pb.Seaweed_SendHeartbeatClient } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes) + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit) return } @@ -208,15 +205,8 @@ func (s *Store) SetRack(rack string) { s.rack = rack } -func (s *Store) SetBootstrapMaster(bootstrapMaster string) { - s.masterNodes = NewMasterNodes(bootstrapMaster) -} -func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { - masterNode, e = s.masterNodes.FindMaster() - if e != nil { - return - } - var volumeMessages []*operation.VolumeInformationMessage +func (s *Store) CollectHeartbeat() *pb.Heartbeat { + var volumeMessages []*pb.VolumeInformationMessage maxVolumeCount := 0 var maxFileKey uint64 for _, location := range s.Locations { @@ -226,18 +216,18 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } - if !v.expired(s.volumeSizeLimit) { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - Ttl: proto.Uint32(v.Ttl.ToUint32()), + if !v.expired(s.VolumeSizeLimit) { + volumeMessage := &pb.VolumeInformationMessage{ + Id: uint32(k), + Size: uint64(v.Size()), + Collection: v.Collection, + FileCount: uint64(v.nm.FileCount()), + DeleteCount: uint64(v.nm.DeletedCount()), + DeletedByteCount: v.nm.DeletedSize(), + ReadOnly: v.readOnly, + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Version: uint32(v.Version()), + Ttl: v.Ttl.ToUint32(), } volumeMessages = append(volumeMessages, volumeMessage) } else { @@ -252,45 +242,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S location.Unlock() } - joinMessage := &operation.JoinMessage{ - IsInit: proto.Bool(!s.connected), - Ip: proto.String(s.Ip), - Port: proto.Uint32(uint32(s.Port)), - PublicUrl: proto.String(s.PublicUrl), - MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)), - MaxFileKey: proto.Uint64(maxFileKey), - DataCenter: proto.String(s.dataCenter), - Rack: proto.String(s.rack), + return &pb.Heartbeat{ + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCount: uint32(maxVolumeCount), + MaxFileKey: maxFileKey, + DataCenter: s.dataCenter, + Rack: s.rack, Volumes: volumeMessages, } - data, err := proto.Marshal(joinMessage) - if err != nil { - return "", "", err - } - - joinUrl := "http://" + masterNode + "/dir/join" - glog.V(4).Infof("Connecting to %s ...", joinUrl) - - jsonBlob, err := util.PostBytes(joinUrl, data) - if err != nil { - s.masterNodes.Reset() - return "", "", err - } - var ret operation.JoinResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) - s.masterNodes.Reset() - return masterNode, "", err - } - if ret.Error != "" { - s.masterNodes.Reset() - return masterNode, "", errors.New(ret.Error) - } - s.volumeSizeLimit = ret.VolumeSizeLimit - secretKey = security.Secret(ret.SecretKey) - s.connected = true - return } func (s *Store) Close() { for _, location := range s.Locations { @@ -307,12 +269,14 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { size, err = v.writeNeedle(n) } else { - err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize()) + err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize()) } - if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { - glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.SendHeartbeatToMaster(); e != nil { - glog.V(0).Infoln("error when reporting size:", e) + if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) { + glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit) + if s.Client != nil { + if e := s.Client.Send(s.CollectHeartbeat()); e != nil { + glog.V(0).Infoln("error when reporting size:", e) + } } } return diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index b3068eec3..c73c27fe4 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -2,8 +2,9 @@ package storage import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/operation" "sort" + + "github.com/chrislusf/seaweedfs/weed/pb" ) type VolumeInfo struct { @@ -19,23 +20,23 @@ type VolumeInfo struct { ReadOnly bool } -func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { +func NewVolumeInfo(m *pb.VolumeInformationMessage) (vi VolumeInfo, err error) { vi = VolumeInfo{ - Id: VolumeId(*m.Id), - Size: *m.Size, - Collection: *m.Collection, - FileCount: int(*m.FileCount), - DeleteCount: int(*m.DeleteCount), - DeletedByteCount: *m.DeletedByteCount, - ReadOnly: *m.ReadOnly, - Version: Version(*m.Version), + Id: VolumeId(m.Id), + Size: m.Size, + Collection: m.Collection, + FileCount: int(m.FileCount), + DeleteCount: int(m.DeleteCount), + DeletedByteCount: m.DeletedByteCount, + ReadOnly: m.ReadOnly, + Version: Version(m.Version), } - rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement)) + rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { return vi, e } vi.ReplicaPlacement = rp - vi.Ttl = LoadTTLFromUint32(*m.Ttl) + vi.Ttl = LoadTTLFromUint32(m.Ttl) return vi, nil } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index b7f039559..0ef8ae14e 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -15,7 +15,6 @@ type DataNode struct { Port int PublicUrl string LastSeen int64 // unix time in seconds - Dead bool } func NewDataNode(id string) *DataNode { @@ -30,7 +29,7 @@ func NewDataNode(id string) *DataNode { func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) + return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { diff --git a/weed/topology/node.go b/weed/topology/node.go index 4ce35f4b0..7383f9576 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -242,12 +242,6 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true - n.GetTopology().chanDeadDataNodes <- dn - } - } for _, v := range dn.GetVolumes() { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 1ca2f8de8..a48d64323 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -32,11 +32,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn := c.(*DataNode) if dn.MatchLocation(ip, port) { dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } return dn } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 04b500053..ffd32ae21 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -24,11 +23,9 @@ type Topology struct { Sequence sequence.Sequencer - chanDeadDataNodes chan *DataNode - chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan storage.VolumeInfo + chanFullVolumes chan storage.VolumeInfo - configuration *Configuration + Configuration *Configuration RaftServer raft.Server } @@ -45,8 +42,6 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.Sequence = seq - t.chanDeadDataNodes = make(chan *DataNode) - t.chanRecoveredDataNodes = make(chan *DataNode) t.chanFullVolumes = make(chan storage.VolumeInfo) err := t.loadConfiguration(confFile) @@ -80,7 +75,7 @@ func (t *Topology) Leader() (string, error) { func (t *Topology) loadConfiguration(configurationFile string) error { b, e := ioutil.ReadFile(configurationFile) if e == nil { - t.configuration, e = NewConfiguration(b) + t.Configuration, e = NewConfiguration(b) return e } glog.V(0).Infoln("Using default configurations.") @@ -147,35 +142,6 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { - t.Sequence.SetMax(*joinMessage.MaxFileKey) - dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) - if *joinMessage.IsInit && dn != nil { - t.UnRegisterDataNode(dn) - } - dn = rack.GetOrCreateDataNode(*joinMessage.Ip, - int(*joinMessage.Port), *joinMessage.PublicUrl, - int(*joinMessage.MaxVolumeCount)) - var volumeInfos []storage.VolumeInfo - for _, v := range joinMessage.Volumes { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } else { - glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) - } - } - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - t.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - t.UnRegisterVolumeLayout(v, dn) - } -} - func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { for _, c := range t.Children() { dc := c.(*DataCenter) diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 476aaf4d8..40019fdcd 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -31,12 +31,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { select { case v := <-t.chanFullVolumes: t.SetVolumeCapacityFull(v) - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - glog.V(0).Infoln("Recovered DataNode: %v", dn) - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - glog.V(0).Infof("Dead DataNode: %v", dn) } } }() @@ -64,11 +58,3 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.GetVolumes() { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) - if vl.isWritable(&v) { - vl.SetVolumeAvailable(dn, v.Id) - } - } -}