Afs
Abstract File Storage
Install / Use
/learn @viant/AfsREADME
afs - abstract file storage
<a href='https://github.com/jpoles1/gopherbadger' target='_blank'>
</a>
Please refer to CHANGELOG.md if you encounter breaking changes.
- Motivation
- Introduction
- Usage
- Matchers
- Content modifiers
- Streaming data
- Options
- Storage Implementations
- Testing mode
- Storage Manager
- GoCover
- License
- Credits and Acknowledgements
Motivation
When dealing with various storage systems, like cloud storage, SCP, container or local file system, using shared API for typical storage operation provides an excellent simplification. What's more, the ability to simulate storage-related errors like Auth or EOF allows you to test an app error handling.
Introduction
This library uses a storage manager abstraction to provide an implementation for a specific storage system with following
- CRUD Operation:
List(ctx context.Context, URL string, options ...Option) ([]Object, error)
Walk(ctx context.Context, URL string, handler OnVisit, options ...Option) error
Open(ctx context.Context, object Object, options ...Option) (io.ReadCloser, error)
OpenURL(ctx context.Context, URL string, options ...Option) (io.ReadCloser, error)
Upload(ctx context.Context, URL string, mode os.FileMode, reader io.Reader, options ...Option) error
Create(ctx context.Context, URL string, mode os.FileMode, isDir bool, options ...Option) error
Delete(ctx context.Context, URL string, options ...Option) error
- Batch uploader:
type Upload func(ctx context.Context, parent string, info os.FileInfo, reader io.Reader) error
Uploader(ctx context.Context, URL string, options ...Option) (Upload, io.Closer, error)
- Utilities:
Copy(ctx context.Context, sourceURL, destURL string, options ...Option) error
Move(ctx context.Context, sourceURL, destURL string, options ...Option) error
NewWriter(ctx context.Context, URL string, mode os.FileMode, options ...storage.Option) (io.WriteCloser, error)
DownloadWithURL(ctx context.Context, URL string, options ...Option) ([]byte, error)
Download(ctx context.Context, object Object, options ...Option) ([]byte, error)
URL scheme is used to identify storage system, or alternatively relative/absolute path can be used for local file storage. By default, all operations using the same baseURL share the same corresponding storage manager instance. For example, instead supplying SCP auth details for all operations, auth option can be used only once.
func main() {
ctx := context.Background()
{
//auth with first call
fs := afs.New()
defer fs.Close()
keyAuth, err := scp.LocalhostKeyAuth("")
if err != nil {
log.Fatal(err)
}
reader1, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
if err != nil {
log.Fatal(err)
}
...
reader2, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt", keyAuth)
}
{
//auth per baseURL
fs := afs.New()
err = fs.Init(ctx, "scp://host1:22/", keyAuth)
if err != nil {
log.Fatal(err)
}
defer fs.Destroy("scp://host1:22/")
reader, err := fs.OpenURL(ctx, "scp://host1:22/myfolder/asset.txt")
}
}
Usage
Downloading location content
func main() {
fs := afs.New()
ctx := context.Background()
objects, err := fs.List(ctx, "/tmp/folder")
if err != nil {
log.Fatal(err)
}
for _, object := range objects {
fmt.Printf("%v %v\n", object.Name(), object.URL())
if object.IsDir() {
continue
}
reader, err := fs.Open(ctx, object)
if err != nil {
log.Fatal(err)
}
data, err := ioutil.ReadAll(reader)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", data)
}
}
Uploading Content
func main() {
fs := afs.New()
ctx := context.Background()
keyAuth, err := scp.LocalhostKeyAuth("")
if err != nil {
log.Fatal(err)
}
err = fs.Init(ctx, "scp://127.0.0.1:22/", keyAuth)
if err != nil {
log.Fatal(err)
}
err = fs.Upload(ctx, "scp://127.0.0.1:22/folder/asset.txt", 0644, strings.NewReader("test me"))
if err != nil {
log.Fatal(err)
}
ok, err := fs.Exists(ctx, "scp://127.0.0.1:22/folder/asset.txt")
if err != nil {
log.Fatal(err)
}
fmt.Printf("has file: %v\n", ok)
_ = fs.Delete(ctx, "scp://127.0.0.1:22/folder/asset.txt")
}
Uploading Content With Writer
func main() {
fs := afs.New()
ctx := context.Background()
keyAuth, err := scp.LocalhostKeyAuth("")
if err != nil {
log.Fatal(err)
}
err = fs.Init(ctx, "scp://127.0.0.1:22/", keyAuth)
if err != nil {
log.Fatal(err)
}
writer = fs.NewWriter(ctx, "scp://127.0.0.1:22/folder/asset.txt", 0644)
_, err := writer.Write([]byte("test me")))
if err != nil {
log.Fatal(err)
}
err = writer.Close()
if err != nil {
log.Fatal(err)
}
ok, err := fs.Exists(ctx, "scp://127.0.0.1:22/folder/asset.txt")
if err != nil {
log.Fatal(err)
}
fmt.Printf("has file: %v\n", ok)
_ = fs.Delete(ctx, "scp://127.0.0.1:22/folder/asset.txt")
}
Data Copy
func main() {
fs := afs.New()
ctx := context.Background()
keyAuth, err := scp.LocalhostKeyAuth("")
if err != nil {
log.Fatal(err)
}
err = fs.Copy(ctx, "s3://mybucket/myfolder", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
if err != nil {
log.Fatal(err)
}
}
Archiving content
func main() {
secretPath := path.Join(os.Getenv("HOME"), ".secret", "gcp-e2e.json")
auth, err := gs.NewJwtConfig(option.NewLocation(secretPath))
if err != nil {
return
}
sourceURL := "mylocalPath/"
destURL := "gs:mybucket/test.zip/zip://localhost/dir1"
fs := afs.New()
ctx := context.Background()
err = fs.Copy(ctx, sourceURL, destURL, option.NewDest(auth))
if err != nil {
log.Fatal(err)
}
}
Archive Walker
Walker can be created for tar or zip archive.
func main() {
ctx := context.Background()
fs := afs.New()
walker := tar.NewWalker(s3afs.New())
err := fs.Copy(ctx, "/tmp/test.tar", "s3:///dest/folder/test", walker)
if err != nil {
log.Fatal(err)
}
Archive Uploader
Uploader can be created for tar or zip archive.
func main() {
ctx := context.Background()
fs := afs.New()
uploader := zip.NewBatchUploader(gsafs.New())
err := fs.Copy(ctx, "gs:///tmp/test/data", "/tmp/data.zip", uploader)
if err != nil {
log.Fatal(err)
}
}
Data Move
func main() {
fs := afs.New()
ctx := context.Background()
keyAuth, err := scp.LocalhostKeyAuth("")
if err != nil {
log.Fatal(err)
}
err = fs.Move(ctx, "/tmp/transient/app", "scp://127.0.0.1/tmp", option.NewSource(), option.NewDest(keyAuth))
if err != nil {
log.Fatal(err)
}
}
Batch Upload
func main() {
fs := afs.New()
ctx := context.Background()
upload, closer, err := fs.Uploader(ctx, "/tmp/clone")
if err != nil {
log.Fatal(err)
}
defer closer.Close()
assets := []*asset.Resource{
asset.NewFile("asset1.txt", []byte("test 1"), 0644),
asset.NewFile("asset2.txt", []byte("test 2"), 0644),
asset.NewDir("folder1", file.DefaultDirOsMode),
asset.NewFile("folder1/asset1.txt", []byte("test 3"), 0644),
asset.NewFile("folder1/asset2.txt", []byte("test 4"), 0644),
}
for _, asset := range assets {
relative := ""
var reader io.Reader
if strings.Contains(asset.Name, "/") {
relative, _ = path.Split(asset.Name)
}
if ! asset.Dir {
reader = bytes.NewReader(asset.Data)
}
err = upload(ctx, relative, asset.Info(), reader)
if err != nil {
log.Fatal(err)
}
}
}
Matchers
To filter source content you can use Matcher option. The following have been implemented.
func main() {
matcher, err := NewBasic("/data", ".avro", nil)
fs := afs.New()
ctx := context.Background()
err := fs.Copy(ctx, "/tmp/data", "s3://mybucket/data/", matcher.Match)
if err != nil {
log.Fatal(err)
}
}
Exclusion
func main() {
matcher := matcher.Basic{Exclusion:".+/data/perf/\\d+/.+"}
fs := afs.New()
ctx := context.Background()
err := fs.Copy(ctx, "/tmp/data", "s3://mybucket/data/", matcher.Match)
if err != nil {
log.Fatal(err)
}
}
OS style filepath match, with the following terms:
- '*' matches any sequence of non-Separator characters
- '?' matches any single non-Separator character
- '[' [ '^' ] { character-range } ']'
func main() {
matcher := matcher.Filepath("*.avro")
fs := afs.New()
ctx := context.Background()
err := fs.Copy(ctx, "/tmp/data", "gs://mybucket/data/", matcher)
if err != nil {
log.Fatal(err)
}
}
Ignore matcher represents matcher that matches file that are not in the ignore rules. The
