protect against edge cases when locations expires

This commit is contained in:
Chris Lu 2020-11-15 21:26:04 -08:00
parent af658ea970
commit 61d96fde01

View file

@ -28,20 +28,26 @@ public class SeaweedRead {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
String vid = parseVolumeId(chunkView.fileId); String vid = parseVolumeId(chunkView.fileId);
if (volumeIdCache.getLocations(vid)==null){ FilerProto.Locations locations = volumeIdCache.getLocations(vid);
if (locations == null) {
lookupRequest.addVolumeIds(vid); lookupRequest.addVolumeIds(vid);
} else {
knownLocations.put(vid, locations);
} }
} }
if (lookupRequest.getVolumeIdsCount()>0){ if (lookupRequest.getVolumeIdsCount() > 0) {
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
.getBlockingStub().lookupVolume(lookupRequest.build()); .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
for (Map.Entry<String,FilerProto.Locations> entry : vid2Locations.entrySet()) { for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
volumeIdCache.setLocations(entry.getKey(), entry.getValue()); volumeIdCache.setLocations(entry.getKey(), entry.getValue());
knownLocations.put(entry.getKey(), entry.getValue());
} }
} }
@ -57,7 +63,7 @@ public class SeaweedRead {
startOffset += gap; startOffset += gap;
} }
FilerProto.Locations locations = volumeIdCache.getLocations(parseVolumeId(chunkView.fileId)); FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) { if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId); LOG.error("failed to locate {}", chunkView.fileId);
// log here! // log here!