This commit is contained in:
Chris Lu 2021-02-17 20:57:08 -08:00
parent 6daa932f5c
commit 3575d41009
15 changed files with 39 additions and 39 deletions

View file

@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId) vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info // find volume location, replication, ttl info
lookup, err := operation.Lookup(func()string{return *s.master}, vid.String()) lookup, err := operation.Lookup(func() string { return *s.master }, vid.String())
if err != nil { if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err) fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true return true

View file

@ -44,7 +44,7 @@ var cmdDownload = &Command{
func runDownload(cmd *Command, args []string) bool { func runDownload(cmd *Command, args []string) bool {
for _, fid := range args { for _, fid := range args {
if e := downloadToFile(func()string{return *d.server}, fid, util.ResolvePath(*d.dir)); e != nil { if e := downloadToFile(func() string { return *d.server }, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e) fmt.Println("Download Error: ", fid, e)
} }
} }

View file

@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
return e return e
} }
results, e := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) results, e := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
if e != nil { if e != nil {
@ -113,7 +113,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil { if e != nil {
fmt.Println(e.Error()) fmt.Println(e.Error())
} }
results, _ := operation.SubmitFiles(func()string {return *upload.master}, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results) bytes, _ := json.Marshal(results)
fmt.Println(string(bytes)) fmt.Println(string(bytes))
} }

View file

@ -47,7 +47,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.SupportBucketTable = false store.SupportBucketTable = false
store.SqlGenerator = &SqlGenMysql{ store.SqlGenerator = &SqlGenMysql{
CreateTableSqlTemplate: "", CreateTableSqlTemplate: "",
DropTableSqlTemplate: "drop table `%s`", DropTableSqlTemplate: "drop table `%s`",
} }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)

View file

@ -1,10 +1,10 @@
package postgres package postgres
import ( import (
`fmt` "fmt"
`github.com/chrislusf/seaweedfs/weed/filer/abstract_sql` "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
_ `github.com/lib/pq` _ "github.com/lib/pq"
) )
type SqlGenPostgres struct { type SqlGenPostgres struct {

View file

@ -50,7 +50,7 @@ func (store *PostgresStore2) initialize(createTable, user, password, hostname st
store.SupportBucketTable = true store.SupportBucketTable = true
store.SqlGenerator = &postgres.SqlGenPostgres{ store.SqlGenerator = &postgres.SqlGenPostgres{
CreateTableSqlTemplate: createTable, CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: `drop table "%s"`, DropTableSqlTemplate: `drop table "%s"`,
} }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)

View file

@ -100,7 +100,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
} else { } else {
glog.Warningf("uploading to %s: %v", uploadUrl, err) glog.Warningf("uploading to %s: %v", uploadUrl, err)
} }
time.Sleep(time.Millisecond * time.Duration(237 * (i+1))) time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
} }
return return
} }

View file

@ -125,13 +125,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
} }
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
submitForClientHandler(w, r, func()string{return ms.selfUrl(r)}, ms.grpcDialOption) submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption)
} else { } else {
masterUrl, err := ms.Topo.Leader() masterUrl, err := ms.Topo.Leader()
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
} else { } else {
submitForClientHandler(w, r, func()string{return masterUrl}, ms.grpcDialOption) submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption)
} }
} }
} }

View file

@ -219,14 +219,14 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
case <-vs.stopChan: case <-vs.stopChan:
var volumeMessages []*master_pb.VolumeInformationMessage var volumeMessages []*master_pb.VolumeInformationMessage
emptyBeat := &master_pb.Heartbeat{ emptyBeat := &master_pb.Heartbeat{
Ip: vs.store.Ip, Ip: vs.store.Ip,
Port: uint32(vs.store.Port), Port: uint32(vs.store.Port),
PublicUrl: vs.store.PublicUrl, PublicUrl: vs.store.PublicUrl,
MaxFileKey: uint64(0), MaxFileKey: uint64(0),
DataCenter: vs.store.GetDataCenter(), DataCenter: vs.store.GetDataCenter(),
Rack: vs.store.GetRack(), Rack: vs.store.GetRack(),
Volumes: volumeMessages, Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0, HasNoVolumes: len(volumeMessages) == 0,
} }
glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port) glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
if err = stream.Send(emptyBeat); err != nil { if err = stream.Send(emptyBeat); err != nil {

View file

@ -126,7 +126,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode { func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
return &EcNode{ return &EcNode{
info: &master_pb.DataNodeInfo{ info: &master_pb.DataNodeInfo{
Id: dataNodeId, Id: dataNodeId,
DiskInfos: make(map[string]*master_pb.DiskInfo), DiskInfos: make(map[string]*master_pb.DiskInfo),
}, },
dc: dc, dc: dc,

View file

@ -285,7 +285,7 @@ func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (vo
} }
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
for _, diskInfo := range t.DiskInfos{ for _, diskInfo := range t.DiskInfos {
for _, vi := range diskInfo.VolumeInfos { for _, vi := range diskInfo.VolumeInfos {
volumeIdToServer[vi.Id] = VInfo{ volumeIdToServer[vi.Id] = VInfo{
server: t.Id, server: t.Id,

View file

@ -285,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
return &master_pb.Heartbeat{ return &master_pb.Heartbeat{
Ip: s.Ip, Ip: s.Ip,
Port: uint32(s.Port), Port: uint32(s.Port),
PublicUrl: s.PublicUrl, PublicUrl: s.PublicUrl,
MaxVolumeCounts: maxVolumeCounts, MaxVolumeCounts: maxVolumeCounts,
MaxFileKey: NeedleIdToUint64(maxFileKey), MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter, DataCenter: s.dataCenter,
Rack: s.rack, Rack: s.rack,
Volumes: volumeMessages, Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0, HasNoVolumes: len(volumeMessages) == 0,
} }
} }

View file

@ -25,7 +25,7 @@ func ToDiskType(vt string) (diskType DiskType) {
return return
} }
func (diskType DiskType) String() string{ func (diskType DiskType) String() string {
if diskType == "" { if diskType == "" {
return "" return ""
} }

View file

@ -48,11 +48,11 @@ func (d *DiskUsages) negative() *DiskUsages {
t := newDiskUsages() t := newDiskUsages()
for diskType, b := range d.usages { for diskType, b := range d.usages {
a := t.getOrCreateDisk(diskType) a := t.getOrCreateDisk(diskType)
a.volumeCount = - b.volumeCount a.volumeCount = -b.volumeCount
a.remoteVolumeCount = - b.remoteVolumeCount a.remoteVolumeCount = -b.remoteVolumeCount
a.activeVolumeCount = - b.activeVolumeCount a.activeVolumeCount = -b.activeVolumeCount
a.ecShardCount = - b.ecShardCount a.ecShardCount = -b.ecShardCount
a.maxVolumeCount = - b.maxVolumeCount a.maxVolumeCount = -b.maxVolumeCount
} }
return t return t
@ -68,7 +68,7 @@ func (d *DiskUsages) ToMap() interface{} {
return ret return ret
} }
func (d *DiskUsages) ToDiskInfo() (map[string]*master_pb.DiskInfo) { func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
ret := make(map[string]*master_pb.DiskInfo) ret := make(map[string]*master_pb.DiskInfo)
for diskType, diskUsageCounts := range d.usages { for diskType, diskUsageCounts := range d.usages {
m := &master_pb.DiskInfo{ m := &master_pb.DiskInfo{

View file

@ -39,7 +39,7 @@ type NodeImpl struct {
diskUsages *DiskUsages diskUsages *DiskUsages
id NodeId id NodeId
parent Node parent Node
sync.RWMutex // lock children sync.RWMutex // lock children
children map[NodeId]Node children map[NodeId]Node
maxVolumeId needle.VolumeId maxVolumeId needle.VolumeId