Merge branch 'new_master' into hashicorp_raft

# Conflicts:
#	weed/pb/master_pb/master.pb.go
This commit is contained in:
Konstantin Lebedev 2022-04-07 18:50:27 +05:00
commit f5246b748d
24 changed files with 341 additions and 76 deletions

View file

@ -20,7 +20,7 @@ jobs:
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@e5622373a38e60fb6d795a4421e56882f2d7a681 # v3
uses: docker/metadata-action@f2a13332ac1ce8c0a71aeac48a150dbb1838ab67 # v3
with:
images: |
chrislusf/seaweedfs

View file

@ -21,7 +21,7 @@ jobs:
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@e5622373a38e60fb6d795a4421e56882f2d7a681 # v3
uses: docker/metadata-action@f2a13332ac1ce8c0a71aeac48a150dbb1838ab67 # v3
with:
images: |
chrislusf/seaweedfs

View file

@ -20,7 +20,7 @@ jobs:
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@e5622373a38e60fb6d795a4421e56882f2d7a681 # v3
uses: docker/metadata-action@f2a13332ac1ce8c0a71aeac48a150dbb1838ab67 # v3
with:
images: |
chrislusf/seaweedfs

View file

@ -21,7 +21,7 @@ jobs:
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@e5622373a38e60fb6d795a4421e56882f2d7a681 # v3
uses: docker/metadata-action@f2a13332ac1ce8c0a71aeac48a150dbb1838ab67 # v3
with:
images: |
chrislusf/seaweedfs

View file

@ -21,7 +21,7 @@ jobs:
-
name: Docker meta
id: docker_meta
uses: docker/metadata-action@e5622373a38e60fb6d795a4421e56882f2d7a681 # v3
uses: docker/metadata-action@f2a13332ac1ce8c0a71aeac48a150dbb1838ab67 # v3
with:
images: |
chrislusf/seaweedfs

4
go.mod
View file

@ -10,7 +10,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.14.0
github.com/OneOfOne/xxhash v1.2.8
github.com/Shopify/sarama v1.32.0
github.com/aws/aws-sdk-go v1.43.31
github.com/aws/aws-sdk-go v1.43.33
github.com/beorn7/perks v1.0.1 // indirect
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/bwmarrin/snowflake v0.3.0
@ -116,7 +116,7 @@ require (
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.etcd.io/etcd/client/v3 v3.5.2
go.mongodb.org/mongo-driver v1.8.4
go.mongodb.org/mongo-driver v1.9.0
go.opencensus.io v0.23.0 // indirect
gocloud.dev v0.25.0
gocloud.dev/pubsub/natspubsub v0.25.0

7
go.sum
View file

@ -151,8 +151,9 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI
github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.38.68/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31 h1:yJZIr8nMV1hXjAvvOLUFqZRJcHV7udPQBfhJqawDzI0=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.43.33 h1:QeX6NSZv5gmji+SCEShL3LqKk3ldtPoTmsuy/YbM+uk=
github.com/aws/aws-sdk-go v1.43.33/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.7.0/go.mod h1:tb9wi5s61kTDA5qCkcDbt3KRVV74GGslQkl/DRdX/P4=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
@ -933,8 +934,8 @@ go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsX
go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0=
go.etcd.io/etcd/client/v3 v3.5.2 h1:WdnejrUtQC4nCxK0/dLTMqKOB+U5TP/2Ya0BJL+1otA=
go.etcd.io/etcd/client/v3 v3.5.2/go.mod h1:kOOaWFFgHygyT0WlSmL8TJiXmMysO/nNUlEsSsN6W4o=
go.mongodb.org/mongo-driver v1.8.4 h1:NruvZPPL0PBcRJKmbswoWSrmHeUvzdxA3GCPfD/NEOA=
go.mongodb.org/mongo-driver v1.8.4/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY=
go.mongodb.org/mongo-driver v1.9.0 h1:f3aLGJvQmBl8d9S40IL+jEyBC6hfLPbJjv9t5hEM9ck=
go.mongodb.org/mongo-driver v1.9.0/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=

View file

@ -108,9 +108,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountDirHash = -mountDirHash
}
*option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash)
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
}
}
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
}
montSocketListener, err := net.Listen("unix", *option.localSocket)
if err != nil {

View file

@ -23,6 +23,9 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
}
func FileSize(entry *filer_pb.Entry) (size uint64) {
if entry == nil || entry.Attributes == nil {
return 0
}
fileSize := entry.Attributes.FileSize
if entry.RemoteEntry != nil {
if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {

View file

@ -187,6 +187,9 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
func (up *UploadPipeline) Shutdown() {
up.swapFile.FreeResource()
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
for logicChunkIndex, sealedChunk := range up.sealedChunks {
sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
}

View file

@ -131,6 +131,9 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
if fh.entry.Attributes == nil {
fh.entry.Attributes = &filer_pb.FuseAttributes{}
}
return path, fh, fh.entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)

View file

@ -147,6 +147,8 @@ message VolumeLocation {
string leader = 5; // optional when leader is not itself
string data_center = 6; // optional when DataCenter is in use
uint32 grpc_port = 7;
repeated uint32 new_ec_vids = 8;
repeated uint32 deleted_ec_vids = 9;
}
message ClusterNodeUpdate {

View file

@ -12,11 +12,11 @@ import (
"time"
)
const slash = "/"
func ParseLocationName(remote string) (locationName string) {
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 2)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 2)
if len(parts) >= 1 {
return parts[0]
}
@ -25,35 +25,31 @@ func ParseLocationName(remote string) (locationName string) {
func parseBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
loc = &remote_pb.RemoteStorageLocation{}
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 3)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 3)
if len(parts) >= 1 {
loc.Name = parts[0]
}
if len(parts) >= 2 {
loc.Bucket = parts[1]
}
loc.Path = string(remote[len(loc.Name)+1+len(loc.Bucket):])
loc.Path = remote[len(loc.Name)+1+len(loc.Bucket):]
if loc.Path == "" {
loc.Path = "/"
loc.Path = slash
}
return
}
func parseNoBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
loc = &remote_pb.RemoteStorageLocation{}
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 2)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 2)
if len(parts) >= 1 {
loc.Name = parts[0]
}
loc.Path = string(remote[len(loc.Name):])
loc.Path = remote[len(loc.Name):]
if loc.Path == "" {
loc.Path = "/"
loc.Path = slash
}
return
}

View file

@ -135,6 +135,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
w.Header().Set("Location", "/" + bucket)
writeSuccessResponseEmpty(w, r)
}

View file

@ -46,8 +46,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
path = ""
}
emptyFolder := true
if len(entries) > 0 {
lastFileName = entries[len(entries)-1].Name()
emptyFolder = false
}
glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
@ -59,12 +61,14 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
EmptyFolder bool
}{
path,
entries,
limit,
lastFileName,
shouldDisplayLoadMore,
emptyFolder,
})
return
}
@ -76,6 +80,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
EmptyFolder bool
}{
path,
ui.ToBreadcrumb(path),
@ -83,5 +88,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
limit,
lastFileName,
shouldDisplayLoadMore,
emptyFolder,
})
}

View file

@ -164,6 +164,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
var entry *filer.Entry
var newChunks []*filer_pb.FileChunk
var mergedChunks []*filer_pb.FileChunk
isAppend := isAppend(r)
@ -186,7 +187,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
entry.FileSize += uint64(chunkOffset)
}
mergedChunks = append(entry.Chunks, fileChunks...)
newChunks = append(entry.Chunks, fileChunks...)
// TODO
if len(entry.Content) > 0 {
@ -196,7 +197,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
} else {
glog.V(4).Infoln("saving", path)
mergedChunks = fileChunks
newChunks = fileChunks
entry = &filer.Entry{
FullPath: util.FullPath(path),
Attr: filer.Attr{
@ -217,6 +218,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
// maybe concatenate small chunks into one whole chunk
mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
if replyerr != nil {
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
mergedChunks = newChunks
}
// maybe compact entry chunks
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
if replyerr != nil {

View file

@ -0,0 +1,11 @@
package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
//TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
return inputChunks, nil
}

View file

@ -26,6 +26,12 @@
border-radius: 2px;
border: 1px solid #ccc;
float: right;
margin-left: 2px;
margin-bottom: 0;
}
label {
font-weight: normal;
}
.button:hover {
@ -36,6 +42,50 @@
display: none;
}
td, th {
vertical-align: bottom;
}
.danger {
color: red;
background: #fff;
border: 1px solid #fff;
border-radius: 2px;
}
.info {
background: #fff;
border: 1px solid #fff;
border-radius: 2px;
}
.footer {
position: absolute;
bottom: 10px;
right: 10%;
min-width: 30%;
}
.progress-table {
width: 100%;
}
.progress-table-file-name {
text-align: right;
}
.progress-table-percent {
width: 60px;
text-align: right;
}
.add-files {
font-size: 46px;
text-align: center;
border: 1px dashed #999;
padding-bottom: 9px;
margin: 0 2px;
}
</style>
</head>
<body>
@ -54,6 +104,7 @@
</a>
{{ end }}
<label class="button" for="fileElem">Upload</label>
<label class="button" onclick="handleCreateDir()">New Folder</label>
</div>
</div>
@ -61,13 +112,13 @@
<form class="upload-form">
<input type="file" id="fileElem" multiple onchange="handleFiles(this.files)">
<table width="90%">
<table width="100%">
{{$path := .Path }}
{{ range $entry_index, $entry := .Entries }}
<tr>
<td>
{{if $entry.IsDirectory}}
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="16">
<a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
@ -89,13 +140,25 @@
{{ $entry.Size | humanizeBytes }}&nbsp;
{{end}}
</td>
<td nowrap>
<td align="right" nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
<td>
{{if $entry.IsDirectory}}
<label class="button danger" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')">Delete</label>
{{else}}
<label class="button danger" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')">Delete</label>
{{end}}
<label class="button info" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')">Rename</label>
</td>
</tr>
{{ end }}
</table>
{{if .EmptyFolder}}
<div class="row add-files">
+
</div>
{{end}}
</form>
</div>
@ -109,65 +172,171 @@
<br/>
<br/>
<div id="progress-area" class="footer" style="display: none;">
</div>
</div>
</body>
<script type="text/javascript">
// ************************ Drag and drop ***************** //
let dropArea = document.getElementById("drop-area")
let dropArea = document.getElementById("drop-area");
let progressArea = document.getElementById("progress-area");
// Prevent default drag behaviors
;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => {
dropArea.addEventListener(eventName, preventDefaults, false)
document.body.addEventListener(eventName, preventDefaults, false)
})
dropArea.addEventListener(eventName, preventDefaults, false);
document.body.addEventListener(eventName, preventDefaults, false);
});
// Highlight drop area when item is dragged over it
;['dragenter', 'dragover'].forEach(eventName => {
dropArea.addEventListener(eventName, highlight, false)
})
dropArea.addEventListener(eventName, highlight, false);
});
;['dragleave', 'drop'].forEach(eventName => {
dropArea.addEventListener(eventName, unhighlight, false)
})
dropArea.addEventListener(eventName, unhighlight, false);
});
// Handle dropped files
dropArea.addEventListener('drop', handleDrop, false)
dropArea.addEventListener('drop', handleDrop, false);
function preventDefaults(e) {
e.preventDefault()
e.stopPropagation()
e.preventDefault();
e.stopPropagation();
}
function highlight(e) {
dropArea.classList.add('highlight')
dropArea.classList.add('highlight');
}
function unhighlight(e) {
dropArea.classList.remove('highlight')
dropArea.classList.remove('highlight');
}
function handleDrop(e) {
var dt = e.dataTransfer
var files = dt.files
var dt = e.dataTransfer;
var files = dt.files;
handleFiles(files)
handleFiles(files);
}
var uploadList = {};
function handleFiles(files) {
files = [...files]
files.forEach(uploadFile)
window.location.reload()
files = [...files];
files.forEach(startUpload);
renderProgress();
files.forEach(uploadFile);
}
function startUpload(file, i) {
uploadList[file.name] = {'name': file.name, 'percent': 0, 'finish': false};
}
function renderProgress() {
var values = Object.values(uploadList);
var html = '<table class="progress-table">\n';
for (let i of values) {
html += '<tr>\n<td class="progress-table-file-name">' + i.name + '<\/td>\n';
html += '<td class="progress-table-percent">' + i.percent + '% <\/td>\n<\/tr>\n';
}
html += '<\/table>\n';
progressArea.innerHTML = html;
if (values.length > 0) {
progressArea.attributes.style.value = '';
}
console.log('Render Progress', values);
}
function reportProgress(file, percent) {
var item = uploadList[file]
item.percent = percent;
renderProgress();
}
function finishUpload(file) {
uploadList[file]['finish'] = true;
renderProgress();
var allFinish = true;
for (let i of Object.values(uploadList)) {
if (!i.finish) {
allFinish = false;
break;
}
}
if (allFinish) {
console.log('All Finish');
window.location.reload();
}
}
function uploadFile(file, i) {
var url = window.location.href
var xhr = new XMLHttpRequest()
var formData = new FormData()
xhr.open('POST', url, false)
var url = window.location.href;
var xhr = new XMLHttpRequest();
var fileName = file.name;
xhr.upload.addEventListener('progress', function(e) {
if (e.lengthComputable) {
var percent = Math.ceil((e.loaded / e.total) * 100);
reportProgress(fileName, percent)
}
});
xhr.upload.addEventListener('loadend', function(e) {
finishUpload(fileName);
});
var formData = new FormData();
xhr.open('POST', url, true);
formData.append('file', file);
xhr.send(formData);
}
formData.append('file', file)
xhr.send(formData)
function handleCreateDir() {
var dirName = prompt('Directory Name:', '');
dirName = dirName.trim();
if (dirName == null || dirName == '') {
return;
}
var baseUrl = window.location.href;
if (!baseUrl.endsWith('/')) {
baseUrl += '/';
}
var url = baseUrl + dirName;
if (!url.endsWith('/')) {
url += '/';
}
var xhr = new XMLHttpRequest();
xhr.open('POST', url, false);
xhr.setRequestHeader('Content-Type', '');
xhr.send();
window.location.reload();
}
function handleRename(originName, basePath) {
var newName = prompt('New Name:', originName);
if (newName == null || newName == '') {
return;
}
var url = basePath + newName;
var originPath = basePath + originName;
url += '?mv.from=' + originPath;
var xhr = new XMLHttpRequest();
xhr.open('POST', url, false);
xhr.setRequestHeader('Content-Type', '');
xhr.send();
window.location.reload();
}
function handleDelete(path) {
if (!confirm('Are you sure to delete ' + path + '?')) {
return;
}
var url = path;
if (url.endsWith('/')) {
url += '?recursive=true';
}
var xhr = new XMLHttpRequest();
xhr.open('DELETE', url, false);
xhr.send();
window.location.reload();
}
</script>
</html>

View file

@ -133,13 +133,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
message.NewEcVids = append(message.NewEcVids, s.Id)
}
for _, s := range heartbeat.DeletedEcShards {
if dn.HasVolumesById(needle.VolumeId(s.Id)) {
if dn.HasEcShards(needle.VolumeId(s.Id)) {
continue
}
message.DeletedVids = append(message.DeletedVids, s.Id)
message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
}
}
@ -151,17 +151,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// broadcast the ec vid changes to master clients
for _, s := range newShards {
message.NewVids = append(message.NewVids, uint32(s.VolumeId))
message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
for _, s := range deletedShards {
if dn.HasVolumesById(s.VolumeId) {
continue
}
message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
}
}
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}

View file

@ -95,7 +95,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
if !found && len(locations) > 0 {
return fmt.Errorf("volume %d not found", vid)
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 B

After

Width:  |  Height:  |  Size: 77 B

View file

@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
for _, ecShards := range actualShards {
if dn.hasEcShards(ecShards.VolumeId) {
if dn.HasEcShards(ecShards.VolumeId) {
continue
}
@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
return
}
func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {

View file

@ -159,6 +159,14 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
for _, newEcVid := range resp.VolumeLocation.NewEcVids {
glog.V(1).Infof("%s: %s masterClient adds ec volume %d", mc.clientType, loc.Url, newEcVid)
mc.addEcLocation(newEcVid, loc)
}
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
glog.V(1).Infof("%s: %s masterClient removes ec volume %d", mc.clientType, loc.Url, deletedEcVid)
mc.deleteEcLocation(deletedEcVid, loc)
}
}
if resp.ClusterNodeUpdate != nil {

View file

@ -36,16 +36,18 @@ func (l Location) ServerAddress() pb.ServerAddress {
type vidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
DataCenter string
cursor int32
vid2Locations map[uint32][]Location
ecVid2Locations map[uint32][]Location
DataCenter string
cursor int32
}
func newVidMap(dataCenter string) vidMap {
return vidMap{
vid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1,
vid2Locations: make(map[uint32][]Location),
ecVid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1,
}
}
@ -124,7 +126,13 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
vc.RLock()
defer vc.RUnlock()
glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
locations, found = vc.vid2Locations[vid]
if found && len(locations) > 0 {
return
}
locations, found = vc.ecVid2Locations[vid]
return
}
@ -132,6 +140,8 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("+ volume id %d: %+v", vid, location)
locations, found := vc.vid2Locations[vid]
if !found {
vc.vid2Locations[vid] = []Location{location}
@ -148,10 +158,34 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
}
func (vc *vidMap) addEcLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("+ ec volume id %d: %+v", vid, location)
locations, found := vc.ecVid2Locations[vid]
if !found {
vc.ecVid2Locations[vid] = []Location{location}
return
}
for _, loc := range locations {
if loc.Url == location.Url {
return
}
}
vc.ecVid2Locations[vid] = append(locations, location)
}
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("- volume id %d: %+v", vid, location)
locations, found := vc.vid2Locations[vid]
if !found {
return
@ -165,3 +199,23 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
}
}
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("- ec volume id %d: %+v", vid, location)
locations, found := vc.ecVid2Locations[vid]
if !found {
return
}
for i, loc := range locations {
if loc.Url == location.Url {
vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
break
}
}
}