mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
c012902c16
This reverts commit 69694a17be
.
1049 lines
24 KiB
Go
1049 lines
24 KiB
Go
package penet
|
||
|
||
import (
|
||
"container/list"
|
||
crand "crypto/rand"
|
||
"encoding/binary"
|
||
"errors"
|
||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||
mrand "math/rand"
|
||
"net"
|
||
"runtime"
|
||
"runtime/debug"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type DataSend struct {
|
||
Seq uint32
|
||
Acked bool
|
||
Resend byte
|
||
Fast bool
|
||
Code byte
|
||
Time uint32
|
||
Data []byte
|
||
}
|
||
|
||
type DataRecv struct {
|
||
Seq uint32
|
||
AckCnt byte
|
||
Code byte
|
||
Data []byte
|
||
}
|
||
|
||
type UdpSend struct {
|
||
Id uint64
|
||
sock *net.UDPConn
|
||
remote *net.UDPAddr
|
||
seq uint32
|
||
rtt float64
|
||
rttMax float64
|
||
rttMin float64
|
||
rate uint32
|
||
mss uint32
|
||
interval uint32
|
||
data []DataSend
|
||
dataBegin int
|
||
dataLen int
|
||
sendRnd int
|
||
sendList *list.List
|
||
sendListLock sync.Mutex
|
||
writable chan bool
|
||
writeMax int
|
||
isClose bool
|
||
closing bool
|
||
opening byte
|
||
conn *Conn
|
||
resendCnt int
|
||
name string
|
||
acked uint32
|
||
recvWnd uint32
|
||
}
|
||
|
||
const (
|
||
TypeData uint8 = 1
|
||
TypeAck uint8 = 2
|
||
TypeClose uint8 = 3
|
||
TypeSYN uint8 = 8
|
||
)
|
||
|
||
var (
|
||
ErrClose = errors.New("conn close")
|
||
ErrTimeout = errors.New("conn timeout")
|
||
|
||
mss uint32 = 1200
|
||
defaultRate = mss * 3000
|
||
dropRate = 0.0
|
||
writeMaxSep = 5
|
||
resendLimit = true
|
||
)
|
||
|
||
func NewUdpSend(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpSend {
|
||
u := &UdpSend{
|
||
conn: conn,
|
||
Id: id,
|
||
sock: sock,
|
||
remote: remote,
|
||
mss: mss,
|
||
interval: 20,
|
||
data: make([]DataSend, 4),
|
||
seq: 1,
|
||
rate: defaultRate, // 修复bug: 太高导致压测的时候内存爆炸,速度慢
|
||
rtt: 200,
|
||
rttMax: 200,
|
||
rttMin: 200,
|
||
sendList: list.New(),
|
||
writable: make(chan bool, 1),
|
||
name: name,
|
||
}
|
||
u.writeMax = int(u.rate/u.mss) / writeMaxSep // fixed bug: 初始化 修复bug: 写入太多,应该一点点写
|
||
u.recvWnd = uint32(u.writeMax)
|
||
return u
|
||
}
|
||
|
||
func (u *UdpSend) send(nowTime time.Time, buf []byte) {
|
||
var sendMax = int(u.rate / u.mss)
|
||
u.writeMax = sendMax / writeMaxSep
|
||
// glog.V(4).Info("send", u.dataLen, u.name, sendMax, uint32(u.rtt))
|
||
var sendCount = uint32(u.rate / u.mss / (1000 / u.interval))
|
||
var sendTotal = sendCount
|
||
if sendCount > u.recvWnd/5 {
|
||
sendCount = u.recvWnd / 5
|
||
}
|
||
if sendCount <= 0 {
|
||
sendCount = 1
|
||
}
|
||
|
||
now := uint32(nowTime.UnixNano() / int64(time.Millisecond))
|
||
resendCnt := 0
|
||
for i := 0; i < u.dataLen; i++ {
|
||
if sendCount <= 0 {
|
||
break
|
||
}
|
||
index := (i + u.dataBegin) % len(u.data)
|
||
d := &u.data[index]
|
||
if d.Acked == false && now-d.Time >= uint32(u.rtt*2.0) {
|
||
glog.V(4).Infof("resend, seq:%v id:%v rtt:%v name:%v", d.Seq, u.Id, uint32(u.rtt), u.name)
|
||
// data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
|
||
// 发送窗口就是配置值,其实发送窗口不需要发送出去
|
||
// 接收端能发送了,要告诉对面开始发送。这个逻辑可以通过接受端的发送通道来发送。
|
||
headLen := structPack(buf, "BBQHIII", uint8(TypeData),
|
||
d.Code, uint64(u.Id), uint16(len(d.Data)+4+4+4), d.Seq, uint32(now), uint32(sendMax))
|
||
copy(buf[headLen:], d.Data)
|
||
u.sock.WriteToUDP(buf[:headLen+len(d.Data)], u.remote)
|
||
d.Time = now
|
||
d.Resend++
|
||
d.Fast = false
|
||
sendCount--
|
||
resendCnt++
|
||
u.resendCnt++
|
||
}
|
||
|
||
}
|
||
|
||
u.sendRnd++
|
||
if u.sendRnd > 100 && u.dataLen > 0 {
|
||
u.sendRnd = 0
|
||
|
||
var resendMax = uint32(float64(u.dataLen) * 0.6)
|
||
if u.resendCnt > 2 {
|
||
glog.V(0).Infof("resendCnt:%v resendMax:%v remain:%v rtt:%v id:%v", u.resendCnt,
|
||
resendMax, u.dataLen, uint32(u.rtt), u.Id)
|
||
u.resendCnt = 0
|
||
}
|
||
// 修复bug: 之前使用lastrecvtime,存在新增发送数据数据那瞬间,出现超时情况
|
||
if u.data[u.dataBegin].Resend > 10 {
|
||
glog.V(0).Infof("resend too much, close, id:%v name:%v drop seq:%v remain:%v",
|
||
u.Id, u.name, u.data[u.dataBegin].Seq, u.dataLen)
|
||
u.conn.close(true, true)
|
||
}
|
||
}
|
||
|
||
var resendToomuch bool
|
||
if uint32(resendCnt) > sendTotal/3 && resendLimit { // 修复bug: 限速
|
||
resendToomuch = true
|
||
glog.V(4).Infof("resend too much, slow, %v %v", resendCnt, sendTotal)
|
||
}
|
||
|
||
u.sendListLock.Lock()
|
||
for ; sendCount > 0; sendCount-- {
|
||
sendData := u.sendList.Front()
|
||
if sendData == nil {
|
||
break
|
||
}
|
||
|
||
if sendMax-u.dataLen <= 0 || resendToomuch {
|
||
break
|
||
}
|
||
if u.dataLen >= len(u.data) {
|
||
newData := make([]DataSend, 2*len(u.data))
|
||
copy(newData, u.data[u.dataBegin:])
|
||
copy(newData[len(u.data)-u.dataBegin:], u.data[:u.dataBegin])
|
||
u.dataBegin = 0
|
||
u.data = newData
|
||
}
|
||
|
||
u.sendList.Remove(sendData)
|
||
hType := uint8(0)
|
||
sdata, ok := sendData.Value.([]byte)
|
||
if !ok {
|
||
// 发送控制消息
|
||
mesCode, _ := sendData.Value.(byte)
|
||
hType = mesCode
|
||
}
|
||
headLen := structPack(buf, "BBQHIII", uint8(TypeData), hType,
|
||
uint64(u.Id), uint16(len(sdata)+4+4+4), u.seq, uint32(now), uint32(sendMax))
|
||
copy(buf[headLen:], sdata)
|
||
u.sock.WriteToUDP(buf[:headLen+len(sdata)], u.remote)
|
||
glog.V(4).Infof("send, seq:%v id:%v %v", u.seq, u.Id, u.name)
|
||
|
||
index := (u.dataBegin + u.dataLen) % len(u.data)
|
||
u.data[index] = DataSend{
|
||
Seq: u.seq,
|
||
Time: now,
|
||
Code: hType,
|
||
Data: sdata,
|
||
}
|
||
u.seq++
|
||
u.dataLen++
|
||
}
|
||
listRemain := u.writeMax - u.sendList.Len()
|
||
u.sendListLock.Unlock()
|
||
|
||
if listRemain > 0 {
|
||
select {
|
||
case u.writable <- !u.isClose:
|
||
default:
|
||
}
|
||
}
|
||
|
||
if u.isClose && u.dataLen == 0 && u.closing == false { // 修复bug: 之前5秒删除,实际发送没完成
|
||
// 修复bug: 接受端write端close为true, 导致5秒后呗删除,来不及接受数据。
|
||
glog.V(1).Infof("close and emtpy, rm conn, id:%v name:%v seq:%v listlen:%v",
|
||
u.Id, u.name, u.seq, u.sendList.Len())
|
||
u.closing = true
|
||
u.conn.close(false, true)
|
||
}
|
||
}
|
||
|
||
func testDrop() bool {
|
||
if dropRate < 0.01 {
|
||
return false
|
||
}
|
||
var v uint32
|
||
var b [4]byte
|
||
if _, err := crand.Read(b[:]); err != nil {
|
||
v = mrand.Uint32()
|
||
} else {
|
||
v = binary.BigEndian.Uint32(b[:])
|
||
}
|
||
if v%1000 < uint32(dropRate*1000) {
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
func (u *UdpSend) recv(buf []byte) {
|
||
|
||
if testDrop() {
|
||
return
|
||
}
|
||
|
||
now := uint32(time.Now().UnixNano() / int64(time.Millisecond))
|
||
|
||
// ack包: type0 flag1 id2 len3 tm4 rcvwnd5 acked6
|
||
head, headLen := structUnPack(buf, "BBQHIII")
|
||
if head[0] == uint64(TypeAck) && u.dataLen > 0 {
|
||
|
||
firstSeq := u.data[u.dataBegin].Seq
|
||
acked := uint32(head[6])
|
||
offset := int(int32(acked - firstSeq))
|
||
glog.V(4).Infof("id:%v recv ack: %v offset:%v databegin:%v dataLen:%v firstSeq:%v name:%v",
|
||
u.Id, acked, offset, u.dataBegin, u.dataLen, firstSeq, u.name)
|
||
if offset >= 0 && offset < u.dataLen {
|
||
offset++
|
||
for i := 0; i < offset; i++ { // 修复bug: 清除引用等
|
||
index := (u.dataBegin + i) % len(u.data) // 修复bug,i写成offset
|
||
d := &u.data[index]
|
||
d.Data = nil
|
||
d.Acked = true
|
||
}
|
||
u.acked += uint32(offset)
|
||
u.dataBegin += offset
|
||
u.dataBegin = u.dataBegin % len(u.data)
|
||
u.dataLen -= offset
|
||
glog.V(4).Infof("2 acked ok:%v, id:%v %v", u.acked, u.Id, u.name)
|
||
}
|
||
if u.dataLen > 0 {
|
||
curSeq := u.data[u.dataBegin].Seq
|
||
// var ackedSeq = []uint32{}
|
||
for i := headLen; i < len(buf); i += 4 {
|
||
seq := binary.BigEndian.Uint32(buf[i:])
|
||
offset := int(int32(seq - curSeq))
|
||
if offset < 0 || offset >= u.dataLen {
|
||
continue
|
||
}
|
||
index := (u.dataBegin + offset) % len(u.data)
|
||
d := &u.data[index]
|
||
if d.Seq == seq {
|
||
d.Acked = true
|
||
d.Data = nil // 修复bug: memory leak
|
||
// ackedSeq = append(ackedSeq, seq)
|
||
} else {
|
||
panic("index not correct")
|
||
}
|
||
}
|
||
// if len(ackedSeq) > 0 {
|
||
// glog.V(0).Info("seq:", ackedSeq)
|
||
// }
|
||
}
|
||
|
||
var i = 0
|
||
for ; i < u.dataLen; i++ {
|
||
index := (i + u.dataBegin) % len(u.data)
|
||
d := &u.data[index]
|
||
if d.Acked == false {
|
||
break
|
||
}
|
||
// fmt.Println("acked->", d.Seq)
|
||
// if d.Seq == 7 {
|
||
// fmt.Println("data:", u.data[u.dataBegin:u.dataBegin+5])
|
||
// }
|
||
}
|
||
if i > 0 {
|
||
u.acked += uint32(i)
|
||
u.dataBegin += i
|
||
u.dataBegin = u.dataBegin % len(u.data)
|
||
u.dataLen -= i
|
||
glog.V(4).Infof("3 acked ok:%v, id:%v %v %v", u.acked, u.Id, u.dataBegin, u.name)
|
||
}
|
||
|
||
sendTime := uint32(head[4])
|
||
rtt := now - sendTime
|
||
if rtt > 0 {
|
||
if firstSeq < 3 {
|
||
u.rtt = float64(rtt) // 初始值
|
||
} else {
|
||
u.rtt = u.rtt*0.8 + float64(rtt)*0.2
|
||
}
|
||
if u.rtt < 50.0 {
|
||
u.rtt = 50
|
||
}
|
||
// glog.V(4).Infof("rtt:%v u.rtt:%v id:%v", rtt, u.rtt, u.Id)
|
||
}
|
||
|
||
u.recvWnd = uint32(head[5])
|
||
}
|
||
|
||
}
|
||
|
||
func structPack(b []byte, format string, param ...interface{}) int {
|
||
j := 0
|
||
for i, s := range format {
|
||
switch s {
|
||
case 'I':
|
||
p, _ := param[i].(uint32)
|
||
binary.BigEndian.PutUint32(b[j:], p)
|
||
j += 4
|
||
case 'B':
|
||
p, _ := param[i].(uint8)
|
||
b[j] = p
|
||
j++
|
||
case 'H':
|
||
p, _ := param[i].(uint16)
|
||
binary.BigEndian.PutUint16(b[j:], p)
|
||
j += 2
|
||
case 'Q':
|
||
p, _ := param[i].(uint64)
|
||
binary.BigEndian.PutUint64(b[j:], p)
|
||
j += 8
|
||
default:
|
||
panic("structPack not found")
|
||
}
|
||
}
|
||
return j
|
||
}
|
||
|
||
func structUnPack(b []byte, format string) ([]uint64, int) {
|
||
var re = make([]uint64, 0, len(format))
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
// log.Error(err)
|
||
re[0] = 0
|
||
}
|
||
}()
|
||
j := 0
|
||
for _, s := range format {
|
||
switch s {
|
||
case 'I':
|
||
re = append(re, uint64(binary.BigEndian.Uint32(b[j:])))
|
||
j += 4
|
||
case 'B':
|
||
re = append(re, uint64(b[j]))
|
||
j++
|
||
case 'H':
|
||
re = append(re, uint64(binary.BigEndian.Uint16(b[j:])))
|
||
j += 2
|
||
case 'Q':
|
||
re = append(re, uint64(binary.BigEndian.Uint64(b[j:])))
|
||
j += 8
|
||
default:
|
||
panic("structUnPack not found")
|
||
}
|
||
}
|
||
return re, j
|
||
}
|
||
|
||
type UdpRecv struct {
|
||
conn *Conn
|
||
Id uint64
|
||
sock *net.UDPConn
|
||
remote *net.UDPAddr
|
||
acked uint32
|
||
lastTm uint32
|
||
sndWnd uint32
|
||
recvCnt uint32
|
||
isClose bool
|
||
isNew byte
|
||
isRecved bool
|
||
recvList *list.List
|
||
recvListLock sync.Mutex
|
||
readable chan byte
|
||
readDeadline *time.Time
|
||
seqData map[uint32]*DataRecv
|
||
dataList *list.List
|
||
name string
|
||
}
|
||
|
||
func NewUdpRecv(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpRecv {
|
||
return &UdpRecv{
|
||
Id: id,
|
||
conn: conn,
|
||
sock: sock,
|
||
remote: remote,
|
||
acked: 0,
|
||
recvList: list.New(),
|
||
dataList: list.New(),
|
||
seqData: make(map[uint32]*DataRecv),
|
||
readable: make(chan byte, 1),
|
||
isNew: 50,
|
||
name: name,
|
||
sndWnd: 1000,
|
||
}
|
||
}
|
||
|
||
func (u *UdpRecv) SetReadDeadline(t time.Time) {
|
||
u.readDeadline = &t
|
||
}
|
||
|
||
func (u *UdpRecv) sendAck(nowTime time.Time, buf []byte) {
|
||
|
||
u.recvListLock.Lock()
|
||
for {
|
||
if d, ok := u.seqData[u.acked+1]; ok {
|
||
u.acked++
|
||
if d.Code == TypeClose {
|
||
if u.isClose == false {
|
||
glog.V(1).Info("recv close:", u.Id)
|
||
u.conn.close(false, true)
|
||
u.isClose = true
|
||
}
|
||
} else {
|
||
u.recvList.PushBack(d.Data)
|
||
}
|
||
d.Data = nil
|
||
delete(u.seqData, u.acked)
|
||
} else {
|
||
break
|
||
}
|
||
}
|
||
recvListLen := u.recvList.Len()
|
||
u.recvListLock.Unlock()
|
||
|
||
// glog.V(4).Info("acked:", u.acked, u.Id, recvListLen, u.name)
|
||
if recvListLen > 0 { // 修复bug,用标志可能会有读不到的数据 修复bug: 去掉 && u.isClose == false,导致读取延迟
|
||
select {
|
||
case u.readable <- 1:
|
||
default:
|
||
}
|
||
} else if u.isClose == true { // 修复bug:快速close
|
||
select {
|
||
case u.readable <- 0:
|
||
default:
|
||
}
|
||
}
|
||
if u.readDeadline != nil && !u.readDeadline.IsZero() {
|
||
if u.readDeadline.Before(nowTime) { // 修复bug after
|
||
select {
|
||
case u.readable <- 2:
|
||
glog.V(0).Info("read dealline: ", u.Id)
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
var b = buf[:mss]
|
||
buf = buf[mss:]
|
||
var n = 0
|
||
for i := u.dataList.Front(); i != nil; {
|
||
d := i.Value.(*DataRecv)
|
||
|
||
next := i.Next()
|
||
if d.AckCnt > 6 || before(d.Seq, u.acked+1) { // 发几次够了,不反复发
|
||
// d.Removed = true
|
||
u.dataList.Remove(i) // 修复bug,删除逻辑不对,需要保存next
|
||
// delete(u.seqData, d.Seq) // 修复bug: 对面已经确认,这里删了,没有重发,也没有了数据。已经收到的数据并且发了ack的数据,不要删了!
|
||
i = next
|
||
continue
|
||
}
|
||
i = next
|
||
|
||
if d.AckCnt%3 == 0 {
|
||
binary.BigEndian.PutUint32(b[n:], d.Seq)
|
||
n += 4
|
||
if n >= len(b) {
|
||
wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
|
||
if wnd < 0 {
|
||
wnd = 0
|
||
}
|
||
headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
|
||
u.lastTm, uint32(wnd), u.acked)
|
||
copy(buf[headLen:], b[:n])
|
||
u.sock.WriteToUDP(buf[:headLen+n], u.remote)
|
||
glog.V(4).Infof("id:%v send ack n:%v", u.Id, n)
|
||
n = 0 // 修复bug,没有置零
|
||
u.isRecved = false // 修复bug: 有时候发多一条数据
|
||
}
|
||
}
|
||
d.AckCnt++
|
||
}
|
||
if n > 0 || u.isRecved { // 修复bug: 一直发数据 修复bug: 有时候发多一条数据
|
||
wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
|
||
if wnd < 0 {
|
||
wnd = 0
|
||
}
|
||
headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
|
||
u.lastTm, uint32(wnd), u.acked)
|
||
copy(buf[headLen:], b[:n])
|
||
u.sock.WriteToUDP(buf[:headLen+n], u.remote)
|
||
glog.V(4).Infof("id:%v send ack n:%v datalen:%v", u.Id, n, u.dataList.Len())
|
||
}
|
||
u.isRecved = false
|
||
|
||
// 修复bug: 如果自己只是发数据,那么自己的acked通道会没用到。所以要判断自己是否在发送数据。
|
||
if u.isNew > 0 { // 完成的优化:超时不确认第一包,就删除链接。防止旧链接不断发包。
|
||
u.isNew--
|
||
if u.isNew == 0 && u.acked == 0 {
|
||
glog.V(0).Infof("not recv first packet!!! close, id:%v name:%v", u.Id, u.name)
|
||
u.conn.Close()
|
||
}
|
||
if u.acked >= 1 || u.conn.s.seq > 1 {
|
||
u.isNew = 0
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
// before seq1比seq2小
|
||
func before(seq1, seq2 uint32) bool {
|
||
return (int32)(seq1-seq2) < 0
|
||
}
|
||
|
||
func after(seq1, seq2 uint32) bool {
|
||
return (int32)(seq2-seq1) < 0
|
||
}
|
||
|
||
func (u *UdpRecv) recv(buf []byte) {
|
||
|
||
if testDrop() {
|
||
return
|
||
}
|
||
|
||
u.isRecved = true
|
||
u.recvCnt++
|
||
// data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
|
||
head, headLen := structUnPack(buf, "BBQHIII")
|
||
if head[0] == uint64(TypeData) {
|
||
|
||
seq := uint32(head[4])
|
||
u.lastTm = uint32(head[5])
|
||
u.sndWnd = uint32(head[6])
|
||
|
||
glog.V(4).Info(u.Id, " recv seq: ", seq, " len: ", u.dataList.Len())
|
||
|
||
if before(seq, u.acked+1) { // 修复bug: 收到before的数据,seqData不会回收。
|
||
// glog.V(0).Info("seq before u.acked:", seq, u.acked, u.Id)
|
||
return
|
||
}
|
||
|
||
// 修复bug: 修复没重发问题。之前由于一直会重发,这个逻辑有意义。现在只重发几次。
|
||
// if d, ok := u.seqData[seq]; ok {
|
||
// d.AckCnt = 0
|
||
// if d.Removed == false { // 修复bug:可能没ack导致一直重发,要直到acked全部覆盖。
|
||
// return
|
||
// }
|
||
// }
|
||
|
||
// glog.V(4).Info(u.Id, " recv 2 seq: ", seq, " len: ", u.dataList.Len())
|
||
|
||
d := &DataRecv{
|
||
Seq: seq,
|
||
Data: buf[headLen:],
|
||
Code: byte(head[1]),
|
||
}
|
||
u.dataList.PushBack(d)
|
||
u.seqData[seq] = d
|
||
|
||
}
|
||
|
||
}
|
||
|
||
func SetRate(rate uint32) {
|
||
defaultRate = rate
|
||
}
|
||
|
||
func SetDropRate(rate float64) {
|
||
if rate > 0.001 {
|
||
dropRate = rate
|
||
}
|
||
}
|
||
|
||
type Conn struct {
|
||
Id uint64
|
||
s *UdpSend
|
||
r *UdpRecv
|
||
responsed chan bool
|
||
isClose bool
|
||
isSendClose bool
|
||
isRmConn bool
|
||
conns *Conns
|
||
}
|
||
|
||
func NewConn(conns *Conns, Id uint64, localConn *net.UDPConn, remote *net.UDPAddr, name string) *Conn {
|
||
conn := &Conn{
|
||
Id: Id,
|
||
conns: conns,
|
||
responsed: make(chan bool, 1),
|
||
}
|
||
conn.s = NewUdpSend(conn, conn.Id, localConn, remote, name)
|
||
conn.r = NewUdpRecv(conn, conn.Id, localConn, remote, name)
|
||
return conn
|
||
}
|
||
|
||
func (c *Conn) Write(bb []byte) (n int, err error) {
|
||
|
||
if c.isClose {
|
||
return 0, ErrClose
|
||
}
|
||
|
||
// 1.对方告诉你满了,是要叫你不要发数据了,而不是还发数据。
|
||
// 2.没有窗口就没法快速地确定对面有没有满,如果你要等到自己的的buffer满,那么可能会比较慢感知到
|
||
// 固定buffer + 停止通知
|
||
b := make([]byte, len(bb)) // 修复bug1:没有拷贝
|
||
copy(b, bb)
|
||
for {
|
||
c.s.sendListLock.Lock()
|
||
remain := c.s.writeMax - c.s.sendList.Len()
|
||
for {
|
||
if len(b) <= 0 {
|
||
break
|
||
}
|
||
if remain <= 0 {
|
||
break
|
||
}
|
||
remain--
|
||
var sendLen = int(mss)
|
||
if len(b) < sendLen {
|
||
// 实际发送值
|
||
sendLen = len(b)
|
||
}
|
||
n += sendLen
|
||
c.s.sendList.PushBack(b[:sendLen])
|
||
// fmt.Println("write:", b[:10])
|
||
b = b[sendLen:]
|
||
}
|
||
c.s.sendListLock.Unlock()
|
||
if len(b) <= 0 {
|
||
break
|
||
}
|
||
// glog.V(0).Info("wait write: ", c.Id)
|
||
w := <-c.s.writable
|
||
if w == false {
|
||
return n, ErrClose
|
||
}
|
||
}
|
||
|
||
return n, nil
|
||
}
|
||
|
||
func (c *Conn) Read(b []byte) (n int, err error) {
|
||
for {
|
||
c.r.recvListLock.Lock()
|
||
for {
|
||
f := c.r.recvList.Front()
|
||
if f != nil {
|
||
data := f.Value.([]byte)
|
||
copy(b[n:], data)
|
||
maxCap := len(b[n:])
|
||
if maxCap < len(data) {
|
||
// b已满
|
||
f.Value = data[maxCap:]
|
||
n += maxCap
|
||
break
|
||
} else {
|
||
// b未满
|
||
c.r.recvList.Remove(f)
|
||
n += len(data)
|
||
}
|
||
} else {
|
||
// 读完数据了
|
||
break
|
||
}
|
||
}
|
||
c.r.recvListLock.Unlock()
|
||
if n <= 0 {
|
||
// glog.V(4).Info("wait read", c.Id)
|
||
// wait for chan
|
||
r := <-c.r.readable
|
||
if r == 0 { // close之后总是返回初始值
|
||
c.r.recvListLock.Lock()
|
||
rlen := c.r.recvList.Len()
|
||
c.r.recvListLock.Unlock()
|
||
if rlen <= 0 { // 修复bug: 等到read完所有数据才让read返回错误
|
||
return n, ErrClose
|
||
}
|
||
}
|
||
if r == 2 {
|
||
return n, ErrTimeout
|
||
}
|
||
} else {
|
||
break
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (c *Conn) LocalAddr() net.Addr {
|
||
return c.conns.sock.LocalAddr()
|
||
}
|
||
|
||
func (c *Conn) RemoteAddr() net.Addr {
|
||
return c.conns.sock.RemoteAddr()
|
||
}
|
||
|
||
func (c *Conn) SetDeadline(t time.Time) error {
|
||
c.r.SetReadDeadline(t)
|
||
return nil
|
||
}
|
||
|
||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||
c.r.SetReadDeadline(t)
|
||
return nil
|
||
}
|
||
|
||
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||
return nil
|
||
}
|
||
|
||
func (c *Conn) close(sendClose, rmConn bool) {
|
||
if c.isClose == false {
|
||
c.isClose = true
|
||
c.r.isClose = true
|
||
}
|
||
if sendClose && c.isSendClose == false { // 修复bug:
|
||
c.isSendClose = true
|
||
c.s.sendListLock.Lock()
|
||
c.s.sendList.PushBack(byte(TypeClose))
|
||
c.s.sendListLock.Unlock()
|
||
}
|
||
if rmConn && c.isRmConn == false {
|
||
c.isRmConn = true
|
||
time.AfterFunc(time.Second*5, func() {
|
||
select {
|
||
case c.conns.input <- Input{
|
||
typ: ActRmConn,
|
||
param: c,
|
||
}:
|
||
default:
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
func (c *Conn) Close() error {
|
||
if c.isClose == false {
|
||
c.isClose = true
|
||
c.s.isClose = true
|
||
// bug: 接收不能主动关闭
|
||
c.isSendClose = true
|
||
c.s.sendListLock.Lock()
|
||
c.s.sendList.PushBack(byte(TypeClose))
|
||
c.s.sendListLock.Unlock()
|
||
// bug: close之后,5秒数据可能无法完成发送
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type Conns struct {
|
||
conns map[uint64]*Conn
|
||
sock *net.UDPConn
|
||
accept chan *Conn
|
||
isClose bool
|
||
isDial bool
|
||
timerRnd uint32
|
||
input chan Input
|
||
}
|
||
|
||
func NewConns() *Conns {
|
||
return &Conns{
|
||
conns: make(map[uint64]*Conn),
|
||
accept: make(chan *Conn, 256),
|
||
input: make(chan Input, 2048),
|
||
}
|
||
}
|
||
|
||
func Listen(network, address string) (net.Listener, error) {
|
||
addr, err := net.ResolveUDPAddr("udp", address)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
conn, err := net.ListenUDP("udp", addr)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
listener := NewConns()
|
||
listener.sock = conn
|
||
go listener.loop()
|
||
return listener, nil
|
||
}
|
||
|
||
var dialConns *Conns
|
||
var dialConnsLock sync.Mutex
|
||
|
||
func Dial(network, address string) (net.Conn, error) {
|
||
return DialTimeout(network, address, time.Second*3)
|
||
}
|
||
|
||
func DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
|
||
addr, err := net.ResolveUDPAddr("udp", address)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var b [8]byte
|
||
if _, err := crand.Read(b[:]); err != nil {
|
||
return nil, err
|
||
}
|
||
id := binary.LittleEndian.Uint64(b[:])
|
||
|
||
glog.V(0).Info("dial new 3:", id)
|
||
|
||
dialConnsLock.Lock()
|
||
if dialConns == nil {
|
||
dialConns = NewConns()
|
||
}
|
||
if dialConns.sock == nil {
|
||
s, err := net.ListenUDP("udp", &net.UDPAddr{})
|
||
if err != nil {
|
||
dialConnsLock.Unlock()
|
||
return nil, err
|
||
}
|
||
dialConns.sock = s
|
||
dialConns.isDial = true
|
||
go dialConns.loop()
|
||
}
|
||
dialConnsLock.Unlock()
|
||
|
||
conn := NewConn(dialConns, id, dialConns.sock, addr, "reqer")
|
||
dialConns.input <- Input{
|
||
typ: ActAddConn,
|
||
param: conn,
|
||
}
|
||
|
||
// conn.s.sendListLock.Lock()
|
||
// conn.s.sendList.PushBack(byte(TypeSYN))
|
||
// conn.s.sendListLock.Unlock()
|
||
|
||
return conn, nil
|
||
}
|
||
|
||
func (c *Conns) Accept() (net.Conn, error) {
|
||
for {
|
||
if c.isClose {
|
||
return nil, errors.New("listener close")
|
||
}
|
||
conn := <-c.accept
|
||
if conn == nil {
|
||
return nil, errors.New("listener close")
|
||
}
|
||
return conn, nil
|
||
}
|
||
}
|
||
|
||
func (c *Conns) Close() error {
|
||
c.isClose = true
|
||
c.sock.Close()
|
||
c.input <- Input{
|
||
typ: ActEnd,
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *Conns) Addr() net.Addr {
|
||
return c.sock.LocalAddr()
|
||
}
|
||
|
||
const (
|
||
ActData = 1
|
||
ActTimer = 2
|
||
ActAddConn = 3
|
||
ActRmConn = 4
|
||
ActEnd = 5
|
||
)
|
||
|
||
type Input struct {
|
||
typ uint8
|
||
data []byte
|
||
param interface{}
|
||
}
|
||
|
||
func (c *Conns) loop() {
|
||
// 这里输入时间和数据
|
||
// 只起一个timer,给所有conn发
|
||
// 之前用setreaddeadline这个不太好,容易出现长时间没超时
|
||
|
||
runtime.LockOSThread()
|
||
|
||
glog.V(0).Info("loop: ", c.isDial)
|
||
|
||
go func() {
|
||
var buf = make([]byte, 2048)
|
||
for {
|
||
n, remote, err := c.sock.ReadFromUDP(buf)
|
||
if n <= 0 || err != nil {
|
||
c.Close()
|
||
return
|
||
}
|
||
b := make([]byte, n)
|
||
copy(b, buf[:n])
|
||
c.input <- Input{
|
||
typ: ActData,
|
||
data: b,
|
||
param: remote,
|
||
}
|
||
if c.isClose {
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
var timerRunning bool
|
||
var releaseMemory uint32
|
||
var buf = make([]byte, mss*3)
|
||
|
||
for {
|
||
data := <-c.input
|
||
switch data.typ {
|
||
case ActData:
|
||
head, _ := structUnPack(data.data, "BBQ")
|
||
// fmt.Println(head[0], head[2])
|
||
var dataType = head[0]
|
||
if conn, ok := c.conns[head[2]]; ok {
|
||
// TODO: 给dial中的链接发送成功 -> 暂时不需要,现在dial不判断这些,默认成功
|
||
if dataType == uint64(TypeData) {
|
||
conn.r.recv(data.data)
|
||
} else if dataType == uint64(TypeAck) {
|
||
conn.s.recv(data.data)
|
||
}
|
||
} else {
|
||
if c.isDial == false && dataType == uint64(TypeData) && c.isClose == false { // 不需要TypeSYN
|
||
// glog.V(0).Info("create new:", head[2])
|
||
// 只有主动listen的,才有新链接,而dial自己就会创建新连接,不用创建
|
||
conn := NewConn(c, head[2], c.sock, data.param.(*net.UDPAddr), "rsper")
|
||
c.conns[conn.Id] = conn
|
||
conn.r.recv(data.data)
|
||
select {
|
||
case c.accept <- conn:
|
||
default:
|
||
}
|
||
if timerRunning == false {
|
||
timerRunning = true
|
||
go c.runTimer(c.timerRnd)
|
||
glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
|
||
}
|
||
}
|
||
}
|
||
case ActTimer:
|
||
now := time.Now()
|
||
for _, conn := range c.conns {
|
||
conn.s.send(now, buf)
|
||
conn.r.sendAck(now, buf)
|
||
}
|
||
if len(c.conns) == 0 && timerRunning {
|
||
glog.V(0).Info("no conn, stop timer, round:", c.timerRnd, c.isDial)
|
||
c.timerRnd++
|
||
timerRunning = false
|
||
debug.FreeOSMemory()
|
||
}
|
||
releaseMemory++
|
||
if releaseMemory > 50*60 {
|
||
releaseMemory = 0
|
||
go func() {
|
||
debug.FreeOSMemory() // 修复bug: go在windows不回收内存
|
||
}()
|
||
}
|
||
case ActAddConn:
|
||
conn := data.param.(*Conn)
|
||
c.conns[conn.Id] = conn
|
||
if timerRunning == false {
|
||
timerRunning = true
|
||
go c.runTimer(c.timerRnd)
|
||
glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
|
||
}
|
||
case ActRmConn:
|
||
conn := data.param.(*Conn)
|
||
// fmt.Println("rm conn:", conn.Id)
|
||
conn.isClose = true
|
||
conn.s.isClose = true
|
||
conn.r.isClose = true
|
||
if _, ok := c.conns[conn.Id]; ok {
|
||
glog.V(0).Info("rm conn ok:", conn.Id, c.isDial, len(c.conns))
|
||
close(conn.r.readable)
|
||
close(conn.s.writable) // 修复bug: 没有close
|
||
delete(c.conns, conn.Id)
|
||
}
|
||
case ActEnd:
|
||
c.timerRnd++
|
||
timerRunning = false
|
||
c.isClose = true
|
||
for _, conn := range c.conns {
|
||
conn.isClose = true
|
||
conn.s.isClose = true
|
||
conn.r.isClose = true
|
||
close(conn.r.readable)
|
||
close(conn.s.writable)
|
||
}
|
||
close(c.accept)
|
||
c.conns = make(map[uint64]*Conn)
|
||
return
|
||
}
|
||
|
||
}
|
||
|
||
}
|
||
|
||
func (c *Conns) runTimer(rnd uint32) {
|
||
// runtime.LockOSThread()
|
||
for {
|
||
time.Sleep(20 * time.Millisecond)
|
||
// C.usleep(20 * 1000)
|
||
if rnd != c.timerRnd {
|
||
glog.V(0).Info("timer stop, round:", rnd, c.isDial)
|
||
}
|
||
if c.isClose || rnd != c.timerRnd {
|
||
return
|
||
}
|
||
c.input <- Input{typ: ActTimer}
|
||
}
|
||
}
|