filesystem.go 6.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package api

import (
	"bufio"
21
	"context"
22 23 24
	"fmt"
	"io"
	"os"
25
	"path"
26 27 28 29
	"path/filepath"
	"sync"

	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/swarm/log"
31 32 33 34 35 36
	"github.com/ethereum/go-ethereum/swarm/storage"
)

const maxParallelFiles = 5

type FileSystem struct {
37
	api *API
38 39
}

40
func NewFileSystem(api *API) *FileSystem {
41 42 43 44
	return &FileSystem{api}
}

// Upload replicates a local directory as a manifest file and uploads it
45 46
// using FileStore store
// This function waits the chunks to be stored.
47
// TODO: localpath should point to a manifest
48 49
//
// DEPRECATED: Use the HTTP API instead
50
func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error) {
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
	var list []*manifestTrieEntry
	localpath, err := filepath.Abs(filepath.Clean(lpath))
	if err != nil {
		return "", err
	}

	f, err := os.Open(localpath)
	if err != nil {
		return "", err
	}
	stat, err := f.Stat()
	if err != nil {
		return "", err
	}

	var start int
	if stat.IsDir() {
		start = len(localpath)
69
		log.Debug(fmt.Sprintf("uploading '%s'", localpath))
70 71 72 73 74 75 76 77
		err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
			if (err == nil) && !info.IsDir() {
				if len(path) <= start {
					return fmt.Errorf("Path is too short")
				}
				if path[:start] != localpath {
					return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
				}
78
				entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
				list = append(list, entry)
			}
			return err
		})
		if err != nil {
			return "", err
		}
	} else {
		dir := filepath.Dir(localpath)
		start = len(dir)
		if len(localpath) <= start {
			return "", fmt.Errorf("Path is too short")
		}
		if localpath[:start] != dir {
			return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
		}
95
		entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
96 97 98
		list = append(list, entry)
	}

99 100 101
	errors := make([]error, len(list))
	sem := make(chan bool, maxParallelFiles)
	defer close(sem)
102 103

	for i, entry := range list {
104 105 106 107
		sem <- true
		go func(i int, entry *manifestTrieEntry) {
			defer func() { <-sem }()

108
			f, err := os.Open(entry.Path)
109 110 111 112 113 114 115 116 117 118
			if err != nil {
				errors[i] = err
				return
			}
			defer f.Close()

			stat, err := f.Stat()
			if err != nil {
				errors[i] = err
				return
119
			}
120 121 122 123 124

			var hash storage.Address
			var wait func(context.Context) error
			ctx := context.TODO()
			hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
125 126 127 128
			if err != nil {
				errors[i] = err
				return
			}
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
			if hash != nil {
				list[i].Hash = hash.Hex()
			}
			if err := wait(ctx); err != nil {
				errors[i] = err
				return
			}

			list[i].ContentType, err = DetectContentType(f.Name(), f)
			if err != nil {
				errors[i] = err
				return
			}

		}(i, entry)
144
	}
145 146
	for i := 0; i < cap(sem); i++ {
		sem <- true
147 148 149
	}

	trie := &manifestTrie{
150
		fileStore: fs.api.fileStore,
151 152 153 154 155 156 157 158
	}
	quitC := make(chan bool)
	for i, entry := range list {
		if errors[i] != nil {
			return "", errors[i]
		}
		entry.Path = RegularSlashes(entry.Path[start:])
		if entry.Path == index {
159
			ientry := newManifestTrieEntry(&ManifestEntry{
160
				ContentType: entry.ContentType,
161 162
			}, nil)
			ientry.Hash = entry.Hash
163 164 165 166 167 168 169 170
			trie.addEntry(ientry, quitC)
		}
		trie.addEntry(entry, quitC)
	}

	err2 := trie.recalcAndStore()
	var hs string
	if err2 == nil {
171
		hs = trie.ref.Hex()
172 173 174 175
	}
	return hs, err2
}

176
// Download replicates the manifest basePath structure on the local filesystem
177
// under localpath
178 179
//
// DEPRECATED: Use the HTTP API instead
180
func (fs *FileSystem) Download(bzzpath, localpath string) error {
181 182 183 184 185 186 187 188 189 190
	lpath, err := filepath.Abs(filepath.Clean(localpath))
	if err != nil {
		return err
	}
	err = os.MkdirAll(lpath, os.ModePerm)
	if err != nil {
		return err
	}

	//resolving host and port
191 192 193 194
	uri, err := Parse(path.Join("bzz:/", bzzpath))
	if err != nil {
		return err
	}
195
	addr, err := fs.api.Resolve(context.TODO(), uri.Addr)
196 197 198
	if err != nil {
		return err
	}
199
	path := uri.Path
200 201 202 203 204 205

	if len(path) > 0 {
		path += "/"
	}

	quitC := make(chan bool)
206
	trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC, NOOPDecrypt)
207
	if err != nil {
208
		log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
209 210 211 212
		return err
	}

	type downloadListEntry struct {
213
		addr storage.Address
214 215 216 217 218 219 220 221
		path string
	}

	var list []*downloadListEntry
	var mde error

	prevPath := lpath
	err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
222
		log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
223

224
		addr = common.Hex2Bytes(entry.Hash)
225 226 227 228 229 230 231
		path := lpath + "/" + suffix
		dir := filepath.Dir(path)
		if dir != prevPath {
			mde = os.MkdirAll(dir, os.ModePerm)
			prevPath = dir
		}
		if (mde == nil) && (path != dir+"/") {
232
			list = append(list, &downloadListEntry{addr: addr, path: path})
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
		}
	})
	if err != nil {
		return err
	}

	wg := sync.WaitGroup{}
	errC := make(chan error)
	done := make(chan bool, maxParallelFiles)
	for i, entry := range list {
		select {
		case done <- true:
			wg.Add(1)
		case <-quitC:
			return fmt.Errorf("aborted")
		}
		go func(i int, entry *downloadListEntry) {
			defer wg.Done()
251
			err := retrieveToFile(quitC, fs.api.fileStore, entry.addr, entry.path)
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
			if err != nil {
				select {
				case errC <- err:
				case <-quitC:
				}
				return
			}
			<-done
		}(i, entry)
	}
	go func() {
		wg.Wait()
		close(errC)
	}()
	select {
	case err = <-errC:
		return err
	case <-quitC:
		return fmt.Errorf("aborted")
	}
272
}
273

274
func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.Address, path string) error {
275
	f, err := os.Create(path) // TODO: basePath separators
276 277 278
	if err != nil {
		return err
	}
279
	reader, _ := fileStore.Retrieve(context.TODO(), addr)
280
	writer := bufio.NewWriter(f)
281
	size, err := reader.Size(context.TODO(), quitC)
282 283 284 285 286 287 288 289 290 291
	if err != nil {
		return err
	}
	if _, err = io.CopyN(writer, reader, size); err != nil {
		return err
	}
	if err := writer.Flush(); err != nil {
		return err
	}
	return f.Close()
292
}