diff --git a/weed/command/server.go b/weed/command/server.go index c47b7fa5d..8dfa63e34 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -130,7 +130,6 @@ func init() { serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") - serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, " enable tcp port") serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 3ad8ba1bb..aa300108a 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -65,7 +65,6 @@ type VolumeServerOptions struct { preStopSeconds *int metricsHttpPort *int // pulseSeconds *int - enableTcp *bool inflightUploadDataTimeout *time.Duration } @@ -96,7 +95,6 @@ func init() { v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") - v.enableTcp = cmdVolume.Flag.Bool("tcp", false, " enable tcp port") v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") } @@ -258,11 +256,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } - // starting tcp server - if *v.enableTcp { - go v.startTcpService(volumeServer) - } - // starting the cluster http server clusterHttpServer := v.startClusterHttpService(volumeMux) @@ -388,22 +381,3 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd }() return clusterHttpServer } - -func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { - listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000) - glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) - listener, e := util.NewListener(listeningAddress, 0) - if e != nil { - glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e) - } - defer listener.Close() - - for { - c, err := listener.Accept() - if err != nil { - fmt.Println(err) - return - } - go volumeServer.HandleTcpConnection(c) - } -} diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go deleted file mode 100644 index fb2623a2c..000000000 --- a/weed/server/volume_server_tcp_handlers_write.go +++ /dev/null @@ -1,138 +0,0 @@ -package weed_server - -import ( - "bufio" - "fmt" - "io" - "net" - "strings" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "github.com/seaweedfs/seaweedfs/weed/util" -) - -func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { - defer c.Close() - - glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) - - bufReader := bufio.NewReaderSize(c, 1024*1024) - bufWriter := bufio.NewWriterSize(c, 1024*1024) - - for { - cmd, err := bufReader.ReadString('\n') - if err != nil { - if err != io.EOF { - glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) - } - return - } - cmd = cmd[:len(cmd)-1] - switch cmd[0] { - case '+': - fileId := cmd[1:] - err = vs.handleTcpPut(fileId, bufReader) - if err == nil { - bufWriter.Write([]byte("+OK\n")) - } else { - bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) - } - case '-': - fileId := cmd[1:] - err = vs.handleTcpDelete(fileId) - if err == nil { - bufWriter.Write([]byte("+OK\n")) - } else { - bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) - } - case '?': - fileId := cmd[1:] - err = vs.handleTcpGet(fileId, bufWriter) - case '!': - bufWriter.Flush() - } - - } - -} - -func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - volume := vs.store.GetVolume(volumeId) - if volume == nil { - return fmt.Errorf("volume %d not found", volumeId) - } - - err = volume.StreamRead(n, writer) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - volume := vs.store.GetVolume(volumeId) - if volume == nil { - return fmt.Errorf("volume %d not found", volumeId) - } - - sizeBuf := make([]byte, 4) - if _, err = bufReader.Read(sizeBuf); err != nil { - return err - } - dataSize := util.BytesToUint32(sizeBuf) - - err = volume.StreamWrite(n, bufReader, dataSize) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - _, err = vs.store.DeleteVolumeNeedle(volumeId, n) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { - - commaIndex := strings.LastIndex(fileId, ",") - if commaIndex <= 0 { - return 0, nil, fmt.Errorf("unknown fileId %s", fileId) - } - - vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] - - volumeId, ve := needle.NewVolumeId(vid) - if ve != nil { - return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) - } - - n := new(needle.Needle) - n.ParsePath(fid) - return volumeId, n, nil -} diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go deleted file mode 100644 index 2496387ff..000000000 --- a/weed/storage/volume_stream_write.go +++ /dev/null @@ -1,105 +0,0 @@ -package storage - -import ( - "bufio" - "fmt" - "io" - "time" - - "github.com/seaweedfs/seaweedfs/weed/util" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" -) - -func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { - - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - - df, ok := v.DataBackend.(*backend.DiskFile) - if !ok { - return fmt.Errorf("unexpected volume backend") - } - offset, _, _ := v.DataBackend.GetStat() - - header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation - CookieToBytes(header[0:CookieSize], n.Cookie) - NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) - n.Size = 4 + Size(dataSize) + 1 - SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - - n.DataSize = dataSize - - // needle header - df.Write(header[0:NeedleHeaderSize]) - - // data size and data - util.Uint32toBytes(header[0:4], n.DataSize) - df.Write(header[0:4]) - // write and calculate CRC - crcWriter := needle.NewCRCwriter(df) - io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) - - // flags - util.Uint8toBytes(header[0:1], n.Flags) - df.Write(header[0:1]) - - // data checksum - util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) - // write timestamp, padding - n.AppendAtNs = uint64(time.Now().UnixNano()) - util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) - padding := needle.PaddingLength(n.Size, needle.Version3) - df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) - - // add to needle map - if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - return -} - -func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { - - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset.IsZero() { - return ErrorNotFound - } - - sr := &StreamReader{ - readerAt: v.DataBackend, - offset: nv.Offset.ToActualOffset(), - } - bufReader := bufio.NewReader(sr) - bufReader.Discard(NeedleHeaderSize) - sizeBuf := make([]byte, 4) - bufReader.Read(sizeBuf) - if _, err = writer.Write(sizeBuf); err != nil { - return err - } - dataSize := util.BytesToUint32(sizeBuf) - - _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) - - return -} - -type StreamReader struct { - offset int64 - readerAt io.ReaderAt -} - -func (sr *StreamReader) Read(p []byte) (n int, err error) { - n, err = sr.readerAt.ReadAt(p, sr.offset) - if err != nil { - return - } - sr.offset += int64(n) - return -}