The store_sync_thread Module

class cloudfusion.store.store_sync_thread.StoreSyncThread(cache, store, logger, max_writer_threads=30)

Bases: object

Synchronizes between cache and store

Max. throughput for files < 1MB is max_writer_threads * 100KB per second. :param max_writer_threads: Max. number of writer threads to use.

_StoreSyncThread__get_file_size_in_mb(fileobject)
_StoreSyncThread__sleep(seconds)

Sleep until seconds have passed since last call

_acquire_two_locks()
_check_for_failed_writers()
_get_max_threads(size_in_mb)
Returns:the number of upload worker threads that should be used

according to the the file size and the average time needed to upload a file.

_get_reader(path)
_get_writer(path)
_profiled_run(*args, **kw)
_reconnect()
_release_two_locks()
_remove_finished_readers()
_remove_finished_writers()
_remove_sleeping_writers()
_remove_successful_removers()
_restart_unsuccessful_removers()
_run()
blocking_read(path)
delete(path, is_dir)
delete_cache_entry(path)
enqueue_dirty_entries()

Start new writer jobs with dirty cache entries.

enqueue_lru_entries()

Start new writer jobs with expired least recently used cache entries.

get_download_rate()

Get download rate in MB/s

get_downloaded()

Get amount of data downloaded from a store in MB

get_exception_stats()
get_upload_rate()

Get upload rate in MB/s

get_uploaded()

Get amount of data uploaded to a store in MB

is_in_progress(path)
Returns:True iff path is currently uploaded or being removed
last_heartbeat()

‘Get time since last heartbeat in seconds.

read(path)
refresh_cache_entry(path, contents, modified)
restart()
run()
set_dirty_cache_entry(path, is_dirty)
set_modified_cache_entry(path, updatetime)
start()
stop()
sync()
tidy_up()

Remove finished workers and restart unsuccessful delete jobs.

write_cache_entry(path, contents)

The transparent_caching_store Module

class cloudfusion.store.transparent_caching_store.TransparentMultiprocessingCachingStore(store, cache_expiration_time=60, cache_size_in_mb=2000, hard_cache_size_limit_in_mb=10000, cache_id=None, cache_dir='/tmp/cloudfusion')

Bases: cloudfusion.store.caching_store.MultiprocessingCachingStore, cloudfusion.store.transparent_store.TransparentStore

Implements the cloudfusion.store.cache_stats.TransparentStore interface to get statistics about a cache wrapping a store.

Parameters:
  • store – the store whose access should be cached
  • cache_expiration_time – the time in seconds until any cache entry is expired
  • cache_size_in_mb – Approximate (soft) limit of the cache in MB.
  • hard_cache_size_limit_in_mb – Hard limit of the cache in MB, exceeding this limit should slow down write operations.
  • cache_id – Serves as identifier for a persistent cache instance.
  • cache_dir – Cache directory on local hard drive disk, default value is /tmp/cloudfusion.
exceeds_hard_limit()
Returns:true if the hard limit of the cache is exceeded, which should should slow down write operations
get_cache_hits()
get_cache_misses()
get_cachesize()
Returns:the size of the cache in MB
get_dirty_files()
get_download_rate()

Get download rate in MB/s

get_downloaded()

Get amount of data downloaded from a store in MB

get_exception_stats()
get_file(path_to_file)
get_hard_limit()
Returns:the hard limit of the cache in MB, exceeding this limit should slow down write operations
get_status_information()
get_upload_rate()

Get upload rate in MB/s

get_uploaded()

Get amount of data uploaded to a store in MB

store_fileobject(fileobject, path)

The caching_store Module

Created on Jun 10, 2013

@author: joe

class cloudfusion.store.caching_store.MultiprocessingCachingStore(store, cache_expiration_time=60, cache_size_in_mb=2000, cache_id=None, cache_dir='/tmp/cloudfusion/')

Bases: cloudfusion.store.store.Store

Like CachingStore, but does not make guarantees as to the consistency of the wrapped store. Use of best effort strategy to synchronize the store. Employs multiple threads for increased throughput. Therefore, it can only use stores with a thread-safe put_file method. Unlike CachingStore, guarantees that write operations do not block for transfer, until the cache size limit is reached. Unlike CachingStore, guarantees that no write operations on the wrapped store are invoked until a cached item expires.

Parameters:
  • store – the store whose access should be cached
  • cache_expiration_time – the time in seconds until any cache entry is expired
  • cache_size_in_mb – Approximate limit of the cache in MB.
  • cache_id – Serves as identifier for a persistent cache instance.
  • cache_dir – Cache directory on local hard drive disk, default value is /tmp/cloudfusion.
_get_actual_modified_date(path)
_is_valid_path(path)
_raise_error_if_invalid_path(path)
_refresh_cache(path_to_file)

Reloads the locally cached file path_to_file from the wrapped store, if the wrapped store version is newer. The cached file’s last updated time is set to the current point of time. Makes a new cache entry if it does not exist yet. If the file was changed in the wrapped store after the cached file’s modified time, the cached file...

  • is updated with the contents of the file in the wrapped store
  • is set to not dirty
  • gets the modified time stamp of the file of the wrapped stores
Raises:NoSuchFilesytemObjectError if file does not exist in wrapped store.

TODO: exception raising does not work and needs to be implemented

account_info()
create_directory(directory)
delete(path, is_dir)
duplicate(path_to_src, path_to_dest)
exists(path)
get_bytes(path)
get_cache_expiration_time()
Returns:the time in seconds until any cache entry is expired
get_configuration(config)
get_directory_listing(directory)
get_file(path_to_file)

If the file was updated in the wrapped store, then its content in the cache will be updated if its entry is expired but not dirty. :returns: string – the data of the file with the path path_to_file

get_free_space()
get_logging_handler()
get_max_filesize()

Return maximum number of bytes per file

get_metadata(path)
get_modified(path)
get_name()
get_overall_space()
get_used_space()
is_cached_version_invalid(path)
Returns:True if the stores version is newer than the cached entry or does not exist and False otherwise.
is_dir(path)
move(path_to_src, path_to_dest)
reconnect()
set_configuration(config)
store_file(path_to_file, dest_dir='/', remote_file_name=None)
store_fileobject(fileobject, path)

Stores a fileobject to the cloudfusion.util.cache.Cache and if the existing fileobject has expired it is also written to the wrapped store. The cached file’s updated and modified attributes will be reset to the current point of time. The cached file’s dirty flag is set to False if the entry has expired and was hence written to the store. Otherwise it is set to True.

Parameters:
  • fileobject – The file object with the method read() returning data as a string
  • path – The path where the file object’s data should be stored, including the filename

The bulk_get_metadata Module

Created on 08.04.2011

class cloudfusion.store.bulk_get_metadata.BulkGetMetadata

Bases: object

get_bulk_metadata(directory)
Returns:A dictionary mapping the path of every file object in directory to a dictionary with the keys ‘modified’, ‘bytes’ and ‘is_dir’ containing the corresponding metadata for the file object.

The value for ‘modified’ is a date in seconds, stating when the file object was last modified. The value for ‘bytes’ is the number of bytes of the file object. It is 0 if the object is a directory. The value for ‘is_dir’ is True if the file object is a directory and False otherwise.

Raises:NoSuchFilesytemObjectError if the directory does not exist

The chunk_store_worker Module

class cloudfusion.store.chunk_store_worker.ChunkWriteWorker(store, chunk_dir, chunk_uuid, chunk, filepaths, pool, logger)

Bases: cloudfusion.store.store_worker.WriteWorker

The store Module

Created on 08.04.2011

exception cloudfusion.store.store.AlreadyExistsError(msg, status=0)

Bases: cloudfusion.store.store.StoreAccessError

exception cloudfusion.store.store.DateParseError(msg)

Bases: exceptions.Exception

exception cloudfusion.store.store.InterruptedException(msg)

Bases: exceptions.Exception

exception cloudfusion.store.store.InvalidPathValueError(path)

Bases: exceptions.ValueError

exception cloudfusion.store.store.NoSuchFilesytemObjectError(path, status=0)

Bases: cloudfusion.store.store.StoreAccessError

class cloudfusion.store.store.Store

Bases: object

Central interface for any cloud storage provider. Any cloud storage provider that is used by CloudFusion needs to implement this interface. After implementing the interface for a new provider, you can add file system access to it by introducing a new branch to the if statement in cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox.__get_new_store(). Advanced functionality such as caching and concurrency are supplied by wrappers, which are already implemented. Path parameters are always absolute paths of a file system, starting with a ‘/’

_is_valid_path(path)
_raise_error_if_invalid_path(path)
account_info()
Returns:a human readable string describing account info like provider, name, statistics
create_directory(directory)

Create the remote directory directory :param directory: the absolute path name of the directory to create :raises: AlreadyExistsError if the directory does already exist:

delete(path, is_dir)

Delete file or directory tree at path. :param path: path to the file or directory to delete :param is_dir: True iff path points to a directory :raises:[Errno 39] Directory not empty:

duplicate(path_to_src, path_to_dest)

Duplicate file or directory from path_to_src to path_to_dest. If path_to_dest exists before, it is deleted or overwritten. If path_to_src is a directory, the directory is duplicated with all its files and directories. Either this method or move() needs to be implemented in a subclass. :param path_to_src: must never be the same as path_to_dest :param path_to_dest: must end in the name of the child directory or the file specified by path_to_src

exists(path)
Returns:True if a remote file or directory exists at path, and False otherwise
get_bytes(path)
Returns:the number of bytes of the file at path, or 0 if path is a directory
get_configuration(config)

Get configuration options during runtime. The method is normally called by cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox, when the user reads CloudFusion’s configuration file in /config/config. It can return a dictionary with variables to display in /config/config. :returns: a dictionary with variable names and corresponsing values

get_directory_listing(directory)
Returns:list of absolute file paths of files in directory
get_file(path_to_file)
Returns:the data of the remote file at path_to_file as a string
Raises:NoSuchFilesytemObjectError if the object does not exist
get_free_space()
Returns:free space in bytes
get_logging_handler()

Get the name of the logging handler used by a subclass, so that the wrappers may use the same logger. Wrappers are responsible for extended functionality like caching data or concurrency i.e. cloudfusion.store.transparent_caching_store.TransparentMultiprocessingCachingStore. This method might simply return get_name(), even if the subclass does not use a logger. :return: the name of the logging handler used by a subclass and its wrappers.

get_max_filesize()

Return maximum number of bytes per file; Some cloud storages limit the size of files to be uploaded.

get_metadata(path)

This method is a hook that must be implemented by subclasses. If it is implemented, the methods exists(), get_bytes(), is_dir() work out of the box.

Returns:A dictionary with the keys ‘modified’, ‘bytes’ and ‘is_dir’ containing the corresponding metadata for path

The value for ‘modified’ is a date in seconds, stating when the object corresponding to path was last modified. The value for ‘bytes’ is the number of bytes of the object corresponding to path. It is 0 if the object is a directory. The value for ‘is_dir’ is True if the object is a directory and False otherwise.

Raises:NoSuchFilesytemObjectError if the object does not exist
get_modified(path)
Returns:the time path was modified in seconds from the epoche
get_name()
Returns:the name of the service; i.e. Amazon S3, or Dropbox
get_overall_space()
Returns:overall space in bytes
get_used_space()
Returns:space used by files in bytes
is_dir(path)
Returns:True if path is a remote file, and False if it is a remote directory
Raises:NoSuchFilesytemObjectError if the remote object does not exist
move(path_to_src, path_to_dest)

Rename a remote file or directory path_to_src to path_to_dest. If path_to_dest exists before, it is deleted or overwritten. If path_to_src is a directory, the directory is renamed to path_to_dest. Default implementation relies on an implementation of duplicate() in a subclass, but it should be overwritten. Either this method or duplicate() needs to be implemented in a subclass. :param path_to_src: path to a remote file or directory :param path_to_dest: path of the new remote file or directory

reconnect()

Try to reconnect to the service.

set_configuration(config)

Set configuration options during runtime. The method is normally called by cloudfusion.pyfusebox.configurable_pyfusebox.ConfigurablePyFuseBox, when the user changes CloudFusion’s configuration file in /config/config. :param config: a dictionary with configuration options

store_file(path_to_file, dest_dir='/', remote_file_name=None, interrupt_event=None)

Store the local file path_to_file to directory dest_dir on the store. :param path_to_file: local file path :param dest_dir: remote destination directory to store the contents of the local file to :param remote_file_name: the file name on the store; by default this is the original file name if this parameter is None. :param interrupt_event: (optional) If the value is not None, listen for an interrupt event with with interrupt_event.wait() until the file has been stored. Abort the upload if interrupt_event.wait() returns. :returns: (optional) the date in seconds, when the file was updated

store_fileobject(fileobject, path, interrupt_event=None)

Store the contents of fileobject to path on the store. :param fileobject: A file like object. The position of the fileobject needs to be at 0 (use fileobject.seek(0) before calling this method) :param path: The remote file path to store the contents of fileobject to :param interrupt_event: (optional) If the value is not None, listen for an interrupt event with with interrupt_event.wait() until the file has been stored. Abort the upload if interrupt_event.wait() returns. :returns: (optional) the date in seconds, when the file was updated

exception cloudfusion.store.store.StoreAccessError(msg, status=0)

Bases: exceptions.Exception

exception cloudfusion.store.store.StoreAutorizationError(msg, status=0)

Bases: cloudfusion.store.store.StoreAccessError

exception cloudfusion.store.store.StoreSpaceLimitError(msg='', status=0)

Bases: cloudfusion.store.store.StoreAccessError

The chunk_store_sync_thread Module

class cloudfusion.store.chunk_store_sync_thread.Archive(directory)

Bases: object

Archive representation used in ChunkFactory

class cloudfusion.store.chunk_store_sync_thread.Chunk(parent_dir, fileobject, filepaths)

Bases: object

Chunk representation: :param parent_dir: directory location to store the chunk :param fileobject: file in the local file system with its location in fileobject.name :param filepaths: list of paths of files in the chunk

class cloudfusion.store.chunk_store_sync_thread.ChunkFactory(logger, max_time_to_upload=200, max_chunk_size=4)

Bases: object

Factory that returns added files packed together as archives, if they are in the same directory.

Parameters:
  • max_time_to_upload – time in seconds until the chunkfactory returns a file inside a chunk after the file has been added to it
  • max_chunk_size – size of the chunk in MB that should not be exceeded when packing several files together
_create_archive(archive)

Create an actual tar archive in the file system. :returns: fileobject with path of the tar archive in the file system in the name property

_get_archive(filepath)

Get the archive to store filepath in, creating a new one if it does not exist.

_is_complete(archive)
Returns:True iff the archive is ready to be uploaded
_swap_out_completed_archives()

Checks for all archives if they are ready to be uploaded, and swap them to self.completed_archives

add(local_file, filepath)

Adds a new file to the chunk factory. :param local_file: fileobject with its absolute path in property name :param filepath: the filepath that local_file should be stored to in the wrapped store

force_get_all_chunks()

Force the factory to return chunks for all added files, ignoring max_time_to_upload, and max_chunk_size. :returns: a list of Chunk instances or an empty list if no chunk is available

get_new_chunk()

Get a chunk, if one is ready for upload according to max_time_to_upload, and max_chunk_size. The chunk file returned needs to be deleted if it is not used anymore. It is stored in chunk.name The list is removed from the list of archives that are ready to be uploaded. :returns: Instance of Chunk or None if no chunk is available.

get_size_of_next_chunk()
Returns:size in bytes of the next chunk returned by get_new_chunk() or 0 if there is none.
in_progress(filepath)
remove(filepath)

Removes added file if possible. :returns: True iff the file could be removed

class cloudfusion.store.chunk_store_sync_thread.ChunkStoreSyncThread(cache, store, temp_dir, logger, max_writer_threads=30)

Bases: object

Synchronizes between cache and store

_ChunkStoreSyncThread__sleep(seconds)

Sleep until seconds have passed since last call

_acquire_two_locks()
_check_for_failed_writers()
_garbage_collect_chunks()

Garbage collect stale chunks

_get_max_threads(size_in_mb)
Returns:the number of upload worker threads that should be used

according to the the file size and the average time needed to upload a file.

_get_reader(path)
Returns:active reader that uploads path or None if there is no such writer
_get_time_to_sleep()
_get_writer(path)
Returns:active writer that uploads path or None if there is no such writer
_reconnect()
_release_two_locks()
_remove_finished_readers()
_remove_finished_writers()
_remove_sleeping_writers()
_remove_successful_removers()
_restart_unsuccessful_removers()
blocking_read(path)
delete(path, is_dir)

Delete path from the remote store.

delete_cache_entry(path)
duplicate(path_to_src, path_to_dest)
enqueue_dirty_entries()

Start new writer jobs with dirty cache entries to synchronize all files.

enqueue_lru_entries()

Start new writer jobs with expired least recently used cache entries.

get_download_rate()

Get download rate in MB/s

get_downloaded()

Get amount of data downloaded from a store in MB

get_exception_stats()
get_upload_rate()

Get upload rate in MB/s

get_uploaded()

Get amount of data uploaded to a store in MB

is_cached_version_invalid(path)
Returns:True if the stores version is newer than the cached entry or does not exist and False otherwise.
is_in_progress(path)
Returns:True iff path is currently uploaded or being removed
last_heartbeat()

‘Get time since last heartbeat in seconds.

read(path)
refresh_cache_entry(path, contents, modified)
restart()
run()
set_dirty_cache_entry(path, is_dirty)
set_modified_cache_entry(path, updatetime)
start()
stop()
sync()
tidy_up()

Remove finished workers and restart unsuccessful delete jobs.

write_cache_entry(path, contents)
class cloudfusion.store.chunk_store_sync_thread.PersistentChunkMapper(temp_dir, logger)

Bases: object

Persistently maps between chunk names and filepaths of files in the chunk

_PersistentChunkMapper__num_to_alpha(num)

From: http://stackoverflow.com/questions/10326118/encoding-a-numeric-string-into-a-shortened-alphanumeric-string-and-back-again

_close()
add_aliases(chunk_uuid, alias_filepaths, new_alias_filepaths)

Adds an alias of a file in the chunk with id chunk_uuid

get_chunk_uuid(filepath)
Returns:the uuid of the chunk that filepath is stored in or None if it does not exist
get_empty_chunks()
Returns:chunks for garbage collection
get_files_in_chunk(chunk_name)
Parameters:chunk_name – absolute path to the chunk
Returns:list of absolute filepaths for files in the chunk
get_next_chunk_uuid()

Get globally unique identifier for the next chunk

iterate_files_from_chunk(chunk_content, chunk_id)
Parameters:
  • chunk_content – raw chunk content
  • chunk_id – unique id of the chunk
Returns:

a generator iterating over the file is the chunk

put(chunk_uuid, filepaths)

Adds a chunk mapping between chunk_uuid and filepaths

remove_file(filepath)

Remove filepath from the mapping.

cloudfusion.store.chunk_store_sync_thread.get_parent_dir(path)

The transparent_store Module

class cloudfusion.store.transparent_store.ExceptionStats(name='', exception_list=None, desc='', count=1, lasttime=1420728936.660947)

Bases: object

Statistics of a certain exception. :param name: the identifier of the exception :param exception_list: the list of the actual exception instances :param desc: the description of the exception (defaults to str(exception) ) :param count: number of occurences of the exception (defaults to 1) :param lasttime: the last time the exception occured in seconds from the epoche (defaults to current time)

static add_exception(exception, exceptions_log, name=None, desc=None, count=-1, lasttime=None)

Add an ExceptionStats object to the list exception_log or update time of occurence, exception_list, and count if there is a similar exception in the log. :param exception: the exception to add to the log :param exceptions_log: an existing dictionary of exceptions mapping their name to an ExceptionStats instance (might be empty) :param name: the identifier of the exception (defaults to type(exception) or repr(exception) ) :param desc: the description of the exception (defaults to str(exception) ) :param count: number of occurences of the exception (defaults to last count plus 1 or to 1 if if no exception with the same identifier exists) :param lasttime: the last time the exception occured in seconds from the epoche (defaults to current time) :returns: the updated exception log

class cloudfusion.store.transparent_store.TransparentStore

Bases: object

Interface to get statistics about a store. The unit MB is 1000000 Bytes.

get_cache_hits()

Get number of files that were accessed while they were cached

get_cache_misses()

Get number of files that were accessed while they were not in cache

get_dirty_files()

Get a list of file paths to files that are not already synchronized with the store

get_download_rate()

Get download rate in MB/s

get_downloaded()

Get amount of data downloaded from a store in MB

get_exception_stats()

Get dict of exception statistics with exception names mapping to ExceptionStats

get_status_information()

Get arbitrary string describing status of the store

get_upload_rate()

Get upload rate in MB/s

get_uploaded()

Get amount of data uploaded to a store in MB

The transparent_chunk_caching_store Module

class cloudfusion.store.transparent_chunk_caching_store.TransparentChunkMultiprocessingCachingStore(store, cache_expiration_time=60, cache_size_in_mb=2000, hard_cache_size_limit_in_mb=3000, cache_id=None, max_archive_size_in_mb=4, cache_dir='/tmp/cloudfusion')

Bases: cloudfusion.store.chunk_caching_store.ChunkMultiprocessingCachingStore, cloudfusion.store.transparent_store.TransparentStore

Implements the cloudfusion.store.cache_stats.TransparentStore interface to get statistics about a cache wrapping a store.

Parameters:
  • store – the store whose access should be cached
  • cache_expiration_time – the time in seconds until any cache entry is expired
  • cache_size_in_mb – Approximate (soft) limit of the cache in MB.
  • hard_cache_size_limit_in_mb – Hard limit of the cache in MB, exceeding this limit should slow down write operations.
  • cache_id – Serves as identifier for a persistent cache instance.
  • max_archive_size_in_mb – the maximum size of an archive
  • cache_dir – Cache directory on local hard drive disk, default value is /tmp/cloudfusion.
exceeds_hard_limit()
Returns:true if the hard limit of the cache is exceeded, which should should slow down write operations
get_cache_hits()
get_cache_misses()
get_cachesize()
Returns:the size of the cache in MB
get_dirty_files()
get_download_rate()
get_downloaded()
get_exception_stats()
get_file(path_to_file)
get_hard_limit()
Returns:the hard limit of the cache in MB, exceeding this limit should slow down write operations
get_status_information()
get_upload_rate()
get_uploaded()
store_fileobject(fileobject, path)

The chunk_caching_store Module

class cloudfusion.store.chunk_caching_store.ChunkMultiprocessingCachingStore(store, cache_expiration_time=60, cache_size_in_mb=2000, cache_id=None, max_archive_size_in_mb=4, cache_dir='/tmp/cloudfusion')

Bases: cloudfusion.store.store.Store

Puts small files into an archive, to upload and download them together. Garbage collects archives with stale contents. Employs multiple threads for increased throughput. Therefore, it can only use stores with a thread-safe put_file method. Write operations do not block for transfer, until the cache size limit is reached. No write operations on the wrapped store are invoked until a cached item expires.

Parameters:
  • store – the store whose access should be cached
  • max_archive_size_in_mb – the maximum size of an archive
  • cache_expiration_time – the time in seconds until any cache entry is expired
  • cache_size_in_mb – Approximate (soft) limit of the cache in MB.
  • hard_cache_size_limit_in_mb – Hard limit of the cache in MB, exceeding this limit should slow down write operations.
  • cache_id – Serves as identifier for a persistent cache instance.
  • cache_dir – Cache directory on local hard drive disk, default value is /tmp/cloudfusion.
_is_valid_path(path)
_raise_error_if_invalid_path(path)
_refresh_cache(path_to_file)

Reloads the locally cached file path_to_file from the wrapped store. Currently only retrieves files from store if they are not cached. #TODO: implement file version resolution over different sessions. #:raises: NoSuchFilesytemObjectError if file does not exist in wrapped store.

account_info()
create_directory(directory)
delete(path, is_dir)
duplicate(path_to_src, path_to_dest)
exists(path)
get_bytes(path)
get_cache_expiration_time()
Returns:the time in seconds until any cache entry is expired
get_configuration(config)
get_directory_listing(directory)
get_file(path_to_file)

If the file was updated in the wrapped store, then its content in the cache will be updated if its entry is expired but not dirty. :returns: string – the data of the file with the path path_to_file

get_free_space()
get_logging_handler()
get_max_archive_size()
Returns:the maximum size of an archive in MB
get_max_filesize()

Return maximum number of bytes per file

get_metadata(path)
get_modified(path)
get_name()
get_overall_space()
get_used_space()
is_dir(path)
reconnect()
set_configuration(config)
set_max_archive_size(max_archive_size_in_mb)

Set the maximum size of an archive in MB

store_file(path_to_file, dest_dir='/', remote_file_name=None)
store_fileobject(fileobject, path)

Stores a fileobject to the cloudfusion.util.cache.Cache and if the existing fileobject has expired it is also written to the wrapped store. The cached file’s updated and modified attributes will be reset to the current point of time. The cached file’s dirty flag is set to False if the entry has expired and was hence written to the store. Otherwise it is set to True.

Parameters:
  • fileobject – The file object with the method read() returning data as a string
  • path – The path where the file object’s data should be stored, including the filename

The store_worker Module

class cloudfusion.store.store_worker.GetFreeSpaceWorker(store, logger, poll_wait_time_in_s=600)

Bases: object

Worker to cyclically poll for free space on store.

_run()
get_free_bytes_in_remote_store()
is_alive()
start()
stop()
class cloudfusion.store.store_worker.LeightWeightValue(val)

Bases: object

To replace multiprocessing.Value.

class cloudfusion.store.store_worker.ReadWorker(store, path, logger)

Bases: object

_check_result()
_clean_up()
_get_result()
_run(result_queue, end_time)
get_duration()

Get duration of download in seconds

get_endtime()

Get the end time of the download in seconds from the epoche

get_error()
get_filesize()

Get size of the file to write in bytes

get_result()

Get the data of the read file. This only works once after a successful read and is a blocking call. Use is_successful to check if the read has been successful without blocking.

get_starttime()

Get the start time of the download in seconds from the epoche

is_finished()
is_successful()
start()
stop()
class cloudfusion.store.store_worker.RemoveWorker(store, path, is_dir, logger)

Bases: object

_run()
is_finished()
is_successful()
start()
stop()
class cloudfusion.store.store_worker.WorkerStats

Bases: object

Statistics about workers. Resets statistics automatically after 100*1000 workers.

_log_exception(exception)
add_finished_worker(worker)
get_download_rate()
get_download_time()

Get download time considering parallel downloads.

get_upload_rate()
get_upload_time()

Get download time considering parallel uploads.

reset()

Resets all statistics.

class cloudfusion.store.store_worker.WriteWorker(store, path, file, pool, logger)

Bases: object

Uploads a single file in a separate process. The start method is used to begin the upload. The method is_finished can be used to check if the worker is done. is_successful shows if it has been successful after it is done. If not successful, an error message can be retrieved with get_error. Other methods can be used to get upload statistics.

Parameters:
  • store – Store instance that will be deepcopied and used in a newly created process to upload file
  • path – path that is used in the store as a reference to the uploaded file
  • file – fileobject with a name attribute; file.name needs to be a file on the local harddrive; the file is removed after the worker is finished
  • pool – a process pool of type WriteWorkerProcesses
  • logger – a multiprocessing logger
_check_result()
_clean_up()
get_duration()

Get duration of upload in seconds

get_endtime()

Get the end time of the upload in seconds from the epoche

get_error()
get_filesize()

Get size of the file to write in bytes

get_starttime()

Get the start time of the upload in seconds from the epoche

get_updatetime()

Get the point of time the file has been updated in the store in seconds from the epoche

is_finished()
is_sleeping()
Returns:True iff the worker does nothing.
is_successful()
kill()

Forcefully stop the upload process.

start()
stop()
class cloudfusion.store.store_worker.WriteWorkerProcesses(store, logger)

Bases: object

Process pool for WriteWorker.

Parameters:store – An implementation of the Store interface.
END_TIME = 'end_time'
LOCAL_FILEPATH = 'local_filepath'
REMOTE_FILEPATH = 'remote_filepath'
_run(result_queue, interrupt_event, parameters, store)
get_worker(local_filepath, remote_filepath)
Parameters:
  • local_filepath – The path of the local file.
  • remote_filepath – The remote path to store the local file.
Returns:

a worker process from the pool.

recycle_worker(worker)

Put the worker process back into the pool for reuse.

The metadata_caching_store Module

class cloudfusion.store.metadata_caching_store.Entry

Bases: object

add_to_listing(path)
remove_from_listing(path)
set_is_dir()
set_is_file()
set_modified(modified=None)
class cloudfusion.store.metadata_caching_store.MetadataCachingStore(store, cache_expiration_time=60)

Bases: cloudfusion.store.store.Store

_MetadataCachingStore__clean_cache()

Delete all expired cache entries only if last called after cache_expiration_time seconds as defined in the constructor.

_acquire_uploading_lock()

This method needs to be called before uploading data.

_add_existing_items(dir_entry, dir_entry_path)

Add existing files or directories to dir_entry because they might have been uploaded recently and might not be retrievable by a directory listing from the storage provider.

_add_parent_dir_listing(path)

Add listing for parent directory of path to cache if it does not yet exist

_add_to_parent_dir_listing(path)
_does_not_exist_in_parent_dir_listing(path)

‘:returns: True if path does not exist in the cached directory listing

_is_valid_path(path)
_prefetch_directory(path)
_prepare_entry(path, metadata)
_raise_error_if_invalid_path(path)
_release_uploading_lock()

This method needs to be called after uploading data.

_remove_from_parent_dir_listing(path)
account_info()
clean_expired_cache_entries()

Delete all expired cache entries iff no upload is going on. :returns: True iff the cache could be cleaned.

create_directory(directory)
delete(path, is_dir)
duplicate(path_to_src, path_to_dest)
exists(path)
get_bytes(path)
get_configuration(config)
get_directory_listing(directory)
get_file(path_to_file)
get_free_space()
get_logging_handler()
get_max_filesize()

Return maximum number of bytes per file

get_metadata(path)

As a side effect cleans expired cache entries from time to time

get_modified(path)
get_name()
get_overall_space()
get_used_space()
is_dir(path)
move(path_to_src, path_to_dest)
set_configuration(config)
store_file(path_to_file, dest_dir='/', remote_file_name=None, interrupt_event=None)
store_fileobject(fileobject, path, interrupt_event=None)