Creating flow.py and tasks.py for a New Publisher
Directory Structure
- Top-Level Directories:
- Create a directory named after the publisher within a main repository folder (e.g.,
copernicus/mdpi). -
For each publisher, include a
flow.pyand atasks.py.flow.py: Manages the workflow logic.tasks.py: Contains publisher-specific functions for data handling and processing.
-
Common and Shared Directories:
common: Includes shared utilities and configurations, such as:configuration.pyfor loading configurations.directory.pyfor managing directories.
utils.py: General utility functions reusable across flows.exceptions.py: Custom exceptions for error handling.-
constants.py: Stores widely used constants for easy management (real values to be stored in NAS directory). -
Publisher-Specific Directories:
- Each publisher has its own folder within
harvesting(e.g.,mdpi,copernicus). -
Publisher folders should include:
flow.py: The main workflow file that coordinates tasks.tasks.py: Contains functions specific to the publisher’s workflow, such as fetching data or file management.
-
Support Files and Configurations:
- Config Files: configuration files in jsin format for defining workflow details (to be stored in NAS).
Creating flow.py and tasks.py
1. flow.py
- Import necessary modules, such as Prefect, logging tools, and custom utilities from
common. - Define the flow using
@flowwith a publisher-specific name. - Load configurations with
common/configuration.py. - Set up logging and initialize the workflow context, including module name and paths.
- Call functions from
tasks.pyin a logical order, such as:- Fetch metadata.
- Synchronize files.
- Filter records for processing.
- Handle errors and log details throughout the workflow.
- Conclude with summary logs on processed data.
2. tasks.py
- Define functions that handle specific tasks for the publisher, such as:
fetch_metadata(): Download metadata required for processing.sync_files(): Synchronize files from a remote server.filter_records(): Filter data to include relevant records only.process_file(): Handle file extraction and validation.
- Use
utils.pyfor repetitive actions, like directory creation or timestamp formatting. - Each function should include logging and error handling for easier debugging.
General Practices Followed throughout
- Modular Design: Ensure each function focuses on a single task to facilitate maintenance.
- Logging: Use structured logging for detailed execution information.
- Error Handling: Implement custom exceptions for publisher-specific issues, such as missing files.
- Documentation: Add comments and docstrings to explain the purpose and usage of functions, especially for publisher-specific nuances.
This guide helps create a scalable solution that maintains consistency while allowing for customization according to different publisher workflows.
Detailed steps on creating flow.py
- Import all necessary packages (see MDPI for reference)
- Define the flow using
@flowwith a publisher-specific name - Initialize logger, get date-time information , get module name using
contextpackage of prefect and save all these to variables with appropriate names. - Obtain the base_path (choose the data root(base path) correctly, based on whether it is the backed-up NAS or the not-backed-up NAS) using
utilsand set the module context using theget_module_contextof common directory. Also obtain and store the previous_module_context usingget_previous_moduleof the common directory. - Obtain the ISSN list by using
common.configuration._config_list_to_issn_list()and giving the module_context as input. - Now, we can complete the
flow.pyby calling functions (from tasks.py) that will perform specific tasks like syncing tocs from the publisher ftp (or from a specific folder that is regularily updated through automated script). (These fuctions can be defined later in tasks.py) -
If fetching files directly from ftp:
- create an issn-isbn mapping using a
select_journals_to_harvestfunction (that can be later defined in tasks) - filter the issn-isbn mapping only for journals listed in the config file
- Now, sync the full text from publisher ftp for the required files
- create an issn-isbn mapping using a
-
Now, iterate over each ISSN in the ISSN list and :
- Set the journal directory path using
utils.get_journal_dir_path() - Incase, a copy of the data has to be made and that has to be used for further processing, set the journal directory using
utils.work_on_folder_copy(journal_path, test_copy) - Initialize all variables, including ones needed for logging. (see : MDPI flow.py, for reference on the variables used). Update variable values accordingly as the script progresses.
-
In another loop, scan each subdirectory within the journal path representing an article and :
-
Delete temporary directories created by earlier failed runs using
utils.delete_temp_module_directories() -
Retrieve the latest previous module using
utils.get_previous_module_latest(), now check if the previous module was updated usingutils.is_previous_module_updated()and whether it is marked "not to be processed" usingutils.is_previous_module_marked_not_be_processed(). If the previous module is updated and the module is NOT marked "not_to_be_processed", proceed. -
Check if the full text for the article has already been harvested using
is_full_text_already_harvested()(can be later defined in tasks.py). And, skip the processing if already harvested. -
If the full text has not been harvested (), copy content from the previous module using utils.copy_content_from_previous_module()
-
Create a temporary module for processing the current module using
utils.get_module_temp_directory() -
Gather the tarball info and extract it. Write 2 functions for this,
TarballInfoandextract_files_from_tarin tasks.py -
Accomodate an error handling scenario, incase the tarball extraction is not succesfull. Examine the error code to see if the error is one of the following:
- Tar Ball not found
- PDF or XML missing
- Both PDF and XML missing
-
Update the corresponding variables and log the error details
-
If both the PDF and XML are missing, remove the current_module_temp_dir using
shutil.rmtree(current_module_temp_dir)
-
- Set the journal directory path using
Final Steps :
- As the last steps of the for loop which iterates through the directory, perform the following steps.
- Add documentation using
documentation.add_documentation()with metadata such as Date, agent, task, description, input/output details etc. - Rename the temporary module directory using
utils.rename_module_temp_directory() - Delete contents of the previous module if processing is successful, using
delete_contents_previous_module() -
Mark the previous module as processed using utils.mark_previous_module_processed()
-
Outside the directory for-loop, as the final steps of the for loop (outer) which iterates through the issn list, using the defined and update variable values, log a summary of articles processed, skipped, or with errors. Also log, specific warnings and errors for missing files or unprocessed articles
Main Entry Point
At the bottom of flow.py, add :
if __name__ == '__main__':
publisher_name()
To ensure that the flow can be executed directly for testing purposes in a local environment.