Zum Inhalt

Creating flow.py and tasks.py for a New Publisher

Directory Structure

  1. Top-Level Directories:
  2. Create a directory named after the publisher within a main repository folder (e.g., copernicus/mdpi).
  3. For each publisher, include a flow.py and a tasks.py.

    • flow.py: Manages the workflow logic.
    • tasks.py: Contains publisher-specific functions for data handling and processing.
  4. Common and Shared Directories:

  5. common: Includes shared utilities and configurations, such as:
    • configuration.py for loading configurations.
    • directory.py for managing directories.
  6. utils.py: General utility functions reusable across flows.
  7. exceptions.py: Custom exceptions for error handling.
  8. constants.py: Stores widely used constants for easy management (real values to be stored in NAS directory).

  9. Publisher-Specific Directories:

  10. Each publisher has its own folder within harvesting (e.g., mdpi, copernicus).
  11. Publisher folders should include:

    • flow.py: The main workflow file that coordinates tasks.
    • tasks.py: Contains functions specific to the publisher’s workflow, such as fetching data or file management.
  12. Support Files and Configurations:

  13. Config Files: configuration files in jsin format for defining workflow details (to be stored in NAS).

Creating flow.py and tasks.py

1. flow.py

  • Import necessary modules, such as Prefect, logging tools, and custom utilities from common.
  • Define the flow using @flow with a publisher-specific name.
  • Load configurations with common/configuration.py.
  • Set up logging and initialize the workflow context, including module name and paths.
  • Call functions from tasks.py in a logical order, such as:
    • Fetch metadata.
    • Synchronize files.
    • Filter records for processing.
  • Handle errors and log details throughout the workflow.
  • Conclude with summary logs on processed data.

2. tasks.py

  • Define functions that handle specific tasks for the publisher, such as:
    • fetch_metadata(): Download metadata required for processing.
    • sync_files(): Synchronize files from a remote server.
    • filter_records(): Filter data to include relevant records only.
    • process_file(): Handle file extraction and validation.
  • Use utils.py for repetitive actions, like directory creation or timestamp formatting.
  • Each function should include logging and error handling for easier debugging.

General Practices Followed throughout

  • Modular Design: Ensure each function focuses on a single task to facilitate maintenance.
  • Logging: Use structured logging for detailed execution information.
  • Error Handling: Implement custom exceptions for publisher-specific issues, such as missing files.
  • Documentation: Add comments and docstrings to explain the purpose and usage of functions, especially for publisher-specific nuances.

This guide helps create a scalable solution that maintains consistency while allowing for customization according to different publisher workflows.

Detailed steps on creating flow.py

  • Import all necessary packages (see MDPI for reference)
  • Define the flow using @flow with a publisher-specific name
  • Initialize logger, get date-time information , get module name using context package of prefect and save all these to variables with appropriate names.
  • Obtain the base_path (choose the data root(base path) correctly, based on whether it is the backed-up NAS or the not-backed-up NAS) using utils and set the module context using the get_module_context of common directory. Also obtain and store the previous_module_context using get_previous_module of the common directory.
  • Obtain the ISSN list by using common.configuration._config_list_to_issn_list() and giving the module_context as input.
  • Now, we can complete the flow.py by calling functions (from tasks.py) that will perform specific tasks like syncing tocs from the publisher ftp (or from a specific folder that is regularily updated through automated script). (These fuctions can be defined later in tasks.py)
  • If fetching files directly from ftp:

    • create an issn-isbn mapping using a select_journals_to_harvest function (that can be later defined in tasks)
    • filter the issn-isbn mapping only for journals listed in the config file
    • Now, sync the full text from publisher ftp for the required files
  • Now, iterate over each ISSN in the ISSN list and :

    • Set the journal directory path using utils.get_journal_dir_path()
    • Incase, a copy of the data has to be made and that has to be used for further processing, set the journal directory using utils.work_on_folder_copy(journal_path, test_copy)
    • Initialize all variables, including ones needed for logging. (see : MDPI flow.py, for reference on the variables used). Update variable values accordingly as the script progresses.
    • In another loop, scan each subdirectory within the journal path representing an article and :

      • Delete temporary directories created by earlier failed runs using utils.delete_temp_module_directories()

      • Retrieve the latest previous module using utils.get_previous_module_latest(), now check if the previous module was updated using utils.is_previous_module_updated() and whether it is marked "not to be processed" using utils.is_previous_module_marked_not_be_processed(). If the previous module is updated and the module is NOT marked "not_to_be_processed", proceed.

      • Check if the full text for the article has already been harvested using is_full_text_already_harvested() (can be later defined in tasks.py). And, skip the processing if already harvested.

      • If the full text has not been harvested (), copy content from the previous module using utils.copy_content_from_previous_module()

      • Create a temporary module for processing the current module using utils.get_module_temp_directory()

      • Gather the tarball info and extract it. Write 2 functions for this, TarballInfo and extract_files_from_tar in tasks.py

      • Accomodate an error handling scenario, incase the tarball extraction is not succesfull. Examine the error code to see if the error is one of the following:

        • Tar Ball not found
        • PDF or XML missing
        • Both PDF and XML missing
      • Update the corresponding variables and log the error details

      • If both the PDF and XML are missing, remove the current_module_temp_dir using shutil.rmtree(current_module_temp_dir)

Final Steps :

  • As the last steps of the for loop which iterates through the directory, perform the following steps.
  • Add documentation using documentation.add_documentation() with metadata such as Date, agent, task, description, input/output details etc.
  • Rename the temporary module directory using utils.rename_module_temp_directory()
  • Delete contents of the previous module if processing is successful, using delete_contents_previous_module()
  • Mark the previous module as processed using utils.mark_previous_module_processed()

  • Outside the directory for-loop, as the final steps of the for loop (outer) which iterates through the issn list, using the defined and update variable values, log a summary of articles processed, skipped, or with errors. Also log, specific warnings and errors for missing files or unprocessed articles

Main Entry Point

At the bottom of flow.py, add :
if __name__ == '__main__': publisher_name() To ensure that the flow can be executed directly for testing purposes in a local environment.