From 83a6415adb5779c919d38355eca15cd42ef237d0 Mon Sep 17 00:00:00 2001 From: tamal Date: Tue, 14 Mar 2017 04:21:18 -0700 Subject: [PATCH] Copy stow from s3 --- src/restic/backend/stow/backend_test.go | 87 ++++++ src/restic/backend/stow/config.go | 73 +++++ src/restic/backend/stow/config_test.go | 99 +++++++ src/restic/backend/stow/s3.go | 337 ++++++++++++++++++++++++ src/restic/backend/stow/s3_test.go | 73 +++++ 5 files changed, 669 insertions(+) create mode 100644 src/restic/backend/stow/backend_test.go create mode 100644 src/restic/backend/stow/config.go create mode 100644 src/restic/backend/stow/config_test.go create mode 100644 src/restic/backend/stow/s3.go create mode 100644 src/restic/backend/stow/s3_test.go diff --git a/src/restic/backend/stow/backend_test.go b/src/restic/backend/stow/backend_test.go new file mode 100644 index 000000000..6457c5c1e --- /dev/null +++ b/src/restic/backend/stow/backend_test.go @@ -0,0 +1,87 @@ +// DO NOT EDIT, AUTOMATICALLY GENERATED +package stow_test + +import ( + "testing" + + "restic/backend/test" +) + +var SkipMessage string + +func TestS3BackendCreate(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestCreate(t) +} + +func TestS3BackendOpen(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestOpen(t) +} + +func TestS3BackendCreateWithConfig(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestCreateWithConfig(t) +} + +func TestS3BackendLocation(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestLocation(t) +} + +func TestS3BackendConfig(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestConfig(t) +} + +func TestS3BackendLoad(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestLoad(t) +} + +func TestS3BackendSave(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestSave(t) +} + +func TestS3BackendSaveFilenames(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestSaveFilenames(t) +} + +func TestS3BackendBackend(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestBackend(t) +} + +func TestS3BackendDelete(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestDelete(t) +} + +func TestS3BackendCleanup(t *testing.T) { + if SkipMessage != "" { + t.Skip(SkipMessage) + } + test.TestCleanup(t) +} diff --git a/src/restic/backend/stow/config.go b/src/restic/backend/stow/config.go new file mode 100644 index 000000000..609695eb7 --- /dev/null +++ b/src/restic/backend/stow/config.go @@ -0,0 +1,73 @@ +package stow + +import ( + "net/url" + "path" + "strings" + + "restic/errors" +) + +// Config contains all configuration necessary to connect to an s3 compatible +// server. +type Config struct { + Endpoint string + UseHTTP bool + KeyID, Secret string + Bucket string + Prefix string +} + +const defaultPrefix = "restic" + +// ParseConfig parses the string s and extracts the s3 config. The two +// supported configuration formats are s3://host/bucketname/prefix and +// s3:host:bucketname/prefix. The host can also be a valid s3 region +// name. If no prefix is given the prefix "restic" will be used. +func ParseConfig(s string) (interface{}, error) { + switch { + case strings.HasPrefix(s, "s3:http"): + // assume that a URL has been specified, parse it and + // use the host as the endpoint and the path as the + // bucket name and prefix + url, err := url.Parse(s[3:]) + if err != nil { + return nil, errors.Wrap(err, "url.Parse") + } + + if url.Path == "" { + return nil, errors.New("s3: bucket name not found") + } + + path := strings.SplitN(url.Path[1:], "/", 2) + return createConfig(url.Host, path, url.Scheme == "http") + case strings.HasPrefix(s, "s3://"): + s = s[5:] + case strings.HasPrefix(s, "s3:"): + s = s[3:] + default: + return nil, errors.New("s3: invalid format") + } + // use the first entry of the path as the endpoint and the + // remainder as bucket name and prefix + path := strings.SplitN(s, "/", 3) + return createConfig(path[0], path[1:], false) +} + +func createConfig(endpoint string, p []string, useHTTP bool) (interface{}, error) { + var prefix string + switch { + case len(p) < 1: + return nil, errors.New("s3: invalid format, host/region or bucket name not found") + case len(p) == 1 || p[1] == "": + prefix = defaultPrefix + default: + prefix = path.Clean(p[1]) + } + return Config{ + Endpoint: endpoint, + UseHTTP: useHTTP, + Bucket: p[0], + Prefix: prefix, + }, nil +} diff --git a/src/restic/backend/stow/config_test.go b/src/restic/backend/stow/config_test.go new file mode 100644 index 000000000..5b64eb475 --- /dev/null +++ b/src/restic/backend/stow/config_test.go @@ -0,0 +1,99 @@ +package stow + +import "testing" + +var configTests = []struct { + s string + cfg Config +}{ + {"s3://eu-central-1/bucketname", Config{ + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "restic", + }}, + {"s3://eu-central-1/bucketname/", Config{ + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "restic", + }}, + {"s3://eu-central-1/bucketname/prefix/directory", Config{ + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "prefix/directory", + }}, + {"s3://eu-central-1/bucketname/prefix/directory/", Config{ + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "prefix/directory", + }}, + {"s3:eu-central-1/foobar", Config{ + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "restic", + }}, + {"s3:eu-central-1/foobar/", Config{ + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "restic", + }}, + {"s3:eu-central-1/foobar/prefix/directory", Config{ + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "prefix/directory", + }}, + {"s3:eu-central-1/foobar/prefix/directory/", Config{ + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "prefix/directory", + }}, + {"s3:https://hostname:9999/foobar", Config{ + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + }}, + {"s3:https://hostname:9999/foobar/", Config{ + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + }}, + {"s3:http://hostname:9999/foobar", Config{ + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + UseHTTP: true, + }}, + {"s3:http://hostname:9999/foobar/", Config{ + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + UseHTTP: true, + }}, + {"s3:http://hostname:9999/bucket/prefix/directory", Config{ + Endpoint: "hostname:9999", + Bucket: "bucket", + Prefix: "prefix/directory", + UseHTTP: true, + }}, + {"s3:http://hostname:9999/bucket/prefix/directory/", Config{ + Endpoint: "hostname:9999", + Bucket: "bucket", + Prefix: "prefix/directory", + UseHTTP: true, + }}, +} + +func TestParseConfig(t *testing.T) { + for i, test := range configTests { + cfg, err := ParseConfig(test.s) + if err != nil { + t.Errorf("test %d:%s failed: %v", i, test.s, err) + continue + } + + if cfg != test.cfg { + t.Errorf("test %d:\ninput:\n %s\n wrong config, want:\n %v\ngot:\n %v", + i, test.s, test.cfg, cfg) + continue + } + } +} diff --git a/src/restic/backend/stow/s3.go b/src/restic/backend/stow/s3.go new file mode 100644 index 000000000..a7a93fae0 --- /dev/null +++ b/src/restic/backend/stow/s3.go @@ -0,0 +1,337 @@ +package stow + +import ( + "bytes" + "io" + "net/http" + "path" + "restic" + "strings" + + "restic/backend" + "restic/errors" + + "github.com/minio/minio-go" + + "restic/debug" +) + +const connLimit = 40 + +// s3 is a backend which stores the data on an S3 endpoint. +type s3 struct { + client *minio.Client + connChan chan struct{} + bucketname string + prefix string +} + +// Open opens the S3 backend at bucket and region. The bucket is created if it +// does not exist yet. +func Open(cfg Config) (restic.Backend, error) { + debug.Log("open, config %#v", cfg) + + client, err := minio.New(cfg.Endpoint, cfg.KeyID, cfg.Secret, !cfg.UseHTTP) + if err != nil { + return nil, errors.Wrap(err, "minio.New") + } + + be := &s3{client: client, bucketname: cfg.Bucket, prefix: cfg.Prefix} + + tr := &http.Transport{MaxIdleConnsPerHost: connLimit} + client.SetCustomTransport(tr) + + be.createConnections() + + found, err := client.BucketExists(cfg.Bucket) + if err != nil { + debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err) + return nil, errors.Wrap(err, "client.BucketExists") + } + + if !found { + // create new bucket with default ACL in default region + err = client.MakeBucket(cfg.Bucket, "") + if err != nil { + return nil, errors.Wrap(err, "client.MakeBucket") + } + } + + return be, nil +} + +func (be *s3) s3path(h restic.Handle) string { + if h.Type == restic.ConfigFile { + return path.Join(be.prefix, string(h.Type)) + } + return path.Join(be.prefix, string(h.Type), h.Name) +} + +func (be *s3) createConnections() { + be.connChan = make(chan struct{}, connLimit) + for i := 0; i < connLimit; i++ { + be.connChan <- struct{}{} + } +} + +// Location returns this backend's location (the bucket name). +func (be *s3) Location() string { + return be.bucketname +} + +// Save stores data in the backend at the handle. +func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) { + if err := h.Valid(); err != nil { + return err + } + + debug.Log("Save %v", h) + + objName := be.s3path(h) + + // Check key does not already exist + _, err = be.client.StatObject(be.bucketname, objName) + if err == nil { + debug.Log("%v already exists", h) + return errors.New("key already exists") + } + + <-be.connChan + defer func() { + be.connChan <- struct{}{} + }() + + debug.Log("PutObject(%v, %v)", + be.bucketname, objName) + n, err := be.client.PutObject(be.bucketname, objName, rd, "binary/octet-stream") + debug.Log("%v -> %v bytes, err %#v", objName, n, err) + + return errors.Wrap(err, "client.PutObject") +} + +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + f func() +} + +func (wr wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} + +// Load returns a reader that yields the contents of the file at h at the +// given offset. If length is nonzero, only a portion of the file is +// returned. rd must be closed after use. +func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + debug.Log("Load %v, length %v, offset %v", h, length, offset) + if err := h.Valid(); err != nil { + return nil, err + } + + if offset < 0 { + return nil, errors.New("offset is negative") + } + + if length < 0 { + return nil, errors.Errorf("invalid length %d", length) + } + + var obj *minio.Object + + objName := be.s3path(h) + + // get token for connection + <-be.connChan + + obj, err := be.client.GetObject(be.bucketname, objName) + if err != nil { + debug.Log(" err %v", err) + + // return token + be.connChan <- struct{}{} + + return nil, errors.Wrap(err, "client.GetObject") + } + + // if we're going to read the whole object, just pass it on. + if length == 0 { + debug.Log("Load %v: pass on object", h) + + _, err = obj.Seek(offset, 0) + if err != nil { + _ = obj.Close() + + // return token + be.connChan <- struct{}{} + + return nil, errors.Wrap(err, "obj.Seek") + } + + rd := wrapReader{ + ReadCloser: obj, + f: func() { + debug.Log("Close()") + // return token + be.connChan <- struct{}{} + }, + } + return rd, nil + } + + defer func() { + // return token + be.connChan <- struct{}{} + }() + + // otherwise use a buffer with ReadAt + info, err := obj.Stat() + if err != nil { + _ = obj.Close() + return nil, errors.Wrap(err, "obj.Stat") + } + + if offset > info.Size { + _ = obj.Close() + return nil, errors.New("offset larger than file size") + } + + l := int64(length) + if offset+l > info.Size { + l = info.Size - offset + } + + buf := make([]byte, l) + n, err := obj.ReadAt(buf, offset) + debug.Log("Load %v: use buffer with ReadAt: %v, %v", h, n, err) + if err == io.EOF { + debug.Log("Load %v: shorten buffer %v -> %v", h, len(buf), n) + buf = buf[:n] + err = nil + } + + if err != nil { + _ = obj.Close() + return nil, errors.Wrap(err, "obj.ReadAt") + } + + return backend.Closer{Reader: bytes.NewReader(buf)}, nil +} + +// Stat returns information about a blob. +func (be *s3) Stat(h restic.Handle) (bi restic.FileInfo, err error) { + debug.Log("%v", h) + + objName := be.s3path(h) + var obj *minio.Object + + obj, err = be.client.GetObject(be.bucketname, objName) + if err != nil { + debug.Log("GetObject() err %v", err) + return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") + } + + // make sure that the object is closed properly. + defer func() { + e := obj.Close() + if err == nil { + err = errors.Wrap(e, "Close") + } + }() + + fi, err := obj.Stat() + if err != nil { + debug.Log("Stat() err %v", err) + return restic.FileInfo{}, errors.Wrap(err, "Stat") + } + + return restic.FileInfo{Size: fi.Size}, nil +} + +// Test returns true if a blob of the given type and name exists in the backend. +func (be *s3) Test(h restic.Handle) (bool, error) { + found := false + objName := be.s3path(h) + _, err := be.client.StatObject(be.bucketname, objName) + if err == nil { + found = true + } + + // If error, then not found + return found, nil +} + +// Remove removes the blob with the given name and type. +func (be *s3) Remove(h restic.Handle) error { + objName := be.s3path(h) + err := be.client.RemoveObject(be.bucketname, objName) + debug.Log("Remove(%v) -> err %v", h, err) + return errors.Wrap(err, "client.RemoveObject") +} + +// List returns a channel that yields all names of blobs of type t. A +// goroutine is started for this. If the channel done is closed, sending +// stops. +func (be *s3) List(t restic.FileType, done <-chan struct{}) <-chan string { + debug.Log("listing %v", t) + ch := make(chan string) + + prefix := be.s3path(restic.Handle{Type: t}) + "/" + + listresp := be.client.ListObjects(be.bucketname, prefix, true, done) + + go func() { + defer close(ch) + for obj := range listresp { + m := strings.TrimPrefix(obj.Key, prefix) + if m == "" { + continue + } + + select { + case ch <- m: + case <-done: + return + } + } + }() + + return ch +} + +// Remove keys for a specified backend type. +func (be *s3) removeKeys(t restic.FileType) error { + done := make(chan struct{}) + defer close(done) + for key := range be.List(restic.DataFile, done) { + err := be.Remove(restic.Handle{Type: restic.DataFile, Name: key}) + if err != nil { + return err + } + } + + return nil +} + +// Delete removes all restic keys in the bucket. It will not remove the bucket itself. +func (be *s3) Delete() error { + alltypes := []restic.FileType{ + restic.DataFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} + + for _, t := range alltypes { + err := be.removeKeys(t) + if err != nil { + return nil + } + } + + return be.Remove(restic.Handle{Type: restic.ConfigFile}) +} + +// Close does nothing +func (be *s3) Close() error { return nil } diff --git a/src/restic/backend/stow/s3_test.go b/src/restic/backend/stow/s3_test.go new file mode 100644 index 000000000..352c394a2 --- /dev/null +++ b/src/restic/backend/stow/s3_test.go @@ -0,0 +1,73 @@ +package stow_test + +import ( + "fmt" + "net/url" + "os" + "restic" + + "restic/errors" + + "restic/backend/s3" + "restic/backend/test" + . "restic/test" +) + +//go:generate go run ../test/generate_backend_tests.go + +func init() { + if TestS3Server == "" { + SkipMessage = "s3 test server not available" + return + } + + url, err := url.Parse(TestS3Server) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid url: %v\n", err) + return + } + + cfg := s3.Config{ + Endpoint: url.Host, + Bucket: "restictestbucket", + KeyID: os.Getenv("AWS_ACCESS_KEY_ID"), + Secret: os.Getenv("AWS_SECRET_ACCESS_KEY"), + } + + if url.Scheme == "http" { + cfg.UseHTTP = true + } + + test.CreateFn = func() (restic.Backend, error) { + be, err := s3.Open(cfg) + if err != nil { + return nil, err + } + + exists, err := be.Test(restic.Handle{Type: restic.ConfigFile}) + if err != nil { + return nil, err + } + + if exists { + return nil, errors.New("config already exists") + } + + return be, nil + } + + test.OpenFn = func() (restic.Backend, error) { + return s3.Open(cfg) + } + + // test.CleanupFn = func() error { + // if tempBackendDir == "" { + // return nil + // } + + // fmt.Printf("removing test backend at %v\n", tempBackendDir) + // err := os.RemoveAll(tempBackendDir) + // tempBackendDir = "" + // return err + // } +}