1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-03-09 00:00:02 +01:00

s3: Update to new changes in minio-go

Fixes #388
This commit is contained in:
Harshavardhana 2016-01-20 02:14:50 -08:00
parent 4d7e802c44
commit 06edb1ee60
19 changed files with 672 additions and 290 deletions

4
Godeps/Godeps.json generated
View file

@ -24,8 +24,8 @@
},
{
"ImportPath": "github.com/minio/minio-go",
"Comment": "v0.2.5-209-g77f35ea",
"Rev": "77f35ea56099f50b0425d0e2f3949773dae723c0"
"Comment": "v0.2.5-225-g651c8bc",
"Rev": "651c8bc0e7739585ca1540ecb04815d969595f81"
},
{
"ImportPath": "github.com/pkg/sftp",

View file

@ -10,8 +10,8 @@ env:
- ARCH=i686
go:
- 1.5.1
- 1.5.2
- 1.5.3
script:
- go vet ./...

View file

@ -161,8 +161,8 @@ func HTTPRespToErrorResponse(resp *http.Response, bucketName, objectName string)
}
// ErrEntityTooLarge - Input size is larger than supported maximum.
func ErrEntityTooLarge(totalSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Your proposed upload size %d exceeds the maximum allowed object size '5GiB' for single PUT operation.", totalSize)
func ErrEntityTooLarge(totalSize, maxObjectSize int64, bucketName, objectName string) error {
msg := fmt.Sprintf("Your proposed upload size %d exceeds the maximum allowed object size %d for single PUT operation.", totalSize, maxObjectSize)
return ErrorResponse{
Code: "EntityTooLarge",
Message: msg,

View file

@ -447,23 +447,29 @@ func (c Client) listObjectParts(bucketName, objectName, uploadID string) (partsI
}
// findUploadID lists all incomplete uploads and finds the uploadID of the matching object name.
func (c Client) findUploadID(bucketName, objectName string) (string, error) {
func (c Client) findUploadID(bucketName, objectName string) (uploadID string, err error) {
// Make list incomplete uploads recursive.
isRecursive := true
// Turn off size aggregation of individual parts, in this request.
isAggregateSize := false
// NOTE: done Channel is set to 'nil, this will drain go routine until exhaustion.
for mpUpload := range c.listIncompleteUploads(bucketName, objectName, isRecursive, isAggregateSize, nil) {
// latestUpload to track the latest multipart info for objectName.
var latestUpload ObjectMultipartInfo
// Create done channel to cleanup the routine.
doneCh := make(chan struct{})
defer close(doneCh)
// List all incomplete uploads.
for mpUpload := range c.listIncompleteUploads(bucketName, objectName, isRecursive, isAggregateSize, doneCh) {
if mpUpload.Err != nil {
return "", mpUpload.Err
}
// if object name found, return the upload id.
if objectName == mpUpload.Key {
return mpUpload.UploadID, nil
if mpUpload.Initiated.Sub(latestUpload.Initiated) > 0 {
latestUpload = mpUpload
}
}
}
// No upload id was found, return success and empty upload id.
return "", nil
// Return the latest upload id.
return latestUpload.UploadID, nil
}
// getTotalMultipartSize - calculate total uploaded size for the a given multipart object.

View file

@ -101,7 +101,7 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
if bucketName != "" {
// If endpoint supports virtual host style use that always.
// Currently only S3 and Google Cloud Storage would support this.
if isVirtualHostSupported(c.endpointURL) {
if isVirtualHostSupported(c.endpointURL, bucketName) {
targetURL.Host = bucketName + "." + c.endpointURL.Host
targetURL.Path = "/"
} else {

View file

@ -21,6 +21,7 @@ import (
"crypto/sha256"
"hash"
"io"
"math"
"os"
)
@ -42,8 +43,59 @@ func isReadAt(reader io.Reader) (ok bool) {
return
}
// shouldUploadPart - verify if part should be uploaded.
func shouldUploadPart(objPart objectPart, objectParts map[int]objectPart) bool {
// If part not found should upload the part.
uploadedPart, found := objectParts[objPart.PartNumber]
if !found {
return true
}
// if size mismatches should upload the part.
if objPart.Size != uploadedPart.Size {
return true
}
// if md5sum mismatches should upload the part.
if objPart.ETag == uploadedPart.ETag {
return true
}
return false
}
// optimalPartInfo - calculate the optimal part info for a given
// object size.
//
// NOTE: Assumption here is that for any object to be uploaded to any S3 compatible
// object storage it will have the following parameters as constants.
//
// maxPartsCount - 10000
// minPartSize - 5MiB
// maxMultipartPutObjectSize - 5TiB
//
func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, lastPartSize int64, err error) {
// object size is '-1' set it to 5TiB.
if objectSize == -1 {
objectSize = maxMultipartPutObjectSize
}
// object size is larger than supported maximum.
if objectSize > maxMultipartPutObjectSize {
err = ErrEntityTooLarge(objectSize, maxMultipartPutObjectSize, "", "")
return
}
// Use floats for part size for all calculations to avoid
// overflows during float64 to int64 conversions.
partSizeFlt := math.Ceil(float64(objectSize / maxPartsCount))
partSizeFlt = math.Ceil(partSizeFlt/minPartSize) * minPartSize
// Total parts count.
totalPartsCount = int(math.Ceil(float64(objectSize) / partSizeFlt))
// Part size.
partSize = int64(partSizeFlt)
// Last part size.
lastPartSize = objectSize - int64(totalPartsCount-1)*partSize
return totalPartsCount, partSize, lastPartSize, nil
}
// hashCopyN - Calculates Md5sum and SHA256sum for upto partSize amount of bytes.
func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
func (c Client) hashCopyN(writer io.ReadWriter, reader io.Reader, partSize int64) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
@ -63,11 +115,6 @@ func (c Client) hashCopyN(writer io.ReadWriteSeeker, reader io.Reader, partSize
}
}
// Seek back to beginning of input, any error fail right here.
if _, err := writer.Seek(0, 0); err != nil {
return nil, nil, 0, err
}
// Finalize md5shum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
@ -140,28 +187,3 @@ func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, siz
}
return md5Sum, sha256Sum, size, nil
}
// Fetch all parts info, including total uploaded size, maximum part
// size and max part number.
func (c Client) getPartsInfo(bucketName, objectName, uploadID string) (prtsInfo map[int]objectPart, totalSize int64, maxPrtSize int64, maxPrtNumber int, err error) {
// Fetch previously upload parts.
prtsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return nil, 0, 0, 0, err
}
// Peek through all the parts and calculate totalSize, maximum
// part size and last part number.
for _, prtInfo := range prtsInfo {
// Save previously uploaded size.
totalSize += prtInfo.Size
// Choose the maximum part size.
if prtInfo.Size >= maxPrtSize {
maxPrtSize = prtInfo.Size
}
// Choose the maximum part number.
if maxPrtNumber < prtInfo.PartNumber {
maxPrtNumber = prtInfo.PartNumber
}
}
return prtsInfo, totalSize, maxPrtSize, maxPrtNumber, nil
}

View file

@ -54,7 +54,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// Check for largest object size allowed.
if fileSize > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(fileSize, bucketName, objectName)
return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName)
}
// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
@ -87,7 +87,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
// Small object upload is initiated for uploads for input data size smaller than 5MiB.
if fileSize < minimumPartSize {
if fileSize < minPartSize {
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType)
}
// Upload all large objects as multipart.
@ -99,7 +99,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
if errResp.Code == "NotImplemented" {
// If size of file is greater than '5GiB' fail.
if fileSize > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(fileSize, bucketName, objectName)
return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType)
@ -139,9 +139,6 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
@ -149,43 +146,39 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// previously uploaded parts info.
if !isNew {
// Fetch previously upload parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given file size.
partSize := optimalPartSize(fileSize)
// Use prevMaxPartSize if available.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(fileSize)
if err != nil {
return 0, err
}
// Part number always starts with '0'.
partNumber := 0
// Upload each part until fileSize.
for totalUploadedSize < fileSize {
// Increment part number.
partNumber++
// Part number always starts with '1'.
partNumber := 1
for partNumber <= totalPartsCount {
// Get a section reader on a particular offset.
sectionReader := io.NewSectionReader(fileReader, totalUploadedSize, partSize)
// Calculates MD5 and SHA256 sum for a section reader.
md5Sum, sha256Sum, size, err := c.computeHash(sectionReader)
md5Sum, sha256Sum, prtSize, err := c.computeHash(sectionReader)
if err != nil {
return 0, err
}
// Verify if part was not uploaded.
if !isPartUploaded(objectPart{
// Verify if part should be uploaded.
if shouldUploadPart(objectPart{
ETag: hex.EncodeToString(md5Sum),
PartNumber: partNumber,
Size: prtSize,
}, partsInfo) {
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(sectionReader), partNumber, md5Sum, sha256Sum, size)
objPart, err := c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(sectionReader), partNumber, md5Sum, sha256Sum, prtSize)
if err != nil {
return totalUploadedSize, err
}
@ -194,7 +187,10 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
}
// Save successfully uploaded size.
totalUploadedSize += size
totalUploadedSize += prtSize
// Increment part number.
partNumber++
}
// Verify if we uploaded all data.
@ -210,8 +206,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}
// Verify if partNumber is different than total list of parts.
if partNumber != len(completeMultipartUpload.Parts) {
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
}

View file

@ -42,7 +42,7 @@ import (
// is where each part is re-downloaded, checksummed and verified
// before upload.
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, contentType string) (n int64, err error) {
if size > 0 && size >= minimumPartSize {
if size > 0 && size >= minPartSize {
// Verify if reader is *os.File, then use file system functionalities.
if isFile(reader) {
return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, contentType)
@ -73,6 +73,15 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
return 0, err
}
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// A map of all previously uploaded parts.
var partsInfo = make(map[int]objectPart)
// getUploadID for an object, initiates a new multipart request
// if it cannot find any previously partially uploaded object.
uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType)
@ -80,83 +89,70 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
return 0, err
}
// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// A map of all previously uploaded parts.
var partsInfo = make(map[int]objectPart)
// If This session is a continuation of a previous session fetch all
// previously uploaded parts info.
if !isNew {
// Fetch previously uploaded parts and maximum part size.
partsInfo, _, prevMaxPartSize, _, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given size.
partSize := optimalPartSize(size)
// Use prevMaxPartSize if available.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
if err != nil {
return 0, err
}
// Part number always starts with '0'.
partNumber := 0
// Part number always starts with '1'.
partNumber := 1
// Upload each part until EOF.
for {
// Increment part number.
partNumber++
// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)
// Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject-stream")
if err != nil {
return 0, err
}
// Calculates MD5 and SHA256 sum while copying partSize bytes into tmpFile.
md5Sum, sha256Sum, size, rErr := c.hashCopyN(tmpFile, reader, partSize)
for partNumber <= totalPartsCount {
// Calculates MD5 and SHA256 sum while copying partSize bytes
// into tmpBuffer.
md5Sum, sha256Sum, prtSize, rErr := c.hashCopyN(tmpBuffer, reader, partSize)
if rErr != nil {
if rErr != io.EOF {
return 0, rErr
}
}
// Verify if part was not uploaded.
if !isPartUploaded(objectPart{
// Verify if part should be uploaded.
if shouldUploadPart(objectPart{
ETag: hex.EncodeToString(md5Sum),
PartNumber: partNumber,
Size: prtSize,
}, partsInfo) {
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, size)
objPart, err := c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(tmpBuffer), partNumber,
md5Sum, sha256Sum, prtSize)
if err != nil {
// Close the temporary file upon any error.
tmpFile.Close()
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
return 0, err
}
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
}
// Close the temporary file.
tmpFile.Close()
// Reset the temporary buffer.
tmpBuffer.Reset()
// Save successfully uploaded size.
totalUploadedSize += size
totalUploadedSize += prtSize
// If read error was an EOF, break out of the loop.
if rErr == io.EOF {
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
break
}
// Increment part number.
partNumber++
}
// Verify if we uploaded all the data.
@ -174,9 +170,11 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}
// Verify if partNumber is different than total list of parts.
if partNumber != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
if size > 0 {
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(partNumber, len(completeMultipartUpload.Parts))
}
}
// Sort all completed parts.
@ -255,7 +253,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
return objectPart{}, err
}
if size > maxPartSize {
return objectPart{}, ErrEntityTooLarge(size, bucketName, objectName)
return objectPart{}, ErrEntityTooLarge(size, maxPartSize, bucketName, objectName)
}
if size <= -1 {
return objectPart{}, ErrEntityTooSmall(size, bucketName, objectName)

View file

@ -17,14 +17,30 @@
package minio
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"errors"
"hash"
"io"
"io/ioutil"
"sort"
)
// shouldUploadPartReadAt - verify if part should be uploaded.
func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) bool {
// If part not found part should be uploaded.
uploadedPart, found := objectParts[objPart.PartNumber]
if !found {
return true
}
// if size mismatches part should be uploaded.
if uploadedPart.Size != objPart.Size {
return true
}
return false
}
// putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader
// of type which implements io.ReaderAt interface (ReadAt method).
//
@ -57,42 +73,70 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Complete multipart upload.
var completeMultipartUpload completeMultipartUpload
// Previous maximum part size
var prevMaxPartSize int64
// Previous part number.
var prevPartNumber int
// A map of all uploaded parts.
var partsInfo = make(map[int]objectPart)
// Fetch all parts info previously uploaded.
if !isNew {
partsInfo, totalUploadedSize, prevMaxPartSize, prevPartNumber, err = c.getPartsInfo(bucketName, objectName, uploadID)
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
return 0, err
}
}
// Calculate the optimal part size for a given file size.
partSize := optimalPartSize(size)
// If prevMaxPartSize is set use that.
if prevMaxPartSize != 0 {
partSize = prevMaxPartSize
// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
if err != nil {
return 0, err
}
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// Part number always starts with prevPartNumber + 1. i.e The next part number.
partNumber := prevPartNumber + 1
// Used for readability, lastPartNumber is always
// totalPartsCount.
lastPartNumber := totalPartsCount
// Upload each part until totalUploadedSize reaches input reader size.
for totalUploadedSize < size {
// Initialize a new temporary file.
tmpFile, err := newTempFile("multiparts$-putobject-partial")
if err != nil {
return 0, err
// partNumber always starts with '1'.
partNumber := 1
// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)
// Upload all the missing parts.
for partNumber <= lastPartNumber {
// Verify object if its uploaded.
verifyObjPart := objectPart{
PartNumber: partNumber,
Size: partSize,
}
// Special case if we see a last part number, save last part
// size as the proper part size.
if partNumber == lastPartNumber {
verifyObjPart = objectPart{
PartNumber: lastPartNumber,
Size: lastPartSize,
}
}
// Verify if part should be uploaded.
if !shouldUploadPartReadAt(verifyObjPart, partsInfo) {
// Increment part number when not uploaded.
partNumber++
continue
}
// If partNumber was not uploaded we calculate the missing
// part offset and size. For all other part numbers we
// calculate offset based on multiples of partSize.
readAtOffset := int64(partNumber-1) * partSize
missingPartSize := partSize
// As a special case if partNumber is lastPartNumber, we
// calculate the offset based on the last part size.
if partNumber == lastPartNumber {
readAtOffset = (size - lastPartSize)
missingPartSize = lastPartSize
}
// Create a hash multiwriter.
@ -102,10 +146,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(hashMD5, hashSHA256)
}
writer := io.MultiWriter(tmpFile, hashWriter)
// Choose totalUploadedSize as the current readAtOffset.
readAtOffset := totalUploadedSize
writer := io.MultiWriter(tmpBuffer, hashWriter)
// Read until partSize.
var totalReadPartSize int64
@ -116,7 +157,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Following block reads data at an offset from the input
// reader and copies data to into local temporary file.
// Temporary file data is limited to the partSize.
for totalReadPartSize < partSize {
for totalReadPartSize < missingPartSize {
readAtSize, rerr := reader.ReadAt(readAtBuffer, readAtOffset)
if rerr != nil {
if rerr != io.EOF {
@ -137,11 +178,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
}
}
// Seek back to beginning of the temporary file.
if _, err := tmpFile.Seek(0, 0); err != nil {
return 0, err
}
var md5Sum, sha256Sum []byte
md5Sum = hashMD5.Sum(nil)
// Signature version '4'.
@ -150,26 +186,22 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
}
// Proceed to upload the part.
objPart, err := c.uploadPart(bucketName, objectName, uploadID, tmpFile, partNumber, md5Sum, sha256Sum, totalReadPartSize)
objPart, err := c.uploadPart(bucketName, objectName, uploadID, ioutil.NopCloser(tmpBuffer),
partNumber, md5Sum, sha256Sum, totalReadPartSize)
if err != nil {
// Close the read closer.
tmpFile.Close()
// Reset the buffer upon any error.
tmpBuffer.Reset()
return totalUploadedSize, err
}
// Save successfully uploaded size.
totalUploadedSize += totalReadPartSize
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
// Move to next part.
// Increment part number here after successful part upload.
partNumber++
}
// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
// Reset the buffer.
tmpBuffer.Reset()
}
// Loop over uploaded parts to save them in a Parts array before completing the multipart request.
@ -177,9 +209,20 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
var complPart completePart
complPart.ETag = part.ETag
complPart.PartNumber = part.PartNumber
totalUploadedSize += part.Size
completeMultipartUpload.Parts = append(completeMultipartUpload.Parts, complPart)
}
// Verify if we uploaded all the data.
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
// Verify if totalPartsCount is not equal to total list of parts.
if totalPartsCount != len(completeMultipartUpload.Parts) {
return totalUploadedSize, ErrInvalidParts(totalPartsCount, len(completeMultipartUpload.Parts))
}
// Sort all completed parts.
sort.Sort(completedParts(completeMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, completeMultipartUpload)

View file

@ -22,37 +22,87 @@ import (
"io/ioutil"
"net/http"
"os"
"reflect"
"runtime"
"strings"
)
// getReaderSize gets the size of the underlying reader, if possible.
// getReaderSize - Determine the size of Reader if available.
func getReaderSize(reader io.Reader) (size int64, err error) {
var result []reflect.Value
size = -1
if reader != nil {
switch v := reader.(type) {
case *bytes.Buffer:
size = int64(v.Len())
case *bytes.Reader:
size = int64(v.Len())
case *strings.Reader:
size = int64(v.Len())
case *os.File:
var st os.FileInfo
st, err = v.Stat()
if err != nil {
return 0, err
// Verify if there is a method by name 'Size'.
lenFn := reflect.ValueOf(reader).MethodByName("Size")
if lenFn.IsValid() {
if lenFn.Kind() == reflect.Func {
// Call the 'Size' function and save its return value.
result = lenFn.Call([]reflect.Value{})
if result != nil && len(result) == 1 {
lenValue := result[0]
if lenValue.IsValid() {
switch lenValue.Kind() {
case reflect.Int:
fallthrough
case reflect.Int8:
fallthrough
case reflect.Int16:
fallthrough
case reflect.Int32:
fallthrough
case reflect.Int64:
size = lenValue.Int()
}
}
}
}
size = st.Size()
case *Object:
var st ObjectInfo
st, err = v.Stat()
if err != nil {
return 0, err
} else {
// Fallback to Stat() method, two possible Stat() structs
// exist.
switch v := reader.(type) {
case *os.File:
var st os.FileInfo
st, err = v.Stat()
if err != nil {
// Handle this case specially for "windows",
// certain files for example 'Stdin', 'Stdout' and
// 'Stderr' it is not allowed to fetch file information.
if runtime.GOOS == "windows" {
if strings.Contains(err.Error(), "GetFileInformationByHandle") {
return -1, nil
}
}
return
}
// Ignore if input is a directory, throw an error.
if st.Mode().IsDir() {
return -1, ErrInvalidArgument("Input file cannot be a directory.")
}
// Ignore 'Stdin', 'Stdout' and 'Stderr', since they
// represent *os.File type but internally do not
// implement Seekable calls. Ignore them and treat
// them like a stream with unknown length.
switch st.Name() {
case "stdin":
fallthrough
case "stdout":
fallthrough
case "stderr":
return
}
size = st.Size()
case *Object:
var st ObjectInfo
st, err = v.Stat()
if err != nil {
return
}
size = st.Size
}
size = st.Size
}
}
return size, nil
// Returns the size here.
return size, err
}
// completedParts is a collection of parts sortable by their part numbers.
@ -85,7 +135,7 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte
return 0, err
}
// get reader size.
// Get reader size.
size, err := getReaderSize(reader)
if err != nil {
return 0, err
@ -93,7 +143,7 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte
// Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
// NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
@ -108,7 +158,7 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Do not compute MD5 for Google Cloud Storage. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType)
@ -125,14 +175,14 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte
}
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Do not compute MD5 for anonymous requests to Amazon S3. Uploads upto 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType)
}
// putSmall object.
if size < minimumPartSize && size > 0 {
if size < minPartSize && size > 0 {
return c.putObjectSingle(bucketName, objectName, reader, size, contentType)
}
// For all sizes greater than 5MiB do multipart.
@ -144,7 +194,7 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte
if errResp.Code == "NotImplemented" {
// Verify if size of reader is greater than '5GiB'.
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, reader, size, contentType)
@ -165,7 +215,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
return 0, err
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// This function does not calculate sha256 and md5sum for payload.
// Execute put object.
@ -190,25 +240,40 @@ func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader,
return 0, err
}
if size > maxSinglePutObjectSize {
return 0, ErrEntityTooLarge(size, bucketName, objectName)
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// If size is a stream, upload upto 5GiB.
if size <= -1 {
size = maxSinglePutObjectSize
}
// Initialize a new temporary file.
tmpFile, err := newTempFile("single$-putobject-single")
if err != nil {
return 0, err
var md5Sum, sha256Sum []byte
var readCloser io.ReadCloser
if size <= minPartSize {
// Initialize a new temporary buffer.
tmpBuffer := new(bytes.Buffer)
md5Sum, sha256Sum, size, err = c.hashCopyN(tmpBuffer, reader, size)
readCloser = ioutil.NopCloser(tmpBuffer)
} else {
// Initialize a new temporary file.
tmpFile, err := newTempFile("single$-putobject-single")
if err != nil {
return 0, err
}
md5Sum, sha256Sum, size, err = c.hashCopyN(tmpFile, reader, size)
// Seek back to beginning of the temporary file.
if _, err := tmpFile.Seek(0, 0); err != nil {
return 0, err
}
readCloser = tmpFile
}
md5Sum, sha256Sum, size, err := c.hashCopyN(tmpFile, reader, size)
// Return error if its not io.EOF.
if err != nil {
if err != io.EOF {
return 0, err
}
}
// Execute put object.
st, err := c.putObjectDo(bucketName, objectName, tmpFile, md5Sum, sha256Sum, size, contentType)
st, err := c.putObjectDo(bucketName, objectName, readCloser, md5Sum, sha256Sum, size, contentType)
if err != nil {
return 0, err
}
@ -234,7 +299,7 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.ReadCloser,
}
if size > maxSinglePutObjectSize {
return ObjectInfo{}, ErrEntityTooLarge(size, bucketName, objectName)
return ObjectInfo{}, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
if strings.TrimSpace(contentType) == "" {

View file

@ -17,6 +17,7 @@
package minio
import (
"bytes"
"encoding/base64"
"encoding/hex"
"fmt"
@ -213,6 +214,7 @@ type requestMetadata struct {
expires int64
// Generated by our internal code.
bucketLocation string
contentBody io.ReadCloser
contentLength int64
contentSHA256Bytes []byte
@ -285,9 +287,22 @@ func (c Client) dumpHTTP(req *http.Request, resp *http.Response) error {
return err
}
} else {
respTrace, err = httputil.DumpResponse(resp, false)
if err != nil {
return err
// WORKAROUND for https://github.com/golang/go/issues/13942.
// httputil.DumpResponse does not print response headers for
// all successful calls which have response ContentLength set
// to zero. Keep this workaround until the above bug is fixed.
if resp.ContentLength == 0 {
var buffer bytes.Buffer
if err := resp.Header.Write(&buffer); err != nil {
return err
}
respTrace = buffer.Bytes()
respTrace = append(respTrace, []byte("\r\n")...)
} else {
respTrace, err = httputil.DumpResponse(resp, false)
if err != nil {
return err
}
}
}
// Write response to trace output.
@ -324,24 +339,12 @@ func (c Client) do(req *http.Request) (*http.Response, error) {
}
// newRequest - instantiate a new HTTP request for a given method.
func (c Client) newRequest(method string, metadata requestMetadata) (*http.Request, error) {
func (c Client) newRequest(method string, metadata requestMetadata) (req *http.Request, err error) {
// If no method is supplied default to 'POST'.
if method == "" {
method = "POST"
}
// construct a new target URL.
targetURL, err := c.makeTargetURL(metadata.bucketName, metadata.objectName, metadata.queryValues)
if err != nil {
return nil, err
}
// get a new HTTP request for the method.
req, err := http.NewRequest(method, targetURL.String(), nil)
if err != nil {
return nil, err
}
// Gather location only if bucketName is present.
location := "us-east-1" // Default all other requests to "us-east-1".
if metadata.bucketName != "" {
@ -351,8 +354,23 @@ func (c Client) newRequest(method string, metadata requestMetadata) (*http.Reque
}
}
// If presigned request, return quickly.
if metadata.expires != 0 {
// Save location.
metadata.bucketLocation = location
// Construct a new target URL.
targetURL, err := c.makeTargetURL(metadata.bucketName, metadata.objectName, metadata.bucketLocation, metadata.queryValues)
if err != nil {
return nil, err
}
// Initialize a new HTTP request for the method.
req, err = http.NewRequest(method, targetURL.String(), nil)
if err != nil {
return nil, err
}
// Generate presign url if needed, return right here.
if metadata.expires != 0 && metadata.presignURL {
if c.anonymous {
return nil, ErrInvalidArgument("Requests cannot be presigned with anonymous credentials.")
}
@ -401,7 +419,7 @@ func (c Client) newRequest(method string, metadata requestMetadata) (*http.Reque
req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes))
}
// Sign the request if not anonymous.
// Sign the request for all authenticated requests.
if !c.anonymous {
if c.signature.isV2() {
// Add signature version '2' authorization header.
@ -411,7 +429,8 @@ func (c Client) newRequest(method string, metadata requestMetadata) (*http.Reque
req = SignV4(*req, c.accessKeyID, c.secretAccessKey, location)
}
}
// return request.
// Return request.
return req, nil
}
@ -424,16 +443,30 @@ func (c Client) setUserAgent(req *http.Request) {
}
// makeTargetURL make a new target url.
func (c Client) makeTargetURL(bucketName, objectName string, queryValues url.Values) (*url.URL, error) {
urlStr := c.endpointURL.Scheme + "://" + c.endpointURL.Host + "/"
func (c Client) makeTargetURL(bucketName, objectName, bucketLocation string, queryValues url.Values) (*url.URL, error) {
// Save if target url will have buckets which suppport virtual
// host.
isVirtualHostStyle := isVirtualHostSupported(c.endpointURL, bucketName)
// Save host.
host := c.endpointURL.Host
// For Amazon S3 endpoint, try to fetch location based endpoint.
if isAmazonEndpoint(c.endpointURL) {
// Fetch new host based on the bucket location.
host = getS3Endpoint(bucketLocation)
}
// Save scheme.
scheme := c.endpointURL.Scheme
urlStr := scheme + "://" + host + "/"
// Make URL only if bucketName is available, otherwise use the
// endpoint URL.
if bucketName != "" {
// If endpoint supports virtual host style use that always.
// Currently only S3 and Google Cloud Storage would support
// this.
if isVirtualHostSupported(c.endpointURL) {
urlStr = c.endpointURL.Scheme + "://" + bucketName + "." + c.endpointURL.Host + "/"
// virtual host style.
if isVirtualHostStyle {
urlStr = scheme + "://" + bucketName + "." + host + "/"
if objectName != "" {
urlStr = urlStr + urlEncodePath(objectName)
}

View file

@ -17,13 +17,195 @@
package minio
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"testing"
)
type customReader struct{}
func (c *customReader) Read(p []byte) (n int, err error) {
return 0, nil
}
func (c *customReader) Size() (n int64) {
return 10
}
// Tests getReaderSize() for various Reader types.
func TestGetReaderSize(t *testing.T) {
var reader io.Reader
size, err := getReaderSize(reader)
if err != nil {
t.Fatal("Error:", err)
}
if size != -1 {
t.Fatal("Reader shouldn't have any length.")
}
bytesReader := bytes.NewReader([]byte("Hello World"))
size, err = getReaderSize(bytesReader)
if err != nil {
t.Fatal("Error:", err)
}
if size != int64(len("Hello World")) {
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, len("Hello World"))
}
size, err = getReaderSize(new(customReader))
if err != nil {
t.Fatal("Error:", err)
}
if size != int64(10) {
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, 10)
}
stringsReader := strings.NewReader("Hello World")
size, err = getReaderSize(stringsReader)
if err != nil {
t.Fatal("Error:", err)
}
if size != int64(len("Hello World")) {
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, len("Hello World"))
}
// Create request channel.
reqCh := make(chan readRequest)
// Create response channel.
resCh := make(chan readResponse)
// Create done channel.
doneCh := make(chan struct{})
// objectInfo.
objectInfo := ObjectInfo{Size: 10}
objectReader := newObject(reqCh, resCh, doneCh, objectInfo)
defer objectReader.Close()
size, err = getReaderSize(objectReader)
if err != nil {
t.Fatal("Error:", err)
}
if size != int64(10) {
t.Fatalf("Reader length doesn't match got: %v, want: %v", size, 10)
}
fileReader, err := ioutil.TempFile(os.TempDir(), "prefix")
if err != nil {
t.Fatal("Error:", err)
}
defer fileReader.Close()
defer os.RemoveAll(fileReader.Name())
size, err = getReaderSize(fileReader)
if err != nil {
t.Fatal("Error:", err)
}
if size == -1 {
t.Fatal("Reader length for file cannot be -1.")
}
// Verify for standard input, output and error file descriptors.
size, err = getReaderSize(os.Stdin)
if err != nil {
t.Fatal("Error:", err)
}
if size != -1 {
t.Fatal("Stdin should have length of -1.")
}
size, err = getReaderSize(os.Stdout)
if err != nil {
t.Fatal("Error:", err)
}
if size != -1 {
t.Fatal("Stdout should have length of -1.")
}
size, err = getReaderSize(os.Stderr)
if err != nil {
t.Fatal("Error:", err)
}
if size != -1 {
t.Fatal("Stderr should have length of -1.")
}
file, err := os.Open(os.TempDir())
if err != nil {
t.Fatal("Error:", err)
}
defer file.Close()
size, err = getReaderSize(file)
if err == nil {
t.Fatal("Input file as directory should throw an error.")
}
}
// Tests valid hosts for location.
func TestValidBucketLocation(t *testing.T) {
s3Hosts := []struct {
bucketLocation string
endpoint string
}{
{"us-east-1", "s3.amazonaws.com"},
{"unknown", "s3.amazonaws.com"},
{"ap-southeast-1", "s3-ap-southeast-1.amazonaws.com"},
}
for _, s3Host := range s3Hosts {
endpoint := getS3Endpoint(s3Host.bucketLocation)
if endpoint != s3Host.endpoint {
t.Fatal("Error: invalid bucket location", endpoint)
}
}
}
// Tests valid bucket names.
func TestBucketNames(t *testing.T) {
buckets := []struct {
name string
valid error
}{
{".mybucket", ErrInvalidBucketName("Bucket name cannot start or end with a '.' dot.")},
{"mybucket.", ErrInvalidBucketName("Bucket name cannot start or end with a '.' dot.")},
{"mybucket-", ErrInvalidBucketName("Bucket name contains invalid characters.")},
{"my", ErrInvalidBucketName("Bucket name cannot be smaller than 3 characters.")},
{"", ErrInvalidBucketName("Bucket name cannot be empty.")},
{"my.bucket.com", nil},
{"my-bucket", nil},
{"123my-bucket", nil},
}
for _, b := range buckets {
err := isValidBucketName(b.name)
if err != b.valid {
t.Fatal("Error:", err)
}
}
}
// Tests temp file.
func TestTempFile(t *testing.T) {
tmpFile, err := newTempFile("testing")
if err != nil {
t.Fatal("Error:", err)
}
fileName := tmpFile.Name()
// Closing temporary file purges the file.
err = tmpFile.Close()
if err != nil {
t.Fatal("Error:", err)
}
st, err := os.Stat(fileName)
if err != nil && !os.IsNotExist(err) {
t.Fatal("Error:", err)
}
if err == nil && st != nil {
t.Fatal("Error: file should be deleted and should not exist.")
}
}
// Tests url encoding.
func TestEncodeURL2Path(t *testing.T) {
type urlStrings struct {
objName string
@ -66,6 +248,7 @@ func TestEncodeURL2Path(t *testing.T) {
}
}
// Tests error response structure.
func TestErrorResponse(t *testing.T) {
var err error
err = ErrorResponse{
@ -90,6 +273,7 @@ func TestErrorResponse(t *testing.T) {
}
}
// Tests signature calculation.
func TestSignatureCalculation(t *testing.T) {
req, err := http.NewRequest("GET", "https://s3.amazonaws.com", nil)
if err != nil {
@ -136,6 +320,7 @@ func TestSignatureCalculation(t *testing.T) {
}
}
// Tests signature type.
func TestSignatureType(t *testing.T) {
clnt := Client{}
if !clnt.signature.isV4() {
@ -154,7 +339,8 @@ func TestSignatureType(t *testing.T) {
}
}
func TestACLTypes(t *testing.T) {
// Tests bucket acl types.
func TestBucketACLTypes(t *testing.T) {
want := map[string]bool{
"private": true,
"public-read": true,
@ -169,20 +355,48 @@ func TestACLTypes(t *testing.T) {
}
}
// Tests optimal part size.
func TestPartSize(t *testing.T) {
var maxPartSize int64 = 1024 * 1024 * 1024 * 5
partSize := optimalPartSize(5000000000000000000)
if partSize > minimumPartSize {
if partSize > maxPartSize {
t.Fatal("invalid result, cannot be bigger than maxPartSize 5GiB")
}
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(5000000000000000000)
if err == nil {
t.Fatal("Error: should fail")
}
partSize = optimalPartSize(50000000000)
if partSize > minimumPartSize {
t.Fatal("invalid result, cannot be bigger than minimumPartSize 5MiB")
totalPartsCount, partSize, lastPartSize, err = optimalPartInfo(5497558138880)
if err != nil {
t.Fatal("Error: ", err)
}
if totalPartsCount != 9987 {
t.Fatalf("Error: expecting total parts count of 9987: got %v instead", totalPartsCount)
}
if partSize != 550502400 {
t.Fatalf("Error: expecting part size of 550502400: got %v instead", partSize)
}
if lastPartSize != 241172480 {
t.Fatalf("Error: expecting last part size of 241172480: got %v instead", lastPartSize)
}
totalPartsCount, partSize, lastPartSize, err = optimalPartInfo(5000000000)
if err != nil {
t.Fatal("Error:", err)
}
if partSize != minPartSize {
t.Fatalf("Error: expecting part size of %v: got %v instead", minPartSize, partSize)
}
totalPartsCount, partSize, lastPartSize, err = optimalPartInfo(-1)
if err != nil {
t.Fatal("Error:", err)
}
if totalPartsCount != 9987 {
t.Fatalf("Error: expecting total parts count of 9987: got %v instead", totalPartsCount)
}
if partSize != 550502400 {
t.Fatalf("Error: expecting part size of 550502400: got %v instead", partSize)
}
if lastPartSize != 241172480 {
t.Fatalf("Error: expecting last part size of 241172480: got %v instead", lastPartSize)
}
}
// Tests url encoding.
func TestURLEncoding(t *testing.T) {
type urlStrings struct {
name string
@ -223,6 +437,7 @@ func TestURLEncoding(t *testing.T) {
}
}
// Tests constructing valid endpoint url.
func TestGetEndpointURL(t *testing.T) {
if _, err := getEndpointURL("s3.amazonaws.com", false); err != nil {
t.Fatal("Error:", err)
@ -241,7 +456,8 @@ func TestGetEndpointURL(t *testing.T) {
}
}
func TestValidIP(t *testing.T) {
// Tests valid ip address.
func TestValidIPAddr(t *testing.T) {
type validIP struct {
ip string
valid bool
@ -273,6 +489,7 @@ func TestValidIP(t *testing.T) {
}
}
// Tests valid endpoint domain.
func TestValidEndpointDomain(t *testing.T) {
type validEndpoint struct {
endpointDomain string
@ -329,6 +546,7 @@ func TestValidEndpointDomain(t *testing.T) {
}
}
// Tests valid endpoint url.
func TestValidEndpointURL(t *testing.T) {
type validURL struct {
url string

View file

@ -18,12 +18,12 @@ package minio
/// Multipart upload defaults.
// minimumPartSize - minimum part size 5MiB per object after which
// miniPartSize - minimum part size 5MiB per object after which
// putObject behaves internally as multipart.
const minimumPartSize = 1024 * 1024 * 5
const minPartSize = 1024 * 1024 * 5
// maxParts - maximum parts for a single multipart session.
const maxParts = 10000
// maxPartsCount - maximum number of parts for a single multipart session.
const maxPartsCount = 10000
// maxPartSize - maximum part size 5GiB for a single multipart upload
// operation.

View file

@ -39,7 +39,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View file

@ -39,7 +39,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View file

@ -40,7 +40,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View file

@ -40,7 +40,7 @@ func main() {
}
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(struct{})
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

View file

@ -0,0 +1,40 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
// awsS3EndpointMap Amazon S3 endpoint map.
var awsS3EndpointMap = map[string]string{
"us-east-1": "s3.amazonaws.com",
"us-west-2": "s3-us-west-2.amazonaws.com",
"us-west-1": "s3-us-west-1.amazonaws.com",
"eu-west-1": "s3-eu-west-1.amazonaws.com",
"eu-central-1": "s3-eu-central-1.amazonaws.com",
"ap-southeast-1": "s3-ap-southeast-1.amazonaws.com",
"ap-northeast-1": "s3-ap-northeast-1.amazonaws.com",
"ap-northeast-2": "s3-ap-northeast-2.amazonaws.com",
"sa-east-1": "s3-sa-east-1.amazonaws.com",
}
// getS3Endpoint get Amazon S3 endpoint based on the bucket location.
func getS3Endpoint(bucketLocation string) (s3Endpoint string) {
s3Endpoint, ok := awsS3EndpointMap[bucketLocation]
if !ok {
// Default to 's3.amazonaws.com' endpoint.
s3Endpoint = "s3.amazonaws.com"
}
return s3Endpoint
}

View file

@ -52,15 +52,6 @@ func sumHMAC(key []byte, data []byte) []byte {
return hash.Sum(nil)
}
// isPartUploaded - true if part is already uploaded.
func isPartUploaded(objPart objectPart, objectParts map[int]objectPart) (isUploaded bool) {
_, isUploaded = objectParts[objPart.PartNumber]
if isUploaded {
isUploaded = (objPart.ETag == objectParts[objPart.PartNumber].ETag)
}
return
}
// getEndpointURL - construct a new endpoint.
func getEndpointURL(endpoint string, inSecure bool) (*url.URL, error) {
if strings.Contains(endpoint, ":") {
@ -151,9 +142,16 @@ func closeResponse(resp *http.Response) {
}
}
// isVirtualHostSupported - verify if host supports virtual hosted style.
// Currently only Amazon S3 and Google Cloud Storage would support this.
func isVirtualHostSupported(endpointURL *url.URL) bool {
// isVirtualHostSupported - verifies if bucketName can be part of
// virtual host. Currently only Amazon S3 and Google Cloud Storage would
// support this.
func isVirtualHostSupported(endpointURL *url.URL, bucketName string) bool {
// bucketName can be valid but '.' in the hostname will fail SSL
// certificate validation. So do not use host-style for such buckets.
if endpointURL.Scheme == "https" && strings.Contains(bucketName, ".") {
return false
}
// Return true for all other cases
return isAmazonEndpoint(endpointURL) || isGoogleEndpoint(endpointURL)
}
@ -212,13 +210,9 @@ func isValidExpiry(expires time.Duration) error {
return nil
}
/// Excerpts from - http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
/// When using virtual hostedstyle buckets with SSL, the SSL wild card
/// certificate only matches buckets that do not contain periods.
/// To work around this, use HTTP or write your own certificate verification logic.
// We decided to not support bucketNames with '.' in them.
var validBucketName = regexp.MustCompile(`^[a-z0-9][a-z0-9\-]{1,61}[a-z0-9]$`)
// We support '.' with bucket names but we fallback to using path
// style requests instead for such buckets.
var validBucketName = regexp.MustCompile(`^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$`)
// isValidBucketName - verify bucket name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html
@ -267,39 +261,6 @@ func isValidObjectPrefix(objectPrefix string) error {
return nil
}
// optimalPartSize - calculate the optimal part size for the given objectSize.
//
// NOTE: Assumption here is that for any object to be uploaded to any S3 compatible
// object storage it will have the following parameters as constants.
//
// maxParts - 10000
// minimumPartSize - 5MiB
// maximumPartSize - 5GiB
//
// if the partSize after division with maxParts is greater than minimumPartSize
// then choose miniumPartSize as the new part size, if not return minimumPartSize.
//
// Special cases
//
// - if input object size is -1 then return maxPartSize.
// - if it happens to be that partSize is indeed bigger
// than the maximum part size just return maxPartSize.
func optimalPartSize(objectSize int64) int64 {
// if object size is -1 choose part size as 5GiB.
if objectSize == -1 {
return maxPartSize
}
// make sure last part has enough buffer and handle this poperly.
partSize := (objectSize / (maxParts - 1))
if partSize > minimumPartSize {
if partSize > maxPartSize {
return maxPartSize
}
return partSize
}
return minimumPartSize
}
// urlEncodePath encode the strings from UTF-8 byte representations to HTML hex escape sequences
//
// This is necessary since regular url.Parse() and url.Encode() functions do not support UTF-8