package weed_server import ( "bufio" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "net" "strings" ) func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { defer c.Close() glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) bufReader := bufio.NewReaderSize(c, 1024*1024) bufWriter := bufio.NewWriterSize(c, 1024*1024) for { cmd, err := bufReader.ReadString('\n') if err != nil { glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) return } switch cmd[0] { case '+': fileId := cmd[1:] err = vs.handleTcpPut(fileId, bufReader) if err == nil { bufWriter.Write([]byte("+OK\n")) } else { bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) } case '-': fileId := cmd[1:] err = vs.handleTcpDelete(fileId) if err == nil { bufWriter.Write([]byte("+OK\n")) } else { bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) } case '?': fileId := cmd[1:] err = vs.handleTcpGet(fileId, bufWriter) case '!': bufWriter.Flush() } } } func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { volumeId, n, err2 := vs.parseFileId(fileId) if err2 != nil { return err2 } volume := vs.store.GetVolume(volumeId) if volume == nil { return fmt.Errorf("volume %d not found", volumeId) } err = volume.StreamRead(n, writer) if err != nil { return err } return nil } func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { volumeId, n, err2 := vs.parseFileId(fileId) if err2 != nil { return err2 } volume := vs.store.GetVolume(volumeId) if volume == nil { return fmt.Errorf("volume %d not found", volumeId) } sizeBuf := make([]byte, 4) if _, err = bufReader.Read(sizeBuf); err != nil { return err } dataSize := util.BytesToUint32(sizeBuf) err = volume.StreamWrite(n, bufReader, dataSize) if err != nil { return err } return nil } func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { volumeId, n, err2 := vs.parseFileId(fileId) if err2 != nil { return err2 } _, err = vs.store.DeleteVolumeNeedle(volumeId, n) if err != nil { return err } return nil } func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { commaIndex := strings.LastIndex(fileId, ",") if commaIndex <= 0 { return 0, nil, fmt.Errorf("unknown fileId %s", fileId) } vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] volumeId, ve := needle.NewVolumeId(vid) if ve != nil { return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) } n := new(needle.Needle) n.ParsePath(fid) return volumeId, n, nil }