mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
parent
e67096656b
commit
6608cb5f43
|
@ -88,6 +88,9 @@ spec:
|
||||||
{{- if .Values.volume.whiteList }}
|
{{- if .Values.volume.whiteList }}
|
||||||
-whiteList={{ .Values.volume.whiteList }} \
|
-whiteList={{ .Values.volume.whiteList }} \
|
||||||
{{- end }}
|
{{- end }}
|
||||||
|
{{- if .Values.volume.imagesFixOrientation }}
|
||||||
|
-images.fix.orientation \
|
||||||
|
{{- end }}
|
||||||
-ip=${POD_NAME}.${SEAWEEDFS_FULLNAME}-volume \
|
-ip=${POD_NAME}.${SEAWEEDFS_FULLNAME}-volume \
|
||||||
-compactionMBps={{ .Values.volume.compactionMBps }} \
|
-compactionMBps={{ .Values.volume.compactionMBps }} \
|
||||||
-mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
|
-mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
|
||||||
|
|
|
@ -123,6 +123,9 @@ volume:
|
||||||
# Comma separated Ip addresses having write permission. No limit if empty.
|
# Comma separated Ip addresses having write permission. No limit if empty.
|
||||||
whiteList: null
|
whiteList: null
|
||||||
|
|
||||||
|
# Adjust jpg orientation when uploading.
|
||||||
|
imagesFixOrientation: false
|
||||||
|
|
||||||
extraVolumes: ""
|
extraVolumes: ""
|
||||||
extraVolumeMounts: ""
|
extraVolumeMounts: ""
|
||||||
|
|
||||||
|
|
|
@ -93,6 +93,7 @@ func init() {
|
||||||
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
||||||
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
|
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
|
||||||
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
|
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
|
||||||
|
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
|
||||||
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
|
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
|
||||||
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
|
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
|
||||||
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
|
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
|
||||||
|
|
|
@ -47,6 +47,7 @@ type VolumeServerOptions struct {
|
||||||
rack *string
|
rack *string
|
||||||
whiteList []string
|
whiteList []string
|
||||||
indexType *string
|
indexType *string
|
||||||
|
fixJpgOrientation *bool
|
||||||
readRedirect *bool
|
readRedirect *bool
|
||||||
cpuProfile *string
|
cpuProfile *string
|
||||||
memProfile *string
|
memProfile *string
|
||||||
|
@ -70,6 +71,7 @@ func init() {
|
||||||
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
|
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
|
||||||
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
|
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
|
||||||
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
|
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
|
||||||
|
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
|
||||||
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
|
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
|
||||||
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
|
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
|
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
|
||||||
|
@ -200,7 +202,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||||
volumeNeedleMapKind,
|
volumeNeedleMapKind,
|
||||||
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
|
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
|
||||||
v.whiteList,
|
v.whiteList,
|
||||||
*v.readRedirect,
|
*v.fixJpgOrientation, *v.readRedirect,
|
||||||
*v.compactionMBPerSecond,
|
*v.compactionMBPerSecond,
|
||||||
*v.fileSizeLimitMB,
|
*v.fileSizeLimitMB,
|
||||||
)
|
)
|
||||||
|
|
182
weed/images/orientation.go
Normal file
182
weed/images/orientation.go
Normal file
|
@ -0,0 +1,182 @@
|
||||||
|
package images
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"image"
|
||||||
|
"image/draw"
|
||||||
|
"image/jpeg"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/goexif/exif"
|
||||||
|
)
|
||||||
|
|
||||||
|
//many code is copied from http://camlistore.org/pkg/images/images.go
|
||||||
|
func FixJpgOrientation(data []byte) (oriented []byte) {
|
||||||
|
ex, err := exif.Decode(bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
tag, err := ex.Get(exif.Orientation)
|
||||||
|
if err != nil {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
angle := 0
|
||||||
|
flipMode := FlipDirection(0)
|
||||||
|
orient, err := tag.Int(0)
|
||||||
|
if err != nil {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
switch orient {
|
||||||
|
case topLeftSide:
|
||||||
|
// do nothing
|
||||||
|
return data
|
||||||
|
case topRightSide:
|
||||||
|
flipMode = 2
|
||||||
|
case bottomRightSide:
|
||||||
|
angle = 180
|
||||||
|
case bottomLeftSide:
|
||||||
|
angle = 180
|
||||||
|
flipMode = 2
|
||||||
|
case leftSideTop:
|
||||||
|
angle = -90
|
||||||
|
flipMode = 2
|
||||||
|
case rightSideTop:
|
||||||
|
angle = -90
|
||||||
|
case rightSideBottom:
|
||||||
|
angle = 90
|
||||||
|
flipMode = 2
|
||||||
|
case leftSideBottom:
|
||||||
|
angle = 90
|
||||||
|
}
|
||||||
|
|
||||||
|
if srcImage, _, err := image.Decode(bytes.NewReader(data)); err == nil {
|
||||||
|
dstImage := flip(rotate(srcImage, angle), flipMode)
|
||||||
|
var buf bytes.Buffer
|
||||||
|
jpeg.Encode(&buf, dstImage, nil)
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exif Orientation Tag values
|
||||||
|
// http://sylvana.net/jpegcrop/exif_orientation.html
|
||||||
|
const (
|
||||||
|
topLeftSide = 1
|
||||||
|
topRightSide = 2
|
||||||
|
bottomRightSide = 3
|
||||||
|
bottomLeftSide = 4
|
||||||
|
leftSideTop = 5
|
||||||
|
rightSideTop = 6
|
||||||
|
rightSideBottom = 7
|
||||||
|
leftSideBottom = 8
|
||||||
|
)
|
||||||
|
|
||||||
|
// The FlipDirection type is used by the Flip option in DecodeOpts
|
||||||
|
// to indicate in which direction to flip an image.
|
||||||
|
type FlipDirection int
|
||||||
|
|
||||||
|
// FlipVertical and FlipHorizontal are two possible FlipDirections
|
||||||
|
// values to indicate in which direction an image will be flipped.
|
||||||
|
const (
|
||||||
|
FlipVertical FlipDirection = 1 << iota
|
||||||
|
FlipHorizontal
|
||||||
|
)
|
||||||
|
|
||||||
|
type DecodeOpts struct {
|
||||||
|
// Rotate specifies how to rotate the image.
|
||||||
|
// If nil, the image is rotated automatically based on EXIF metadata.
|
||||||
|
// If an int, Rotate is the number of degrees to rotate
|
||||||
|
// counter clockwise and must be one of 0, 90, -90, 180, or
|
||||||
|
// -180.
|
||||||
|
Rotate interface{}
|
||||||
|
|
||||||
|
// Flip specifies how to flip the image.
|
||||||
|
// If nil, the image is flipped automatically based on EXIF metadata.
|
||||||
|
// Otherwise, Flip is a FlipDirection bitfield indicating how to flip.
|
||||||
|
Flip interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func rotate(im image.Image, angle int) image.Image {
|
||||||
|
var rotated *image.NRGBA
|
||||||
|
// trigonometric (i.e counter clock-wise)
|
||||||
|
switch angle {
|
||||||
|
case 90:
|
||||||
|
newH, newW := im.Bounds().Dx(), im.Bounds().Dy()
|
||||||
|
rotated = image.NewNRGBA(image.Rect(0, 0, newW, newH))
|
||||||
|
for y := 0; y < newH; y++ {
|
||||||
|
for x := 0; x < newW; x++ {
|
||||||
|
rotated.Set(x, y, im.At(newH-1-y, x))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case -90:
|
||||||
|
newH, newW := im.Bounds().Dx(), im.Bounds().Dy()
|
||||||
|
rotated = image.NewNRGBA(image.Rect(0, 0, newW, newH))
|
||||||
|
for y := 0; y < newH; y++ {
|
||||||
|
for x := 0; x < newW; x++ {
|
||||||
|
rotated.Set(x, y, im.At(y, newW-1-x))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case 180, -180:
|
||||||
|
newW, newH := im.Bounds().Dx(), im.Bounds().Dy()
|
||||||
|
rotated = image.NewNRGBA(image.Rect(0, 0, newW, newH))
|
||||||
|
for y := 0; y < newH; y++ {
|
||||||
|
for x := 0; x < newW; x++ {
|
||||||
|
rotated.Set(x, y, im.At(newW-1-x, newH-1-y))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return im
|
||||||
|
}
|
||||||
|
return rotated
|
||||||
|
}
|
||||||
|
|
||||||
|
// flip returns a flipped version of the image im, according to
|
||||||
|
// the direction(s) in dir.
|
||||||
|
// It may flip the imput im in place and return it, or it may allocate a
|
||||||
|
// new NRGBA (if im is an *image.YCbCr).
|
||||||
|
func flip(im image.Image, dir FlipDirection) image.Image {
|
||||||
|
if dir == 0 {
|
||||||
|
return im
|
||||||
|
}
|
||||||
|
ycbcr := false
|
||||||
|
var nrgba image.Image
|
||||||
|
dx, dy := im.Bounds().Dx(), im.Bounds().Dy()
|
||||||
|
di, ok := im.(draw.Image)
|
||||||
|
if !ok {
|
||||||
|
if _, ok := im.(*image.YCbCr); !ok {
|
||||||
|
log.Printf("failed to flip image: input does not satisfy draw.Image")
|
||||||
|
return im
|
||||||
|
}
|
||||||
|
// because YCbCr does not implement Set, we replace it with a new NRGBA
|
||||||
|
ycbcr = true
|
||||||
|
nrgba = image.NewNRGBA(image.Rect(0, 0, dx, dy))
|
||||||
|
di, ok = nrgba.(draw.Image)
|
||||||
|
if !ok {
|
||||||
|
log.Print("failed to flip image: could not cast an NRGBA to a draw.Image")
|
||||||
|
return im
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dir&FlipHorizontal != 0 {
|
||||||
|
for y := 0; y < dy; y++ {
|
||||||
|
for x := 0; x < dx/2; x++ {
|
||||||
|
old := im.At(x, y)
|
||||||
|
di.Set(x, y, im.At(dx-1-x, y))
|
||||||
|
di.Set(dx-1-x, y, old)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dir&FlipVertical != 0 {
|
||||||
|
for y := 0; y < dy/2; y++ {
|
||||||
|
for x := 0; x < dx; x++ {
|
||||||
|
old := im.At(x, y)
|
||||||
|
di.Set(x, y, im.At(x, dy-1-y))
|
||||||
|
di.Set(x, dy-1-y, old)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ycbcr {
|
||||||
|
return nrgba
|
||||||
|
}
|
||||||
|
return im
|
||||||
|
}
|
20
weed/images/orientation_test.go
Normal file
20
weed/images/orientation_test.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package images
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestXYZ(t *testing.T) {
|
||||||
|
fname := "sample1.jpg"
|
||||||
|
|
||||||
|
dat, _ := ioutil.ReadFile(fname)
|
||||||
|
|
||||||
|
fixed_data := FixJpgOrientation(dat)
|
||||||
|
|
||||||
|
ioutil.WriteFile("fixed1.jpg", fixed_data, 0644)
|
||||||
|
|
||||||
|
os.Remove("fixed1.jpg")
|
||||||
|
|
||||||
|
}
|
29
weed/images/preprocess.go
Normal file
29
weed/images/preprocess.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package images
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Preprocess image files on client side.
|
||||||
|
* 1. possibly adjust the orientation
|
||||||
|
* 2. resize the image to a width or height limit
|
||||||
|
* 3. remove the exif data
|
||||||
|
* Call this function on any file uploaded to SeaweedFS
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
func MaybePreprocessImage(filename string, data []byte, width, height int) (resized io.ReadSeeker, w int, h int) {
|
||||||
|
ext := filepath.Ext(filename)
|
||||||
|
ext = strings.ToLower(ext)
|
||||||
|
switch ext {
|
||||||
|
case ".png", ".gif":
|
||||||
|
return Resized(ext, bytes.NewReader(data), width, height, "")
|
||||||
|
case ".jpg", ".jpeg":
|
||||||
|
data = FixJpgOrientation(data)
|
||||||
|
return Resized(ext, bytes.NewReader(data), width, height, "")
|
||||||
|
}
|
||||||
|
return bytes.NewReader(data), 0, 0
|
||||||
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
package pb
|
package pb
|
||||||
|
|
||||||
const (
|
const (
|
||||||
AdminShellClient = "adminShell"
|
AdminShellClient = "shell"
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,6 +25,7 @@ type VolumeServer struct {
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
|
|
||||||
needleMapKind storage.NeedleMapType
|
needleMapKind storage.NeedleMapType
|
||||||
|
FixJpgOrientation bool
|
||||||
ReadRedirect bool
|
ReadRedirect bool
|
||||||
compactionBytePerSecond int64
|
compactionBytePerSecond int64
|
||||||
MetricsAddress string
|
MetricsAddress string
|
||||||
|
@ -39,6 +40,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||||
masterNodes []string, pulseSeconds int,
|
masterNodes []string, pulseSeconds int,
|
||||||
dataCenter string, rack string,
|
dataCenter string, rack string,
|
||||||
whiteList []string,
|
whiteList []string,
|
||||||
|
fixJpgOrientation bool,
|
||||||
readRedirect bool,
|
readRedirect bool,
|
||||||
compactionMBPerSecond int,
|
compactionMBPerSecond int,
|
||||||
fileSizeLimitMB int,
|
fileSizeLimitMB int,
|
||||||
|
@ -59,6 +61,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||||
dataCenter: dataCenter,
|
dataCenter: dataCenter,
|
||||||
rack: rack,
|
rack: rack,
|
||||||
needleMapKind: needleMapKind,
|
needleMapKind: needleMapKind,
|
||||||
|
FixJpgOrientation: fixJpgOrientation,
|
||||||
ReadRedirect: readRedirect,
|
ReadRedirect: readRedirect,
|
||||||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
|
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
|
||||||
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
|
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.fileSizeLimitBytes)
|
reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
|
||||||
if ne != nil {
|
if ne != nil {
|
||||||
writeJsonError(w, r, http.StatusBadRequest, ne)
|
writeJsonError(w, r, http.StatusBadRequest, ne)
|
||||||
return
|
return
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/images"
|
||||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ func (n *Needle) String() (str string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateNeedleFromRequest(r *http.Request, sizeLimit int64) (n *Needle, originalSize int, e error) {
|
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
|
||||||
n = new(Needle)
|
n = new(Needle)
|
||||||
pu, e := ParseUpload(r, sizeLimit)
|
pu, e := ParseUpload(r, sizeLimit)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
|
@ -94,6 +95,13 @@ func CreateNeedleFromRequest(r *http.Request, sizeLimit int64) (n *Needle, origi
|
||||||
n.SetIsChunkManifest()
|
n.SetIsChunkManifest()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if fixJpgOrientation {
|
||||||
|
loweredName := strings.ToLower(pu.FileName)
|
||||||
|
if pu.MimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
|
||||||
|
n.Data = images.FixJpgOrientation(n.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
n.Checksum = NewCRC(n.Data)
|
n.Checksum = NewCRC(n.Data)
|
||||||
|
|
||||||
commaSep := strings.LastIndex(r.URL.Path, ",")
|
commaSep := strings.LastIndex(r.URL.Path, ",")
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (mc *MasterClient) WaitUntilConnected() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MasterClient) KeepConnectedToMaster() {
|
func (mc *MasterClient) KeepConnectedToMaster() {
|
||||||
glog.V(1).Infof("%s masterClient bootstraps with masters %v", mc.clientType, mc.masters)
|
glog.V(1).Infof("%s bootstraps with masters %v", mc.clientType, mc.masters)
|
||||||
for {
|
for {
|
||||||
mc.tryAllMasters()
|
mc.tryAllMasters()
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
@ -67,27 +67,27 @@ func (mc *MasterClient) tryAllMasters() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
|
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
|
||||||
glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
|
glog.V(1).Infof("%s Connecting to master %v", mc.clientType, master)
|
||||||
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||||
|
|
||||||
stream, err := client.KeepConnected(context.Background())
|
stream, err := client.KeepConnected(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
|
glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil {
|
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil {
|
||||||
glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err)
|
glog.V(0).Infof("%s failed to send to %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("%s masterClient Connected to %v", mc.clientType, master)
|
glog.V(1).Infof("%s Connected to %v", mc.clientType, master)
|
||||||
mc.currentMaster = master
|
mc.currentMaster = master
|
||||||
|
|
||||||
for {
|
for {
|
||||||
volumeLocation, err := stream.Recv()
|
volumeLocation, err := stream.Recv()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infof("%s masterClient failed to receive from %s: %v", mc.clientType, master, err)
|
glog.V(0).Infof("%s failed to receive from %s: %v", mc.clientType, master, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,18 +104,18 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
|
||||||
PublicUrl: volumeLocation.PublicUrl,
|
PublicUrl: volumeLocation.PublicUrl,
|
||||||
}
|
}
|
||||||
for _, newVid := range volumeLocation.NewVids {
|
for _, newVid := range volumeLocation.NewVids {
|
||||||
glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)
|
glog.V(1).Infof("%s: %s adds volume %d", mc.clientType, loc.Url, newVid)
|
||||||
mc.addLocation(newVid, loc)
|
mc.addLocation(newVid, loc)
|
||||||
}
|
}
|
||||||
for _, deletedVid := range volumeLocation.DeletedVids {
|
for _, deletedVid := range volumeLocation.DeletedVids {
|
||||||
glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
|
glog.V(1).Infof("%s: %s removes volume %d", mc.clientType, loc.Url, deletedVid)
|
||||||
mc.deleteLocation(deletedVid, loc)
|
mc.deleteLocation(deletedVid, loc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
if gprcErr != nil {
|
if gprcErr != nil {
|
||||||
glog.V(0).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr)
|
glog.V(0).Infof("%s failed to connect with master %v: %v", mc.clientType, master, gprcErr)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue