use fixed list of masters in both filer and volume servers

This commit is contained in:
Chris Lu 2018-06-01 00:39:39 -07:00
parent a6f7f9b0b8
commit 43e3f5724c
22 changed files with 307 additions and 280 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
"strings"
) )
var ( var (
@ -20,7 +21,7 @@ var (
) )
type FilerOptions struct { type FilerOptions struct {
master *string masters *string
ip *string ip *string
port *int port *int
publicPort *int publicPort *int
@ -34,7 +35,7 @@ type FilerOptions struct {
func init() { func init() {
cmdFiler.Run = runFiler // break init cycle cmdFiler.Run = runFiler // break init cycle
f.master = cmdFiler.Flag.String("master", "localhost:9333", "master server location") f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
@ -47,8 +48,8 @@ func init() {
} }
var cmdFiler = &Command{ var cmdFiler = &Command{
UsageLine: "filer -port=8888 -master=<ip:port>", UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
Short: "start a file server that points to a master server", Short: "start a file server that points to a master server, or a list of master servers",
Long: `start a file server which accepts REST operation for any files. Long: `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created //create or overwrite the file, the directories /path/to will be automatically created
@ -83,8 +84,10 @@ func (fo *FilerOptions) start() {
publicVolumeMux = http.NewServeMux() publicVolumeMux = http.NewServeMux()
} }
masters := *f.masters
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux,
*fo.ip, *fo.port, *fo.master, *fo.collection, *fo.ip, *fo.port, strings.Split(masters, ","), *fo.collection,
*fo.defaultReplicaPlacement, *fo.redirectOnRead, *fo.disableDirListing, *fo.defaultReplicaPlacement, *fo.redirectOnRead, *fo.disableDirListing,
*fo.maxMB, *fo.maxMB,
*fo.secretKey, *fo.secretKey,

View file

@ -83,7 +83,6 @@ var (
func init() { func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
@ -108,7 +107,7 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true *isStartingFiler = true
} }
*filerOptions.master = *serverIp + ":" + strconv.Itoa(*masterPort) master := *serverIp + ":" + strconv.Itoa(*masterPort)
filerOptions.ip = serverIp filerOptions.ip = serverIp
if *filerOptions.defaultReplicaPlacement == "" { if *filerOptions.defaultReplicaPlacement == "" {
@ -251,7 +250,7 @@ func runServer(cmd *Command, args []string) bool {
*serverIp, *volumePort, *volumeServerPublicUrl, *serverIp, *volumePort, *volumeServerPublicUrl,
folders, maxCounts, folders, maxCounts,
volumeNeedleMapKind, volumeNeedleMapKind,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, []string{master}, *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
) )

View file

@ -26,7 +26,7 @@ type VolumeServerOptions struct {
ip *string ip *string
publicUrl *string publicUrl *string
bindIp *string bindIp *string
master *string masters *string
pulseSeconds *int pulseSeconds *int
idleConnectionTimeout *int idleConnectionTimeout *int
maxCpu *int maxCpu *int
@ -47,7 +47,7 @@ func init() {
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
@ -132,11 +132,14 @@ func runVolume(cmd *Command, args []string) bool {
case "btree": case "btree":
volumeNeedleMapKind = storage.NeedleMapBtree volumeNeedleMapKind = storage.NeedleMapBtree
} }
masters := *v.masters
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl, *v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.folders, v.folderMaxLimits,
volumeNeedleMapKind, volumeNeedleMapKind,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.readRedirect, *v.fixJpgOrientation, *v.readRedirect,
) )

View file

@ -13,14 +13,16 @@ import (
) )
type Filer struct { type Filer struct {
master string masters []string
store FilerStore store FilerStore
directoryCache *ccache.Cache directoryCache *ccache.Cache
currentMaster string
} }
func NewFiler(master string) *Filer { func NewFiler(masters []string) *Filer {
return &Filer{ return &Filer{
master: master, masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
} }
} }
@ -175,14 +177,12 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
} }
func (f *Filer) deleteChunks(entry *Entry) { func (f *Filer) deleteChunks(entry *Entry) {
if f.master == "" {
return
}
if entry == nil { if entry == nil {
return return
} }
for _, chunk := range entry.Chunks { for _, chunk := range entry.Chunks {
if err := operation.DeleteFile(f.master, chunk.FileId, ""); err != nil { if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err) glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
} }
} }

View file

@ -0,0 +1,60 @@
package filer2
import (
"fmt"
"context"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/glog"
"google.golang.org/grpc"
)
func (fs *Filer) GetMaster() string {
return fs.currentMaster
}
func (fs *Filer) KeepConnectedToMaster() {
glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
for _, master := range fs.masters {
glog.V(0).Infof("Connecting to %v", master)
withMasterClient(master, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
return err
}
glog.V(0).Infof("Connected to %v", master)
fs.currentMaster = master
for {
time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
if err = stream.Send(&master_pb.Empty{}); err != nil {
glog.V(0).Infof("failed to send to %s: %v", master, err)
return err
}
if _, err = stream.Recv(); err != nil {
glog.V(0).Infof("failed to receive from %s: %v", master, err)
return err
}
}
})
fs.currentMaster = ""
}
}
func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}
defer grpcConnection.Close()
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}

View file

@ -8,7 +8,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler("") filer := filer2.NewFiler(nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := &LevelDBStore{} store := &LevelDBStore{}

View file

@ -6,7 +6,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler("") filer := filer2.NewFiler(nil)
store := &MemDbStore{} store := &MemDbStore{}
store.Initialize(nil) store.Initialize(nil)
filer.SetStore(store) filer.SetStore(store)
@ -43,7 +43,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestCreateFileAndList(t *testing.T) { func TestCreateFileAndList(t *testing.T) {
filer := filer2.NewFiler("") filer := filer2.NewFiler(nil)
store := &MemDbStore{} store := &MemDbStore{}
store.Initialize(nil) store.Initialize(nil)
filer.SetStore(store) filer.SetStore(store)

View file

@ -12,6 +12,7 @@ It has these top-level messages:
Heartbeat Heartbeat
HeartbeatResponse HeartbeatResponse
VolumeInformationMessage VolumeInformationMessage
Empty
*/ */
package master_pb package master_pb
@ -235,10 +236,19 @@ func (m *VolumeInformationMessage) GetTtl() uint32 {
return 0 return 0
} }
type Empty struct {
}
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() { func init() {
proto.RegisterType((*Heartbeat)(nil), "master_pb.Heartbeat") proto.RegisterType((*Heartbeat)(nil), "master_pb.Heartbeat")
proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse") proto.RegisterType((*HeartbeatResponse)(nil), "master_pb.HeartbeatResponse")
proto.RegisterType((*VolumeInformationMessage)(nil), "master_pb.VolumeInformationMessage") proto.RegisterType((*VolumeInformationMessage)(nil), "master_pb.VolumeInformationMessage")
proto.RegisterType((*Empty)(nil), "master_pb.Empty")
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -253,6 +263,7 @@ const _ = grpc.SupportPackageIsVersion4
type SeaweedClient interface { type SeaweedClient interface {
SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error) SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error)
KeepConnected(ctx context.Context, opts ...grpc.CallOption) (Seaweed_KeepConnectedClient, error)
} }
type seaweedClient struct { type seaweedClient struct {
@ -294,10 +305,42 @@ func (x *seaweedSendHeartbeatClient) Recv() (*HeartbeatResponse, error) {
return m, nil return m, nil
} }
func (c *seaweedClient) KeepConnected(ctx context.Context, opts ...grpc.CallOption) (Seaweed_KeepConnectedClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Seaweed_serviceDesc.Streams[1], c.cc, "/master_pb.Seaweed/KeepConnected", opts...)
if err != nil {
return nil, err
}
x := &seaweedKeepConnectedClient{stream}
return x, nil
}
type Seaweed_KeepConnectedClient interface {
Send(*Empty) error
Recv() (*Empty, error)
grpc.ClientStream
}
type seaweedKeepConnectedClient struct {
grpc.ClientStream
}
func (x *seaweedKeepConnectedClient) Send(m *Empty) error {
return x.ClientStream.SendMsg(m)
}
func (x *seaweedKeepConnectedClient) Recv() (*Empty, error) {
m := new(Empty)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for Seaweed service // Server API for Seaweed service
type SeaweedServer interface { type SeaweedServer interface {
SendHeartbeat(Seaweed_SendHeartbeatServer) error SendHeartbeat(Seaweed_SendHeartbeatServer) error
KeepConnected(Seaweed_KeepConnectedServer) error
} }
func RegisterSeaweedServer(s *grpc.Server, srv SeaweedServer) { func RegisterSeaweedServer(s *grpc.Server, srv SeaweedServer) {
@ -330,6 +373,32 @@ func (x *seaweedSendHeartbeatServer) Recv() (*Heartbeat, error) {
return m, nil return m, nil
} }
func _Seaweed_KeepConnected_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SeaweedServer).KeepConnected(&seaweedKeepConnectedServer{stream})
}
type Seaweed_KeepConnectedServer interface {
Send(*Empty) error
Recv() (*Empty, error)
grpc.ServerStream
}
type seaweedKeepConnectedServer struct {
grpc.ServerStream
}
func (x *seaweedKeepConnectedServer) Send(m *Empty) error {
return x.ServerStream.SendMsg(m)
}
func (x *seaweedKeepConnectedServer) Recv() (*Empty, error) {
m := new(Empty)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Seaweed_serviceDesc = grpc.ServiceDesc{ var _Seaweed_serviceDesc = grpc.ServiceDesc{
ServiceName: "master_pb.Seaweed", ServiceName: "master_pb.Seaweed",
HandlerType: (*SeaweedServer)(nil), HandlerType: (*SeaweedServer)(nil),
@ -341,6 +410,12 @@ var _Seaweed_serviceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },
{
StreamName: "KeepConnected",
Handler: _Seaweed_KeepConnected_Handler,
ServerStreams: true,
ClientStreams: true,
},
}, },
Metadata: "seaweed.proto", Metadata: "seaweed.proto",
} }
@ -348,37 +423,39 @@ var _Seaweed_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) } func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 504 bytes of a gzipped FileDescriptorProto // 540 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x93, 0xdf, 0x6e, 0xd3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x93, 0xcf, 0x6e, 0xd3, 0x4e,
0x14, 0xc6, 0x49, 0x16, 0xda, 0xfa, 0x74, 0x1d, 0x9d, 0x85, 0x90, 0x05, 0x03, 0x4a, 0xb9, 0x89, 0x10, 0xc7, 0x7f, 0x76, 0xdd, 0xa4, 0x9e, 0x34, 0xfd, 0xa5, 0x2b, 0x84, 0xac, 0x52, 0x20, 0x84,
0x04, 0xaa, 0xd0, 0xb8, 0xe6, 0x86, 0x49, 0x88, 0x69, 0x20, 0x26, 0x17, 0xb8, 0x8d, 0xdc, 0xe4, 0x8b, 0x25, 0x50, 0x84, 0xca, 0x89, 0x03, 0x17, 0x22, 0x10, 0x55, 0x40, 0x54, 0x8e, 0xe0, 0x6a,
0x0c, 0x59, 0x73, 0xfe, 0xc8, 0x76, 0x47, 0xb3, 0x77, 0xe2, 0x2d, 0x78, 0x30, 0xe4, 0x93, 0xa6, 0x6d, 0xec, 0x29, 0x5a, 0x75, 0xbd, 0xb6, 0x76, 0x37, 0x25, 0xee, 0x4b, 0xf0, 0x24, 0xbc, 0x05,
0x9d, 0x10, 0xdc, 0x1d, 0xff, 0xce, 0xe7, 0xf8, 0xe4, 0xfb, 0x6c, 0x98, 0x38, 0x54, 0x3f, 0x11, 0x0f, 0x86, 0x76, 0x36, 0x4e, 0x22, 0xfe, 0xdc, 0x66, 0x3f, 0xf3, 0x1d, 0xcf, 0x78, 0xbe, 0xbb,
0x8b, 0x45, 0x63, 0x6b, 0x5f, 0x73, 0x56, 0x2a, 0xe7, 0xd1, 0x66, 0xcd, 0x6a, 0xfe, 0x2b, 0x06, 0x30, 0x34, 0xc8, 0xbf, 0x21, 0x96, 0xd3, 0x46, 0xd7, 0xb6, 0x66, 0x71, 0xc5, 0x8d, 0x45, 0x9d,
0xf6, 0x11, 0x95, 0xf5, 0x2b, 0x54, 0x9e, 0x1f, 0x41, 0xac, 0x1b, 0x11, 0xcd, 0xa2, 0x94, 0xc9, 0x37, 0xcb, 0xc9, 0x8f, 0x10, 0xe2, 0xf7, 0xc8, 0xb5, 0x5d, 0x22, 0xb7, 0xec, 0x04, 0x42, 0xd1,
0x58, 0x37, 0x9c, 0x43, 0xd2, 0xd4, 0xd6, 0x8b, 0x78, 0x16, 0xa5, 0x13, 0x49, 0x35, 0x7f, 0x0a, 0x24, 0xc1, 0x38, 0x48, 0xe3, 0x2c, 0x14, 0x0d, 0x63, 0x10, 0x35, 0xb5, 0xb6, 0x49, 0x38, 0x0e,
0xd0, 0xac, 0x57, 0x46, 0xe7, 0xd9, 0xda, 0x1a, 0x71, 0x40, 0x5a, 0xd6, 0x91, 0x6f, 0xd6, 0xf0, 0xd2, 0x61, 0x46, 0x31, 0x7b, 0x08, 0xd0, 0xac, 0x96, 0x52, 0x14, 0xf9, 0x4a, 0xcb, 0xe4, 0x80,
0x14, 0xa6, 0xa5, 0xda, 0x64, 0x37, 0xb5, 0x59, 0x97, 0x98, 0xe5, 0xf5, 0xba, 0xf2, 0x22, 0xa1, 0xb4, 0xb1, 0x27, 0x9f, 0xb5, 0x64, 0x29, 0x8c, 0x2a, 0xbe, 0xce, 0x6f, 0x6b, 0xb9, 0xaa, 0x30,
0xed, 0x47, 0xa5, 0xda, 0x7c, 0x27, 0x7c, 0x16, 0x28, 0x9f, 0xc1, 0x61, 0x50, 0x5e, 0x69, 0x83, 0x2f, 0xea, 0x95, 0xb2, 0x49, 0x44, 0xe5, 0x27, 0x15, 0x5f, 0x7f, 0x21, 0x3c, 0x73, 0x94, 0x8d,
0xd9, 0x35, 0xb6, 0xe2, 0xfe, 0x2c, 0x4a, 0x13, 0x09, 0xa5, 0xda, 0x7c, 0xd0, 0x06, 0x2f, 0xb0, 0xe1, 0xd8, 0x29, 0xaf, 0x85, 0xc4, 0xfc, 0x06, 0xdb, 0xe4, 0x70, 0x1c, 0xa4, 0x51, 0x06, 0x15,
0xe5, 0xcf, 0x61, 0x5c, 0x28, 0xaf, 0xb2, 0x1c, 0x2b, 0x8f, 0x56, 0x0c, 0xe8, 0x2c, 0x08, 0xe8, 0x5f, 0xbf, 0x13, 0x12, 0xe7, 0xd8, 0xb2, 0xc7, 0x30, 0x28, 0xb9, 0xe5, 0x79, 0x81, 0xca, 0xa2,
0x8c, 0x48, 0x98, 0xcf, 0xaa, 0xfc, 0x5a, 0x0c, 0xa9, 0x43, 0x75, 0x98, 0x4f, 0x15, 0xa5, 0xae, 0x4e, 0x7a, 0xd4, 0x0b, 0x1c, 0x9a, 0x11, 0x71, 0xf3, 0x69, 0x5e, 0xdc, 0x24, 0x7d, 0xca, 0x50,
0x32, 0x9a, 0x7c, 0x44, 0x47, 0x33, 0x22, 0x97, 0x61, 0xfc, 0x77, 0x30, 0xec, 0x66, 0x73, 0x82, 0xec, 0xe6, 0xe3, 0x65, 0x25, 0x54, 0x4e, 0x93, 0x1f, 0x51, 0xeb, 0x98, 0xc8, 0x95, 0x1b, 0xff,
0xcd, 0x0e, 0xd2, 0xf1, 0xe9, 0xcb, 0xc5, 0xce, 0x8d, 0x45, 0x37, 0xde, 0x79, 0x75, 0x55, 0xdb, 0x35, 0xf4, 0xfd, 0x6c, 0x26, 0x89, 0xc7, 0x07, 0xe9, 0xe0, 0xe2, 0xe9, 0x74, 0xbb, 0x8d, 0xa9,
0x52, 0x79, 0x5d, 0x57, 0x9f, 0xd1, 0x39, 0xf5, 0x03, 0x65, 0xbf, 0x67, 0xee, 0xe0, 0x78, 0x67, 0x1f, 0xef, 0x52, 0x5d, 0xd7, 0xba, 0xe2, 0x56, 0xd4, 0xea, 0x23, 0x1a, 0xc3, 0xbf, 0x62, 0xd6,
0x97, 0x44, 0xd7, 0xd4, 0x95, 0x43, 0x9e, 0xc2, 0x83, 0xae, 0xbf, 0xd4, 0xb7, 0xf8, 0x49, 0x97, 0xd5, 0x4c, 0x0c, 0x9c, 0x6e, 0xd7, 0x95, 0xa1, 0x69, 0x6a, 0x65, 0x90, 0xa5, 0xf0, 0xbf, 0xcf,
0xda, 0x93, 0x87, 0x89, 0xfc, 0x1b, 0xf3, 0x13, 0x60, 0x0e, 0x73, 0x8b, 0xfe, 0x02, 0x5b, 0x72, 0x2f, 0xc4, 0x1d, 0x7e, 0x10, 0x95, 0xb0, 0xb4, 0xc3, 0x28, 0xfb, 0x1d, 0xb3, 0x73, 0x88, 0x0d,
0x95, 0xc9, 0x3d, 0xe0, 0x8f, 0x60, 0x60, 0x50, 0x15, 0x68, 0xb7, 0xb6, 0x6e, 0x57, 0xf3, 0xdf, 0x16, 0x1a, 0xed, 0x1c, 0x5b, 0xda, 0x6a, 0x9c, 0xed, 0x00, 0xbb, 0x0f, 0x3d, 0x89, 0xbc, 0x44,
0x31, 0x88, 0xff, 0x8d, 0x46, 0x99, 0x15, 0x74, 0xde, 0x44, 0xc6, 0xba, 0x08, 0x9e, 0x38, 0x7d, 0xbd, 0x59, 0xeb, 0xe6, 0x34, 0xf9, 0x19, 0x42, 0xf2, 0xaf, 0xd1, 0xc8, 0xb3, 0x92, 0xfa, 0x0d,
0x8b, 0xf4, 0xf5, 0x44, 0x52, 0xcd, 0x9f, 0x01, 0xe4, 0xb5, 0x31, 0x98, 0x87, 0x8d, 0xdb, 0x8f, 0xb3, 0x50, 0x94, 0x6e, 0x27, 0x46, 0xdc, 0x21, 0x7d, 0x3d, 0xca, 0x28, 0x66, 0x8f, 0x00, 0x8a,
0xdf, 0x21, 0xc1, 0x33, 0x8a, 0x61, 0x1f, 0x57, 0x22, 0x59, 0x20, 0x5d, 0x52, 0x2f, 0xe0, 0xb0, 0x5a, 0x4a, 0x2c, 0x5c, 0xe1, 0xe6, 0xe3, 0x7b, 0xc4, 0xed, 0x8c, 0x6c, 0xd8, 0xd9, 0x15, 0x65,
0x40, 0x83, 0xbe, 0x17, 0x74, 0x49, 0x8d, 0x3b, 0xd6, 0x49, 0x5e, 0x03, 0xef, 0x96, 0x45, 0xb6, 0xb1, 0x23, 0xde, 0xa9, 0x27, 0x70, 0x5c, 0xa2, 0x44, 0xdb, 0x09, 0xbc, 0x53, 0x03, 0xcf, 0xbc,
0x6a, 0x77, 0xc2, 0x01, 0x09, 0xa7, 0xdb, 0xce, 0xfb, 0xb6, 0x57, 0x3f, 0x01, 0x66, 0x51, 0x15, 0xe4, 0x39, 0x30, 0x7f, 0x2c, 0xf3, 0x65, 0xbb, 0x15, 0xf6, 0x48, 0x38, 0xda, 0x64, 0xde, 0xb4,
0x59, 0x5d, 0x99, 0x96, 0xc2, 0x1b, 0xc9, 0x51, 0x00, 0x5f, 0x2a, 0xd3, 0xf2, 0x57, 0x70, 0x6c, 0x9d, 0xfa, 0x01, 0xc4, 0x1a, 0x79, 0x99, 0xd7, 0x4a, 0xb6, 0x64, 0xde, 0x51, 0x76, 0xe4, 0xc0,
0xb1, 0x31, 0x3a, 0x57, 0x59, 0x63, 0x54, 0x8e, 0x25, 0x56, 0x7d, 0x8e, 0xd3, 0x6d, 0xe3, 0xb2, 0x27, 0x25, 0x5b, 0xf6, 0x0c, 0x4e, 0x35, 0x36, 0x52, 0x14, 0x3c, 0x6f, 0x24, 0x2f, 0xb0, 0x42,
0xe7, 0x5c, 0xc0, 0xf0, 0x06, 0xad, 0x0b, 0xbf, 0xc5, 0x48, 0xd2, 0x2f, 0xf9, 0x14, 0x0e, 0xbc, 0xd5, 0xf9, 0x38, 0xda, 0x24, 0xae, 0x3a, 0xce, 0x12, 0xe8, 0xdf, 0xa2, 0x36, 0xee, 0xb7, 0x62,
0x37, 0x02, 0x88, 0x86, 0xf2, 0xf4, 0x2b, 0x0c, 0x97, 0xdd, 0x3b, 0xe0, 0xe7, 0x30, 0x59, 0x62, 0x92, 0x74, 0x47, 0x36, 0x82, 0x03, 0x6b, 0x65, 0x02, 0x44, 0x5d, 0x38, 0xe9, 0xc3, 0xe1, 0xdb,
0x55, 0xec, 0x6f, 0xfe, 0xc3, 0x3b, 0xb7, 0x60, 0x47, 0x1f, 0x9f, 0xfc, 0x8b, 0xf6, 0xb1, 0xcf, 0xaa, 0xb1, 0xed, 0xc5, 0xf7, 0x00, 0xfa, 0x0b, 0xff, 0x22, 0xd8, 0x25, 0x0c, 0x17, 0xa8, 0xca,
0xef, 0xa5, 0xd1, 0x9b, 0x68, 0x35, 0xa0, 0x37, 0xf5, 0xf6, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x1b, 0xb8, 0xb7, 0x77, 0x1f, 0xb6, 0xf4, 0xec, 0xfc, 0x6f, 0xb4, 0xbb, 0x00, 0x93, 0xff,
0x01, 0x14, 0xbb, 0x3a, 0x64, 0x03, 0x00, 0x00, 0xd2, 0xe0, 0x45, 0xc0, 0x5e, 0xc1, 0x70, 0x8e, 0xd8, 0xcc, 0x6a, 0xa5, 0xb0, 0xb0, 0x58, 0xb2,
0xd1, 0x5e, 0x11, 0x75, 0x3e, 0xfb, 0x83, 0xf8, 0xd2, 0x65, 0x8f, 0x1e, 0xe6, 0xcb, 0x5f, 0x01,
0x00, 0x00, 0xff, 0xff, 0x41, 0x64, 0x8a, 0xd9, 0xa9, 0x03, 0x00, 0x00,
} }

View file

@ -5,7 +5,10 @@ package master_pb;
////////////////////////////////////////////////// //////////////////////////////////////////////////
service Seaweed { service Seaweed {
rpc SendHeartbeat(stream Heartbeat) returns (stream HeartbeatResponse) {} rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}
rpc KeepConnected (stream Empty) returns (stream Empty) {
}
} }
////////////////////////////////////////////////// //////////////////////////////////////////////////
@ -21,6 +24,7 @@ message Heartbeat {
uint32 admin_port = 8; uint32 admin_port = 8;
repeated VolumeInformationMessage volumes = 9; repeated VolumeInformationMessage volumes = 9;
} }
message HeartbeatResponse { message HeartbeatResponse {
uint64 volumeSizeLimit = 1; uint64 volumeSizeLimit = 1;
string secretKey = 2; string secretKey = 2;
@ -39,3 +43,6 @@ message VolumeInformationMessage {
uint32 version = 9; uint32 version = 9;
uint32 ttl = 10; uint32 ttl = 10;
} }
message Empty {
}

View file

@ -88,7 +88,7 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
lookupResult, err := operation.LookupVolumeIds(fs.getMasterNode(), req.VolumeIds) lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -172,11 +172,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
if err = fs.filer.UpdateEntry(newEntry); err == nil { if err = fs.filer.UpdateEntry(newEntry); err == nil {
for _, garbage := range unusedChunks { for _, garbage := range unusedChunks {
glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId)) operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
} }
for _, garbage := range garbages { for _, garbage := range garbages {
glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size))
operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId)) operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId))
} }
} }
@ -190,7 +190,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
assignResult, err := operation.Assign(fs.master, &operation.VolumeAssignRequest{ assignResult, err := operation.Assign(fs.filer.GetMaster(), &operation.VolumeAssignRequest{
Count: uint64(req.Count), Count: uint64(req.Count),
Replication: req.Replication, Replication: req.Replication,
Collection: req.Collection, Collection: req.Collection,

View file

@ -1,12 +1,8 @@
package weed_server package weed_server
import ( import (
"math/rand"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
@ -14,16 +10,13 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
) )
type FilerServer struct { type FilerServer struct {
port string port string
master string masters []string
mnLock sync.RWMutex
collection string collection string
defaultReplication string defaultReplication string
redirectOnRead bool redirectOnRead bool
@ -31,16 +24,15 @@ type FilerServer struct {
secret security.Secret secret security.Secret
filer *filer2.Filer filer *filer2.Filer
maxMB int maxMB int
masterNodes *storage.MasterNodes
} }
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, collection string, func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, masters []string, collection string,
replication string, redirectOnRead bool, disableDirListing bool, replication string, redirectOnRead bool, disableDirListing bool,
maxMB int, maxMB int,
secret string, secret string,
) (fs *FilerServer, err error) { ) (fs *FilerServer, err error) {
fs = &FilerServer{ fs = &FilerServer{
master: master, masters: masters,
collection: collection, collection: collection,
defaultReplication: replication, defaultReplication: replication,
redirectOnRead: redirectOnRead, redirectOnRead: redirectOnRead,
@ -48,7 +40,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int,
maxMB: maxMB, maxMB: maxMB,
port: ip + ":" + strconv.Itoa(port), port: ip + ":" + strconv.Itoa(port),
} }
fs.filer = filer2.NewFiler(master)
if len(masters) == 0 {
glog.Fatal("master list is required!")
}
fs.filer = filer2.NewFiler(masters)
go fs.filer.KeepConnectedToMaster()
fs.filer.LoadConfiguration() fs.filer.LoadConfiguration()
@ -59,78 +58,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int,
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
} }
go func() {
connected := true
fs.masterNodes = storage.NewMasterNodes(fs.master)
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
for {
glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
master, err := fs.detectHealthyMaster(fs.getMasterNode())
if err == nil {
if !connected {
connected = true
if fs.getMasterNode() != master {
fs.setMasterNode(master)
}
glog.V(0).Infoln("Filer Server Connected with master at", master)
}
} else {
glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
if connected {
connected = false
}
}
if connected {
time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
}
}
}()
return fs, nil return fs, nil
} }
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId) return security.GenJwt(fs.secret, fileId)
} }
func (fs *FilerServer) getMasterNode() string {
fs.mnLock.RLock()
defer fs.mnLock.RUnlock()
return fs.master
}
func (fs *FilerServer) setMasterNode(masterNode string) {
fs.mnLock.Lock()
defer fs.mnLock.Unlock()
fs.master = masterNode
}
func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
if e = checkMaster(masterNode); e != nil {
fs.masterNodes.Reset()
for i := 0; i <= 3; i++ {
master, e = fs.masterNodes.FindMaster()
if e != nil {
continue
} else {
if e = checkMaster(master); e == nil {
break
}
}
}
} else {
master = masterNode
}
return
}
func checkMaster(masterNode string) error {
statUrl := "http://" + masterNode + "/stats/health"
glog.V(4).Infof("Connecting to %s ...", statUrl)
_, e := util.Get(statUrl)
return e
}

View file

@ -63,7 +63,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
fileId := entry.Chunks[0].FileId fileId := entry.Chunks[0].FileId
urlString, err := operation.LookupFileId(fs.getMasterNode(), fileId) urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -225,7 +225,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int
for _, chunkView := range chunkViews { for _, chunkView := range chunkViews {
urlString, err := operation.LookupFileId(fs.getMasterNode(), chunkView.FileId) urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err

View file

@ -32,7 +32,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else { } else {
fileId = entry.Chunks[0].FileId fileId = entry.Chunks[0].FileId
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -48,7 +48,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
Collection: collection, Collection: collection,
Ttl: r.URL.Query().Get("ttl"), Ttl: r.URL.Query().Get("ttl"),
} }
assignResult, ae := operation.Assign(fs.getMasterNode(), ar) assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar)
if ae != nil { if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error()) glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, http.StatusInternalServerError, ae) writeJsonError(w, r, http.StatusInternalServerError, ae)
@ -145,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" { if ret.Name != "" {
path += ret.Name path += ret.Name
} else { } else {
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError, writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name")) errors.New("Can not to write to folder "+path+" without a file name"))
@ -157,7 +157,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" { if r.Method != "PUT" {
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
oldFid := entry.Chunks[0].FileId oldFid := entry.Chunks[0].FileId
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer2.ErrNotFound { } else if err != nil && err != filer2.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
} }
@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}}, }},
} }
if db_err := fs.filer.CreateEntry(entry); db_err != nil { if db_err := fs.filer.CreateEntry(entry); db_err != nil {
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
return return

View file

@ -147,7 +147,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
for _, chunk := range entry.Chunks { for _, chunk := range entry.Chunks {
oldFid := chunk.FileId oldFid := chunk.FileId
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
} }
} else if err != nil { } else if err != nil {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)

View file

@ -77,3 +77,15 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
} }
} }
func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
for {
_, err := stream.Recv()
if err != nil {
return err
}
if err := stream.Send(&master_pb.Empty{}); err != nil {
return err
}
}
}

View file

@ -7,49 +7,51 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
func (vs *VolumeServer) heartbeat() { func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes)
vs.masterNodes = storage.NewMasterNodes(vs.masterNode)
vs.store.SetDataCenter(vs.dataCenter) vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack) vs.store.SetRack(vs.rack)
var err error
var newLeader string
for { for {
err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second) for _, master := range vs.MasterNodes {
if newLeader != "" {
master = newLeader
}
newLeader, err = vs.doHeartbeat(master, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil { if err != nil {
glog.V(0).Infof("heartbeat error: %v", err) glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
} }
} }
} }
func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
vs.masterNodes.Reset()
masterNode, err := vs.masterNodes.FindMaster()
if err != nil {
return fmt.Errorf("No master found: %v", err)
} }
func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) {
grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure()) grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure())
if err != nil { if err != nil {
return fmt.Errorf("fail to dial: %v", err) return "", fmt.Errorf("fail to dial: %v", err)
} }
defer grpcConection.Close() defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection) client := master_pb.NewSeaweedClient(grpcConection)
stream, err := client.SendHeartbeat(context.Background()) stream, err := client.SendHeartbeat(context.Background())
if err != nil { if err != nil {
glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err) glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return err return "", err
} }
vs.SetMasterNode(masterNode) glog.V(0).Infof("Heartbeat to: %v", masterNode)
glog.V(0).Infof("Heartbeat to %s", masterNode) vs.currentMaster = masterNode
vs.store.Client = stream vs.store.Client = stream
defer func() { vs.store.Client = nil }() defer func() { vs.store.Client = nil }()
@ -70,7 +72,8 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
vs.guard.SecretKey = security.Secret(in.GetSecretKey()) vs.guard.SecretKey = security.Secret(in.GetSecretKey())
} }
if in.GetLeader() != "" && masterNode != in.GetLeader() { if in.GetLeader() != "" && masterNode != in.GetLeader() {
vs.masterNodes.SetPossibleLeader(in.GetLeader()) glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
newLeader = in.GetLeader()
doneChan <- nil doneChan <- nil
return return
} }
@ -79,7 +82,7 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return err return "", err
} }
tickChan := time.Tick(sleepInterval) tickChan := time.Tick(sleepInterval)
@ -89,11 +92,10 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
case <-tickChan: case <-tickChan:
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return err return "", err
} }
case err := <-doneChan: case <-doneChan:
glog.V(0).Infof("Volume Server heart beat stops with %v", err) return
return err
} }
} }
} }

View file

@ -2,22 +2,19 @@ package weed_server
import ( import (
"net/http" "net/http"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
) )
type VolumeServer struct { type VolumeServer struct {
masterNode string MasterNodes []string
mnLock sync.RWMutex currentMaster string
pulseSeconds int pulseSeconds int
dataCenter string dataCenter string
rack string rack string
store *storage.Store store *storage.Store
guard *security.Guard guard *security.Guard
masterNodes *storage.MasterNodes
needleMapKind storage.NeedleMapType needleMapKind storage.NeedleMapType
FixJpgOrientation bool FixJpgOrientation bool
@ -28,7 +25,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string, port int, publicUrl string,
folders []string, maxCounts []int, folders []string, maxCounts []int,
needleMapKind storage.NeedleMapType, needleMapKind storage.NeedleMapType,
masterNode string, pulseSeconds int, masterNodes []string, pulseSeconds int,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, whiteList []string,
fixJpgOrientation bool, fixJpgOrientation bool,
@ -41,7 +38,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect, ReadRedirect: readRedirect,
} }
vs.SetMasterNode(masterNode) vs.MasterNodes = masterNodes
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")
@ -76,18 +73,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
return vs return vs
} }
func (vs *VolumeServer) GetMasterNode() string {
vs.mnLock.RLock()
defer vs.mnLock.RUnlock()
return vs.masterNode
}
func (vs *VolumeServer) SetMasterNode(masterNode string) {
vs.mnLock.Lock()
defer vs.mnLock.Unlock()
vs.masterNode = masterNode
}
func (vs *VolumeServer) Shutdown() { func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...") glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close() vs.store.Close()

View file

@ -46,7 +46,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 { if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
@ -176,7 +176,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
chunkedFileReader := &operation.ChunkedFileReader{ chunkedFileReader := &operation.ChunkedFileReader{
Manifest: chunkManifest, Manifest: chunkManifest,
Master: vs.GetMasterNode(), Master: vs.GetMaster(),
} }
defer chunkedFileReader.Close() defer chunkedFileReader.Close()
if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil { if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil {

View file

@ -21,14 +21,14 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
} }
args := struct { args := struct {
Version string Version string
Master string Masters []string
Volumes interface{} Volumes interface{}
DiskStatuses interface{} DiskStatuses interface{}
Stats interface{} Stats interface{}
Counters *stats.ServerStats Counters *stats.ServerStats
}{ }{
util.VERSION, util.VERSION,
vs.masterNode, vs.MasterNodes,
vs.store.Status(), vs.store.Status(),
ds, ds,
infos, infos,

View file

@ -31,7 +31,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
ret := operation.UploadResult{} ret := operation.UploadResult{}
size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), size, errorStatus := topology.ReplicatedWrite(vs.GetMaster(),
vs.store, volumeId, needle, r) vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated httpStatus := http.StatusCreated
if errorStatus != "" { if errorStatus != "" {
@ -80,14 +80,14 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// make sure all chunks had deleted before delete manifest // make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil { if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return return
} }
count = chunkManifest.Size count = chunkManifest.Size
} }
_, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
if err == nil { if err == nil {
m := make(map[string]int64) m := make(map[string]int64)

View file

@ -72,8 +72,8 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<h2>System Stats</h2> <h2>System Stats</h2>
<table class="table table-condensed table-striped"> <table class="table table-condensed table-striped">
<tr> <tr>
<th>Master</th> <th>Masters</th>
<td><a href="http://{{.Master}}/ui/index.html">{{.Master}}</a></td> <td>{{.Masters}}</td>
</tr> </tr>
<tr> <tr>
<th>Weekly # ReadRequests</th> <th>Weekly # ReadRequests</th>

View file

@ -1,13 +1,11 @@
package storage package storage
import ( import (
"errors"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
) )
@ -15,55 +13,6 @@ const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
) )
type MasterNodes struct {
nodes []string
leader string
possibleLeader string
}
func (mn *MasterNodes) String() string {
return fmt.Sprintf("nodes:%v, leader:%s", mn.nodes, mn.leader)
}
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, leader: ""}
return
}
func (mn *MasterNodes) Reset() {
if mn.leader != "" {
mn.leader = ""
glog.V(0).Infof("Resetting master nodes: %v", mn)
}
}
func (mn *MasterNodes) SetPossibleLeader(possibleLeader string) {
// TODO try to check this leader first
mn.possibleLeader = possibleLeader
}
func (mn *MasterNodes) FindMaster() (leader string, err error) {
if len(mn.nodes) == 0 {
return "", errors.New("No master node found!")
}
if mn.leader == "" {
for _, m := range mn.nodes {
glog.V(4).Infof("Listing masters on %s", m)
if leader, masters, e := operation.ListMasters(m); e == nil {
if leader != "" {
mn.nodes = append(masters, m)
mn.leader = leader
glog.V(2).Infof("current master nodes is %v", mn)
break
}
} else {
glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
}
}
}
if mn.leader == "" {
return "", errors.New("No master node available!")
}
return mn.leader, nil
}
/* /*
* A VolumeServer contains one Store * A VolumeServer contains one Store
*/ */