diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go new file mode 100644 index 000000000..18427d341 --- /dev/null +++ b/weed/util/skiplist/name_batch.go @@ -0,0 +1,102 @@ +package skiplist + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/golang/protobuf/proto" + "sort" + "strings" +) + +type NameBatch struct { + key string + names map[string]struct{} +} + +func (nb *NameBatch) ContainsName(name string) (found bool) { + _, found = nb.names[name] + return +} +func (nb *NameBatch) WriteName(name string) { + if nb.key == "" || strings.Compare(nb.key, name) > 0 { + nb.key = name + } + nb.names[name] = struct{}{} +} +func (nb *NameBatch) DeleteName(name string) { + delete(nb.names, name) + if nb.key == name { + nb.key = "" + for n := range nb.names { + if nb.key == "" || strings.Compare(nb.key, n) > 0 { + nb.key = n + } + } + } +} +func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool { + var names []string + needFilter := startFrom == "" + for n := range nb.names { + if !needFilter || strings.Compare(n, startFrom) >= 0 { + names = append(names, n) + } + } + sort.Slice(names, func(i, j int) bool { + return strings.Compare(names[i], names[j]) < 0 + }) + for _, n := range names { + if !visitNamesFn(n) { + return false + } + } + return true +} + +func NewNameBatch() *NameBatch { + return &NameBatch{ + names: make(map[string]struct{}), + } +} + +func LoadNameBatch(data []byte) *NameBatch { + t := &NameBatchData{} + if len(data) > 0 { + err := proto.Unmarshal(data, t) + if err != nil { + glog.Errorf("unmarshal into NameBatchData{} : %v", err) + return nil + } + } + nb := NewNameBatch() + for _, n := range t.Names { + name := string(n) + if nb.key == "" || strings.Compare(nb.key, name) > 0 { + nb.key = name + } + nb.names[name] = struct{}{} + } + return nb +} + +func (nb *NameBatch) ToBytes() []byte { + t := &NameBatchData{} + for n := range nb.names { + t.Names = append(t.Names, []byte(n)) + } + data, _ := proto.Marshal(t) + return data +} + +func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) { + x, y = NewNameBatch(), NewNameBatch() + + for n := range nb.names { + // there should be no equal case though + if strings.Compare(n, name) <= 0 { + x.WriteName(n) + } else { + y.WriteName(n) + } + } + return +} diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go new file mode 100644 index 000000000..db328afba --- /dev/null +++ b/weed/util/skiplist/name_list.go @@ -0,0 +1,303 @@ +package skiplist + +import ( + "bytes" +) + +type NameList struct { + skipList *SkipList + batchSize int +} + +func NewNameList(store ListStore, batchSize int) *NameList { + return &NameList{ + skipList: New(store), + batchSize: batchSize, + } +} + +/* +Be reluctant to create new nodes. Try to fit into either previous node or next node. +Prefer to add to previous node. + +There are multiple cases after finding the name for greater or equal node + 1. found and node.Key == name + The node contains a batch with leading key the same as the name + nothing to do + 2. no such node found or node.Key > name + + if no such node found + prevNode = list.LargestNode + + // case 2.1 + if previousNode contains name + nothing to do + + // prefer to add to previous node + if prevNode != nil { + // case 2.2 + if prevNode has capacity + prevNode.add name, and save + return + // case 2.3 + split prevNode by name + } + + // case 2.4 + // merge into next node. Avoid too many nodes if adding data in reverse order. + if nextNode is not nil and nextNode has capacity + delete nextNode.Key + nextNode.Key = name + nextNode.batch.add name + insert nodeNode.Key + return + + // case 2.5 + if prevNode is nil + insert new node with key = name, value = batch{name} + return + +*/ +func (nl *NameList) WriteName(name string) error { + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + // case 1: the name already exists as one leading key in the batch + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + return nil + } + + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if nextNode != nil && prevNode == nil { + prevNode, err = nl.skipList.loadElement(nextNode.Prev) + if err != nil { + return err + } + } + + if prevNode != nil { + prevNameBatch := LoadNameBatch(prevNode.Value) + // case 2.1 + if prevNameBatch.ContainsName(name) { + return nil + } + + // case 2.2 + if len(prevNameBatch.names) < nl.batchSize { + prevNameBatch.WriteName(name) + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } + + // case 2.3 + x, y := prevNameBatch.SplitBy(name) + addToX := len(x.names) <= len(y.names) + if len(x.names) != len(prevNameBatch.names) { + if addToX { + x.WriteName(name) + } + if x.key == prevNameBatch.key { + if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil { + return err + } + } else { + if err := nl.skipList.Insert([]byte(x.key), x.ToBytes()); err != nil { + return err + } + } + } + if len(y.names) != len(prevNameBatch.names) { + if !addToX { + y.WriteName(name) + } + if y.key == prevNameBatch.key { + if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil { + return err + } + } else { + if err := nl.skipList.Insert([]byte(y.key), y.ToBytes()); err != nil { + return err + } + } + } + return nil + + } + + // case 2.4 + if nextNode != nil { + nextNameBatch := LoadNameBatch(nextNode.Value) + if len(nextNameBatch.names) < nl.batchSize { + if err := nl.skipList.Delete(nextNode.Key); err != nil { + return err + } + nextNameBatch.WriteName(name) + if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil { + return err + } + return nil + } + } + + // case 2.5 + // now prevNode is nil + newNameBatch := NewNameBatch() + newNameBatch.WriteName(name) + if err := nl.skipList.Insert([]byte(newNameBatch.key), newNameBatch.ToBytes()); err != nil { + return err + } + + return nil +} + +/* +// case 1: exists in nextNode +if nextNode != nil && nextNode.Key == name { + remove from nextNode, update nextNode + // TODO: merge with prevNode if possible? + return +} +if nextNode is nil + prevNode = list.Largestnode +if prevNode == nil and nextNode.Prev != nil + prevNode = load(nextNode.Prev) + +// case 2: does not exist +// case 2.1 +if prevNode == nil { + return +} +// case 2.2 +if prevNameBatch does not contain name { + return +} + +// case 3 +delete from prevNameBatch +if prevNameBatch + nextNode < capacityList + // case 3.1 + merge +else + // case 3.2 + update prevNode + + +*/ +func (nl *NameList) DeleteName(name string) error { + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + + // case 1 + var nextNameBatch *NameBatch + if nextNode != nil { + nextNameBatch = LoadNameBatch(nextNode.Value) + } + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + if err := nl.skipList.Delete(nextNode.Key); err != nil { + return err + } + nextNameBatch.DeleteName(name) + if len(nextNameBatch.names) > 0 { + if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil { + return err + } + } + return nil + } + + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if nextNode != nil && prevNode == nil { + prevNode, err = nl.skipList.loadElement(nextNode.Prev) + if err != nil { + return err + } + } + + // case 2 + if prevNode == nil { + // case 2.1 + return nil + } + prevNameBatch := LoadNameBatch(prevNode.Value) + if !prevNameBatch.ContainsName(name) { + // case 2.2 + return nil + } + + // case 3 + prevNameBatch.DeleteName(name) + if len(prevNameBatch.names) == 0 { + if err := nl.skipList.Delete(prevNode.Key); err != nil { + return err + } + return nil + } + if nextNameBatch != nil && len(nextNameBatch.names) + len(prevNameBatch.names) < nl.batchSize { + // case 3.1 merge nextNode and prevNode + if err := nl.skipList.Delete(nextNode.Key); err != nil { + return err + } + for nextName := range nextNameBatch.names { + prevNameBatch.WriteName(nextName) + } + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } else { + // case 3.2 update prevNode + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } + + return nil +} + +func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error { + lookupKey := []byte(startFrom) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + prevNode = nil + } + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if prevNode != nil { + prevNameBatch := LoadNameBatch(prevNode.Value) + if !prevNameBatch.ListNames(startFrom, visitNamesFn) { + return nil + } + } + + for nextNode != nil { + nextNameBatch := LoadNameBatch(nextNode.Value) + if !nextNameBatch.ListNames(startFrom, visitNamesFn) { + return nil + } + nextNode, err = nl.skipList.loadElement(nextNode.Next[0]) + if err != nil { + return err + } + } + + return nil +} diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go new file mode 100644 index 000000000..811a101f2 --- /dev/null +++ b/weed/util/skiplist/name_list_test.go @@ -0,0 +1,73 @@ +package skiplist + +import ( + "math/rand" + "strconv" + "testing" +) + +const ( + maxNameCount = 100 +) + +func String(x int) string { + return strconv.Itoa(x) +} + +func TestNameList(t *testing.T) { + list := NewNameList(memStore, 7) + + for i := 0; i < maxNameCount; i++ { + list.WriteName(String(i)) + } + + counter := 0 + list.ListNames("", func(name string) bool { + counter++ + print(name, " ") + return true + }) + if counter != maxNameCount { + t.Fail() + } + + // list.skipList.println() + + deleteBase := 5 + deleteCount := maxNameCount - 3 * deleteBase + + for i := deleteBase; i < deleteBase+deleteCount; i++ { + list.DeleteName(String(i)) + } + + counter = 0 + list.ListNames("", func(name string) bool { + counter++ + return true + }) + // list.skipList.println() + if counter != maxNameCount-deleteCount { + t.Fail() + } + + // randomized deletion + list = NewNameList(memStore, 7) + // Delete elements at random positions in the list. + rList := rand.Perm(maxN) + for _, i := range rList { + list.WriteName(String(i)) + } + for _, i := range rList { + list.DeleteName(String(i)) + } + counter = 0 + list.ListNames("", func(name string) bool { + counter++ + print(name, " ") + return true + }) + if counter != 0 { + t.Fail() + } + +} diff --git a/weed/util/skiplist/skiplist.pb.go b/weed/util/skiplist/skiplist.pb.go index 82afec453..adb121bfc 100644 --- a/weed/util/skiplist/skiplist.pb.go +++ b/weed/util/skiplist/skiplist.pb.go @@ -238,6 +238,53 @@ func (x *SkipListElement) GetPrev() *SkipListElementReference { return nil } +type NameBatchData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"` +} + +func (x *NameBatchData) Reset() { + *x = NameBatchData{} + if protoimpl.UnsafeEnabled { + mi := &file_skiplist_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NameBatchData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NameBatchData) ProtoMessage() {} + +func (x *NameBatchData) ProtoReflect() protoreflect.Message { + mi := &file_skiplist_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead. +func (*NameBatchData) Descriptor() ([]byte, []int) { + return file_skiplist_proto_rawDescGZIP(), []int{3} +} + +func (x *NameBatchData) GetNames() [][]byte { + if x != nil { + return x.Names + } + return nil +} + var File_skiplist_proto protoreflect.FileDescriptor var file_skiplist_proto_rawDesc = []byte{ @@ -275,10 +322,13 @@ var file_skiplist_proto_rawDesc = []byte{ 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76, - 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, - 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, - 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74, + 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, + 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, + 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, + 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -293,11 +343,12 @@ func file_skiplist_proto_rawDescGZIP() []byte { return file_skiplist_proto_rawDescData } -var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_skiplist_proto_goTypes = []interface{}{ (*SkipListProto)(nil), // 0: skiplist.SkipListProto (*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference (*SkipListElement)(nil), // 2: skiplist.SkipListElement + (*NameBatchData)(nil), // 3: skiplist.NameBatchData } var file_skiplist_proto_depIdxs = []int32{ 1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference @@ -353,6 +404,18 @@ func file_skiplist_proto_init() { return nil } } + file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NameBatchData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -360,7 +423,7 @@ func file_skiplist_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_skiplist_proto_rawDesc, NumEnums: 0, - NumMessages: 3, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/weed/util/skiplist/skiplist.proto b/weed/util/skiplist/skiplist.proto index bfb190b33..2991ad830 100644 --- a/weed/util/skiplist/skiplist.proto +++ b/weed/util/skiplist/skiplist.proto @@ -24,3 +24,7 @@ message SkipListElement { bytes value = 5; SkipListElementReference prev = 6; } + +message NameBatchData { + repeated bytes names = 1; +} \ No newline at end of file