From d493d626ba3534686dfaabc5c49ee62d6fe9720a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 21 Feb 2021 19:27:38 -0800 Subject: [PATCH 01/32] use basic file object to parse path fix https://github.com/chrislusf/seaweedfs/issues/1825 --- .../java/seaweedfs/client/FilerClient.java | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 58269d41f..c2ffe0ac6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -4,8 +4,7 @@ import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -94,9 +93,9 @@ public class FilerClient extends FilerGrpcClient { if ("/".equals(path)) { return true; } - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent(); + String name = pathFile.getName(); mkdirs(parent, mode, uid, gid, userName, groupNames); @@ -115,13 +114,13 @@ public class FilerClient extends FilerGrpcClient { public boolean mv(String oldPath, String newPath) { - Path oldPathObject = Paths.get(oldPath); - String oldParent = oldPathObject.getParent().toString(); - String oldName = oldPathObject.getFileName().toString(); + File oldPathFile = new File(oldPath); + String oldParent = oldPathFile.getParent(); + String oldName = oldPathFile.getName(); - Path newPathObject = Paths.get(newPath); - String newParent = newPathObject.getParent().toString(); - String newName = newPathObject.getFileName().toString(); + File newPathFile = new File(newPath); + String newParent = newPathFile.getParent(); + String newName = newPathFile.getName(); return atomicRenameEntry(oldParent, oldName, newParent, newName); @@ -129,9 +128,9 @@ public class FilerClient extends FilerGrpcClient { public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent(); + String name = pathFile.getName(); return deleteEntry( parent, @@ -148,9 +147,9 @@ public class FilerClient extends FilerGrpcClient { public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent(); + String name = pathFile.getName(); FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { From 151c281f36f1523593ad2777fd66c347934d73fb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 21 Feb 2021 19:29:27 -0800 Subject: [PATCH 02/32] 2.27 --- k8s/seaweedfs/Chart.yaml | 4 ++-- k8s/seaweedfs/values.yaml | 2 +- weed/util/constants.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index a16bb0133..ac56e337c 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.26" -version: 2.26 +appVersion: "2.27" +version: 2.27 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 6dc182b29..f7cc521c8 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.26" - started using {.Chart.appVersion} + # imageTag: "2.27" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/util/constants.go b/weed/util/constants.go index 1900d3d42..908f782ae 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 26) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 27) COMMIT = "" ) From 1c233ad986bbbf5de3394c734a1bafebaf67b9e0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 00:28:42 -0800 Subject: [PATCH 03/32] refactoring --- weed/shell/command_ec_common.go | 11 ++--- weed/shell/command_ec_decode.go | 6 +-- weed/shell/command_ec_encode.go | 11 ++--- weed/shell/command_volume_balance.go | 20 ++++----- .../command_volume_configure_replication.go | 9 ++-- weed/shell/command_volume_fix_replication.go | 13 +++--- weed/shell/command_volume_fsck.go | 13 +++--- weed/shell/command_volume_list.go | 10 ++--- weed/shell/command_volume_server_evacuate.go | 22 ++++------ weed/shell/command_volume_tier_download.go | 2 +- weed/shell/command_volume_tier_move.go | 44 ++++++++----------- 11 files changed, 63 insertions(+), 98 deletions(-) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 87a138ab6..fd35bb14b 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -201,17 +201,14 @@ type EcRack struct { func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { - return nil, 0, err + return } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) sortEcNodesByFreeslotsDecending(ecNodes) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 3e1499d41..dafdb041a 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -51,7 +51,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(commandEnv) + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } @@ -208,7 +208,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur } -func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { +func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { @@ -219,7 +219,7 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn return } - return resp.TopologyInfo, nil + return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e937f4490..edacf22c6 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -265,11 +265,8 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv) if err != nil { return } @@ -280,11 +277,11 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) vidMap := make(map[uint32]bool) - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { - if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { vidMap[v.Id] = true } } diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index a1e3c1ab6..e0c41f310 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/super_block" @@ -75,18 +74,15 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv) if err != nil { return err } - volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc) - volumeReplicas, _ := collectVolumeReplicaLocations(resp) - diskTypes := collectVolumeDiskTypes(resp.TopologyInfo) + volumeServers := collectVolumeServersByDc(topologyInfo, *dc) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + diskTypes := collectVolumeDiskTypes(topologyInfo) if *collection == "EACH_COLLECTION" { collections, err := ListCollectionNames(commandEnv, true, false) @@ -94,16 +90,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } for _, c := range collections { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { return err } } } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { return err } } else { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { return err } } diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index ecbe402e8..e3f034873 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -56,11 +56,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } replicaPlacementInt32 := uint32(replicaPlacement.Byte()) - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } @@ -69,7 +66,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman // find all data nodes with volumes that needs replication change var allLocations []location - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 7b2eb6769..bf20d3574 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -64,18 +64,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, takeAction := !*skipChange - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } // find all volumes that needs replication // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(resp) + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) if len(allLocations) == 0 { return fmt.Errorf("no data nodes at all") @@ -107,10 +104,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } -func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) { +func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index f9dcf3b5f..1fbc9ad35 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -73,7 +73,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. defer os.RemoveAll(tempFolder) // collect all volume id locations - volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer) + volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer) if err != nil { return fmt.Errorf("failed to collect all volume locations: %v", err) } @@ -268,23 +268,20 @@ type VInfo struct { isEcVolume bool } -func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) { +func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) { if verbose { fmt.Fprintf(writer, "collecting volume id and locations from master ...\n") } volumeIdToServer = make(map[uint32]VInfo) - var resp *master_pb.VolumeListResponse - err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return } - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { for _, diskInfo := range t.DiskInfos { for _, vi := range diskInfo.VolumeInfos { volumeIdToServer[vi.Id] = VInfo{ diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index dc8783cd1..9856de10b 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -2,7 +2,6 @@ package shell import ( "bytes" - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -32,16 +31,13 @@ func (c *commandVolumeList) Help() string { func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv) if err != nil { return err } - writeTopologyInfo(writer, resp.TopologyInfo, resp.VolumeSizeLimitMb) + writeTopologyInfo(writer, topologyInfo, volumeSizeLimitMb) return nil } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 5216de66b..80c5b1d6b 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -71,36 +70,33 @@ func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMo // 3. move to any other volume server as long as it satisfy the replication requirements // list all the volumes - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } - if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + if err := evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } - if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + if err := evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } return nil } -func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server - volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "") + volumeServers := collectVolumeServersByDc(topologyInfo, "") thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer) if thisNode == nil { return fmt.Errorf("%s is not found in this cluster", volumeServer) } // move away normal volumes - volumeReplicas, _ := collectVolumeReplicaLocations(resp) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) for _, diskInfo := range thisNode.info.DiskInfos { for _, vol := range diskInfo.VolumeInfos { hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) @@ -120,9 +116,9 @@ func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRes return nil } -func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer) if thisNode == nil { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 8aeb34d5c..33166ce65 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -56,7 +56,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(commandEnv) + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 27c6815d4..42c710db3 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -26,8 +25,10 @@ func (c *commandVolumeTierMove) Name() string { func (c *commandVolumeTierMove) Help() string { return ` change a volume from one disk type to another - volume.tier.move -source=hdd -target=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] - volume.tier.move -target=hdd [-collection=""] -volumeId= + volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] + + Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. + So "volume.fix.replication" and "volume.balance" should be followed. ` } @@ -39,7 +40,6 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - volumeId := tierCommand.Int("volumeId", 0, "the volume id") collection := tierCommand.String("collection", "", "the collection name") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") @@ -49,20 +49,21 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer return nil } - if *source == *target { - return fmt.Errorf("source tier %s is the same as target tier %s", *source, *target) + fromDiskType := types.ToDiskType(*source) + toDiskType := types.ToDiskType(*target) + + if fromDiskType == toDiskType { + return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType) } - vid := needle.VolumeId(*volumeId) - - // volumeId is provided - if vid != 0 { - // return doVolumeTierMove(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv) + if err != nil { + return err } // apply to all volumes in the collection - // reusing collectVolumeIdsForEcEncode for now - volumeIds, err := collectVolumeIdsForTierChange(commandEnv, *source, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } @@ -71,16 +72,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer return nil } -func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { - - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) - if err != nil { - return - } +func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() @@ -88,11 +80,11 @@ func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, se fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) vidMap := make(map[uint32]bool) - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { - if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == types.ToDiskType(sourceTier) { - if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier { + if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { vidMap[v.Id] = true } } From 6a4546d2c088b5d71f8b2200b758a82f1aad08c6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 01:30:07 -0800 Subject: [PATCH 04/32] shell: add volume.tier.move --- weed/shell/command_volume_fix_replication.go | 8 +-- weed/shell/command_volume_tier_move.go | 59 +++++++++++++++++++- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index bf20d3574..538351fd0 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -162,10 +162,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false hasSkippedCollection := false - keepDataNodesSorted(allLocations, replica.info.DiskType) + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) for _, dst := range allLocations { // check whether data nodes satisfy the constraints - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { @@ -216,8 +216,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return nil } -func keepDataNodesSorted(dataNodes []location, diskType string) { - fn := capacityByFreeVolumeCount(types.ToDiskType(diskType)) +func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { + fn := capacityByFreeVolumeCount(diskType) sort.Slice(dataNodes, func(i, j int) bool { return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) }) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 42c710db3..18385667d 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -23,7 +23,7 @@ func (c *commandVolumeTierMove) Name() string { } func (c *commandVolumeTierMove) Help() string { - return ` change a volume from one disk type to another + return `change a volume from one disk type to another volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] @@ -45,6 +45,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") source := tierCommand.String("fromDiskType", "", "the source disk type") target := tierCommand.String("toDiskType", "", "the target disk type") + applyChange := tierCommand.Bool("force", false, "actually apply the changes") if err = tierCommand.Parse(args); err != nil { return nil } @@ -62,13 +63,67 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer return err } - // apply to all volumes in the collection + // collect all volumes that should change volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("tier move volumes: %v\n", volumeIds) + _, allLocations := collectVolumeReplicaLocations(topologyInfo) + for _, vid := range volumeIds { + if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil { + fmt.Printf("tier move volume %d: %v\n", vid, err) + } + } + + return nil +} + +func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + // find one server with the most empty volume slots with target disk type + hasFoundTarget := false + keepDataNodesSorted(allLocations, toDiskType) + fn := capacityByFreeVolumeCount(toDiskType) + for _, dst := range allLocations { + if fn(dst.dataNode) > 0 { + // ask the volume server to replicate the volume + sourceVolumeServer := "" + for _, loc := range locations { + if loc.Url != dst.dataNode.Id { + sourceVolumeServer = loc.Url + } + } + if sourceVolumeServer == "" { + continue + } + fmt.Fprintf(writer, "moving volume %d %s from %s to dataNode %s with disk type ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.String()) + hasFoundTarget = true + + if !applyChanges { + break + } + + // mark all replicas as read only + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations) + if err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + } + return LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()) + + } + } + + if !hasFoundTarget { + fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.String(), vid) + } + return nil } From 5da63e045ec8aac1f86b31c398a4319ddb5e227a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 01:44:18 -0800 Subject: [PATCH 05/32] avoid moving to another server with the same volume id --- weed/shell/command_volume_tier_move.go | 28 +++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 18385667d..fb52c5d96 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "time" @@ -80,6 +81,15 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer return nil } +func isOneOf(server string, locations []wdclient.Location) bool { + for _, loc := range locations { + if server == loc.Url { + return true + } + } + return false +} + func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) @@ -94,6 +104,9 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin for _, dst := range allLocations { if fn(dst.dataNode) > 0 { // ask the volume server to replicate the volume + if isOneOf(dst.dataNode.Id, locations) { + continue + } sourceVolumeServer := "" for _, loc := range locations { if loc.Url != dst.dataNode.Id { @@ -111,12 +124,21 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin } // mark all replicas as read only - err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations) - if err != nil { + if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } - return LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()) + if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()); err != nil { + return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) + } + // remove the remaining replicas + for _, loc := range locations { + if loc.Url != sourceVolumeServer { + if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil { + fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url) + } + } + } } } From 30b30b8fe0bdfef08c27268970b4df89f7f0abae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 01:59:03 -0800 Subject: [PATCH 06/32] volume.tier.move: passing non-empty disk type --- weed/server/volume_grpc_copy.go | 2 +- weed/shell/command_volume_tier_move.go | 6 +++--- weed/storage/types/volume_disk_type.go | 7 +++++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 3b6524df8..cf9f9f777 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -61,7 +61,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } location := vs.store.FindFreeLocation(types.ToDiskType(diskType)) if location == nil { - return fmt.Errorf("no space left") + return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString()) } dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index fb52c5d96..a15efd6b3 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -116,7 +116,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin if sourceVolumeServer == "" { continue } - fmt.Fprintf(writer, "moving volume %d %s from %s to dataNode %s with disk type ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.String()) + fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.ReadableString()) hasFoundTarget = true if !applyChanges { @@ -127,7 +127,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } - if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()); err != nil { + if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil { return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) } @@ -143,7 +143,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin } if !hasFoundTarget { - fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.String(), vid) + fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid) } return nil diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go index 25056ee10..c9b87d802 100644 --- a/weed/storage/types/volume_disk_type.go +++ b/weed/storage/types/volume_disk_type.go @@ -31,3 +31,10 @@ func (diskType DiskType) String() string { } return string(diskType) } + +func (diskType DiskType) ReadableString() string { + if diskType == "" { + return "hdd" + } + return string(diskType) +} From 62191b08eafe896f0f403bbf759f1cb07a59856f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 02:03:12 -0800 Subject: [PATCH 07/32] disk type support custom tags --- weed/command/benchmark.go | 2 +- weed/command/filer.go | 2 +- weed/command/filer_copy.go | 2 +- weed/command/filer_sync.go | 4 ++-- weed/command/mount.go | 2 +- weed/command/server.go | 4 ++-- weed/command/upload.go | 2 +- weed/command/volume.go | 2 +- weed/command/webdav.go | 2 +- weed/shell/command_fs_configure.go | 2 +- weed/shell/command_volume_move.go | 6 +++--- 11 files changed, 15 insertions(+), 15 deletions(-) diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index e1b6d8d6c..c1bc80c42 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -63,7 +63,7 @@ func init() { b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") - b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") diff --git a/weed/command/filer.go b/weed/command/filer.go index 6660bd694..534bd9e04 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -84,7 +84,7 @@ func init() { filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port") filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files") filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files") - filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive") + filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file") filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index bf64e72b3..42f5ec4c3 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -55,7 +55,7 @@ func init() { copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 725f7d485..f8beb6fda 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -58,8 +58,8 @@ func init() { syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") - syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] hard drive or solid state drive on filer A") - syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] hard drive or solid state drive on filer B") + syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag on filer A") + syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag on filer B") syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers") syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") diff --git a/weed/command/mount.go b/weed/command/mount.go index aa6d91740..ea439af7c 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -42,7 +42,7 @@ func init() { mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") - mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0") diff --git a/weed/command/server.go b/weed/command/server.go index d7c41b014..611578953 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -102,7 +102,7 @@ func init() { serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") - serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd] hard drive or solid state drive") + serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") @@ -122,7 +122,7 @@ func init() { webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port") webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files") webdavOptions.replication = cmdServer.Flag.String("webdav.replication", "", "replication to create the files") - webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive") + webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file") webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") diff --git a/weed/command/upload.go b/weed/command/upload.go index 149d71241..67fde2185 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -41,7 +41,7 @@ func init() { upload.replication = cmdUpload.Flag.String("replication", "", "replication type") upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") - upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server") diff --git a/weed/command/volume.go b/weed/command/volume.go index ff951afdc..659c93d96 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -78,7 +78,7 @@ func init() { v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") - v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 3e4532d6e..2bd4a3c61 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -39,7 +39,7 @@ func init() { webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port") webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.replication = cmdWebDav.Flag.String("replication", "", "replication to create the files") - webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index d0db3722a..02cd7ac69 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -52,7 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") - diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes") isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index f9462beee..84f33db34 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -29,7 +29,7 @@ func (c *commandVolumeMove) Help() string { return `move a live volume from one volume server to another volume server volume.move -source -target -volumeId - volume.move -source -target -volumeId -disk [hdd|ssd] + volume.move -source -target -volumeId -disk [hdd|ssd|] This command move a live volume from one volume server to another volume server. Here are the steps: @@ -41,7 +41,7 @@ func (c *commandVolumeMove) Help() string { Now the master will mark this volume id as writable. 5. This command asks the source volume server to delete the source volume - The option "-disk [hdd|ssd]" can be used to change the volume disk type. + The option "-disk [hdd|ssd|]" can be used to change the volume disk type. ` } @@ -56,7 +56,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id") sourceNodeStr := volMoveCommand.String("source", "", "the source volume server :") targetNodeStr := volMoveCommand.String("target", "", "the target volume server :") - diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive") + diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") if err = volMoveCommand.Parse(args); err != nil { return nil } From 44bdfb2d157684e0862408a3a5146179d62839db Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 12:22:49 -0800 Subject: [PATCH 08/32] filer: avoid encryption and compression at the same time fix https://github.com/chrislusf/seaweedfs/issues/1828 --- weed/operation/upload_content.go | 33 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 934075ff3..70428bb07 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -130,7 +130,8 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // gzip if possible // this could be double copying clearDataLen = len(data) - if shouldGzipNow { + clearData := data + if shouldGzipNow && !cipher { compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { @@ -139,7 +140,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } } else if isInputCompressed { // just to get the clear data length - clearData, err := util.DecompressData(data) + clearData, err = util.DecompressData(data) if err == nil { clearDataLen = len(clearData) } @@ -150,7 +151,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // encrypt cipherKey := util.GenCipherKey() - encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey) if encryptionErr != nil { err = fmt.Errorf("encrypt input: %v", encryptionErr) return @@ -161,26 +162,26 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i _, err = w.Write(encryptedData) return }, "", false, len(encryptedData), "", nil, jwt) - if uploadResult != nil { - uploadResult.Name = filename - uploadResult.Mime = mtype - uploadResult.CipherKey = cipherKey + if uploadResult == nil { + return } + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + uploadResult.Size = uint32(clearDataLen) } else { // upload data uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { _, err = w.Write(data) return }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) - } - - if uploadResult == nil { - return - } - - uploadResult.Size = uint32(clearDataLen) - if contentIsGzipped { - uploadResult.Gzip = 1 + if uploadResult == nil { + return + } + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } } return uploadResult, err From 2270737344f1d5eec90f67d7d07fb139bbec67bd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 12:52:37 -0800 Subject: [PATCH 09/32] volume: avoid fixed vacuum timeout for large volumes 1GB for 3 minutes, about 5.7MB/s --- weed/topology/topology_vacuum.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 89eefefc7..9feb55b73 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, +func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) { ch := make(chan int, locationlist.Length()) errCount := int32(0) @@ -43,7 +43,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi } vacuumLocationList := NewVolumeLocationList() - waitTimeout := time.NewTimer(30 * time.Minute) + waitTimeout := time.NewTimer(time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1)) defer waitTimeout.Stop() for range locationlist.list { @@ -58,7 +58,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi } return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } -func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, +func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.accessLock.Lock() vl.removeFromWritable(vid) @@ -86,7 +86,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, } isVacuumSuccess := true - waitTimeout := time.NewTimer(30 * time.Minute) + waitTimeout := time.NewTimer(3 * time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1)) defer waitTimeout.Stop() for range locationlist.list { @@ -99,7 +99,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, } return isVacuumSuccess } -func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { +func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true isReadOnly := false for _, dn := range locationlist.list { @@ -127,7 +127,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v } return isCommitSuccess } -func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { +func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -161,13 +161,13 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) - vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } } } -func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() tmpMap := make(map[needle.VolumeId]*VolumeLocationList) @@ -187,12 +187,11 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL } glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if vacuumLocationList, needVacuum := batchVacuumVolumeCheck( - grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum { - if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { - batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) + if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(grpcDialOption, vid, locationList, garbageThreshold); needVacuum { + if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { + t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) } else { - batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) + t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) } } } From 72b0d9d8c4bc79d1571c6c770443dcade846d5cc Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 16:50:56 -0800 Subject: [PATCH 10/32] avoid unnecessary user home checking --- weed/util/file_util.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/weed/util/file_util.go b/weed/util/file_util.go index 70135180d..f83f80265 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -69,6 +69,10 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti func ResolvePath(path string) string { + if !strings.Contains(path, "~") { + return path + } + usr, _ := user.Current() dir := usr.HomeDir From 90cdf9dcace5595b31104df3a3b7e4038a7db341 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 16:57:27 -0800 Subject: [PATCH 11/32] avoid conflict with "weed scaffold -config=xxx" --- weed/util/fla9/fla9.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 335b9ff7a..4a5884e9b 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -1078,7 +1078,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand // DefaultConfigFlagName defines the flag name of the optional config file // path. Used to lookup and parse the config file when a default is set and // available on disk. -var DefaultConfigFlagName = "config" +var DefaultConfigFlagName = "conf" // ParseFile parses flags from the file in path. // Same format as commandline arguments, newlines and lines beginning with a From 37f104f88f89225ff8a1bf785cdca833db45de7a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Feb 2021 22:54:34 -0800 Subject: [PATCH 12/32] 2.28 --- k8s/seaweedfs/Chart.yaml | 4 ++-- k8s/seaweedfs/values.yaml | 2 +- weed/util/constants.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index ac56e337c..424307ff6 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.27" -version: 2.27 +appVersion: "2.28" +version: 2.28 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index f7cc521c8..a055de821 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.27" - started using {.Chart.appVersion} + # imageTag: "2.28" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/util/constants.go b/weed/util/constants.go index 908f782ae..55f5b52d6 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 27) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 28) COMMIT = "" ) From 9edd964627a1d0eb6549ab4eaab8e0191551c70a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 23 Feb 2021 03:49:14 -0800 Subject: [PATCH 13/32] volume.tier.move: avoid repeated move for replicated volumes fix https://github.com/chrislusf/seaweedfs/issues/1792#issuecomment-784139348 --- weed/shell/command_volume_tier_move.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index a15efd6b3..f7fa94031 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin keepDataNodesSorted(allLocations, toDiskType) fn := capacityByFreeVolumeCount(toDiskType) for _, dst := range allLocations { - if fn(dst.dataNode) > 0 { + if fn(dst.dataNode) > 0 && !hasFoundTarget { // ask the volume server to replicate the volume if isOneOf(dst.dataNode.Id, locations) { continue From 7ba75e3d5a4b059f4b41797f96a0feea97eef66a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 23 Feb 2021 12:25:27 -0800 Subject: [PATCH 14/32] filer: do not return no content for empty files fix https://github.com/chrislusf/seaweedfs/issues/1831 fix https://github.com/chrislusf/seaweedfs/issues/1830 --- weed/server/filer_server_handlers_read.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index f77462adb..160ee9d97 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - if len(entry.Chunks) == 0 && len(entry.Content) == 0 { - glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) - stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc() - w.WriteHeader(http.StatusNoContent) - return - } - w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) // mime type mimeType := entry.Attr.Mime From 9d6b6a4aea50afe5f80b7c06f5647e39ad19ccff Mon Sep 17 00:00:00 2001 From: LazyDBA247-Anyvision Date: Wed, 24 Feb 2021 09:54:42 +0200 Subject: [PATCH 15/32] add filter Pattern only if configured --- k8s/seaweedfs/templates/cronjob.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/seaweedfs/templates/cronjob.yaml b/k8s/seaweedfs/templates/cronjob.yaml index c7dcd52b1..4caf4bad1 100644 --- a/k8s/seaweedfs/templates/cronjob.yaml +++ b/k8s/seaweedfs/templates/cronjob.yaml @@ -40,7 +40,7 @@ spec: {{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\ {{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\ {{- if .Values.cronjob.enableFixReplication }} - volume.fix.replication {{ if .Values.cronjob.collectionPattern }} -collectionPattern={{ .Values.cronjob.collectionPattern }} {{ end }} \n\ + volume.fix.replication -collectionPattern={{ .Values.cronjob.collectionPattern }} \n\ {{- end }} unlock\n" | \ /usr/bin/weed shell \ From b1b21e05641727a894272ab311e5ebf07bf1928e Mon Sep 17 00:00:00 2001 From: LazyDBA247-Anyvision Date: Wed, 24 Feb 2021 09:55:17 +0200 Subject: [PATCH 16/32] make internal filter svc - internal k8s lb --- k8s/seaweedfs/templates/filer-service-client.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/k8s/seaweedfs/templates/filer-service-client.yaml b/k8s/seaweedfs/templates/filer-service-client.yaml index f509086e3..929b6f8bc 100644 --- a/k8s/seaweedfs/templates/filer-service-client.yaml +++ b/k8s/seaweedfs/templates/filer-service-client.yaml @@ -10,6 +10,7 @@ metadata: monitoring: "true" {{- end }} spec: + clusterIP: None ports: - name: "swfs-filer" port: {{ .Values.filer.port }} From 02be7ea2fcbc4e3c4d36bf844e7573b65ecc27a1 Mon Sep 17 00:00:00 2001 From: LazyDBA247-Anyvision Date: Wed, 24 Feb 2021 10:25:11 +0200 Subject: [PATCH 17/32] Volume: PVC support + Host Path idx support on ssd --- k8s/README.md | 8 ++ k8s/seaweedfs/templates/_helpers.tpl | 23 ++++++ .../templates/volume-statefulset.yaml | 80 +++++++++++++++++-- k8s/seaweedfs/values.yaml | 18 +++++ 4 files changed, 121 insertions(+), 8 deletions(-) diff --git a/k8s/README.md b/k8s/README.md index 36230f7b2..c5615522c 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -29,6 +29,14 @@ please set/update the corresponding affinity rule in values.yaml to an empty one ```affinity: ""``` +### PVC - storage class ### + +on the volume stateful set added support for K8S PVC, currently example +with the simple local-path-provisioner from Rancher (comes included with k3d / k3s) +https://github.com/rancher/local-path-provisioner + +you can use ANY storage class you like, just update the correct storage-class +for your deployment. ### current instances config (AIO): 1 instance for each type (master/filer+s3/volume) diff --git a/k8s/seaweedfs/templates/_helpers.tpl b/k8s/seaweedfs/templates/_helpers.tpl index f6c4fa570..a9ee89f03 100644 --- a/k8s/seaweedfs/templates/_helpers.tpl +++ b/k8s/seaweedfs/templates/_helpers.tpl @@ -126,3 +126,26 @@ Inject extra environment vars in the format key:value, if populated {{- printf "%s%s%s:%s" $registryName $repositoryName $name $tag -}} {{- end -}} {{- end -}} + + +{{/* check if any PVC exists */}} +{{- define "volume.pvc_exists" -}} +{{- if or (or (eq .Values.volume.data.type "persistentVolumeClaim") (and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "persistentVolumeClaim") -}} +{{- printf "true" -}} +{{- else -}} +{{- printf "false" -}} +{{- end -}} +{{- end -}} + +{{/* check if any HostPath exists */}} +{{- define "volume.hostpath_exists" -}} +{{- if or (or (eq .Values.volume.data.type "hostPath") (and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "hostPath") -}} +{{- printf "true" -}} +{{- else -}} +{{- if or .Values.global.enableSecurity .Values.volume.extraVolumes -}} +{{- printf "true" -}} +{{- else -}} +{{- printf "false" -}} +{{- end -}} +{{- end -}} +{{- end -}} diff --git a/k8s/seaweedfs/templates/volume-statefulset.yaml b/k8s/seaweedfs/templates/volume-statefulset.yaml index f9e55e0d3..652fd9ea3 100644 --- a/k8s/seaweedfs/templates/volume-statefulset.yaml +++ b/k8s/seaweedfs/templates/volume-statefulset.yaml @@ -45,6 +45,19 @@ spec: priorityClassName: {{ .Values.volume.priorityClassName | quote }} {{- end }} enableServiceLinks: false + {{- if .Values.volume.dir_idx }} + initContainers: + - name: seaweedfs-vol-move-idx + image: {{ template "volume.image" . }} + imagePullPolicy: {{ .Values.global.pullPolicy | default "IfNotPresent" }} + command: [ '/bin/sh', '-c' ] + args: ['if ls {{ .Values.volume.dir }}/*.idx >/dev/null 2>&1; then mv {{ .Values.volume.dir }}/*.idx {{ .Values.volume.dir_idx }}/; fi;'] + volumeMounts: + - name: idx + mountPath: {{ .Values.volume.dir_idx }} + - name: data + mountPath: {{ .Values.volume.dir }} + {{- end }} containers: - name: seaweedfs image: {{ template "volume.image" . }} @@ -118,9 +131,13 @@ spec: -compactionMBps={{ .Values.volume.compactionMBps }} \ -mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }} volumeMounts: - - name: seaweedfs-volume-storage - mountPath: "/data/" - - name: seaweedfs-volume-log-volume + - name: data + mountPath: "{{ .Values.volume.dir }}/" + {{- if .Values.volume.dir_idx }} + - name: idx + mountPath: "{{ .Values.volume.dir_idx }}/" + {{- end }} + - name: logs mountPath: "/logs/" {{- if .Values.global.enableSecurity }} - name: security-config @@ -173,15 +190,27 @@ spec: resources: {{ tpl .Values.volume.resources . | nindent 12 | trim }} {{- end }} + {{- $hostpath_exists := include "volume.hostpath_exists" . -}} + {{- if $hostpath_exists }} volumes: - - name: seaweedfs-volume-log-volume - hostPath: - path: /storage/logs/seaweedfs/volume - type: DirectoryOrCreate - - name: seaweedfs-volume-storage + {{- if eq .Values.volume.data.type "hostPath" }} + - name: data hostPath: path: /storage/object_store/ type: DirectoryOrCreate + {{- end }} + {{- if and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx }} + - name: idx + hostPath: + path: /ssd/seaweedfs-volume-idx/ + type: DirectoryOrCreate + {{- end }} + {{- if eq .Values.volume.logs.type "hostPath" }} + - name: logs + hostPath: + path: /storage/logs/seaweedfs/volume + type: DirectoryOrCreate + {{- end }} {{- if .Values.global.enableSecurity }} - name: security-config configMap: @@ -205,8 +234,43 @@ spec: {{- if .Values.volume.extraVolumes }} {{ tpl .Values.volume.extraVolumes . | indent 8 | trim }} {{- end }} + {{- end }} {{- if .Values.volume.nodeSelector }} nodeSelector: {{ tpl .Values.volume.nodeSelector . | indent 8 | trim }} {{- end }} + {{- $pvc_exists := include "volume.pvc_exists" . -}} + {{- if $pvc_exists }} + volumeClaimTemplates: + {{- if eq .Values.volume.data.type "persistentVolumeClaim"}} + - metadata: + name: data + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Values.volume.data.storageClass }} + resources: + requests: + storage: {{ .Values.volume.data.size }} + {{- end }} + {{- if and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx }} + - metadata: + name: idx + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Values.volume.idx.storageClass }} + resources: + requests: + storage: {{ .Values.volume.idx.size }} + {{- end }} + {{- if eq .Values.volume.logs.type "persistentVolumeClaim" }} + - metadata: + name: logs + spec: + accessModes: [ "ReadWriteOnce" ] + storageClassName: {{ .Values.volume.logs.storageClass }} + resources: + requests: + storage: {{ .Values.volume.logs.size }} + {{- end }} + {{- end }} {{- end }} diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index a055de821..8768b32f6 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -138,6 +138,24 @@ volume: # minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly minFreeSpacePercent: 7 +# can use ANY storage-class , example with local-path-provisner +# data: +# type: "persistentVolumeClaim" +# size: "24Ti" +# storageClass: "local-path-provisioner" + data: + type: "hostPath" + size: "" + storageClass: "" + idx: + type: "hostPath" + size: "" + storageClass: "" + + logs: + type: "hostPath" + size: "" + storageClass: "" # limit background compaction or copying speed in mega bytes per second compactionMBps: "50" From 1d848641d76879573c02dafe9bd6941b794c29af Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 24 Feb 2021 07:40:32 -0800 Subject: [PATCH 18/32] Java client: ByteBuffer.clear supports higher JDK versions fix https://github.com/chrislusf/seaweedfs/issues/1836 --- .../src/main/java/seaweedfs/client/SeaweedOutputStream.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index b73e99e69..ba298a713 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.concurrent.*; @@ -217,7 +218,7 @@ public class SeaweedOutputStream extends OutputStream { private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { - bufferToWrite.flip(); + ((Buffer)bufferToWrite).flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { From c276117fef7d871e09ca82f9d4e7c088ce670204 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 25 Feb 2021 08:24:30 -0800 Subject: [PATCH 19/32] Java: 1.6.2 --- other/java/client/pom.xml | 2 +- other/java/client/pom.xml.deploy | 2 +- other/java/client/pom_debug.xml | 2 +- other/java/examples/pom.xml | 4 ++-- other/java/hdfs2/dependency-reduced-pom.xml | 2 +- other/java/hdfs2/pom.xml | 2 +- other/java/hdfs3/dependency-reduced-pom.xml | 2 +- other/java/hdfs3/pom.xml | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 056904ebe..a7da16a93 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.1 + 1.6.2 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 69b900017..b45c193ec 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.1 + 1.6.2 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 1447401b7..217f708e9 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.1 + 1.6.2 org.sonatype.oss diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index 2456113d0..d0f22a44d 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ com.github.chrislusf seaweedfs-client - 1.6.1 + 1.6.2 compile com.github.chrislusf seaweedfs-hadoop2-client - 1.6.1 + 1.6.2 compile diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 0680d86bb..64fb4da75 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ - 1.6.1 + 1.6.2 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 897477066..3bc9709a7 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.6.1 + 1.6.2 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 2b4a1a494..ea7c08c17 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ - 1.6.1 + 1.6.2 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 49ff8f926..3e0f373f7 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.6.1 + 1.6.2 3.1.1 From ee21c0042eb519c8775af620d552de554325f4da Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev Date: Fri, 26 Feb 2021 14:18:01 +0500 Subject: [PATCH 20/32] log error write entry --- weed/server/filer_server_handlers_read.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 160ee9d97..e210f4bdf 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -156,6 +156,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) + glog.Errorf("failed to write entry content: %v", err) return err } return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) From 5f7b024891f0e113961e389d4f9449289db4c60a Mon Sep 17 00:00:00 2001 From: Patrick Schmidt Date: Fri, 26 Feb 2021 13:58:40 +0100 Subject: [PATCH 21/32] Show the real disk usage in stats calls Currently the file size of only one volume location is taken into account in the stats. This commit multiplies the disk usages by the amount of nodes holding a replica of the volume. This will yield the expected amount of disk usage and matches the total size calculations from before. --- weed/server/master_grpc_server_volume.go | 7 ++----- weed/topology/volume_layout.go | 2 +- weed/topology/volume_location_list.go | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 29aff5c0b..156afd4a1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if !ms.Topo.HasWritableVolume(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for "+option.String()) + return nil, fmt.Errorf("no free volumes left for " + option.String()) } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { @@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType)) stats := volumeLayout.Stats() - - totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 - resp := &master_pb.StatisticsResponse{ - TotalSize: uint64(totalSize), + TotalSize: stats.TotalSize, UsedSize: stats.UsedSize, FileCount: stats.FileCount, } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 5784c894b..c7e171248 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -432,7 +432,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { - ret.TotalSize += vl.volumeSizeLimit + ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length()) } } diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 64c13ca52..548c4cd25 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -82,7 +82,7 @@ func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64 if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) if err == nil { - return vinfo.Size - vinfo.DeletedByteCount, vinfo.FileCount - vinfo.DeleteCount + return (vinfo.Size - vinfo.DeletedByteCount) * uint64(len(dnll.list)), vinfo.FileCount - vinfo.DeleteCount } } } From d3c31c69a73759ed2719848cb16f6387e18744c0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 27 Feb 2021 12:08:09 -0800 Subject: [PATCH 22/32] avoid confusion: conf and config are too similar --- weed/util/fla9/fla9.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 4a5884e9b..7f0f50a14 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -1078,7 +1078,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand // DefaultConfigFlagName defines the flag name of the optional config file // path. Used to lookup and parse the config file when a default is set and // available on disk. -var DefaultConfigFlagName = "conf" +var DefaultConfigFlagName = "flags" // ParseFile parses flags from the file in path. // Same format as commandline arguments, newlines and lines beginning with a From 70434df1056a5aab1ab1b4a1e7f94907edcc1419 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 27 Feb 2021 12:12:53 -0800 Subject: [PATCH 23/32] use "options" --- weed/util/fla9/fla9.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 7f0f50a14..7b016c686 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -1078,7 +1078,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand // DefaultConfigFlagName defines the flag name of the optional config file // path. Used to lookup and parse the config file when a default is set and // available on disk. -var DefaultConfigFlagName = "flags" +var DefaultConfigFlagName = "options" // ParseFile parses flags from the file in path. // Same format as commandline arguments, newlines and lines beginning with a From 3b76a51f5f88c8a35e484fc1183bb580d7f16306 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 27 Feb 2021 12:15:49 -0800 Subject: [PATCH 24/32] add help message --- weed/util/fla9/fla9.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 7b016c686..91d3b5ce9 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) { // The return value will be ErrHelp if -help or -h were set but not defined. func (f *FlagSet) Parse(arguments []string) error { if _, ok := f.formal[DefaultConfigFlagName]; !ok { - f.String(DefaultConfigFlagName, "", "config file") + f.String(DefaultConfigFlagName, "", "file with command line options with each line in optionName=optionValue format") } f.parsed = true From 026b1f96bfa89007ea3f5d390af3ddc701f12b86 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:13:44 -0800 Subject: [PATCH 25/32] setup 2 clusters, with one as s3 destination --- docker/compose/local-clusters-compose.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/compose/local-clusters-compose.yml b/docker/compose/local-clusters-compose.yml index e813ca35f..7bd16aa3f 100644 --- a/docker/compose/local-clusters-compose.yml +++ b/docker/compose/local-clusters-compose.yml @@ -20,4 +20,5 @@ services: - 18085:18080 - 8889:8888 - 18889:18888 - command: "server -ip=server2 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" + - 8334:8333 + command: "server -ip=server2 -filer -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1" From 4ff2c5c4c9ec13f83433cb723f125566afa93d3e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:14:21 -0800 Subject: [PATCH 26/32] rename file --- .../repl_util/{replication_utli.go => replication_util.go} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename weed/replication/repl_util/{replication_utli.go => replication_util.go} (85%) diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_util.go similarity index 85% rename from weed/replication/repl_util/replication_utli.go rename to weed/replication/repl_util/replication_util.go index 3514c6977..f642bb801 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_util.go @@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { From 678c54d705e5c29b2fdb580158a4455703b46a0d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:19:03 -0800 Subject: [PATCH 27/32] data sink: add incremental mode --- weed/command/scaffold.go | 8 ++++++ weed/replication/replicator.go | 2 +- weed/replication/sink/azuresink/azure_sink.go | 14 +++++++--- weed/replication/sink/b2sink/b2_sink.go | 6 ++++ weed/replication/sink/filersink/filer_sink.go | 6 ++++ weed/replication/sink/gcssink/gcs_sink.go | 6 ++++ weed/replication/sink/localsink/local_sink.go | 4 +++ weed/replication/sink/replication_sink.go | 1 + weed/replication/sink/s3sink/s3_sink.go | 28 ++++++++++++------- 9 files changed, 60 insertions(+), 15 deletions(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 993391a42..c2d53e4bd 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -356,6 +356,9 @@ directory = "/buckets" [sink.local] enabled = false directory = "/data" +# all replicated files are under modified time as yyyy-mm-dd directories +# so each date directory contains all new and updated files. +is_incremental = false [sink.local_incremental] # all replicated files are under modified time as yyyy-mm-dd directories @@ -373,6 +376,7 @@ directory = "/backup" replication = "" collection = "" ttlSec = 0 +is_incremental = false [sink.s3] # read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html @@ -384,6 +388,7 @@ region = "us-east-2" bucket = "your_bucket_name" # an existing bucket directory = "/" # destination directory endpoint = "" +is_incremental = false [sink.google_cloud_storage] # read credentials doc at https://cloud.google.com/docs/authentication/getting-started @@ -391,6 +396,7 @@ enabled = false google_application_credentials = "/path/to/x.json" # path to json credential file bucket = "your_bucket_seaweedfs" # an existing bucket directory = "/" # destination directory +is_incremental = false [sink.azure] # experimental, let me know if it works @@ -399,6 +405,7 @@ account_name = "" account_key = "" container = "mycontainer" # an existing container directory = "/" # destination directory +is_incremental = false [sink.backblaze] enabled = false @@ -406,6 +413,7 @@ b2_account_id = "" b2_master_application_key = "" bucket = "mybucket" # an existing bucket directory = "/" # destination directory +is_incremental = false ` diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 7688029e6..d7e609c68 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } var dateKey string - if r.sink.GetName() == "local_incremental" { + if r.sink.IsIncremental() { var mTime int64 if message.NewEntry != nil { mTime = message.NewEntry.Attributes.Mtime diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index df70be64b..865f1b25c 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -18,10 +18,11 @@ import ( ) type AzureSink struct { - containerURL azblob.ContainerURL - container string - dir string - filerSource *source.FilerSource + containerURL azblob.ContainerURL + container string + dir string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } +func (g *AzureSink) IsIncremental() bool { + return g.isIncremental +} + func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix+"is_incremental") return g.initialize( configuration.GetString(prefix+"account_name"), configuration.GetString(prefix+"account_key"), diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 24f0ecbbc..8738231d5 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -18,6 +18,7 @@ type B2Sink struct { bucket string dir string filerSource *source.FilerSource + isIncremental bool } func init() { @@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } +func (g *B2Sink) IsIncremental() bool { + return g.isIncremental +} + func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix+"is_incremental") return g.initialize( configuration.GetString(prefix+"b2_account_id"), configuration.GetString(prefix+"b2_master_application_key"), diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 509f75116..4165e87be 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -30,6 +30,7 @@ type FilerSink struct { grpcDialOption grpc.DialOption address string writeChunkByFiler bool + isIncremental bool } func init() { @@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } +func (fs *FilerSink) IsIncremental() bool { + return fs.isIncremental +} + func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { + fs.isIncremental = configuration.GetBool(prefix+"is_incremental") return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"), diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index badabc32c..02f482862 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -22,6 +22,7 @@ type GcsSink struct { bucket string dir string filerSource *source.FilerSource + isIncremental bool } func init() { @@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } +func (g *GcsSink) IsIncremental() bool { + return g.isIncremental +} + func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { + g.isIncremental = configuration.GetBool(prefix+"is_incremental") return g.initialize( configuration.GetString(prefix+"google_application_credentials"), configuration.GetString(prefix+"bucket"), diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index 21c625c3f..c76647fcc 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string { return localsink.Dir } +func (localsink *LocalSink) IsIncremental() bool { + return true +} + func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { if localsink.isMultiPartEntry(key) { return nil diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index cfc6e0a4d..4ffd09462 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -14,6 +14,7 @@ type ReplicationSink interface { UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) + IsIncremental() bool } var ( diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 58432ee6b..ea219ce74 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -21,12 +21,13 @@ import ( ) type S3Sink struct { - conn s3iface.S3API - region string - bucket string - dir string - endpoint string - filerSource *source.FilerSource + conn s3iface.S3API + region string + bucket string + dir string + endpoint string + filerSource *source.FilerSource + isIncremental bool } func init() { @@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } +func (s3sink *S3Sink) IsIncremental() bool { + return s3sink.isIncremental +} + func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) + glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental")) + s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental") return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), @@ -67,8 +74,8 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc s3sink.endpoint = endpoint config := &aws.Config{ - Region: aws.String(s3sink.region), - Endpoint: aws.String(s3sink.endpoint), + Region: aws.String(s3sink.region), + Endpoint: aws.String(s3sink.endpoint), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") @@ -104,7 +111,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures uploadId, err := s3sink.createMultipartUpload(key, entry) if err != nil { - return err + return fmt.Errorf("createMultipartUpload: %v", err) } totalSize := filer.FileSize(entry) @@ -120,6 +127,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures defer wg.Done() if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr + glog.Errorf("uploadPart: %v", uploadErr) } else { parts[index] = part } @@ -129,7 +137,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures if err != nil { s3sink.abortMultipartUpload(key, uploadId) - return err + return fmt.Errorf("uploadPart: %v", err) } return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) From 9abb041763f711a34b5049dcfce84033d0dc2171 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:19:47 -0800 Subject: [PATCH 28/32] filer source: support filerProxy mode --- weed/filer/stream.go | 2 +- weed/replication/sink/s3sink/s3_write.go | 7 ++++--- weed/replication/source/filer_source.go | 8 ++++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 075204b79..6a87a2b7d 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index b172ea2c3..bf1ad9b76 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId result, err := s3sink.conn.CompleteMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) + glog.V(1).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) } else { glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) + return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) } - return err + return nil } // To upload a part @@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er } buf := make([]byte, chunk.Size) for _, fileUrl := range fileUrls { - _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf) + _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf) if err != nil { glog.V(1).Infof("read from %s: %v", fileUrl, err) } else { diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 3982360b0..e2e3575dc 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - for _, loc := range locations.Locations { - fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part)) + if !fs.proxyByFiler { + for _, loc := range locations.Locations { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part)) + } + } else { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part)) } return From 984fdd61925560ebadb6f3bb8c80d6bbf94d291c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:20:13 -0800 Subject: [PATCH 29/32] always use non bucket prefixing url --- weed/replication/sink/s3sink/s3_sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index ea219ce74..9a36573e3 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -76,6 +76,7 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc config := &aws.Config{ Region: aws.String(s3sink.region), Endpoint: aws.String(s3sink.endpoint), + S3ForcePathStyle: aws.Bool(true), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") From f2fcb77808a72d3a62a0d922ee3fea8d878bf2ed Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:20:47 -0800 Subject: [PATCH 30/32] local file sink: create backup files with permission 0755 --- weed/replication/sink/localsink/local_sink.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index c76647fcc..2b9b3e69a 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -78,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa if _, err := os.Stat(dir); os.IsNotExist(err) { glog.V(4).Infof("Create Direcotry key: %s", dir) - if err = os.MkdirAll(dir, 0); err != nil { + if err = os.MkdirAll(dir, 0755); err != nil { return err } } writeFunc := func(data []byte) error { - writeErr := ioutil.WriteFile(key, data, 0) + writeErr := ioutil.WriteFile(key, data, 0755) return writeErr } From 014a31d11ae8a147bc52c3615d485ffb4310e736 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:21:09 -0800 Subject: [PATCH 31/32] minor --- weed/command/filer_meta_tail.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index f055b19a8..189cacefc 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -24,8 +24,8 @@ func init() { var cmdFilerMetaTail = &Command{ UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", - Short: "see recent changes on a filer", - Long: `See recent changes on a filer. + Short: "see continuous changes on a filer", + Long: `See continuous changes on a filer. weed filer.meta.tail -timeAgo=30h | grep truncate weed filer.meta.tail -timeAgo=30h | jq . From e52c94640e9898be5308a77867ecea5ef1567c5b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Feb 2021 16:22:27 -0800 Subject: [PATCH 32/32] filer.backup: added to replace filer.replicate --- weed/Makefile | 4 + weed/command/command.go | 1 + weed/command/filer_backup.go | 158 +++++++++++++++++++++++ weed/command/filer_replication.go | 29 +++-- weed/command/filer_sync.go | 203 +++++++++++++++++------------- 5 files changed, 296 insertions(+), 99 deletions(-) create mode 100644 weed/command/filer_backup.go diff --git a/weed/Makefile b/weed/Makefile index fd0843c22..8f1257d09 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -33,3 +33,7 @@ debug_webdav: debug_s3: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3 + +debug_filer_copy: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h diff --git a/weed/command/command.go b/weed/command/command.go index bbc2e0423..5506e6969 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -15,6 +15,7 @@ var Commands = []*Command{ cmdDownload, cmdExport, cmdFiler, + cmdFilerBackup, cmdFilerCat, cmdFilerMetaTail, cmdFilerReplicate, diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go new file mode 100644 index 000000000..8cb7441f6 --- /dev/null +++ b/weed/command/filer_backup.go @@ -0,0 +1,158 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "io" + "time" +) + +type FilerBackupOptions struct { + isActivePassive *bool + filer *string + path *string + debug *bool + proxyByFiler *bool + timeAgo *time.Duration +} + +var ( + filerBackupOptions FilerBackupOptions +) + +func init() { + cmdFilerBackup.Run = runFilerBackup // break init cycle + filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") + filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") + filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") + filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") + filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerBackup = &Command{ + UsageLine: "filer.backup -filer=: ", + Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml", + Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml + + filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content, + and write to the destination. This is to replace filer.replicate command since additional message queue is not needed. + + If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute. + A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value. + +`, +} + +func runFilerBackup(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + util.LoadConfiguration("security", false) + util.LoadConfiguration("replication", true) + + for { + err := doFilerBackup(grpcDialOption, &filerBackupOptions) + if err != nil { + glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) + time.Sleep(1747 * time.Millisecond) + } + } + + return true +} + +const ( + BackupKeyPrefix = "backup." +) + +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error { + + // find data sink + config := util.GetViper() + dataSink := findSink(config) + if dataSink == nil { + return fmt.Errorf("no data sink configured in replication.toml") + } + + sourceFiler := *backupOption.filer + sourcePath := *backupOption.path + timeAgo := *backupOption.timeAgo + targetPath := dataSink.GetSinkToDirectory() + debug := *backupOption.debug + + // get start time for the data sink + startFrom := time.Unix(0, 0) + sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory()) + if timeAgo.Milliseconds() == 0 { + lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId)) + if err != nil { + glog.V(0).Infof("starting from %v", startFrom) + } else { + startFrom = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resuming from %v", startFrom) + } + } else { + startFrom = time.Now().Add(-timeAgo) + glog.V(0).Infof("start time is set to %v", startFrom) + } + + // create filer sink + filerSource := &source.FilerSource{} + filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler) + dataSink.SetSourceFiler(filerSource) + + processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) + + return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "backup_" + dataSink.GetName(), + PathPrefix: sourcePath, + SinceNs: startFrom.UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + var counter int64 + var lastWriteTime time.Time + for { + resp, listenErr := stream.Recv() + + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("processEventFn: %v", err) + } + + counter++ + if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) + counter = 0 + lastWriteTime = time.Now() + if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { + return fmt.Errorf("setOffset: %v", err) + } + } + + } + + }) + +} + diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index e8c06b208..885c95540 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - var dataSink sink.ReplicationSink - for _, sk := range sink.Sinks { - if config.GetBool("sink." + sk.GetName() + ".enabled") { - if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { - glog.Fatalf("Failed to initialize sink for %s: %+v", - sk.GetName(), err) - } - glog.V(0).Infof("Configure sink to %s", sk.GetName()) - dataSink = sk - break - } - } + dataSink := findSink(config) if dataSink == nil { println("no data sink configured in replication.toml:") @@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool { } +func findSink(config *util.ViperProxy) sink.ReplicationSink { + var dataSink sink.ReplicationSink + for _, sk := range sink.Sinks { + if config.GetBool("sink." + sk.GetName() + ".enabled") { + if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { + glog.Fatalf("Failed to initialize sink for %s: %+v", + sk.GetName(), err) + } + glog.V(0).Infof("Configure sink to %s", sk.GetName()) + dataSink = sk + break + } + } + return dataSink +} + func validateOneEnabledInput(config *util.ViperProxy) { enabledInput := "" for _, input := range sub.NotificationInputs { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index f8beb6fda..0f34e5701 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication" + "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" @@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) if err != nil { return err } @@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) + persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - - var sourceOldKey, sourceNewKey util.FullPath - if message.OldEntry != nil { - sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) - } - if message.NewEntry != nil { - sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) - } - for _, sig := range message.Signatures { if sig == targetFilerSignature && targetFilerSignature != 0 { fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) return nil } } - if debug { - fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature) - } - - if !strings.HasPrefix(resp.Directory, sourcePath) { - return nil - } - - // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { - if !strings.HasPrefix(string(sourceOldKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - - // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { - if !strings.HasPrefix(string(sourceNewKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } - - // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { - return nil - } - - // handle updates - if strings.HasPrefix(string(sourceOldKey), sourcePath) { - // old key is in the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is also in the watched directory - oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) - foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) - if foundExisting { - return err - } - - // not able to find old entry - if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %v", oldKey, err) - } - - // create the new entry - newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures) - - } else { - // new key is outside of the watched directory - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - } else { - // old key is outside of the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is in the watched directory - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } else { - // new key is also outside of the watched directory - // skip - } - } - - return nil + return persistEventFn(resp) } return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) counter = 0 lastWriteTime = time.Now() - if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil { + if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { return err } } @@ -290,11 +215,11 @@ const ( SyncKeyPrefix = "sync." ) -func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) { +func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) if err != nil { @@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature } -func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error { +func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) valueBuf := make([]byte, 8) util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) @@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur }) } + +func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { + // process function + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + + var sourceOldKey, sourceNewKey util.FullPath + if message.OldEntry != nil { + sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) + } + if message.NewEntry != nil { + sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) + } + + if debug { + glog.V(0).Infof("received %v", resp) + } + + if !strings.HasPrefix(resp.Directory, sourcePath) { + return nil + } + + // handle deletions + if message.OldEntry != nil && message.NewEntry == nil { + if !strings.HasPrefix(string(sourceOldKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + + // handle new entries + if message.OldEntry == nil && message.NewEntry != nil { + if !strings.HasPrefix(string(sourceNewKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } + + // this is something special? + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + + // handle updates + if strings.HasPrefix(string(sourceOldKey), sourcePath) { + // old key is in the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is also in the watched directory + if !dataSink.IsIncremental() { + oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) + message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) + if foundExisting { + return err + } + + // not able to find old entry + if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { + return fmt.Errorf("delete old entry %v: %v", oldKey, err) + } + } + // create the new entry + newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures) + + } else { + // new key is outside of the watched directory + if !dataSink.IsIncremental() { + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + } + } else { + // old key is outside of the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is in the watched directory + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } else { + // new key is also outside of the watched directory + // skip + } + } + + return nil + } + return processEventFn +} + +func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string { + if !dataSink.IsIncremental() { + return util.Join(targetPath, string(sourceKey)[len(sourcePath):]) + } + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey := time.Unix(mTime, 0).Format("2006-01-02") + return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) +}