Merge pull request #1408 from hilimd/master

fix spark read s3 bug (sc.binaryFiles)
This commit is contained in:
Chris Lu 2020-08-01 00:13:00 -07:00 committed by GitHub
commit 90ddd7ec92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 5 deletions

View file

@ -230,6 +230,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
resp, postErr := client.Do(proxyReq) resp, postErr := client.Do(proxyReq)
if resp.ContentLength == -1 {
writeErrorResponse(w, ErrNoSuchKey, r.URL)
return
}
if postErr != nil { if postErr != nil {
glog.Errorf("post to filer: %v", postErr) glog.Errorf("post to filer: %v", postErr)
writeErrorResponse(w, ErrInternalError, r.URL) writeErrorResponse(w, ErrInternalError, r.URL)

View file

@ -38,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
marker = startAfter marker = startAfter
} }
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker) response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil { if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL) writeErrorResponse(w, ErrInternalError, r.URL)
@ -66,7 +66,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
return return
} }
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker) response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil { if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL) writeErrorResponse(w, ErrInternalError, r.URL)
@ -76,8 +76,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML(w, encodeResponse(response)) writeSuccessResponseXML(w, encodeResponse(response))
} }
func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string) (response ListBucketResult, err error) { func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
// convert full path prefix into directory name and prefix for entry name // convert full path prefix into directory name and prefix for entry name
dir, prefix := filepath.Split(originalPrefix) dir, prefix := filepath.Split(originalPrefix)
if strings.HasPrefix(dir, "/") { if strings.HasPrefix(dir, "/") {
@ -125,9 +124,18 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys
lastEntryName = entry.Name lastEntryName = entry.Name
if entry.IsDirectory { if entry.IsDirectory {
if entry.Name != ".uploads" { if entry.Name != ".uploads" {
prefix = fmt.Sprintf("%s%s/", dir, entry.Name)
commonPrefixes = append(commonPrefixes, PrefixEntry{ commonPrefixes = append(commonPrefixes, PrefixEntry{
Prefix: fmt.Sprintf("%s%s/", dir, entry.Name), Prefix: prefix,
}) })
if delimiter != "/" {
response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter)
for _, content := range response.Contents {
contents = append(contents, content)
}
}
} }
} else { } else {
contents = append(contents, ListEntry{ contents = append(contents, ListEntry{