Bases: object
Synchronizes between cache and store
Max. throughput for files < 1MB is max_writer_threads * 100KB per second. :param max_writer_threads: Max. number of writer threads to use.
Sleep until seconds have passed since last call
Returns: | the number of upload worker threads that should be used |
---|
according to the the file size and the average time needed to upload a file.
Start new writer jobs with dirty cache entries.
Start new writer jobs with expired least recently used cache entries.
Get download rate in MB/s
Get amount of data downloaded from a store in MB
Get upload rate in MB/s
Get amount of data uploaded to a store in MB
Returns: | True iff path is currently uploaded or being removed |
---|
‘Get time since last heartbeat in seconds.
Remove finished workers and restart unsuccessful delete jobs.
Bases: cloudfusion.store.caching_store.MultiprocessingCachingStore, cloudfusion.store.transparent_store.TransparentStore
Implements the cloudfusion.store.cache_stats.TransparentStore interface to get statistics about a cache wrapping a store.
Parameters: |
|
---|
Returns: | true if the hard limit of the cache is exceeded, which should should slow down write operations |
---|
Returns: | the size of the cache in MB |
---|
Get download rate in MB/s
Get amount of data downloaded from a store in MB
Returns: | the hard limit of the cache in MB, exceeding this limit should slow down write operations |
---|
Get upload rate in MB/s
Get amount of data uploaded to a store in MB
Created on Jun 10, 2013
@author: joe
Bases: cloudfusion.store.store.Store
Like CachingStore, but does not make guarantees as to the consistency of the wrapped store. Use of best effort strategy to synchronize the store. Employs multiple threads for increased throughput. Therefore, it can only use stores with a thread-safe put_file method. Unlike CachingStore, guarantees that write operations do not block for transfer, until the cache size limit is reached. Unlike CachingStore, guarantees that no write operations on the wrapped store are invoked until a cached item expires.
Parameters: |
|
---|
Reloads the locally cached file path_to_file from the wrapped store, if the wrapped store version is newer. The cached file’s last updated time is set to the current point of time. Makes a new cache entry if it does not exist yet. If the file was changed in the wrapped store after the cached file’s modified time, the cached file...
Raises: | NoSuchFilesytemObjectError if file does not exist in wrapped store. |
---|
TODO: exception raising does not work and needs to be implemented
Returns: | the time in seconds until any cache entry is expired |
---|
If the file was updated in the wrapped store, then its content in the cache will be updated if its entry is expired but not dirty. :returns: string – the data of the file with the path path_to_file
Return maximum number of bytes per file
Returns: | True if the stores version is newer than the cached entry or does not exist and False otherwise. |
---|
Stores a fileobject to the cloudfusion.util.cache.Cache and if the existing fileobject has expired it is also written to the wrapped store. The cached file’s updated and modified attributes will be reset to the current point of time. The cached file’s dirty flag is set to False if the entry has expired and was hence written to the store. Otherwise it is set to True.
Parameters: |
|
---|
Created on 08.04.2011
Bases: object
Returns: | A dictionary mapping the path of every file object in directory to a dictionary with the keys ‘modified’, ‘bytes’ and ‘is_dir’ containing the corresponding metadata for the file object. |
---|
The value for ‘modified’ is a date in seconds, stating when the file object was last modified. The value for ‘bytes’ is the number of bytes of the file object. It is 0 if the object is a directory. The value for ‘is_dir’ is True if the file object is a directory and False otherwise.
Raises: | NoSuchFilesytemObjectError if the directory does not exist |
---|
Created on 08.04.2011
Bases: exceptions.Exception
Bases: exceptions.Exception
Bases: exceptions.ValueError
Bases: object
Central interface for any cloud storage provider. Any cloud storage provider that is used by CloudFusion needs to implement this interface. After implementing the interface for a new provider, you can add file system access to it by introducing a new branch to the if statement in cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox.__get_new_store(). Advanced functionality such as caching and concurrency are supplied by wrappers, which are already implemented. Path parameters are always absolute paths of a file system, starting with a ‘/’
Returns: | a human readable string describing account info like provider, name, statistics |
---|
Create the remote directory directory :param directory: the absolute path name of the directory to create :raises: AlreadyExistsError if the directory does already exist:
Delete file or directory tree at path. :param path: path to the file or directory to delete :param is_dir: True iff path points to a directory :raises:[Errno 39] Directory not empty:
Duplicate file or directory from path_to_src to path_to_dest. If path_to_dest exists before, it is deleted or overwritten. If path_to_src is a directory, the directory is duplicated with all its files and directories. Either this method or move() needs to be implemented in a subclass. :param path_to_src: must never be the same as path_to_dest :param path_to_dest: must end in the name of the child directory or the file specified by path_to_src
Returns: | True if a remote file or directory exists at path, and False otherwise |
---|
Returns: | the number of bytes of the file at path, or 0 if path is a directory |
---|
Get configuration options during runtime. The method is normally called by cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox, when the user reads CloudFusion’s configuration file in /config/config. It can return a dictionary with variables to display in /config/config. :returns: a dictionary with variable names and corresponsing values
Returns: | list of absolute file paths of files in directory |
---|
Returns: | the data of the remote file at path_to_file as a string |
---|---|
Raises: | NoSuchFilesytemObjectError if the object does not exist |
Returns: | free space in bytes |
---|
Get the name of the logging handler used by a subclass, so that the wrappers may use the same logger. Wrappers are responsible for extended functionality like caching data or concurrency i.e. cloudfusion.store.transparent_caching_store.TransparentMultiprocessingCachingStore. This method might simply return get_name(), even if the subclass does not use a logger. :return: the name of the logging handler used by a subclass and its wrappers.
Return maximum number of bytes per file; Some cloud storages limit the size of files to be uploaded.
This method is a hook that must be implemented by subclasses. If it is implemented, the methods exists(), get_bytes(), is_dir() work out of the box.
Returns: | A dictionary with the keys ‘modified’, ‘bytes’ and ‘is_dir’ containing the corresponding metadata for path |
---|
The value for ‘modified’ is a date in seconds, stating when the object corresponding to path was last modified. The value for ‘bytes’ is the number of bytes of the object corresponding to path. It is 0 if the object is a directory. The value for ‘is_dir’ is True if the object is a directory and False otherwise.
Raises: | NoSuchFilesytemObjectError if the object does not exist |
---|
Returns: | the time path was modified in seconds from the epoche |
---|
Returns: | the name of the service; i.e. Amazon S3, or Dropbox |
---|
Returns: | overall space in bytes |
---|
Returns: | space used by files in bytes |
---|
Returns: | True if path is a remote file, and False if it is a remote directory |
---|---|
Raises: | NoSuchFilesytemObjectError if the remote object does not exist |
Rename a remote file or directory path_to_src to path_to_dest. If path_to_dest exists before, it is deleted or overwritten. If path_to_src is a directory, the directory is renamed to path_to_dest. Default implementation relies on an implementation of duplicate() in a subclass, but it should be overwritten. Either this method or duplicate() needs to be implemented in a subclass. :param path_to_src: path to a remote file or directory :param path_to_dest: path of the new remote file or directory
Try to reconnect to the service.
Set configuration options during runtime. The method is normally called by cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox, when the user changes CloudFusion’s configuration file in /config/config. :param config: a dictionary with configuration options
Store the local file path_to_file to directory dest_dir on the store. :param path_to_file: local file path :param dest_dir: remote destination directory to store the contents of the local file to :param remote_file_name: the file name on the store; by default this is the original file name if this parameter is None. :param interrupt_event: (optional) If the value is not None, listen for an interrupt event with with interrupt_event.wait() until the file has been stored. Abort the upload if interrupt_event.wait() returns. :returns: (optional) the date in seconds, when the file was updated
Store the contents of fileobject to path on the store. :param fileobject: A file like object. The position of the fileobject needs to be at 0 (use fileobject.seek(0) before calling this method) :param path: The remote file path to store the contents of fileobject to :param interrupt_event: (optional) If the value is not None, listen for an interrupt event with with interrupt_event.wait() until the file has been stored. Abort the upload if interrupt_event.wait() returns. :returns: (optional) the date in seconds, when the file was updated
Bases: exceptions.Exception
Bases: object
Archive representation used in ChunkFactory
Bases: object
Chunk representation: :param parent_dir: directory location to store the chunk :param fileobject: file in the local file system with its location in fileobject.name :param filepaths: list of paths of files in the chunk
Bases: object
Factory that returns added files packed together as archives, if they are in the same directory.
Parameters: |
|
---|
Create an actual tar archive in the file system. :returns: fileobject with path of the tar archive in the file system in the name property
Get the archive to store filepath in, creating a new one if it does not exist.
Returns: | True iff the archive is ready to be uploaded |
---|
Checks for all archives if they are ready to be uploaded, and swap them to self.completed_archives
Adds a new file to the chunk factory. :param local_file: fileobject with its absolute path in property name :param filepath: the filepath that local_file should be stored to in the wrapped store
Force the factory to return chunks for all added files, ignoring max_time_to_upload, and max_chunk_size. :returns: a list of Chunk instances or an empty list if no chunk is available
Get a chunk, if one is ready for upload according to max_time_to_upload, and max_chunk_size. The chunk file returned needs to be deleted if it is not used anymore. It is stored in chunk.name The list is removed from the list of archives that are ready to be uploaded. :returns: Instance of Chunk or None if no chunk is available.
Returns: | size in bytes of the next chunk returned by get_new_chunk() or 0 if there is none. |
---|
Removes added file if possible. :returns: True iff the file could be removed
Bases: object
Synchronizes between cache and store
Sleep until seconds have passed since last call
Garbage collect stale chunks
Returns: | the number of upload worker threads that should be used |
---|
according to the the file size and the average time needed to upload a file.
Returns: | active reader that uploads path or None if there is no such writer |
---|
Returns: | active writer that uploads path or None if there is no such writer |
---|
Delete path from the remote store.
Start new writer jobs with dirty cache entries to synchronize all files.
Start new writer jobs with expired least recently used cache entries.
Get download rate in MB/s
Get amount of data downloaded from a store in MB
Get upload rate in MB/s
Get amount of data uploaded to a store in MB
Returns: | True if the stores version is newer than the cached entry or does not exist and False otherwise. |
---|
Returns: | True iff path is currently uploaded or being removed |
---|
‘Get time since last heartbeat in seconds.
Remove finished workers and restart unsuccessful delete jobs.
Bases: object
Persistently maps between chunk names and filepaths of files in the chunk
Adds an alias of a file in the chunk with id chunk_uuid
Returns: | the uuid of the chunk that filepath is stored in or None if it does not exist |
---|
Returns: | chunks for garbage collection |
---|
Parameters: | chunk_name – absolute path to the chunk |
---|---|
Returns: | list of absolute filepaths for files in the chunk |
Get globally unique identifier for the next chunk
Parameters: |
|
---|---|
Returns: | a generator iterating over the file is the chunk |
Adds a chunk mapping between chunk_uuid and filepaths
Remove filepath from the mapping.
Bases: object
Statistics of a certain exception. :param name: the identifier of the exception :param exception_list: the list of the actual exception instances :param desc: the description of the exception (defaults to str(exception) ) :param count: number of occurences of the exception (defaults to 1) :param lasttime: the last time the exception occured in seconds from the epoche (defaults to current time)
Add an ExceptionStats object to the list exception_log or update time of occurence, exception_list, and count if there is a similar exception in the log. :param exception: the exception to add to the log :param exceptions_log: an existing dictionary of exceptions mapping their name to an ExceptionStats instance (might be empty) :param name: the identifier of the exception (defaults to type(exception) or repr(exception) ) :param desc: the description of the exception (defaults to str(exception) ) :param count: number of occurences of the exception (defaults to last count plus 1 or to 1 if if no exception with the same identifier exists) :param lasttime: the last time the exception occured in seconds from the epoche (defaults to current time) :returns: the updated exception log
Bases: object
Interface to get statistics about a store. The unit MB is 1000000 Bytes.
Get number of files that were accessed while they were cached
Get number of files that were accessed while they were not in cache
Get a list of file paths to files that are not already synchronized with the store
Get download rate in MB/s
Get amount of data downloaded from a store in MB
Get dict of exception statistics with exception names mapping to ExceptionStats
Get arbitrary string describing status of the store
Get upload rate in MB/s
Get amount of data uploaded to a store in MB
Bases: cloudfusion.store.chunk_caching_store.ChunkMultiprocessingCachingStore, cloudfusion.store.transparent_store.TransparentStore
Implements the cloudfusion.store.cache_stats.TransparentStore interface to get statistics about a cache wrapping a store.
Parameters: |
|
---|
Returns: | true if the hard limit of the cache is exceeded, which should should slow down write operations |
---|
Returns: | the size of the cache in MB |
---|
Returns: | the hard limit of the cache in MB, exceeding this limit should slow down write operations |
---|
Bases: cloudfusion.store.store.Store
Puts small files into an archive, to upload and download them together. Garbage collects archives with stale contents. Employs multiple threads for increased throughput. Therefore, it can only use stores with a thread-safe put_file method. Write operations do not block for transfer, until the cache size limit is reached. No write operations on the wrapped store are invoked until a cached item expires.
Parameters: |
|
---|
Reloads the locally cached file path_to_file from the wrapped store. Currently only retrieves files from store if they are not cached. #TODO: implement file version resolution over different sessions. #:raises: NoSuchFilesytemObjectError if file does not exist in wrapped store.
Returns: | the time in seconds until any cache entry is expired |
---|
If the file was updated in the wrapped store, then its content in the cache will be updated if its entry is expired but not dirty. :returns: string – the data of the file with the path path_to_file
Returns: | the maximum size of an archive in MB |
---|
Return maximum number of bytes per file
Set the maximum size of an archive in MB
Stores a fileobject to the cloudfusion.util.cache.Cache and if the existing fileobject has expired it is also written to the wrapped store. The cached file’s updated and modified attributes will be reset to the current point of time. The cached file’s dirty flag is set to False if the entry has expired and was hence written to the store. Otherwise it is set to True.
Parameters: |
|
---|
Bases: object
Worker to cyclically poll for free space on store.
Bases: object
To replace multiprocessing.Value.
Bases: object
Get duration of download in seconds
Get the end time of the download in seconds from the epoche
Get size of the file to write in bytes
Get the data of the read file. This only works once after a successful read and is a blocking call. Use is_successful to check if the read has been successful without blocking.
Get the start time of the download in seconds from the epoche
Bases: object
Bases: object
Statistics about workers. Resets statistics automatically after 100*1000 workers.
Get download time considering parallel downloads.
Get download time considering parallel uploads.
Resets all statistics.
Bases: object
Uploads a single file in a separate process. The start method is used to begin the upload. The method is_finished can be used to check if the worker is done. is_successful shows if it has been successful after it is done. If not successful, an error message can be retrieved with get_error. Other methods can be used to get upload statistics.
Parameters: |
|
---|
Get duration of upload in seconds
Get the end time of the upload in seconds from the epoche
Get size of the file to write in bytes
Get the start time of the upload in seconds from the epoche
Get the point of time the file has been updated in the store in seconds from the epoche
Returns: | True iff the worker does nothing. |
---|
Forcefully stop the upload process.
Bases: object
Process pool for WriteWorker.
Parameters: | store – An implementation of the Store interface. |
---|
Parameters: |
|
---|---|
Returns: | a worker process from the pool. |
Put the worker process back into the pool for reuse.
Bases: object
Bases: cloudfusion.store.store.Store
Delete all expired cache entries only if last called after cache_expiration_time seconds as defined in the constructor.
This method needs to be called before uploading data.
Add existing files or directories to dir_entry because they might have been uploaded recently and might not be retrievable by a directory listing from the storage provider.
Add listing for parent directory of path to cache if it does not yet exist
‘:returns: True if path does not exist in the cached directory listing
This method needs to be called after uploading data.
Delete all expired cache entries iff no upload is going on. :returns: True iff the cache could be cleaned.
Return maximum number of bytes per file
As a side effect cleans expired cache entries from time to time