From c012902c16878613c30707f9b83c66890c753eb0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Mar 2021 22:02:40 -0700 Subject: [PATCH] testing udp This reverts commit 69694a17be1ef8817d9afb3a1265e605298e7ab3. --- weed/command/volume.go | 11 +- weed/server/volume_server_udp_handlers.go | 4 +- weed/wdclient/penet/penet.go | 1048 +++++++++++++++++++++ weed/wdclient/volume_udp_client.go | 40 +- 4 files changed, 1067 insertions(+), 36 deletions(-) create mode 100644 weed/wdclient/penet/penet.go diff --git a/weed/command/volume.go b/weed/command/volume.go index ab3c63a9a..eb89027ab 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -3,7 +3,7 @@ package command import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/udptransfer" + "github.com/chrislusf/seaweedfs/weed/wdclient/penet" "net" "net/http" httppprof "net/http/pprof" @@ -401,13 +401,7 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) { listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001) - listener, err := udptransfer.NewEndpoint(&udptransfer.Params{ - LocalAddr: listeningAddress, - Bandwidth: 100, - FastRetransmit: true, - FlatTraffic: true, - IsServ: true, - }) + listener, err := penet.Listen("", listeningAddress) if err != nil { glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err) } @@ -416,7 +410,6 @@ func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeSer for { conn, err := listener.Accept() if err == nil { - glog.V(0).Infof("Client from %s", conn.RemoteAddr()) go volumeServer.HandleUdpConnection(conn) } else if isTemporaryError(err) { continue diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go index 974ec78d2..cd6cb5e6e 100644 --- a/weed/server/volume_server_udp_handlers.go +++ b/weed/server/volume_server_udp_handlers.go @@ -10,8 +10,6 @@ import ( func (vs *VolumeServer) HandleUdpConnection(c net.Conn) { defer c.Close() - glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) - bufReader := bufio.NewReaderSize(c, 1024*1024) bufWriter := bufio.NewWriterSize(c, 1024*1024) @@ -19,7 +17,7 @@ func (vs *VolumeServer) HandleUdpConnection(c net.Conn) { cmd, err := bufReader.ReadString('\n') if err != nil { if err != io.EOF { - glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + // glog.Errorf("read command from: %v", err) } return } diff --git a/weed/wdclient/penet/penet.go b/weed/wdclient/penet/penet.go new file mode 100644 index 000000000..9aa00cc7d --- /dev/null +++ b/weed/wdclient/penet/penet.go @@ -0,0 +1,1048 @@ +package penet + +import ( + "container/list" + crand "crypto/rand" + "encoding/binary" + "errors" + "github.com/chrislusf/seaweedfs/weed/glog" + mrand "math/rand" + "net" + "runtime" + "runtime/debug" + "sync" + "time" +) + +type DataSend struct { + Seq uint32 + Acked bool + Resend byte + Fast bool + Code byte + Time uint32 + Data []byte +} + +type DataRecv struct { + Seq uint32 + AckCnt byte + Code byte + Data []byte +} + +type UdpSend struct { + Id uint64 + sock *net.UDPConn + remote *net.UDPAddr + seq uint32 + rtt float64 + rttMax float64 + rttMin float64 + rate uint32 + mss uint32 + interval uint32 + data []DataSend + dataBegin int + dataLen int + sendRnd int + sendList *list.List + sendListLock sync.Mutex + writable chan bool + writeMax int + isClose bool + closing bool + opening byte + conn *Conn + resendCnt int + name string + acked uint32 + recvWnd uint32 +} + +const ( + TypeData uint8 = 1 + TypeAck uint8 = 2 + TypeClose uint8 = 3 + TypeSYN uint8 = 8 +) + +var ( + ErrClose = errors.New("conn close") + ErrTimeout = errors.New("conn timeout") + + mss uint32 = 1200 + defaultRate = mss * 3000 + dropRate = 0.0 + writeMaxSep = 5 + resendLimit = true +) + +func NewUdpSend(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpSend { + u := &UdpSend{ + conn: conn, + Id: id, + sock: sock, + remote: remote, + mss: mss, + interval: 20, + data: make([]DataSend, 4), + seq: 1, + rate: defaultRate, // 修复bug: 太高导致压测的时候内存爆炸,速度慢 + rtt: 200, + rttMax: 200, + rttMin: 200, + sendList: list.New(), + writable: make(chan bool, 1), + name: name, + } + u.writeMax = int(u.rate/u.mss) / writeMaxSep // fixed bug: 初始化 修复bug: 写入太多,应该一点点写 + u.recvWnd = uint32(u.writeMax) + return u +} + +func (u *UdpSend) send(nowTime time.Time, buf []byte) { + var sendMax = int(u.rate / u.mss) + u.writeMax = sendMax / writeMaxSep + // glog.V(4).Info("send", u.dataLen, u.name, sendMax, uint32(u.rtt)) + var sendCount = uint32(u.rate / u.mss / (1000 / u.interval)) + var sendTotal = sendCount + if sendCount > u.recvWnd/5 { + sendCount = u.recvWnd / 5 + } + if sendCount <= 0 { + sendCount = 1 + } + + now := uint32(nowTime.UnixNano() / int64(time.Millisecond)) + resendCnt := 0 + for i := 0; i < u.dataLen; i++ { + if sendCount <= 0 { + break + } + index := (i + u.dataBegin) % len(u.data) + d := &u.data[index] + if d.Acked == false && now-d.Time >= uint32(u.rtt*2.0) { + glog.V(4).Infof("resend, seq:%v id:%v rtt:%v name:%v", d.Seq, u.Id, uint32(u.rtt), u.name) + // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6 + // 发送窗口就是配置值,其实发送窗口不需要发送出去 + // 接收端能发送了,要告诉对面开始发送。这个逻辑可以通过接受端的发送通道来发送。 + headLen := structPack(buf, "BBQHIII", uint8(TypeData), + d.Code, uint64(u.Id), uint16(len(d.Data)+4+4+4), d.Seq, uint32(now), uint32(sendMax)) + copy(buf[headLen:], d.Data) + u.sock.WriteToUDP(buf[:headLen+len(d.Data)], u.remote) + d.Time = now + d.Resend++ + d.Fast = false + sendCount-- + resendCnt++ + u.resendCnt++ + } + + } + + u.sendRnd++ + if u.sendRnd > 100 && u.dataLen > 0 { + u.sendRnd = 0 + + var resendMax = uint32(float64(u.dataLen) * 0.6) + if u.resendCnt > 2 { + glog.V(0).Infof("resendCnt:%v resendMax:%v remain:%v rtt:%v id:%v", u.resendCnt, + resendMax, u.dataLen, uint32(u.rtt), u.Id) + u.resendCnt = 0 + } + // 修复bug: 之前使用lastrecvtime,存在新增发送数据数据那瞬间,出现超时情况 + if u.data[u.dataBegin].Resend > 10 { + glog.V(0).Infof("resend too much, close, id:%v name:%v drop seq:%v remain:%v", + u.Id, u.name, u.data[u.dataBegin].Seq, u.dataLen) + u.conn.close(true, true) + } + } + + var resendToomuch bool + if uint32(resendCnt) > sendTotal/3 && resendLimit { // 修复bug: 限速 + resendToomuch = true + glog.V(4).Infof("resend too much, slow, %v %v", resendCnt, sendTotal) + } + + u.sendListLock.Lock() + for ; sendCount > 0; sendCount-- { + sendData := u.sendList.Front() + if sendData == nil { + break + } + + if sendMax-u.dataLen <= 0 || resendToomuch { + break + } + if u.dataLen >= len(u.data) { + newData := make([]DataSend, 2*len(u.data)) + copy(newData, u.data[u.dataBegin:]) + copy(newData[len(u.data)-u.dataBegin:], u.data[:u.dataBegin]) + u.dataBegin = 0 + u.data = newData + } + + u.sendList.Remove(sendData) + hType := uint8(0) + sdata, ok := sendData.Value.([]byte) + if !ok { + // 发送控制消息 + mesCode, _ := sendData.Value.(byte) + hType = mesCode + } + headLen := structPack(buf, "BBQHIII", uint8(TypeData), hType, + uint64(u.Id), uint16(len(sdata)+4+4+4), u.seq, uint32(now), uint32(sendMax)) + copy(buf[headLen:], sdata) + u.sock.WriteToUDP(buf[:headLen+len(sdata)], u.remote) + glog.V(4).Infof("send, seq:%v id:%v %v", u.seq, u.Id, u.name) + + index := (u.dataBegin + u.dataLen) % len(u.data) + u.data[index] = DataSend{ + Seq: u.seq, + Time: now, + Code: hType, + Data: sdata, + } + u.seq++ + u.dataLen++ + } + listRemain := u.writeMax - u.sendList.Len() + u.sendListLock.Unlock() + + if listRemain > 0 { + select { + case u.writable <- !u.isClose: + default: + } + } + + if u.isClose && u.dataLen == 0 && u.closing == false { // 修复bug: 之前5秒删除,实际发送没完成 + // 修复bug: 接受端write端close为true, 导致5秒后呗删除,来不及接受数据。 + glog.V(1).Infof("close and emtpy, rm conn, id:%v name:%v seq:%v listlen:%v", + u.Id, u.name, u.seq, u.sendList.Len()) + u.closing = true + u.conn.close(false, true) + } +} + +func testDrop() bool { + if dropRate < 0.01 { + return false + } + var v uint32 + var b [4]byte + if _, err := crand.Read(b[:]); err != nil { + v = mrand.Uint32() + } else { + v = binary.BigEndian.Uint32(b[:]) + } + if v%1000 < uint32(dropRate*1000) { + return true + } + return false +} + +func (u *UdpSend) recv(buf []byte) { + + if testDrop() { + return + } + + now := uint32(time.Now().UnixNano() / int64(time.Millisecond)) + + // ack包: type0 flag1 id2 len3 tm4 rcvwnd5 acked6 + head, headLen := structUnPack(buf, "BBQHIII") + if head[0] == uint64(TypeAck) && u.dataLen > 0 { + + firstSeq := u.data[u.dataBegin].Seq + acked := uint32(head[6]) + offset := int(int32(acked - firstSeq)) + glog.V(4).Infof("id:%v recv ack: %v offset:%v databegin:%v dataLen:%v firstSeq:%v name:%v", + u.Id, acked, offset, u.dataBegin, u.dataLen, firstSeq, u.name) + if offset >= 0 && offset < u.dataLen { + offset++ + for i := 0; i < offset; i++ { // 修复bug: 清除引用等 + index := (u.dataBegin + i) % len(u.data) // 修复bug,i写成offset + d := &u.data[index] + d.Data = nil + d.Acked = true + } + u.acked += uint32(offset) + u.dataBegin += offset + u.dataBegin = u.dataBegin % len(u.data) + u.dataLen -= offset + glog.V(4).Infof("2 acked ok:%v, id:%v %v", u.acked, u.Id, u.name) + } + if u.dataLen > 0 { + curSeq := u.data[u.dataBegin].Seq + // var ackedSeq = []uint32{} + for i := headLen; i < len(buf); i += 4 { + seq := binary.BigEndian.Uint32(buf[i:]) + offset := int(int32(seq - curSeq)) + if offset < 0 || offset >= u.dataLen { + continue + } + index := (u.dataBegin + offset) % len(u.data) + d := &u.data[index] + if d.Seq == seq { + d.Acked = true + d.Data = nil // 修复bug: memory leak + // ackedSeq = append(ackedSeq, seq) + } else { + panic("index not correct") + } + } + // if len(ackedSeq) > 0 { + // glog.V(0).Info("seq:", ackedSeq) + // } + } + + var i = 0 + for ; i < u.dataLen; i++ { + index := (i + u.dataBegin) % len(u.data) + d := &u.data[index] + if d.Acked == false { + break + } + // fmt.Println("acked->", d.Seq) + // if d.Seq == 7 { + // fmt.Println("data:", u.data[u.dataBegin:u.dataBegin+5]) + // } + } + if i > 0 { + u.acked += uint32(i) + u.dataBegin += i + u.dataBegin = u.dataBegin % len(u.data) + u.dataLen -= i + glog.V(4).Infof("3 acked ok:%v, id:%v %v %v", u.acked, u.Id, u.dataBegin, u.name) + } + + sendTime := uint32(head[4]) + rtt := now - sendTime + if rtt > 0 { + if firstSeq < 3 { + u.rtt = float64(rtt) // 初始值 + } else { + u.rtt = u.rtt*0.8 + float64(rtt)*0.2 + } + if u.rtt < 50.0 { + u.rtt = 50 + } + // glog.V(4).Infof("rtt:%v u.rtt:%v id:%v", rtt, u.rtt, u.Id) + } + + u.recvWnd = uint32(head[5]) + } + +} + +func structPack(b []byte, format string, param ...interface{}) int { + j := 0 + for i, s := range format { + switch s { + case 'I': + p, _ := param[i].(uint32) + binary.BigEndian.PutUint32(b[j:], p) + j += 4 + case 'B': + p, _ := param[i].(uint8) + b[j] = p + j++ + case 'H': + p, _ := param[i].(uint16) + binary.BigEndian.PutUint16(b[j:], p) + j += 2 + case 'Q': + p, _ := param[i].(uint64) + binary.BigEndian.PutUint64(b[j:], p) + j += 8 + default: + panic("structPack not found") + } + } + return j +} + +func structUnPack(b []byte, format string) ([]uint64, int) { + var re = make([]uint64, 0, len(format)) + defer func() { + if err := recover(); err != nil { + // log.Error(err) + re[0] = 0 + } + }() + j := 0 + for _, s := range format { + switch s { + case 'I': + re = append(re, uint64(binary.BigEndian.Uint32(b[j:]))) + j += 4 + case 'B': + re = append(re, uint64(b[j])) + j++ + case 'H': + re = append(re, uint64(binary.BigEndian.Uint16(b[j:]))) + j += 2 + case 'Q': + re = append(re, uint64(binary.BigEndian.Uint64(b[j:]))) + j += 8 + default: + panic("structUnPack not found") + } + } + return re, j +} + +type UdpRecv struct { + conn *Conn + Id uint64 + sock *net.UDPConn + remote *net.UDPAddr + acked uint32 + lastTm uint32 + sndWnd uint32 + recvCnt uint32 + isClose bool + isNew byte + isRecved bool + recvList *list.List + recvListLock sync.Mutex + readable chan byte + readDeadline *time.Time + seqData map[uint32]*DataRecv + dataList *list.List + name string +} + +func NewUdpRecv(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpRecv { + return &UdpRecv{ + Id: id, + conn: conn, + sock: sock, + remote: remote, + acked: 0, + recvList: list.New(), + dataList: list.New(), + seqData: make(map[uint32]*DataRecv), + readable: make(chan byte, 1), + isNew: 50, + name: name, + sndWnd: 1000, + } +} + +func (u *UdpRecv) SetReadDeadline(t time.Time) { + u.readDeadline = &t +} + +func (u *UdpRecv) sendAck(nowTime time.Time, buf []byte) { + + u.recvListLock.Lock() + for { + if d, ok := u.seqData[u.acked+1]; ok { + u.acked++ + if d.Code == TypeClose { + if u.isClose == false { + glog.V(1).Info("recv close:", u.Id) + u.conn.close(false, true) + u.isClose = true + } + } else { + u.recvList.PushBack(d.Data) + } + d.Data = nil + delete(u.seqData, u.acked) + } else { + break + } + } + recvListLen := u.recvList.Len() + u.recvListLock.Unlock() + + // glog.V(4).Info("acked:", u.acked, u.Id, recvListLen, u.name) + if recvListLen > 0 { // 修复bug,用标志可能会有读不到的数据 修复bug: 去掉 && u.isClose == false,导致读取延迟 + select { + case u.readable <- 1: + default: + } + } else if u.isClose == true { // 修复bug:快速close + select { + case u.readable <- 0: + default: + } + } + if u.readDeadline != nil && !u.readDeadline.IsZero() { + if u.readDeadline.Before(nowTime) { // 修复bug after + select { + case u.readable <- 2: + glog.V(0).Info("read dealline: ", u.Id) + default: + } + } + } + + var b = buf[:mss] + buf = buf[mss:] + var n = 0 + for i := u.dataList.Front(); i != nil; { + d := i.Value.(*DataRecv) + + next := i.Next() + if d.AckCnt > 6 || before(d.Seq, u.acked+1) { // 发几次够了,不反复发 + // d.Removed = true + u.dataList.Remove(i) // 修复bug,删除逻辑不对,需要保存next + // delete(u.seqData, d.Seq) // 修复bug: 对面已经确认,这里删了,没有重发,也没有了数据。已经收到的数据并且发了ack的数据,不要删了! + i = next + continue + } + i = next + + if d.AckCnt%3 == 0 { + binary.BigEndian.PutUint32(b[n:], d.Seq) + n += 4 + if n >= len(b) { + wnd := int(u.sndWnd) - recvListLen - len(u.seqData) + if wnd < 0 { + wnd = 0 + } + headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n), + u.lastTm, uint32(wnd), u.acked) + copy(buf[headLen:], b[:n]) + u.sock.WriteToUDP(buf[:headLen+n], u.remote) + glog.V(4).Infof("id:%v send ack n:%v", u.Id, n) + n = 0 // 修复bug,没有置零 + u.isRecved = false // 修复bug: 有时候发多一条数据 + } + } + d.AckCnt++ + } + if n > 0 || u.isRecved { // 修复bug: 一直发数据 修复bug: 有时候发多一条数据 + wnd := int(u.sndWnd) - recvListLen - len(u.seqData) + if wnd < 0 { + wnd = 0 + } + headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n), + u.lastTm, uint32(wnd), u.acked) + copy(buf[headLen:], b[:n]) + u.sock.WriteToUDP(buf[:headLen+n], u.remote) + glog.V(4).Infof("id:%v send ack n:%v datalen:%v", u.Id, n, u.dataList.Len()) + } + u.isRecved = false + + // 修复bug: 如果自己只是发数据,那么自己的acked通道会没用到。所以要判断自己是否在发送数据。 + if u.isNew > 0 { // 完成的优化:超时不确认第一包,就删除链接。防止旧链接不断发包。 + u.isNew-- + if u.isNew == 0 && u.acked == 0 { + glog.V(0).Infof("not recv first packet!!! close, id:%v name:%v", u.Id, u.name) + u.conn.Close() + } + if u.acked >= 1 || u.conn.s.seq > 1 { + u.isNew = 0 + } + } + +} + +// before seq1比seq2小 +func before(seq1, seq2 uint32) bool { + return (int32)(seq1-seq2) < 0 +} + +func after(seq1, seq2 uint32) bool { + return (int32)(seq2-seq1) < 0 +} + +func (u *UdpRecv) recv(buf []byte) { + + if testDrop() { + return + } + + u.isRecved = true + u.recvCnt++ + // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6 + head, headLen := structUnPack(buf, "BBQHIII") + if head[0] == uint64(TypeData) { + + seq := uint32(head[4]) + u.lastTm = uint32(head[5]) + u.sndWnd = uint32(head[6]) + + glog.V(4).Info(u.Id, " recv seq: ", seq, " len: ", u.dataList.Len()) + + if before(seq, u.acked+1) { // 修复bug: 收到before的数据,seqData不会回收。 + // glog.V(0).Info("seq before u.acked:", seq, u.acked, u.Id) + return + } + + // 修复bug: 修复没重发问题。之前由于一直会重发,这个逻辑有意义。现在只重发几次。 + // if d, ok := u.seqData[seq]; ok { + // d.AckCnt = 0 + // if d.Removed == false { // 修复bug:可能没ack导致一直重发,要直到acked全部覆盖。 + // return + // } + // } + + // glog.V(4).Info(u.Id, " recv 2 seq: ", seq, " len: ", u.dataList.Len()) + + d := &DataRecv{ + Seq: seq, + Data: buf[headLen:], + Code: byte(head[1]), + } + u.dataList.PushBack(d) + u.seqData[seq] = d + + } + +} + +func SetRate(rate uint32) { + defaultRate = rate +} + +func SetDropRate(rate float64) { + if rate > 0.001 { + dropRate = rate + } +} + +type Conn struct { + Id uint64 + s *UdpSend + r *UdpRecv + responsed chan bool + isClose bool + isSendClose bool + isRmConn bool + conns *Conns +} + +func NewConn(conns *Conns, Id uint64, localConn *net.UDPConn, remote *net.UDPAddr, name string) *Conn { + conn := &Conn{ + Id: Id, + conns: conns, + responsed: make(chan bool, 1), + } + conn.s = NewUdpSend(conn, conn.Id, localConn, remote, name) + conn.r = NewUdpRecv(conn, conn.Id, localConn, remote, name) + return conn +} + +func (c *Conn) Write(bb []byte) (n int, err error) { + + if c.isClose { + return 0, ErrClose + } + + // 1.对方告诉你满了,是要叫你不要发数据了,而不是还发数据。 + // 2.没有窗口就没法快速地确定对面有没有满,如果你要等到自己的的buffer满,那么可能会比较慢感知到 + // 固定buffer + 停止通知 + b := make([]byte, len(bb)) // 修复bug1:没有拷贝 + copy(b, bb) + for { + c.s.sendListLock.Lock() + remain := c.s.writeMax - c.s.sendList.Len() + for { + if len(b) <= 0 { + break + } + if remain <= 0 { + break + } + remain-- + var sendLen = int(mss) + if len(b) < sendLen { + // 实际发送值 + sendLen = len(b) + } + n += sendLen + c.s.sendList.PushBack(b[:sendLen]) + // fmt.Println("write:", b[:10]) + b = b[sendLen:] + } + c.s.sendListLock.Unlock() + if len(b) <= 0 { + break + } + // glog.V(0).Info("wait write: ", c.Id) + w := <-c.s.writable + if w == false { + return n, ErrClose + } + } + + return n, nil +} + +func (c *Conn) Read(b []byte) (n int, err error) { + for { + c.r.recvListLock.Lock() + for { + f := c.r.recvList.Front() + if f != nil { + data := f.Value.([]byte) + copy(b[n:], data) + maxCap := len(b[n:]) + if maxCap < len(data) { + // b已满 + f.Value = data[maxCap:] + n += maxCap + break + } else { + // b未满 + c.r.recvList.Remove(f) + n += len(data) + } + } else { + // 读完数据了 + break + } + } + c.r.recvListLock.Unlock() + if n <= 0 { + // glog.V(4).Info("wait read", c.Id) + // wait for chan + r := <-c.r.readable + if r == 0 { // close之后总是返回初始值 + c.r.recvListLock.Lock() + rlen := c.r.recvList.Len() + c.r.recvListLock.Unlock() + if rlen <= 0 { // 修复bug: 等到read完所有数据才让read返回错误 + return n, ErrClose + } + } + if r == 2 { + return n, ErrTimeout + } + } else { + break + } + } + + return +} + +func (c *Conn) LocalAddr() net.Addr { + return c.conns.sock.LocalAddr() +} + +func (c *Conn) RemoteAddr() net.Addr { + return c.conns.sock.RemoteAddr() +} + +func (c *Conn) SetDeadline(t time.Time) error { + c.r.SetReadDeadline(t) + return nil +} + +func (c *Conn) SetReadDeadline(t time.Time) error { + c.r.SetReadDeadline(t) + return nil +} + +func (c *Conn) SetWriteDeadline(t time.Time) error { + return nil +} + +func (c *Conn) close(sendClose, rmConn bool) { + if c.isClose == false { + c.isClose = true + c.r.isClose = true + } + if sendClose && c.isSendClose == false { // 修复bug: + c.isSendClose = true + c.s.sendListLock.Lock() + c.s.sendList.PushBack(byte(TypeClose)) + c.s.sendListLock.Unlock() + } + if rmConn && c.isRmConn == false { + c.isRmConn = true + time.AfterFunc(time.Second*5, func() { + select { + case c.conns.input <- Input{ + typ: ActRmConn, + param: c, + }: + default: + } + }) + } +} + +func (c *Conn) Close() error { + if c.isClose == false { + c.isClose = true + c.s.isClose = true + // bug: 接收不能主动关闭 + c.isSendClose = true + c.s.sendListLock.Lock() + c.s.sendList.PushBack(byte(TypeClose)) + c.s.sendListLock.Unlock() + // bug: close之后,5秒数据可能无法完成发送 + } + return nil +} + +type Conns struct { + conns map[uint64]*Conn + sock *net.UDPConn + accept chan *Conn + isClose bool + isDial bool + timerRnd uint32 + input chan Input +} + +func NewConns() *Conns { + return &Conns{ + conns: make(map[uint64]*Conn), + accept: make(chan *Conn, 256), + input: make(chan Input, 2048), + } +} + +func Listen(network, address string) (net.Listener, error) { + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + listener := NewConns() + listener.sock = conn + go listener.loop() + return listener, nil +} + +var dialConns *Conns +var dialConnsLock sync.Mutex + +func Dial(network, address string) (net.Conn, error) { + return DialTimeout(network, address, time.Second*3) +} + +func DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) { + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + return nil, err + } + var b [8]byte + if _, err := crand.Read(b[:]); err != nil { + return nil, err + } + id := binary.LittleEndian.Uint64(b[:]) + + glog.V(0).Info("dial new 3:", id) + + dialConnsLock.Lock() + if dialConns == nil { + dialConns = NewConns() + } + if dialConns.sock == nil { + s, err := net.ListenUDP("udp", &net.UDPAddr{}) + if err != nil { + dialConnsLock.Unlock() + return nil, err + } + dialConns.sock = s + dialConns.isDial = true + go dialConns.loop() + } + dialConnsLock.Unlock() + + conn := NewConn(dialConns, id, dialConns.sock, addr, "reqer") + dialConns.input <- Input{ + typ: ActAddConn, + param: conn, + } + + // conn.s.sendListLock.Lock() + // conn.s.sendList.PushBack(byte(TypeSYN)) + // conn.s.sendListLock.Unlock() + + return conn, nil +} + +func (c *Conns) Accept() (net.Conn, error) { + for { + if c.isClose { + return nil, errors.New("listener close") + } + conn := <-c.accept + if conn == nil { + return nil, errors.New("listener close") + } + return conn, nil + } +} + +func (c *Conns) Close() error { + c.isClose = true + c.sock.Close() + c.input <- Input{ + typ: ActEnd, + } + return nil +} + +func (c *Conns) Addr() net.Addr { + return c.sock.LocalAddr() +} + +const ( + ActData = 1 + ActTimer = 2 + ActAddConn = 3 + ActRmConn = 4 + ActEnd = 5 +) + +type Input struct { + typ uint8 + data []byte + param interface{} +} + +func (c *Conns) loop() { + // 这里输入时间和数据 + // 只起一个timer,给所有conn发 + // 之前用setreaddeadline这个不太好,容易出现长时间没超时 + + runtime.LockOSThread() + + glog.V(0).Info("loop: ", c.isDial) + + go func() { + var buf = make([]byte, 2048) + for { + n, remote, err := c.sock.ReadFromUDP(buf) + if n <= 0 || err != nil { + c.Close() + return + } + b := make([]byte, n) + copy(b, buf[:n]) + c.input <- Input{ + typ: ActData, + data: b, + param: remote, + } + if c.isClose { + return + } + } + }() + + var timerRunning bool + var releaseMemory uint32 + var buf = make([]byte, mss*3) + + for { + data := <-c.input + switch data.typ { + case ActData: + head, _ := structUnPack(data.data, "BBQ") + // fmt.Println(head[0], head[2]) + var dataType = head[0] + if conn, ok := c.conns[head[2]]; ok { + // TODO: 给dial中的链接发送成功 -> 暂时不需要,现在dial不判断这些,默认成功 + if dataType == uint64(TypeData) { + conn.r.recv(data.data) + } else if dataType == uint64(TypeAck) { + conn.s.recv(data.data) + } + } else { + if c.isDial == false && dataType == uint64(TypeData) && c.isClose == false { // 不需要TypeSYN + // glog.V(0).Info("create new:", head[2]) + // 只有主动listen的,才有新链接,而dial自己就会创建新连接,不用创建 + conn := NewConn(c, head[2], c.sock, data.param.(*net.UDPAddr), "rsper") + c.conns[conn.Id] = conn + conn.r.recv(data.data) + select { + case c.accept <- conn: + default: + } + if timerRunning == false { + timerRunning = true + go c.runTimer(c.timerRnd) + glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial) + } + } + } + case ActTimer: + now := time.Now() + for _, conn := range c.conns { + conn.s.send(now, buf) + conn.r.sendAck(now, buf) + } + if len(c.conns) == 0 && timerRunning { + glog.V(0).Info("no conn, stop timer, round:", c.timerRnd, c.isDial) + c.timerRnd++ + timerRunning = false + debug.FreeOSMemory() + } + releaseMemory++ + if releaseMemory > 50*60 { + releaseMemory = 0 + go func() { + debug.FreeOSMemory() // 修复bug: go在windows不回收内存 + }() + } + case ActAddConn: + conn := data.param.(*Conn) + c.conns[conn.Id] = conn + if timerRunning == false { + timerRunning = true + go c.runTimer(c.timerRnd) + glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial) + } + case ActRmConn: + conn := data.param.(*Conn) + // fmt.Println("rm conn:", conn.Id) + conn.isClose = true + conn.s.isClose = true + conn.r.isClose = true + if _, ok := c.conns[conn.Id]; ok { + glog.V(0).Info("rm conn ok:", conn.Id, c.isDial, len(c.conns)) + close(conn.r.readable) + close(conn.s.writable) // 修复bug: 没有close + delete(c.conns, conn.Id) + } + case ActEnd: + c.timerRnd++ + timerRunning = false + c.isClose = true + for _, conn := range c.conns { + conn.isClose = true + conn.s.isClose = true + conn.r.isClose = true + close(conn.r.readable) + close(conn.s.writable) + } + close(c.accept) + c.conns = make(map[uint64]*Conn) + return + } + + } + +} + +func (c *Conns) runTimer(rnd uint32) { + // runtime.LockOSThread() + for { + time.Sleep(20 * time.Millisecond) + // C.usleep(20 * 1000) + if rnd != c.timerRnd { + glog.V(0).Info("timer stop, round:", rnd, c.isDial) + } + if c.isClose || rnd != c.timerRnd { + return + } + c.input <- Input{typ: ActTimer} + } +} diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go index d0365f110..e8a5fa34d 100644 --- a/weed/wdclient/volume_udp_client.go +++ b/weed/wdclient/volume_udp_client.go @@ -3,18 +3,22 @@ package wdclient import ( "bufio" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/udptransfer" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient/penet" "io" "net" + "time" ) // VolumeUdpClient put/get/delete file chunks directly on volume servers without replication type VolumeUdpClient struct { + Conn net.Conn + bufWriter *bufio.Writer + bufReader *bufio.Reader } type VolumeUdpConn struct { - net.Conn + Conn net.Conn bufWriter *bufio.Writer bufReader *bufio.Reader } @@ -30,41 +34,29 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string return parseErr } - listener, err := udptransfer.NewEndpoint(&udptransfer.Params{ - LocalAddr: "", - Bandwidth: 100, - FastRetransmit: true, - FlatTraffic: true, - IsServ: false, - }) - if err != nil { - return err + if c.Conn == nil { + c.Conn, err = penet.DialTimeout("", udpAddress, 500*time.Millisecond) + if err != nil { + return err + } + c.bufWriter = bufio.NewWriter(c.Conn) } - defer listener.Close() - - conn, err := listener.Dial(udpAddress) - if err != nil { - return err - } - defer conn.Close() - - bufWriter := bufio.NewWriter(conn) buf := []byte("+" + fileId + "\n") - _, err = bufWriter.Write([]byte(buf)) + _, err = c.bufWriter.Write([]byte(buf)) if err != nil { return } util.Uint32toBytes(buf[0:4], fileSize) - _, err = bufWriter.Write(buf[0:4]) + _, err = c.bufWriter.Write(buf[0:4]) if err != nil { return } - _, err = io.Copy(bufWriter, fileReader) + _, err = io.Copy(c.bufWriter, fileReader) if err != nil { return } - bufWriter.Flush() + c.bufWriter.Flush() return nil }