fix leader master /dir/lookup api

Signed-off-by: Lei Liu <lei01.liu@horizon.ai>
This commit is contained in:
Lei Liu 2019-10-30 15:49:58 +08:00
parent 57e441d67b
commit f2f90436ef
4 changed files with 15 additions and 9 deletions

View file

@ -15,12 +15,12 @@ func NewMemorySequencer() (m *MemorySequencer) {
return return
} }
func (m *MemorySequencer) NextFileId(count uint64) (uint64, uint64) { func (m *MemorySequencer) NextFileId(count uint64) uint64 {
m.sequenceLock.Lock() m.sequenceLock.Lock()
defer m.sequenceLock.Unlock() defer m.sequenceLock.Unlock()
ret := m.counter ret := m.counter
m.counter += uint64(count) m.counter += count
return ret, count return ret
} }
func (m *MemorySequencer) SetMax(seenValue uint64) { func (m *MemorySequencer) SetMax(seenValue uint64) {

View file

@ -1,7 +1,7 @@
package sequence package sequence
type Sequencer interface { type Sequencer interface {
NextFileId(count uint64) (uint64, uint64) NextFileId(count uint64) uint64
SetMax(uint64) SetMax(uint64)
Peek() uint64 Peek() uint64
} }

View file

@ -65,11 +65,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
var err error var err error
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
volumeId, newVolumeIdErr := needle.NewVolumeId(vid) volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
machines := ms.Topo.Lookup(collection, volumeId) if newVolumeIdErr != nil {
for _, loc := range machines { err = fmt.Errorf("Unknown volume id %s", vid)
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) } else {
machines := ms.Topo.Lookup(collection, volumeId)
for _, loc := range machines {
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
}
if locations == nil {
err = fmt.Errorf("volume id %s not found", vid)
}
} }
err = newVolumeIdErr
} else { } else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines { for _, loc := range machines {

View file

@ -125,7 +125,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
if datanodes.Length() == 0 { if datanodes.Length() == 0 {
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
} }
fileId, count := t.Sequence.NextFileId(count) fileId := t.Sequence.NextFileId(count)
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
} }