cctools
ndcctools.taskvine.dask_executor.DaskVine Class Reference

TaskVine Manager specialized to compute dask graphs. More...

Inheritance diagram for ndcctools.taskvine.dask_executor.DaskVine:
ndcctools.taskvine.manager.Manager

Public Member Functions

 get (self, dsk, keys, *environment=None, extra_files=None, worker_transfers=True, env_vars=None, low_memory_mode=False, checkpoint_fn=None, resources=None, resources_mode=None, submit_per_cycle=None, max_pending=None, retries=5, verbose=False, lib_extra_functions=None, lib_resources=None, lib_command=None, lib_modules=None, task_mode='tasks', env_per_task=False, progress_disable=False, progress_label="[green]tasks", wrapper=None, wrapper_proc=print, prune_files=False, hoisting_modules=None, import_modules=None, lazy_transfers=True)
 Execute the task graph dsk and return the results for keys in graph.
 
- Public Member Functions inherited from ndcctools.taskvine.manager.Manager
 __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", staging_path=None, ssl=None, init_fn=None, status_display_interval=None)
 Create a new manager.
 
 name (self)
 Get the project name of the manager.
 
 port (self)
 Get the listening port of the manager.
 
 using_ssl (self)
 Whether the manager is using ssl to talk to workers.
 
 logging_directory (self)
 Get the logs directory of the manager.
 
 staging_directory (self)
 Get the staging directory of the manager.
 
 library_logging_directory (self)
 Get the library logs directory of the manager.
 
 cache_directory (self)
 Get the caching directory of the manager.
 
 stats (self)
 Get manager statistics.
 
 stats_category (self, category)
 Get the task statistics for the given category.
 
 status (self, request)
 Get manager information as list of dictionaries.
 
 summarize_workers (self)
 Get resource statistics of workers connected.
 
 update_catalog (self)
 Send update to catalog server.
 
 set_category_mode (self, category, mode)
 Turn on or off first-allocation labeling for a given category.
 
 set_category_autolabel_resource (self, category, resource, autolabel)
 Turn on or off first-allocation labeling for a given category and resource.
 
 task_state (self, task_id)
 Get current task state.
 
 enable_monitoring (self, watchdog=True, time_series=False)
 Enables resource monitoring for tasks.
 
 enable_peer_transfers (self)
 Enable P2P worker transfer functionality.
 
 disable_peer_transfers (self)
 Disable P2P worker transfer functionality.
 
 enable_disconnect_slow_workers (self, multiplier)
 Change the project name for the given manager.
 
 enable_disconnect_slow_workers_category (self, name, multiplier)
 Enable disconnect slow workers functionality for a given manager.
 
 set_draining_by_hostname (self, hostname, drain_mode=True)
 Turn on or off draining mode for workers at hostname.
 
 empty (self)
 Determine whether there are any known tasks managerd, running, or waiting to be collected.
 
 hungry (self)
 Determine whether the manager can support more tasks.
 
 set_scheduler (self, scheduler)
 Set the worker selection scheduler for manager.
 
 set_name (self, name)
 Change the project name for the given manager.
 
 set_manager_preferred_connection (self, mode)
 Set the preference for using hostname over IP address to connect.
 
 set_min_task_id (self, minid)
 Set the minimum task_id of future submitted tasks.
 
 set_priority (self, priority)
 Change the project priority for the given manager.
 
 tasks_left_count (self, ntasks)
 Specify the number of tasks not yet submitted to the manager.
 
 set_catalog_servers (self, catalogs)
 Specify the catalog servers the manager should report to.
 
 set_property (self, name, value)
 Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations.
 
 set_runtime_info_path (self, dirname)
 Specify a directory to write logs and staging files.
 
 set_password (self, password)
 Add a mandatory password that each worker must present.
 
 set_password_file (self, file)
 Add a mandatory password file that each worker must present.
 
 set_resources_max (self, rmd)
 Specifies the maximum resources allowed for the default category.
 
 set_resources_min (self, rmd)
 Specifies the minimum resources allowed for the default category.
 
 set_category_resources_max (self, category, rmd)
 Specifies the maximum resources allowed for the given category.
 
 set_category_resources_min (self, category, rmd)
 Specifies the minimum resources allowed for the given category.
 
 set_category_first_allocation_guess (self, category, rmd)
 Specifies the first-allocation guess for the given category.
 
 set_category_max_concurrent (self, category, max_concurrent)
 Specifies the maximum resources allowed for the given category.
 
 initialize_categories (self, filename, rm)
 Initialize first value of categories.
 
 cancel_by_task_id (self, id)
 Cancel task identified by its task_id.
 
 cancel_by_task_tag (self, tag)
 Cancel task identified by its tag.
 
 cancel_by_category (self, category)
 Cancel all tasks of the given category.
 
 cancel_all (self)
 Cancel all tasks.
 
 workers_shutdown (self, n=0)
 Shutdown workers connected to manager.
 
 block_host (self, host)
 Block workers running on host from working for the manager.
 
 blacklist (self, host)
 Replaced by ndcctools.taskvine.manager.Manager.block_host.
 
 block_host_with_timeout (self, host, timeout)
 Block workers running on host for the duration of the given timeout.
 
 blacklist_with_timeout (self, host, timeout)
 See ndcctools.taskvine.manager.Manager.block_host_with_timeout.
 
 unblock_host (self, host=None)
 Unblock given host, of all hosts if host not given.
 
 blacklist_clear (self, host=None)
 See ndcctools.taskvine.manager.Manager.unblock_host.
 
 set_keepalive_interval (self, interval)
 Change keepalive interval for a given manager.
 
 set_keepalive_timeout (self, timeout)
 Change keepalive timeout for a given manager.
 
 tune (self, name, value)
 Tune advanced parameters.
 
 submit (self, task)
 Submit a task to the manager.
 
 install_library (self, task)
 Submit a library to install on all connected workers.
 
 remove_library (self, name)
 Remove a library from all connected workers.
 
 check_library_exists (self, library_name)
 Check whether a libray exists on the manager or not.
 
 create_library_from_functions (self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None)
 Turn a list of python functions into a Library Task.
 
 create_library_from_serverized_files (self, library_name, library_path, env=None)
 Turn Library code created with poncho_package_serverize into a Library Task.
 
 create_library_from_command (self, executable_path, name, env=None)
 Create a Library task from arbitrary inputs.
 
 wait (self, timeout="wait_forever")
 Wait for tasks to complete.
 
 wait_for_tag (self, tag, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag.
 
 wait_for_task_id (self, task_id, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id.
 
 application_info (self)
 Should return a dictionary with information for the status display.
 
 map (self, fn, seq, chunksize=1)
 Maps a function to elements in a sequence using taskvine.
 
 pair (self, fn, seq1, seq2, chunksize=1, env=None)
 Returns the values for a function of each pair from 2 sequences.
 
 tree_reduce (self, fn, seq, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value.
 
 remote_map (self, fn, seq, library, name, chunksize=1)
 Maps a function to elements in a sequence using taskvine remote task.
 
 remote_pair (self, fn, seq1, seq2, library, name, chunksize=1)
 Returns the values for a function of each pair from 2 sequences using remote task.
 
 remote_tree_reduce (self, fn, seq, library, name, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value.
 
 declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False)
 Declare a file obtained from the local filesystem.
 
 fetch_file (self, file)
 Fetch file contents from the cluster or local disk.
 
 undeclare_file (self, file)
 Un-declare a file that was created by declare_file or similar methods.
 
 undeclare_function (self, fn)
 Remove the manager's local serialized copy of a function used with PythonTask.
 
 declare_temp (self)
 Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks.
 
 declare_url (self, url, cache=False, peer_transfer=True)
 Declare a file obtained from a remote URL.
 
 declare_buffer (self, buffer=None, cache=False, peer_transfer=True)
 Declare a file created from a buffer in memory.
 
 declare_minitask (self, minitask, source, cache=False, peer_transfer=True)
 Declare a file created by executing a mini-task.
 
 declare_untar (self, tarball, cache=False, peer_transfer=True)
 Declare a file created by by unpacking a tar file.
 
 declare_poncho (self, package, cache=False, peer_transfer=True)
 Declare a file that sets up a poncho environment.
 
 declare_starch (self, starch, cache=False, peer_transfer=True)
 Declare a file create a file by unpacking a starch package.
 
 declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server.
 
 declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server.
 
 log_txn_app (self, entry)
 Adds a custom APPLICATION entry to the transactions log.
 
 log_debug_app (self, entry)
 Adds a custom APPLICATION entry to the debug log.
 

Detailed Description

TaskVine Manager specialized to compute dask graphs.

Managers created via DaskVine can be used to execute dask graphs via the method ndcctools.taskvine.dask_executor.DaskVine.get as follows:

m = DaskVine(...)
# Initialize as any other. @see ndcctools.taskvine.manager.Manager
result = v.compute(scheduler= m.get)
# or set by temporarily as the default for dask:
with dask.config.set(scheduler=m.get):
result = v.compute()

Parameters for execution can be set as arguments to the compute function. These arguments are applied to each task executed:

my_env = m.declare_poncho("my_env.tar.gz")
with dask.config.set(scheduler=m.get):
# Each task uses at most 4 cores, they run in the my_env environment, and
# their allocation is set to maximum values seen.
# If resource_mode is different than None, then the resource monitor is activated.
result = v.compute(resources={"cores": 1}, resources_mode="max", environment=my_env)

Member Function Documentation

◆ get()

ndcctools.taskvine.dask_executor.DaskVine.get (   self,
  dsk,
  keys,
environment = None,
  extra_files = None,
  worker_transfers = True,
  env_vars = None,
  low_memory_mode = False,
  checkpoint_fn = None,
  resources = None,
  resources_mode = None,
  submit_per_cycle = None,
  max_pending = None,
  retries = 5,
  verbose = False,
  lib_extra_functions = None,
  lib_resources = None,
  lib_command = None,
  lib_modules = None,
  task_mode = 'tasks',
  env_per_task = False,
  progress_disable = False,
  progress_label = "[green]tasks",
  wrapper = None,
  wrapper_proc = print,
  prune_files = False,
  hoisting_modules = None,
  import_modules = None,
  lazy_transfers = True 
)

Execute the task graph dsk and return the results for keys in graph.

Parameters
dskThe task graph to execute.
keysA possible nested list of keys to compute the value from dsk.
environmentA taskvine file representing an environment to run the tasks.
extra_filesA dictionary of {taskvine.File: "remote_name"} to add to each task.
worker_transfersWhether to keep intermediate results only at workers (True, default) or to bring back each result to the manager (False). True is more IO efficient, but runs the risk of needing to recompute results if workers are lost.
env_varsA dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string.
low_memory_modeSplit graph vertices to reduce memory needed per function call. It removes some of the dask graph optimizations, thus proceed with care.
checkpoint_fnWhen using worker_transfers, a predicate with arguments (dag, key) called before submitting a task. If True, the result is brought back to the manager.
resourcesA dictionary with optional keys of cores, memory and disk (MB) to set maximum resource usage per task.
lib_extra_functionsAdditional functions to include in execution library (function-calls task_mode)
lib_resourcesA dictionary with optional keys of cores, memory and disk in MB (function-calls task_mode)
lib_commandA command to be prefixed to the execution of a Library task (function-calls task_mode)
lib_modulesHoist these module imports for the execution library (function-calls task_mode)
env_per_taskexecute each task
resources_modeAutomatically resize allocation per task. One of 'fixed' (use the value of 'resources' above), 'max througput', 'max' (for maximum values seen), 'min_waste', 'greedy bucketing' or 'exhaustive bucketing'. This is done per function type in dsk.
task_modeCreate tasks as either as 'tasks' (using PythonTasks) or 'function-calls' (using FunctionCalls)
retriesNumber of times to attempt a task. Default is 5.
submit_per_cycleMaximum number of tasks to submit to scheduler at once. If None, or less than 1, then all tasks are submitted as they are available.
max_pendingMaximum number of tasks without a result before new ones are submitted to the scheduler. If None, or less than 1, then no limit is set.
verboseif true, emit additional debugging information.
env_per_taskif true, each task individually expands its own environment. Must use environment option as a str.
progress_disableIf True, disable progress bar
progress_labelLabel to use in progress bar
wrapperFunction to wrap dask calls. It should take as arguments (key, fn, *args). It should execute fn(*args) at some point during its execution to produce the dask task result. Should return a tuple of (wrapper result, dask call result). Use for debugging.
wrapper_procFunction to process results from wrapper on completion. (default is print)
prune_filesIf True, remove files from the cluster after they are no longer needed.

The documentation for this class was generated from the following file: