diff --git a/weed/command/volume.go b/weed/command/volume.go index 1468aa2ee..ab3c63a9a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -417,7 +417,7 @@ func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeSer conn, err := listener.Accept() if err == nil { glog.V(0).Infof("Client from %s", conn.RemoteAddr()) - go volumeServer.HandleTcpConnection(conn) + go volumeServer.HandleUdpConnection(conn) } else if isTemporaryError(err) { continue } else { diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go index 2f7563f4c..b1f95446c 100644 --- a/weed/server/volume_server_udp_handlers.go +++ b/weed/server/volume_server_udp_handlers.go @@ -1,81 +1,48 @@ package weed_server import ( + "bufio" "github.com/chrislusf/seaweedfs/weed/glog" - "pack.ag/tftp" + "io" + "net" ) -func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) { +func (vs *VolumeServer) HandleUdpConnection(c net.Conn) { + defer c.Close() - filename := r.Name() + glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) - volumeId, n, err := vs.parseFileId(filename) - if err != nil { - glog.Errorf("parse file id %s: %v", filename, err) - return - } + bufReader := bufio.NewReaderSize(c, 1024*1024) + bufWriter := bufio.NewWriterSize(c, 1024*1024) - hasVolume := vs.store.HasVolume(volumeId) - _, hasEcVolume := vs.store.FindEcVolume(volumeId) - - if hasVolume { - if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { - glog.Errorf("ReadVolumeNeedle %s: %v", filename, err) - return - } - } - if hasEcVolume { - if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil { - glog.Errorf("ReadEcShardNeedle %s: %v", filename, err) - return - } - } - - if _, err = r.Write(n.Data); err != nil { - glog.Errorf("UDP Write data %s: %v", filename, err) - return - } - -} - -func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) { - - filename := w.Name() - println("+ ", filename) - - // Get the file size - size, err := w.Size() - - // Note: The size value is sent by the client, the client could send more data than - // it indicated in the size option. To be safe we'd want to allocate a buffer - // with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll. - - if filename[0] == '-' { - err = vs.handleTcpDelete(filename[1:]) + for { + cmd, err := bufReader.ReadString('\n') if err != nil { - glog.Errorf("handleTcpDelete %s: %v", filename, err) + if err != io.EOF { + glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + } return } - } + cmd = cmd[:len(cmd)-1] + switch cmd[0] { + case '+': + fileId := cmd[1:] + err = vs.handleTcpPut(fileId, bufReader) + if err != nil { + glog.Errorf("put %s: %v", fileId, err) + } + case '-': + fileId := cmd[1:] + err = vs.handleTcpDelete(fileId) + if err != nil { + glog.Errorf("del %s: %v", fileId, err) + } + case '?': + fileId := cmd[1:] + err = vs.handleTcpGet(fileId, bufWriter) + case '!': + } - volumeId, n, err := vs.parseFileId(filename) - if err != nil { - glog.Errorf("parse file id %s: %v", filename, err) - return } - volume := vs.store.GetVolume(volumeId) - if volume == nil { - glog.Errorf("volume %d not found", volumeId) - return - } - - err = volume.StreamWrite(n, w, uint32(size)) - if err != nil { - glog.Errorf("StreamWrite %s: %v", filename, err) - return - } - - println("- ", filename) - } diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go index 93fd2b227..470d7a82d 100644 --- a/weed/wdclient/volume_udp_client.go +++ b/weed/wdclient/volume_udp_client.go @@ -2,9 +2,6 @@ package wdclient import ( "bufio" - "bytes" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/udptransfer" "github.com/chrislusf/seaweedfs/weed/util" @@ -70,45 +67,41 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string return parseErr } - c.cp.Register("udp", udpAddress) - udpConn, getErr := c.cp.Get("udp", udpAddress) - if getErr != nil { - return fmt.Errorf("get connection to %s: %v", udpAddress, getErr) + listener, err := udptransfer.NewEndpoint(&udptransfer.Params{ + LocalAddr: "", + Bandwidth: 100, + FastRetransmit: true, + FlatTraffic: true, + IsServ: false, + }) + if err != nil { + return err } - conn := udpConn.RawConn().(*VolumeUdpConn) - defer func() { - if err != nil { - udpConn.DiscardConnection() - } else { - udpConn.ReleaseConnection() - } - }() + + conn, err := listener.Dial(udpAddress) + if err != nil { + return err + } + defer conn.Close() + + bufWriter := bufio.NewWriter(conn) buf := []byte("+" + fileId + "\n") - _, err = conn.bufWriter.Write([]byte(buf)) + _, err = bufWriter.Write([]byte(buf)) if err != nil { return } util.Uint32toBytes(buf[0:4], fileSize) - _, err = conn.bufWriter.Write(buf[0:4]) + _, err = bufWriter.Write(buf[0:4]) if err != nil { return } - _, err = io.Copy(conn.bufWriter, fileReader) + _, err = io.Copy(bufWriter, fileReader) if err != nil { return } - conn.bufWriter.Write([]byte("!\n")) - conn.bufWriter.Flush() - - ret, _, err := conn.bufReader.ReadLine() - if err != nil { - glog.V(0).Infof("upload by udp: %v", err) - return - } - if !bytes.HasPrefix(ret, []byte("+OK")) { - glog.V(0).Infof("upload by udp: %v", string(ret)) - } + bufWriter.Write([]byte("!\n")) + bufWriter.Flush() return nil }