mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
udp hangs
This commit is contained in:
parent
2a68ddb963
commit
f9a7c45e9a
1
go.mod
1
go.mod
|
@ -57,6 +57,7 @@ require (
|
|||
github.com/olivere/elastic/v7 v7.0.19
|
||||
github.com/peterh/liner v1.1.0
|
||||
github.com/pierrec/lz4 v2.2.7+incompatible // indirect
|
||||
github.com/pin/tftp v2.1.0+incompatible // indirect
|
||||
github.com/prometheus/client_golang v1.3.0
|
||||
github.com/rakyll/statik v0.1.7
|
||||
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -619,6 +619,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
|
|||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pierrec/lz4 v2.2.7+incompatible h1:Eerk9aiqeZo2QzsbWOAsELUf9ddvAxEdMY9LYze/DEc=
|
||||
github.com/pierrec/lz4 v2.2.7+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pin/tftp v2.1.0+incompatible h1:Yng4J7jv6lOc6IF4XoB5mnd3P7ZrF60XQq+my3FAMus=
|
||||
github.com/pin/tftp v2.1.0+incompatible/go.mod h1:xVpZOMCXTy+A5QMjEVN0Glwa1sUvaJhFXbr/aAxuxGY=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
|
|
@ -42,6 +42,7 @@ type BenchmarkOptions struct {
|
|||
masterClient *wdclient.MasterClient
|
||||
fsync *bool
|
||||
useTcp *bool
|
||||
useUdp *bool
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -68,7 +69,8 @@ func init() {
|
|||
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
|
||||
b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
|
||||
b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "write data via tcp")
|
||||
b.useUdp = cmdBenchmark.Flag.Bool("useUdp", false, "write data via udp")
|
||||
sharedBytes = make([]byte, 1024)
|
||||
}
|
||||
|
||||
|
@ -226,6 +228,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||
random := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
volumeTcpClient := wdclient.NewVolumeTcpClient()
|
||||
volumeUdpClient := wdclient.NewVolumeUdpClient()
|
||||
|
||||
for id := range idChan {
|
||||
start := time.Now()
|
||||
|
@ -255,6 +258,14 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||
} else {
|
||||
s.failed++
|
||||
}
|
||||
} else if *b.useUdp {
|
||||
if uploadByUdp(volumeUdpClient, fp) {
|
||||
fileIdLineChan <- fp.Fid
|
||||
s.completed++
|
||||
s.transferred += fileSize
|
||||
} else {
|
||||
s.failed++
|
||||
}
|
||||
} else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
|
||||
if random.Intn(100) < *b.deletePercentage {
|
||||
s.total++
|
||||
|
@ -352,6 +363,17 @@ func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePa
|
|||
return true
|
||||
}
|
||||
|
||||
func uploadByUdp(volumeUdpClient *wdclient.VolumeUdpClient, fp *operation.FilePart) bool {
|
||||
|
||||
err := volumeUdpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
|
||||
if err != nil {
|
||||
glog.Errorf("upload chunk err: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func readFileIds(fileName string, fileIdLineChan chan string) {
|
||||
file, err := os.Open(fileName) // For read access.
|
||||
if err != nil {
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/pin/tftp"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -256,6 +257,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
|||
// starting tcp server
|
||||
if *v.enableTcp {
|
||||
go v.startTcpService(volumeServer)
|
||||
go v.startUdpService(volumeServer)
|
||||
}
|
||||
|
||||
// starting the cluster http server
|
||||
|
@ -378,10 +380,10 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
|
|||
|
||||
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
|
||||
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
|
||||
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
|
||||
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "TCP at", listeningAddress)
|
||||
listener, e := util.NewListener(listeningAddress, 0)
|
||||
if e != nil {
|
||||
glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
|
||||
glog.Fatalf("Volume server TCP on %s:%v", listeningAddress, e)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
|
@ -394,3 +396,13 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
|
|||
go volumeServer.HandleTcpConnection(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
|
||||
tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler)
|
||||
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
|
||||
|
||||
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
|
||||
if e:= tftpServer.ListenAndServe(listeningAddress); e != nil {
|
||||
glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
|
||||
}
|
||||
}
|
||||
|
|
65
weed/server/volume_server_udp_handlers.go
Normal file
65
weed/server/volume_server_udp_handlers.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package weed_server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error {
|
||||
|
||||
volumeId, n, err := vs.parseFileId(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hasVolume := vs.store.HasVolume(volumeId)
|
||||
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
||||
|
||||
if hasVolume {
|
||||
if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if hasEcVolume {
|
||||
if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error {
|
||||
|
||||
if filename[0] == '-' {
|
||||
return vs.handleTcpDelete(filename[1:])
|
||||
}
|
||||
|
||||
volumeId, n, err := vs.parseFileId(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
volume := vs.store.GetVolume(volumeId)
|
||||
if volume == nil {
|
||||
return fmt.Errorf("volume %d not found", volumeId)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
written, err := wt.WriteTo(&buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = volume.StreamWrite(n, &buf, uint32(written))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
42
weed/wdclient/volume_udp_client.go
Normal file
42
weed/wdclient/volume_udp_client.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package wdclient
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/pin/tftp"
|
||||
"io"
|
||||
)
|
||||
|
||||
// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
|
||||
type VolumeUdpClient struct {
|
||||
udpClient *tftp.Client
|
||||
}
|
||||
|
||||
func NewVolumeUdpClient() *VolumeUdpClient {
|
||||
return &VolumeUdpClient{
|
||||
}
|
||||
}
|
||||
|
||||
func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
|
||||
|
||||
udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001)
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
}
|
||||
|
||||
if c.udpClient == nil {
|
||||
c.udpClient, err = tftp.NewClient(udpAddress)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
rf, err := c.udpClient.Send(fileId, "octet")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = rf.ReadFrom(fileReader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
Loading…
Reference in a new issue