keep alive for gRpc calls

This commit is contained in:
Chris Lu 2018-07-03 19:07:55 -07:00
parent 28e5f20c8e
commit 77fc8c5914
8 changed files with 38 additions and 14 deletions

View file

@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"strings"
)
@ -129,7 +128,7 @@ func (fo *FilerOptions) start() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := grpc.NewServer()
grpcS := util.NewGrpcServer()
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)

View file

@ -16,7 +16,6 @@ import (
"strconv"
"io"
"time"
"google.golang.org/grpc"
"context"
"github.com/chrislusf/seaweedfs/weed/util"
)
@ -340,7 +339,7 @@ func detectMimeType(f *os.File) string {
func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure())
grpcConnection, err := util.GrpcDial(filerAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
}

View file

@ -14,7 +14,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@ -103,7 +102,7 @@ func runMaster(cmd *Command, args []string) bool {
httpL := m.Match(cmux.Any())
// Create your protocol servers.
grpcS := grpc.NewServer()
grpcS := util.NewGrpcServer()
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)

View file

@ -17,7 +17,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@ -208,7 +207,7 @@ func runServer(cmd *Command, args []string) bool {
httpL := m.Match(cmux.Any())
// Create your protocol servers.
grpcS := grpc.NewServer()
grpcS := util.NewGrpcServer()
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)

View file

@ -7,7 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/glog"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *Filer) GetMaster() string {
@ -48,7 +48,7 @@ func (fs *Filer) KeepConnectedToMaster() {
func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
grpcConnection, err := util.GrpcDial(master)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}

View file

@ -5,10 +5,10 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/karlseguin/ccache"
"google.golang.org/grpc"
"sync"
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
type WFS struct {
@ -43,7 +43,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure())
grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
}

View file

@ -8,7 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) GetMaster() string {
@ -38,7 +38,7 @@ func (vs *VolumeServer) heartbeat() {
func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) {
grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure())
grpcConection, err := util.GrpcDial(masterNode)
if err != nil {
return "", fmt.Errorf("fail to dial: %v", err)
}

View file

@ -0,0 +1,28 @@
package util
import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
func NewGrpcServer() *grpc.Server {
return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Second, // wait time before ping if no activity
Timeout: 20 * time.Second, // ping timeout
}), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 60 * time.Second, // min time a client should wait before sending a ping
}))
}
func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long
Timeout: 20 * time.Second,
}))
return grpc.Dial(address, opts...)
}