AGLOW sensors

Derived from the airflow sensor class, the AGLOW sensors can track long-running events using GRID and LOFAR tools

gliteSensor

The purpose of the gliteSensor is to monitor a job submitted with glite-wms-job-submit. Given a URL pointing to the job, this sensor polls the jobs status at a regular interval using the airflow sensor’s poke feature. It parses the output and exits when all of the jobs have exited (whether completed successfully or not)

class AGLOW.airflow.sensors.glite_wms_sensor.gliteSensor(**kwargs)[source]

Bases: airflow.operators.sensors.BaseSensorOperator

An sensor initialized with the glite-wms job ID. It tracks the status of the job and returns only when all the jobs have exited (finished OK or not)

Parameters:
  • submit_task (string) – The task which submitted the jobs (should return a glite-wms job ID)
  • success_threshold – Currently a dummy
__init__(**kwargs)[source]

x.__init__(…) initializes x; see help(type(x)) for signature

add_only_new(item_set, item)

Adds only new items to item set

clear(**kwargs)

Clears the state of task instances associated with the task, following the parameters specified.

count_successes(jobs)[source]

Counts the number of Completed jobs in the results of the glite-wms-job-status output. Returns all the job statuses and sets self.job_status if it’s Done

Parameters:jobs (str) – A string containing the full output of glite-wms-job-status
dag

Returns the Operator’s DAG if set, otherwise raises an error

dag_id

Returns dag id if it has one or an adhoc + owner

deps

Adds one additional dependency for all sensor operators that checks if a sensor task instance can be rescheduled.

downstream_list

@property: list of tasks directly downstream

downstream_task_ids

@property: list of ids of tasks directly downstream

dry_run()

Performs dry run for the operator - just render template fields.

execute(context)

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

@property: extra links for the task.

get_direct_relative_ids(upstream=False)

Get the direct relative ids to the current task, upstream or downstream.

get_direct_relatives(upstream=False)

Get the direct relatives to the current task, upstream or downstream.

For an operator, gets the URL that the external links specified in extra_links should point to.

Raises:

ValueError – The error message of a ValueError will be passed on through to the fronted to show up as a tooltip on the disabled link

Parameters:
  • dttm – The datetime parsed execution date for the URL being searched for
  • link_name – The name of the link we’re looking for the URL for. Should be one of the options specified in extra_links
Returns:

A URL

get_flat_relative_ids(upstream=False, found_descendants=None)

Get a flat list of relatives’ ids, either upstream or downstream.

get_flat_relatives(upstream=False)

Get a flat list of relatives, either upstream or downstream.

classmethod get_serialized_fields()

Stringified DAGs and operators contain exactly these fields.

get_task_instances(**kwargs)

Get a set of task instance related to this task for a specific date range.

get_template_env()

Fetch a Jinja template environment from the DAG or instantiate empty environment if no DAG.

Returns dictionary of all global extra links

has_dag()

Returns True if the Operator has been assigned to a DAG.

log
logger
next()
on_kill()

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

Returns dictionary of all extra links for the operator

parse_glite_jobs(jobs)[source]
poke(context)[source]

Function called every (by default 2) minutes. It calls glite-wms-job-status on the jobID and exits if all the jobs have finished/crashed.

pool = ''
post_execute(context, *args, **kwargs)

This hook is triggered right after self.execute() is called. It is passed the execution context and any results returned by the operator.

pre_execute(context, *args, **kwargs)

This hook is triggered right before self.execute() is called.

prepare_template()

Hook that is triggered after the templated fields get replaced by their content. If you need your operator to alter the content of the file before the template is rendered, it should override this method to do so.

priority_weight_total

Total priority weight for the task. It might include all upstream or downstream tasks. depending on the weight rule.

  • WeightRule.ABSOLUTE - only own weight
  • WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks
  • WeightRule.UPSTREAM - adds priority weight of all upstream tasks
render_template(content, context, jinja_env=None, seen_oids=None)

Render a templated string. The content can be a collection holding multiple templated strings and will be templated recursively.

Parameters:
  • content (Any) – Content to template. Only strings can be templated (may be inside collection).
  • context (dict) – Dict with values to apply on templated content
  • jinja_env (jinja2.Environment) – Jinja environment. Can be provided to avoid re-creating Jinja environments during recursion.
  • seen_oids (set) – template fields already rendered (to avoid RecursionError on circular dependencies)
Returns:

Templated content

render_template_fields(context, jinja_env=None)

Template all attributes listed in template_fields. Note this operation is irreversible.

Parameters:
  • context (dict) – Dict with values to apply on content
  • jinja_env (jinja2.Environment) – Jinja environment
reschedule
resolve_template_files()
run(start_date=None, end_date=None, ignore_first_depends_on_past=False, ignore_ti_state=False, mark_success=False)

Run a set of task instances for a date range.

schedule_interval

The schedule interval of the DAG always wins over individual tasks so that tasks within a DAG always line up. The task still needs a schedule_interval as it may not be attached to a DAG.

set_downstream(task_or_task_list)

Set a task or a task list to be directly downstream from the current task.

set_upstream(task_or_task_list)

Set a task or a task list to be directly upstream from the current task.

shallow_copy_attrs = ()
skip(**kwargs)

Sets tasks instances to skipped from the same dag run.

Parameters:
  • dag_run – the DagRun for which to set the tasks to skipped
  • execution_date – execution_date
  • tasks – tasks to skip (not task_ids)
  • session – db session to use
skip_all_except(ti, branch_task_ids)

This method implements the logic for a branching operator; given a single task ID or list of task IDs to follow, this skips all other tasks immediately downstream of this operator.

task_type

@property: type of the task

template_ext = ()
template_fields = ()
ui_color = '#7c7287'
ui_fgcolor = '#000'
upstream_list

@property: list of tasks directly upstream

upstream_task_ids

@property: list of ids of tasks directly upstream

valid_modes = ['poke', 'reschedule']
xcom_pull(context, task_ids=None, dag_id=None, key='return_value', include_prior_dates=None)

See TaskInstance.xcom_pull()

xcom_push(context, key, value, execution_date=None)

See TaskInstance.xcom_push()