fix spark read s3 bug (sc.binaryFiles)

This commit is contained in:
limd 2020-08-01 01:08:30 +08:00
parent 28764f237c
commit b41b7ea4d0
2 changed files with 18 additions and 5 deletions

View file

@ -230,6 +230,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
resp, postErr := client.Do(proxyReq)
if resp.ContentLength == -1 {
writeErrorResponse(w, ErrNoSuchKey, r.URL)
return
}
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
writeErrorResponse(w, ErrInternalError, r.URL)

View file

@ -38,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
marker = startAfter
}
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker)
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@ -66,7 +66,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
return
}
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker)
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@ -76,8 +76,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML(w, encodeResponse(response))
}
func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string) (response ListBucketResult, err error) {
func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
// convert full path prefix into directory name and prefix for entry name
dir, prefix := filepath.Split(originalPrefix)
if strings.HasPrefix(dir, "/") {
@ -125,9 +124,18 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys
lastEntryName = entry.Name
if entry.IsDirectory {
if entry.Name != ".uploads" {
prefix = fmt.Sprintf("%s%s/", dir, entry.Name)
commonPrefixes = append(commonPrefixes, PrefixEntry{
Prefix: fmt.Sprintf("%s%s/", dir, entry.Name),
Prefix: prefix,
})
if delimiter != "/" {
response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter)
for _, content := range response.Contents {
contents = append(contents, content)
}
}
}
} else {
contents = append(contents, ListEntry{