From 7c5c94785c5789a333987348c3a3e462c143e885 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 20 Mar 2014 11:07:15 -0700 Subject: [PATCH] switch to idle timeout instead of read timeout --- go/util/net_timeout.go | 65 +++++++++++++++++++ go/weed/master.go | 17 ++--- go/weed/server.go | 32 ++++----- go/weed/volume.go | 18 ++--- go/weed/weed_server/volume_server_handlers.go | 4 +- 5 files changed, 103 insertions(+), 33 deletions(-) create mode 100644 go/util/net_timeout.go diff --git a/go/util/net_timeout.go b/go/util/net_timeout.go new file mode 100644 index 000000000..410152a79 --- /dev/null +++ b/go/util/net_timeout.go @@ -0,0 +1,65 @@ +package util + +import ( + "net" + "time" +) + +// Listener wraps a net.Listener, and gives a place to store the timeout +// parameters. On Accept, it will wrap the net.Conn with our own Conn for us. +type Listener struct { + net.Listener + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (l *Listener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + tc := &Conn{ + Conn: c, + ReadTimeout: l.ReadTimeout, + WriteTimeout: l.WriteTimeout, + } + return tc, nil +} + +// Conn wraps a net.Conn, and sets a deadline for every read +// and write operation. +type Conn struct { + net.Conn + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (c *Conn) Read(b []byte) (int, error) { + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + if err != nil { + return 0, err + } + return c.Conn.Read(b) +} + +func (c *Conn) Write(b []byte) (int, error) { + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + if err != nil { + return 0, err + } + return c.Conn.Write(b) +} + +func NewListener(addr string, timeout time.Duration) (net.Listener, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + tl := &Listener{ + Listener: l, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + return tl, nil +} diff --git a/go/weed/master.go b/go/weed/master.go index f0b82d414..2d003185f 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -35,7 +35,7 @@ var ( mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") - mReadTimeout = cmdMaster.Flag.Int("readTimeout", 30, "connection read timeout in seconds") + mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") @@ -62,10 +62,12 @@ func runMaster(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport)) - srv := &http.Server{ - Addr: *masterIp + ":" + strconv.Itoa(*mport), - Handler: r, - ReadTimeout: time.Duration(*mReadTimeout) * time.Second, + listener, e := util.NewListener( + *masterIp+":"+strconv.Itoa(*mport), + time.Duration(*mTimeout)*time.Second, + ) + if e != nil { + glog.Fatalf(e.Error()) } go func() { @@ -78,9 +80,8 @@ func runMaster(cmd *Command, args []string) bool { ms.SetRaftServer(raftServer) }() - e := srv.ListenAndServe() - if e != nil { - glog.Fatalf("Fail to start:%s", e) + if e := http.Serve(listener, r); e != nil { + glog.Fatalf("Fail to serve:%s", e.Error()) } return true } diff --git a/go/weed/server.go b/go/weed/server.go index 21543b1d7..bb2f2b447 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -38,7 +38,7 @@ var cmdServer = &Command{ var ( serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - serverReadTimeout = cmdServer.Flag.Int("readTimeout", 30, "connection read timeout in seconds. Increase this if uploading large files.") + serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") @@ -109,10 +109,12 @@ func runServer(cmd *Command, args []string) bool { ) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort)) - masterServer := &http.Server{ - Addr: *serverIp + ":" + strconv.Itoa(*masterPort), - Handler: r, - ReadTimeout: time.Duration(*serverReadTimeout) * time.Second, + masterListener, e := util.NewListener( + *serverIp+":"+strconv.Itoa(*masterPort), + time.Duration(*serverTimeout)*time.Second, + ) + if e != nil { + glog.Fatalf(e.Error()) } go func() { @@ -128,9 +130,8 @@ func runServer(cmd *Command, args []string) bool { }() raftWaitForMaster.Done() - e := masterServer.ListenAndServe() - if e != nil { - glog.Fatalf("Fail to start master:%s", e) + if e := http.Serve(masterListener, r); e != nil { + glog.Fatalf("Master Fail to serve:%s", e.Error()) } }() @@ -142,14 +143,15 @@ func runServer(cmd *Command, args []string) bool { ) glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*serverIp+":"+strconv.Itoa(*volumePort)) - volumeServer := &http.Server{ - Addr: *serverIp + ":" + strconv.Itoa(*volumePort), - Handler: r, - ReadTimeout: (time.Duration(*serverReadTimeout) * time.Second), - } - e := volumeServer.ListenAndServe() + volumeListener, e := util.NewListener( + *serverIp+":"+strconv.Itoa(*volumePort), + time.Duration(*serverTimeout)*time.Second, + ) if e != nil { - glog.Fatalf("Fail to start volume:%s", e.Error()) + glog.Fatalf(e.Error()) + } + if e := http.Serve(volumeListener, r); e != nil { + glog.Fatalf("Fail to serve:%s", e.Error()) } return true diff --git a/go/weed/volume.go b/go/weed/volume.go index dcc2c1097..85ee4618e 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -32,7 +32,7 @@ var ( publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible :") masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") - vReadTimeout = cmdVolume.Flag.Int("readTimeout", 30, "connection read timeout in seconds. Increase this if uploading large files.") + vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") @@ -79,14 +79,16 @@ func runVolume(cmd *Command, args []string) bool { ) glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) - srv := &http.Server{ - Addr: *ip + ":" + strconv.Itoa(*vport), - Handler: r, - ReadTimeout: (time.Duration(*vReadTimeout) * time.Second), - } - e := srv.ListenAndServe() + listener, e := util.NewListener( + *ip+":"+strconv.Itoa(*vport), + time.Duration(*vTimeout)*time.Second, + ) if e != nil { - glog.Fatalf("Fail to start:%s", e.Error()) + glog.Fatalf(e.Error()) + } + + if e := http.Serve(listener, r); e != nil { + glog.Fatalf("Fail to serve:%s", e.Error()) } return true } diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index c015cfe40..fc7d91783 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -134,7 +134,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } if n.Cookie != cookie { - glog.V(0).Infoln("request with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) + glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) w.WriteHeader(http.StatusNotFound) return } @@ -235,7 +235,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } if n.Cookie != cookie { - glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) return }