AGLOW utils

The AGLOW utils module includes small functions that can be used with a PythonOperator to interface with LOFAR data, LOFAR fields processing, GRID storage and others.

AGLOW.airflow.utils.AGLOW_utils.archive_all_tokens(token_type, archive_location, delete=False)[source]
AGLOW.airflow.utils.AGLOW_utils.archive_tokens_from_task(token_task, delete=False, **context)[source]

Determines whic tokens to archive and saves them. delete if necessary

AGLOW.airflow.utils.AGLOW_utils.check_folder_for_files(folder, number=1)[source]

Raises an exception (IE FAILS) if the (gsiftp) folder has less than ‘number’ files

By default fails if folder is empty

AGLOW.airflow.utils.AGLOW_utils.check_folder_for_files_from_task(taskid, xcom_key, number, **context)[source]

Either uses number to see how many files should be there or checks the number of tokens in the view TODO:make this the right way

AGLOW.airflow.utils.AGLOW_utils.check_folder_for_files_from_tokens(task_id, dummy, number, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.check_if_running_field(fields_file)[source]

Checks if there are fields that are running (ie not completed, or Error) Returns True if running, False if not

AGLOW.airflow.utils.AGLOW_utils.copy_gsifile(src, dest)[source]
AGLOW.airflow.utils.AGLOW_utils.copy_to_archive(src_dir=u'gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lofar/user/sksp/distrib/SKSP', dest_dir=u'gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lofar/user/sksp/archive/SKSP/', **context)[source]
AGLOW.airflow.utils.AGLOW_utils.count_files_uberftp(directory)[source]
AGLOW.airflow.utils.AGLOW_utils.count_from_task(srmlist_task, srmfile_name, task_if_less, task_if_more, pipeline=u'SKSP', step=u'pref_cal2', min_num_files=1, parent_dag=False, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.count_grid_files(srmlist_file, task_if_less, task_if_more, pipeline=u'SKSP', step=u'pref_cal1', min_num_files=1)[source]

An airflow function that branches depending on whether there are calibrator or target solutions matching the files in the srmlist.

AGLOW.airflow.utils.AGLOW_utils.create_gsiftp_directory(gsiftp_directory)[source]
AGLOW.airflow.utils.AGLOW_utils.delete_gsiftp_files(gsiftp_directory)[source]
AGLOW.airflow.utils.AGLOW_utils.delete_gsiftp_from_task(root_dir, OBSID_task, **context)[source]

A task that returns OBSID, and a root directory come together in this function to gracefully delete all files in this here directory

AGLOW.airflow.utils.AGLOW_utils.get_cal_from_dir(base_dir, return_key=None, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.get_field_location_from_srmlist(srmlist_task, srmfile_key=u'targ_srmfile', **context)[source]

Gets the srmlist from a task and returns the location of the field IE the LTA location where the raw data is stored

AGLOW.airflow.utils.AGLOW_utils.get_list_from_dir(base_dir, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.get_next_field(fields_file, indicator=u'SND', **context)[source]

Determines the next field to be processed, uses the set_field_status function to set its status to started(). By default it locks SARA tokens (marked SND in the first column of the fields_file), however using the indicator variable, other files can be locked

AGLOW.airflow.utils.AGLOW_utils.get_result_files_from_tokenlist(token_type, token_ids, key=u'Results_location', **kwargs)[source]
AGLOW.airflow.utils.AGLOW_utils.get_results_from_subdag(subdag_id, task=u'tokens', key=u'Results_location', return_key=None, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.get_srmfile_from_dir(srmdir, field_task, var_calib=u'SKSP_Prod_Calibrator_srm_file', var_targ=u'SKSP_Prod_Target_srm_file', **context)[source]
AGLOW.airflow.utils.AGLOW_utils.get_task_instance(context, key, parent_dag=False)[source]
AGLOW.airflow.utils.AGLOW_utils.get_user_proxy(username)[source]

Gets the X509 file by the user in order to perform authorized operations by them as opposed as by the DAG Executor

AGLOW.airflow.utils.AGLOW_utils.get_var_from_task_decorator(Cls, upstream_task_id=u'', upstream_return_key=u'', u_task=None)[source]

wrapper for functions that require an fixed input, which is here provided by a previous task

AGLOW.airflow.utils.AGLOW_utils.launch_processing_subdag(prev_task, **context)[source]
AGLOW.airflow.utils.AGLOW_utils.make_srmfile_from_step_results(prev_step_token_task, parent_dag=None)[source]

Makes a list of srms using the results of all the tokens in a previous task

AGLOW.airflow.utils.AGLOW_utils.modify_parset(parset_path, freq_res, time_res, OBSID, flags, demix_sources)[source]

Takes in a base_parset path and changes the time and frequency resolution parameters of this parset. Saves it into a tempfile. Returns the tempfile_path

AGLOW.airflow.utils.AGLOW_utils.modify_parset_from_fields_task(parsets_dict={}, fields_task=None, time_avg=8, freq_avg=2, flags=None, **context)[source]

Takes a dict of ‘original’ parsets and the task with the fields information which will be used to update the averaging paremetes. Returns a dict of the ‘name’:’location’ of the modified parsets, so they can be used by the upload tokens task

AGLOW.airflow.utils.AGLOW_utils.set_field_status(fields_file, cal_OBSID, targ_OBSID, field_name, status)[source]
AGLOW.airflow.utils.AGLOW_utils.set_field_status_from_task_return(fields_file, task_id, status_task, **context)[source]

Sets the field status based on the (String) reuturned by the status_task variable

AGLOW.airflow.utils.AGLOW_utils.set_field_status_from_taskid(fields_file, task_id, status, **context)[source]

sets the field status as the status input variable

AGLOW.airflow.utils.AGLOW_utils.set_user_proxy_var(proxy_location)[source]
AGLOW.airflow.utils.AGLOW_utils.stage_if_needed(stage_task, run_if_staged, run_if_not_staged, **context)[source]

This function takes the check_if_staged task and stages the files if None is returned. Otherwise it passes the srmlist to the ‘join’ task and the processing continues