diff --git a/weed/util/lock_table.go b/weed/util/lock_table.go new file mode 100644 index 000000000..32d98bea3 --- /dev/null +++ b/weed/util/lock_table.go @@ -0,0 +1,146 @@ +package util + +import ( + "fmt" + "sync" + "sync/atomic" +) + +// LockTable is a table of locks that can be acquired. +// Locks are acquired in order of request. +type LockTable[T comparable] struct { + mu sync.Mutex + locks map[T]*LockEntry + lockIdSeq int64 +} + +type LockEntry struct { + mu sync.Mutex + waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks + activeLockOwnerCount int32 + lockType LockType + cond *sync.Cond +} + +type LockType int + +const ( + SharedLock LockType = iota + ExclusiveLock +) + +type ActiveLock struct { + ID int64 + isDeleted bool + intention string // for debugging +} + +func NewLockTable[T comparable]() *LockTable[T] { + return &LockTable[T]{ + locks: make(map[T]*LockEntry), + } +} + +func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock { + id := atomic.AddInt64(<.lockIdSeq, 1) + l := &ActiveLock{ID: id, intention: intention} + return l +} + +func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) { + lt.mu.Lock() + // Get or create the lock entry for the key + entry, exists := lt.locks[key] + if !exists { + entry = &LockEntry{} + entry.cond = sync.NewCond(&entry.mu) + lt.locks[key] = entry + } + lt.mu.Unlock() + + lock = lt.NewActiveLock(intention) + + // If the lock is held exclusively, wait + entry.mu.Lock() + if len(entry.waiters) > 0 || lockType == ExclusiveLock { + fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.waiters = append(entry.waiters, lock) + if lockType == ExclusiveLock { + for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) { + entry.cond.Wait() + } + } else { + for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) { + entry.cond.Wait() + } + } + // Remove the transaction from the waiters list + if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID { + entry.waiters = entry.waiters[1:] + entry.cond.Broadcast() + } + } + entry.activeLockOwnerCount++ + + // Otherwise, grant the lock + entry.lockType = lockType + fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.mu.Unlock() + + return lock +} + +func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) { + lt.mu.Lock() + defer lt.mu.Unlock() + + entry, exists := lt.locks[key] + if !exists { + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + // Remove the transaction from the waiters list + for i, waiter := range entry.waiters { + if waiter == lock { + waiter.isDeleted = true + entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...) + break + } + } + + // If there are no waiters, release the lock + if len(entry.waiters) == 0 { + delete(lt.locks, key) + } + + fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount) + if len(entry.waiters) > 0 { + for _, waiter := range entry.waiters { + fmt.Printf(" %d", waiter.ID) + } + fmt.Printf("\n") + } + entry.activeLockOwnerCount-- + + // Notify the next waiter + entry.cond.Broadcast() +} + +func main() { + +} diff --git a/weed/util/lock_table_test.go b/weed/util/lock_table_test.go new file mode 100644 index 000000000..f9037559c --- /dev/null +++ b/weed/util/lock_table_test.go @@ -0,0 +1,42 @@ +package util + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +func TestOrderedLock(t *testing.T) { + lt := NewLockTable[string]() + + var wg sync.WaitGroup + // Simulate transactions requesting locks + for i := 1; i <= 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + key := "resource" + lockType := SharedLock + if i%5 == 0 { + lockType = ExclusiveLock + } + + // Simulate attempting to acquire the lock + lock := lt.AcquireLock("", key, lockType) + + // Lock acquired, perform some work + fmt.Printf("ActiveLock %d acquired the lock.\n", lock.ID) + + // Simulate some work + time.Sleep(time.Duration(rand.Int31n(10)*10) * time.Millisecond) + + // Release the lock + lt.ReleaseLock(key, lock) + fmt.Printf("ActiveLock %d released the lock.\n", lock.ID) + }(i) + } + + wg.Wait() +}