From 56953b207d771a0bf8b799576d8a6e50bc2cd5af Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 13 Mar 2021 10:59:29 -0800 Subject: [PATCH] compiles, but fail to send --- go.mod | 1 + go.sum | 2 + weed/command/volume.go | 11 +++-- weed/server/volume_server_udp_handlers.go | 59 ++++++++++++++--------- weed/wdclient/volume_udp_client.go | 21 +++----- 5 files changed, 55 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 08b41cb09..c8a914ab0 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect + pack.ag/tftp v1.0.0 // indirect ) // replace github.com/seaweedfs/fuse => /Users/chris/go/src/github.com/seaweedfs/fuse diff --git a/go.sum b/go.sum index e66b0ecb9..ab1ef4739 100644 --- a/go.sum +++ b/go.sum @@ -1243,6 +1243,8 @@ modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6 modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc= modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= +pack.ag/tftp v1.0.0 h1:q7iP8mKRtqTAWfxbQ4XY5/flZ5JmuThvrEmHPn8cN9A= +pack.ag/tftp v1.0.0/go.mod h1:N1Pyo5YG+K90XHoR2vfLPhpRuE8ziqbgMn/r/SghZas= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/weed/command/volume.go b/weed/command/volume.go index 002227a10..0f3dba361 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -29,7 +29,7 @@ import ( stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/pin/tftp" + "pack.ag/tftp" ) var ( @@ -398,11 +398,16 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer } func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) { - tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler) listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001) + tftpServer, err := tftp.NewServer(listeningAddress) + if err != nil { + glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err) + } + tftpServer.WriteHandler(volumeServer) + tftpServer.ReadHandler(volumeServer) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress) - if e:= tftpServer.ListenAndServe(listeningAddress); e != nil { + if e:= tftpServer.ListenAndServe(); e != nil { glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e) } } diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go index 99c43e3f4..e0dc94310 100644 --- a/weed/server/volume_server_udp_handlers.go +++ b/weed/server/volume_server_udp_handlers.go @@ -1,16 +1,18 @@ package weed_server import ( - "bytes" - "fmt" - "io" + "github.com/chrislusf/seaweedfs/weed/glog" + "pack.ag/tftp" ) -func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error { +func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) { + + filename := r.Name() volumeId, n, err := vs.parseFileId(filename) if err != nil { - return err + glog.Errorf("parse file id %s: %v", filename, err) + return } hasVolume := vs.store.HasVolume(volumeId) @@ -18,48 +20,59 @@ func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error if hasVolume { if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { - return err + glog.Errorf("ReadVolumeNeedle %s: %v", filename, err) + return } } if hasEcVolume { if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil { - return err + glog.Errorf("ReadEcShardNeedle %s: %v", filename, err) + return } } - if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil { - return err + if _, err = r.Write(n.Data); err != nil { + glog.Errorf("UDP Write data %s: %v", filename, err) + return } - return nil } -func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error { +func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) { + + filename := w.Name() + + // Get the file size + size, err := w.Size() + + // Note: The size value is sent by the client, the client could send more data than + // it indicated in the size option. To be safe we'd want to allocate a buffer + // with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll. if filename[0] == '-' { - return vs.handleTcpDelete(filename[1:]) + err = vs.handleTcpDelete(filename[1:]) + if err != nil { + glog.Errorf("handleTcpDelete %s: %v", filename, err) + return + } } volumeId, n, err := vs.parseFileId(filename) if err != nil { - return err + glog.Errorf("parse file id %s: %v", filename, err) + return } volume := vs.store.GetVolume(volumeId) if volume == nil { - return fmt.Errorf("volume %d not found", volumeId) + glog.Errorf("volume %d not found", volumeId) + return } - var buf bytes.Buffer - written, err := wt.WriteTo(&buf) + err = volume.StreamWrite(n, w, uint32(size)) if err != nil { - return err + glog.Errorf("StreamWrite %s: %v", filename, err) + return } - err = volume.StreamWrite(n, &buf, uint32(written)) - if err != nil { - return err - } - - return nil } diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go index 0b7e8cf6d..2daf37bfc 100644 --- a/weed/wdclient/volume_udp_client.go +++ b/weed/wdclient/volume_udp_client.go @@ -1,14 +1,14 @@ package wdclient import ( + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/pin/tftp" + "pack.ag/tftp" "io" ) // VolumeTcpClient put/get/delete file chunks directly on volume servers without replication type VolumeUdpClient struct { - udpClient *tftp.Client } func NewVolumeUdpClient() *VolumeUdpClient { @@ -23,18 +23,13 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string return parseErr } - if c.udpClient == nil { - c.udpClient, err = tftp.NewClient(udpAddress) - if err != nil { - return - } - } - rf, err := c.udpClient.Send(fileId, "octet") - if err != nil { - return - } - _, err = rf.ReadFrom(fileReader) + udpClient, _ := tftp.NewClient() + + fileUrl := "tftp://"+udpAddress+"/"+fileId + + err = udpClient.Put(fileUrl, fileReader, int64(fileSize)) if err != nil { + glog.Errorf("udp put %s: %v", fileUrl, err) return }