diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 9d1fb3417..64754321b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -119,9 +119,8 @@ public class SeaweedInputStream extends InputStream { long bytesRead = 0; int len = buf.remaining(); - int start = (int) this.position; - if (start + len <= entry.getContent().size()) { - entry.getContent().substring(start, start + len).copyTo(buf); + if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) { + entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf); } else { bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go index e660237de..f4088e6b3 100644 --- a/weed/s3api/bucket_metadata.go +++ b/weed/s3api/bucket_metadata.go @@ -1,16 +1,12 @@ package s3api import ( - "bytes" "encoding/json" - "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3account" - - //"github.com/seaweedfs/seaweedfs/weed/s3api" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" "math" @@ -23,7 +19,7 @@ var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*B return nil, err } - return buildBucketMetadata(entry), nil + return buildBucketMetadata(r.s3a.accountManager, entry), nil } type BucketMetaData struct { @@ -77,13 +73,13 @@ func (r *BucketRegistry) init() error { } func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) { - bucketMetadata := buildBucketMetadata(entry) + bucketMetadata := buildBucketMetadata(r.s3a.accountManager, entry) r.metadataCacheLock.Lock() defer r.metadataCacheLock.Unlock() r.metadataCache[entry.Name] = bucketMetadata } -func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData { +func buildBucketMetadata(accountManager *s3account.AccountManager, entry *filer_pb.Entry) *BucketMetaData { entryJson, _ := json.Marshal(entry) glog.V(3).Infof("build bucket metadata,entry=%s", entryJson) bucketMetadata := &BucketMetaData{ @@ -112,22 +108,29 @@ func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData { } //access control policy - acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey] - if ok { - var acp s3.AccessControlPolicy - err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes)) - if err == nil { - //validate owner - if acp.Owner != nil && acp.Owner.ID != nil { - bucketMetadata.Owner = acp.Owner - } else { - glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name) - } - - //acl - bucketMetadata.Acl = acp.Grants + //owner + acpOwnerBytes, ok := entry.Extended[s3_constants.ExtAmzOwnerKey] + if ok && len(acpOwnerBytes) > 0 { + ownerAccountId := string(acpOwnerBytes) + ownerAccountName, exists := accountManager.IdNameMapping[ownerAccountId] + if !exists { + glog.Warningf("owner[id=%s] is invalid, bucket: %s", ownerAccountId, bucketMetadata.Name) } else { - glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name) + bucketMetadata.Owner = &s3.Owner{ + ID: &ownerAccountId, + DisplayName: &ownerAccountName, + } + } + } + //grants + acpGrantsBytes, ok := entry.Extended[s3_constants.ExtAmzAclKey] + if ok && len(acpGrantsBytes) > 0 { + var grants []*s3.Grant + err := json.Unmarshal(acpGrantsBytes, &grants) + if err == nil { + bucketMetadata.Acl = grants + } else { + glog.Warningf("Unmarshal ACP grants: %s(%v), bucket: %s", string(acpGrantsBytes), err, bucketMetadata.Name) } } } diff --git a/weed/s3api/bucket_metadata_test.go b/weed/s3api/bucket_metadata_test.go index 23af6417b..f852a272a 100644 --- a/weed/s3api/bucket_metadata_test.go +++ b/weed/s3api/bucket_metadata_test.go @@ -1,8 +1,8 @@ package s3api import ( + "encoding/json" "fmt" - "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" @@ -26,18 +26,13 @@ var ( } //good entry - goodEntryAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ - Owner: &s3.Owner{ - DisplayName: &s3account.AccountAdmin.Name, - ID: &s3account.AccountAdmin.Id, - }, - Grants: s3_constants.PublicRead, - }) - goodEntry = &filer_pb.Entry{ + goodEntryAcl, _ = json.Marshal(s3_constants.PublicRead) + goodEntry = &filer_pb.Entry{ Name: "entryWithValidAcp", Extended: map[string][]byte{ s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced), - s3_constants.ExtAcpKey: goodEntryAcp, + s3_constants.ExtAmzOwnerKey: []byte(s3account.AccountAdmin.Name), + s3_constants.ExtAmzAclKey: goodEntryAcl, }, } @@ -57,35 +52,28 @@ var ( }, } - //acp is "" + //owner is "" acpEmptyStr = &filer_pb.Entry{ Name: "acpEmptyStr", Extended: map[string][]byte{ - s3_constants.ExtAcpKey: []byte(""), + s3_constants.ExtAmzOwnerKey: []byte(""), }, } - //acp is empty object - acpEmptyObjectAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ - Owner: nil, - Grants: nil, - }) + //owner not exists acpEmptyObject = &filer_pb.Entry{ Name: "acpEmptyObject", Extended: map[string][]byte{ - s3_constants.ExtAcpKey: acpEmptyObjectAcp, + s3_constants.ExtAmzOwnerKey: []byte("xxxxx"), }, } - //acp owner is nil - acpOwnerNilAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ - Owner: nil, - Grants: make([]*s3.Grant, 1), - }) - acpOwnerNil = &filer_pb.Entry{ + //grants is nil + acpOwnerNilAcp, _ = json.Marshal(make([]*s3.Grant, 0)) + acpOwnerNil = &filer_pb.Entry{ Name: "acpOwnerNil", Extended: map[string][]byte{ - s3_constants.ExtAcpKey: acpOwnerNilAcp, + s3_constants.ExtAmzAclKey: acpOwnerNilAcp, }, } @@ -175,8 +163,14 @@ var tcs = []*BucketMetadataTestCase{ } func TestBuildBucketMetadata(t *testing.T) { + accountManager := &s3account.AccountManager{ + IdNameMapping: map[string]string{ + s3account.AccountAdmin.Id: s3account.AccountAdmin.Name, + s3account.AccountAnonymous.Id: s3account.AccountAnonymous.Name, + }, + } for _, tc := range tcs { - resultBucketMetadata := buildBucketMetadata(tc.filerEntry) + resultBucketMetadata := buildBucketMetadata(accountManager, tc.filerEntry) if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) { t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata) } diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go index 10b69979e..f78983a99 100644 --- a/weed/s3api/s3_constants/extend_key.go +++ b/weed/s3api/s3_constants/extend_key.go @@ -1,6 +1,7 @@ package s3_constants const ( - ExtAcpKey = "Seaweed-X-Amz-Acp" + ExtAmzOwnerKey = "Seaweed-X-Amz-Owner" + ExtAmzAclKey = "Seaweed-X-Amz-Acl" ExtOwnershipKey = "Seaweed-X-Amz-Ownership" ) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 96e5018da..7064cac02 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -375,10 +375,16 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool } } - //acp - acp := r.Header.Get(s3_constants.ExtAcpKey) - if len(acp) > 0 { - metadata[s3_constants.ExtAcpKey] = []byte(acp) + //acp-owner + acpOwner := r.Header.Get(s3_constants.ExtAmzOwnerKey) + if len(acpOwner) > 0 { + metadata[s3_constants.ExtAmzOwnerKey] = []byte(acpOwner) + } + + //acp-grants + acpGrants := r.Header.Get(s3_constants.ExtAmzAclKey) + if len(acpOwner) > 0 { + metadata[s3_constants.ExtAmzAclKey] = []byte(acpGrants) } return diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 682c88f78..0e49da959 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -48,7 +48,6 @@ type TempNeedleMapper interface { NeedleMapper DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error - UpdateNeedleMapMetric(indexFile *os.File) error } func (nm *baseNeedleMapper) IndexFileSize() uint64 { diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index e77651b78..30ed96c3b 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" @@ -179,6 +180,7 @@ func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, update } return nil } + func levelDbDelete(db *leveldb.DB, key NeedleId) error { bytes := make([]byte, NeedleIdSize) NeedleIdToBytes(bytes, key) @@ -305,23 +307,45 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF } err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) { - if !offset.IsZero() && size.IsValid() { + m.mapMetric.FileCounter++ + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + // fresh loading + if startFrom == 0 { + m.mapMetric.FileByteCounter += uint64(size) + e = levelDbWrite(db, key, offset, size, false, 0) + return e + } + // increment loading + data, err := db.Get(bytes, nil) + if err != nil { + if !strings.Contains(strings.ToLower(err.Error()), "not found") { + // unexpected error + return err + } + // new needle, unlikely happen + m.mapMetric.FileByteCounter += uint64(size) e = levelDbWrite(db, key, offset, size, false, 0) } else { - e = levelDbDelete(db, key) + // needle is found + oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize]) + oldOffset := BytesToOffset(data[0:OffsetSize]) + if !offset.IsZero() && size.IsValid() { + // updated needle + m.mapMetric.FileByteCounter += uint64(size) + if !oldOffset.IsZero() && oldSize.IsValid() { + m.mapMetric.DeletionCounter++ + m.mapMetric.DeletionByteCounter += uint64(oldSize) + } + e = levelDbWrite(db, key, offset, size, false, 0) + } else { + // deleted needle + m.mapMetric.DeletionCounter++ + m.mapMetric.DeletionByteCounter += uint64(oldSize) + e = levelDbDelete(db, key) + } } return e }) - if err != nil { - return err - } - - if startFrom != 0 { - return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) - } - return nil -} - -func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { - return needleMapMetricFromIndexFile(indexFile, &m.mapMetric) + return err } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 93b1fa4f5..7721980ee 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -129,7 +129,3 @@ func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom ui return e } - -func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error { - return nil -} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 2862ca94d..38b5c0080 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -219,15 +219,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { - if v.needleMapKind == NeedleMapInMemory { - return nil - } - newIdx, err := os.OpenFile(newIdxFileName, os.O_RDWR, 0644) - if err != nil { - return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) - } - defer newIdx.Close() - return v.tmpNm.UpdateNeedleMapMetric(newIdx) + return nil } // fail if the old .dat file has changed to a new revision