mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
filer.sync: fix synchronization logic in active-active mode
fix https://github.com/seaweedfs/seaweedfs/issues/3328
This commit is contained in:
parent
303bd067b5
commit
036566629a
|
@ -114,9 +114,12 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p
|
||||||
fileIds := make(map[string]bool)
|
fileIds := make(map[string]bool)
|
||||||
for _, interval := range bs {
|
for _, interval := range bs {
|
||||||
fileIds[interval.GetFileIdString()] = true
|
fileIds[interval.GetFileIdString()] = true
|
||||||
|
fileIds[interval.GetSourceFileId()] = true
|
||||||
}
|
}
|
||||||
for _, chunk := range as {
|
for _, chunk := range as {
|
||||||
if _, found := fileIds[chunk.GetSourceFileId()]; !found {
|
_, sourceFileIdFound := fileIds[chunk.GetSourceFileId()]
|
||||||
|
_, fileIdFound := fileIds[chunk.GetFileId()]
|
||||||
|
if !sourceFileIdFound && !fileIdFound {
|
||||||
delta = append(delta, chunk)
|
delta = append(delta, chunk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,52 @@
|
||||||
package filer
|
package filer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestDoMinusChunks(t *testing.T) {
|
||||||
|
// https://github.com/seaweedfs/seaweedfs/issues/3328
|
||||||
|
|
||||||
|
// clusterA and clusterB using filer.sync to sync file: hello.txt
|
||||||
|
// clusterA append a new line and then clusterB also append a new line
|
||||||
|
// clusterA append a new line again
|
||||||
|
chunksInA := []*filer_pb.FileChunk{
|
||||||
|
{Offset: 0, Size: 3, FileId: "11", Mtime: 100},
|
||||||
|
{Offset: 3, Size: 3, FileId: "22", SourceFileId: "2", Mtime: 200},
|
||||||
|
{Offset: 6, Size: 3, FileId: "33", Mtime: 300},
|
||||||
|
}
|
||||||
|
chunksInB := []*filer_pb.FileChunk{
|
||||||
|
{Offset: 0, Size: 3, FileId: "1", SourceFileId: "11", Mtime: 100},
|
||||||
|
{Offset: 3, Size: 3, FileId: "2", Mtime: 200},
|
||||||
|
{Offset: 6, Size: 3, FileId: "3", SourceFileId: "33", Mtime: 300},
|
||||||
|
}
|
||||||
|
|
||||||
|
// clusterB using command "echo 'content' > hello.txt" to overwrite file
|
||||||
|
// clusterA will receive two evenNotification, need to empty the whole file content first and add new content
|
||||||
|
// the first one is oldEntry is chunksInB and newEntry is empty fileChunks
|
||||||
|
firstOldEntry := chunksInB
|
||||||
|
var firstNewEntry []*filer_pb.FileChunk
|
||||||
|
|
||||||
|
// clusterA received the first one event, gonna empty the whole chunk, according the code in filer_sink 194
|
||||||
|
// we can get the deleted chunks and newChunks
|
||||||
|
firstDeletedChunks := DoMinusChunks(firstOldEntry, firstNewEntry)
|
||||||
|
log.Println("first deleted chunks:", firstDeletedChunks)
|
||||||
|
//firstNewEntry := DoMinusChunks(firstNewEntry, firstOldEntry)
|
||||||
|
|
||||||
|
// clusterA need to delete all chunks in firstDeletedChunks
|
||||||
|
emptiedChunksInA := DoMinusChunksBySourceFileId(chunksInA, firstDeletedChunks)
|
||||||
|
// chunksInA supposed to be empty by minus the deletedChunks but it just delete the chunk which sync from clusterB
|
||||||
|
log.Println("clusterA synced empty chunks event result:", emptiedChunksInA)
|
||||||
|
// clusterB emptied it's chunks and clusterA must sync the change and empty chunks too
|
||||||
|
assert.Equalf(t, firstNewEntry, emptiedChunksInA, "empty")
|
||||||
|
}
|
||||||
|
|
||||||
func TestCompactFileChunksRealCase(t *testing.T) {
|
func TestCompactFileChunksRealCase(t *testing.T) {
|
||||||
|
|
||||||
chunks := []*filer_pb.FileChunk{
|
chunks := []*filer_pb.FileChunk{
|
||||||
|
|
Loading…
Reference in a new issue