mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
compiles, but fail to send
This commit is contained in:
parent
f9a7c45e9a
commit
56953b207d
1
go.mod
1
go.mod
|
@ -99,6 +99,7 @@ require (
|
||||||
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
|
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
|
||||||
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
|
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
|
||||||
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
|
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
|
||||||
|
pack.ag/tftp v1.0.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
// replace github.com/seaweedfs/fuse => /Users/chris/go/src/github.com/seaweedfs/fuse
|
// replace github.com/seaweedfs/fuse => /Users/chris/go/src/github.com/seaweedfs/fuse
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -1243,6 +1243,8 @@ modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6
|
||||||
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
|
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
|
||||||
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
|
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
|
||||||
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
|
||||||
|
pack.ag/tftp v1.0.0 h1:q7iP8mKRtqTAWfxbQ4XY5/flZ5JmuThvrEmHPn8cN9A=
|
||||||
|
pack.ag/tftp v1.0.0/go.mod h1:N1Pyo5YG+K90XHoR2vfLPhpRuE8ziqbgMn/r/SghZas=
|
||||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||||
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
|
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/pin/tftp"
|
"pack.ag/tftp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -398,11 +398,16 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
|
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
|
||||||
tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler)
|
|
||||||
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
|
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
|
||||||
|
tftpServer, err := tftp.NewServer(listeningAddress)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err)
|
||||||
|
}
|
||||||
|
tftpServer.WriteHandler(volumeServer)
|
||||||
|
tftpServer.ReadHandler(volumeServer)
|
||||||
|
|
||||||
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
|
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
|
||||||
if e:= tftpServer.ListenAndServe(listeningAddress); e != nil {
|
if e:= tftpServer.ListenAndServe(); e != nil {
|
||||||
glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
|
glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,18 @@
|
||||||
package weed_server
|
package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"fmt"
|
"pack.ag/tftp"
|
||||||
"io"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error {
|
func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) {
|
||||||
|
|
||||||
|
filename := r.Name()
|
||||||
|
|
||||||
volumeId, n, err := vs.parseFileId(filename)
|
volumeId, n, err := vs.parseFileId(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
glog.Errorf("parse file id %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
hasVolume := vs.store.HasVolume(volumeId)
|
hasVolume := vs.store.HasVolume(volumeId)
|
||||||
|
@ -18,48 +20,59 @@ func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error
|
||||||
|
|
||||||
if hasVolume {
|
if hasVolume {
|
||||||
if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
|
if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
|
||||||
return err
|
glog.Errorf("ReadVolumeNeedle %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if hasEcVolume {
|
if hasEcVolume {
|
||||||
if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
|
if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
|
||||||
return err
|
glog.Errorf("ReadEcShardNeedle %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil {
|
if _, err = r.Write(n.Data); err != nil {
|
||||||
return err
|
glog.Errorf("UDP Write data %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error {
|
func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) {
|
||||||
|
|
||||||
|
filename := w.Name()
|
||||||
|
|
||||||
|
// Get the file size
|
||||||
|
size, err := w.Size()
|
||||||
|
|
||||||
|
// Note: The size value is sent by the client, the client could send more data than
|
||||||
|
// it indicated in the size option. To be safe we'd want to allocate a buffer
|
||||||
|
// with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll.
|
||||||
|
|
||||||
if filename[0] == '-' {
|
if filename[0] == '-' {
|
||||||
return vs.handleTcpDelete(filename[1:])
|
err = vs.handleTcpDelete(filename[1:])
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("handleTcpDelete %s: %v", filename, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeId, n, err := vs.parseFileId(filename)
|
volumeId, n, err := vs.parseFileId(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
glog.Errorf("parse file id %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
volume := vs.store.GetVolume(volumeId)
|
volume := vs.store.GetVolume(volumeId)
|
||||||
if volume == nil {
|
if volume == nil {
|
||||||
return fmt.Errorf("volume %d not found", volumeId)
|
glog.Errorf("volume %d not found", volumeId)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
err = volume.StreamWrite(n, w, uint32(size))
|
||||||
written, err := wt.WriteTo(&buf)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
glog.Errorf("StreamWrite %s: %v", filename, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = volume.StreamWrite(n, &buf, uint32(written))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
package wdclient
|
package wdclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/pin/tftp"
|
"pack.ag/tftp"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
|
// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
|
||||||
type VolumeUdpClient struct {
|
type VolumeUdpClient struct {
|
||||||
udpClient *tftp.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVolumeUdpClient() *VolumeUdpClient {
|
func NewVolumeUdpClient() *VolumeUdpClient {
|
||||||
|
@ -23,18 +23,13 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
|
||||||
return parseErr
|
return parseErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.udpClient == nil {
|
udpClient, _ := tftp.NewClient()
|
||||||
c.udpClient, err = tftp.NewClient(udpAddress)
|
|
||||||
if err != nil {
|
fileUrl := "tftp://"+udpAddress+"/"+fileId
|
||||||
return
|
|
||||||
}
|
err = udpClient.Put(fileUrl, fileReader, int64(fileSize))
|
||||||
}
|
|
||||||
rf, err := c.udpClient.Send(fileId, "octet")
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, err = rf.ReadFrom(fileReader)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf("udp put %s: %v", fileUrl, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue