AGLOW utils¶
The AGLOW utils module includes small functions that can be used with a PythonOperator to interface with LOFAR data, LOFAR fields processing, GRID storage and others.
-
AGLOW.airflow.utils.AGLOW_utils.
archive_all_tokens
(token_type, archive_location, delete=False)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
archive_tokens_from_task
(token_task, delete=False, **context)[source]¶ Determines whic tokens to archive and saves them. delete if necessary
-
AGLOW.airflow.utils.AGLOW_utils.
check_folder_for_files
(folder, number=1)[source]¶ Raises an exception (IE FAILS) if the (gsiftp) folder has less than ‘number’ files
By default fails if folder is empty
-
AGLOW.airflow.utils.AGLOW_utils.
check_folder_for_files_from_task
(taskid, xcom_key, number, **context)[source]¶ Either uses number to see how many files should be there or checks the number of tokens in the view TODO:make this the right way
-
AGLOW.airflow.utils.AGLOW_utils.
check_folder_for_files_from_tokens
(task_id, dummy, number, **context)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
check_if_running_field
(fields_file)[source]¶ Checks if there are fields that are running (ie not completed, or Error) Returns True if running, False if not
-
AGLOW.airflow.utils.AGLOW_utils.
copy_to_archive
(src_dir=u'gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lofar/user/sksp/distrib/SKSP', dest_dir=u'gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lofar/user/sksp/archive/SKSP/', **context)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
count_from_task
(srmlist_task, srmfile_name, task_if_less, task_if_more, pipeline=u'SKSP', step=u'pref_cal2', min_num_files=1, parent_dag=False, **context)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
count_grid_files
(srmlist_file, task_if_less, task_if_more, pipeline=u'SKSP', step=u'pref_cal1', min_num_files=1)[source]¶ An airflow function that branches depending on whether there are calibrator or target solutions matching the files in the srmlist.
-
AGLOW.airflow.utils.AGLOW_utils.
delete_gsiftp_from_task
(root_dir, OBSID_task, **context)[source]¶ A task that returns OBSID, and a root directory come together in this function to gracefully delete all files in this here directory
-
AGLOW.airflow.utils.AGLOW_utils.
get_field_location_from_srmlist
(srmlist_task, srmfile_key=u'targ_srmfile', **context)[source]¶ Gets the srmlist from a task and returns the location of the field IE the LTA location where the raw data is stored
-
AGLOW.airflow.utils.AGLOW_utils.
get_next_field
(fields_file, indicator=u'SND', **context)[source]¶ Determines the next field to be processed, uses the set_field_status function to set its status to started(). By default it locks SARA tokens (marked SND in the first column of the fields_file), however using the indicator variable, other files can be locked
-
AGLOW.airflow.utils.AGLOW_utils.
get_result_files_from_tokenlist
(token_type, token_ids, key=u'Results_location', **kwargs)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
get_results_from_subdag
(subdag_id, task=u'tokens', key=u'Results_location', return_key=None, **context)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
get_srmfile_from_dir
(srmdir, field_task, var_calib=u'SKSP_Prod_Calibrator_srm_file', var_targ=u'SKSP_Prod_Target_srm_file', **context)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
get_user_proxy
(username)[source]¶ Gets the X509 file by the user in order to perform authorized operations by them as opposed as by the DAG Executor
-
AGLOW.airflow.utils.AGLOW_utils.
get_var_from_task_decorator
(Cls, upstream_task_id=u'', upstream_return_key=u'', u_task=None)[source]¶ wrapper for functions that require an fixed input, which is here provided by a previous task
-
AGLOW.airflow.utils.AGLOW_utils.
make_srmfile_from_step_results
(prev_step_token_task, parent_dag=None)[source]¶ Makes a list of srms using the results of all the tokens in a previous task
-
AGLOW.airflow.utils.AGLOW_utils.
modify_parset
(parset_path, freq_res, time_res, OBSID, flags, demix_sources)[source]¶ Takes in a base_parset path and changes the time and frequency resolution parameters of this parset. Saves it into a tempfile. Returns the tempfile_path
-
AGLOW.airflow.utils.AGLOW_utils.
modify_parset_from_fields_task
(parsets_dict={}, fields_task=None, time_avg=8, freq_avg=2, flags=None, **context)[source]¶ Takes a dict of ‘original’ parsets and the task with the fields information which will be used to update the averaging paremetes. Returns a dict of the ‘name’:’location’ of the modified parsets, so they can be used by the upload tokens task
-
AGLOW.airflow.utils.AGLOW_utils.
set_field_status
(fields_file, cal_OBSID, targ_OBSID, field_name, status)[source]¶
-
AGLOW.airflow.utils.AGLOW_utils.
set_field_status_from_task_return
(fields_file, task_id, status_task, **context)[source]¶ Sets the field status based on the (String) reuturned by the status_task variable