diff --git a/.github/workflows/base-run-bulk-upload.yml b/.github/workflows/base-run-bulk-upload.yml index 48c08af40e..65d348c6d8 100644 --- a/.github/workflows/base-run-bulk-upload.yml +++ b/.github/workflows/base-run-bulk-upload.yml @@ -35,6 +35,21 @@ on: required: false type: "string" default: "3" + lambda_type: + description: "Which lambda to trigger (BulkUploadMetadataLambda or BulkUploadMetadataProcessor)" + required: false + type: "string" + default: "BulkUploadMetadataLambda" + run_bulk_upload_setup: + description: "Run bulk upload setup step" + required: false + type: "boolean" + default: true + run_document_review_setup: + description: "Run document review setup step" + required: false + type: "boolean" + default: true secrets: AWS_ASSUME_ROLE: required: true @@ -67,6 +82,7 @@ jobs: working-directory: ./tests/bulk-upload/scripts - name: Setup Bulk Upload + if: ${{ inputs.run_bulk_upload_setup }} run: | python setup_bulk_upload.py \ --environment "${{ inputs.sandbox }}" \ @@ -80,6 +96,7 @@ jobs: working-directory: ./tests/bulk-upload/scripts - name: Setup Document Review + if: ${{ inputs.run_document_review_setup }} run: | python setup_document_review.py working-directory: ./tests/bulk-upload/scripts @@ -90,5 +107,6 @@ jobs: run: | python run_bulk_upload.py \ --environment "${{ inputs.sandbox }}" \ + --lambda-type "${{ inputs.lambda_type }}" \ --start-bulk-upload working-directory: ./tests/bulk-upload/scripts diff --git a/.github/workflows/full-deploy-to-sandbox.yml b/.github/workflows/full-deploy-to-sandbox.yml index 078a517da9..bf5621cf20 100644 --- a/.github/workflows/full-deploy-to-sandbox.yml +++ b/.github/workflows/full-deploy-to-sandbox.yml @@ -25,6 +25,14 @@ on: required: true type: boolean default: false + lambda_type: + description: "Which lambda to trigger for bulk upload (only applies if bulk_upload is true)" + required: false + type: choice + options: + - "Bulk upload metadata" + - "NEW bulk metadata processor" + default: "Bulk upload metadata" disable_pds: description: "Do you want to disable the PDS stub?" required: true @@ -169,5 +177,6 @@ jobs: combi_settings: "combi300" base_branch: ${{ inputs.build_branch }} file_count: "3" + lambda_type: ${{ inputs.lambda_type == 'NEW bulk metadata processor' && 'BulkUploadMetadataProcessor' || 'BulkUploadMetadataLambda' }} secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/.github/workflows/run-bulk-upload-dev.yml b/.github/workflows/run-bulk-upload-dev.yml index 02732631a3..ce064cb2eb 100644 --- a/.github/workflows/run-bulk-upload-dev.yml +++ b/.github/workflows/run-bulk-upload-dev.yml @@ -1,6 +1,6 @@ name: Run Bulk Upload - Dev -run-name: "${{ github.event.inputs.sandbox }} | ${{ github.event.inputs.combi_settings }} | ${{ github.event.inputs.file_count }}" +run-name: "${{ github.event.inputs.sandbox }} | ${{ github.event.inputs.combi_settings }} | ${{ github.event.inputs.file_count }} | ${{ github.event.inputs.lambda_type }}" permissions: pull-requests: write @@ -27,6 +27,14 @@ on: required: true type: "string" default: "3" + lambda_type: + description: "Which lambda to trigger" + required: true + type: choice + options: + - "BulkUploadMetadataLambda" + - "BulkUploadMetadataProcessor" + default: "BulkUploadMetadataLambda" jobs: bulk_upload: @@ -36,5 +44,6 @@ jobs: sandbox: "${{ inputs.sandbox }}" combi_settings: "${{ inputs.combi_settings }}" file_count: "${{ inputs.file_count }}" + lambda_type: "${{ inputs.lambda_type }}" secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/lambdas/handlers/bulk_upload_metadata_processor_handler.py b/lambdas/handlers/bulk_upload_metadata_processor_handler.py index 4df3b8bc6d..450f770c7e 100644 --- a/lambdas/handlers/bulk_upload_metadata_processor_handler.py +++ b/lambdas/handlers/bulk_upload_metadata_processor_handler.py @@ -1,8 +1,10 @@ +from enums.feature_flags import FeatureFlags from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat from services.bulk_upload_metadata_processor_service import ( BulkUploadMetadataProcessorService, get_formatter_service, ) +from services.feature_flags_service import FeatureFlagService from services.metadata_mapping_validator_service import MetadataMappingValidatorService from utils.audit_logging_setup import LoggingService from utils.decorators.ensure_env_var import ensure_environment_variables @@ -20,6 +22,17 @@ ) @handle_lambda_exceptions def lambda_handler(event, _context): + feature_flag_service = FeatureFlagService() + send_to_review_flag_object = feature_flag_service.get_feature_flags_by_flag( + FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value + ) + send_to_review_enabled = send_to_review_flag_object[ + FeatureFlags.BULK_UPLOAD_SEND_TO_REVIEW_ENABLED.value + ] + + if send_to_review_enabled: + logger.info("Bulk upload send to review queue is enabled for metadata processor") + raw_pre_format_type = event.get( "preFormatType", LloydGeorgePreProcessFormat.GENERAL ) @@ -32,6 +45,7 @@ def lambda_handler(event, _context): metadata_formatter_service=metadata_formatter_service, metadata_heading_remap=remappings, input_file_location=input_file_location, + send_to_review_enabled=send_to_review_enabled, ) if "source" in event and event.get("source") == "aws.s3": @@ -60,6 +74,7 @@ def lambda_handler(event, _context): metadata_formatter_service=metadata_formatter_service, metadata_heading_remap=remappings, fixed_values=fixed_values, - input_file_location=input_file_location + input_file_location=input_file_location, + send_to_review_enabled=send_to_review_enabled, ) metadata_service.process_metadata() diff --git a/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py b/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py index 78f738f411..e987c34e40 100644 --- a/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py +++ b/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py @@ -16,8 +16,8 @@ class BulkUploadSqsRepository: def __init__(self): self.sqs_repository = SQSService() - self.metadata_queue_url = os.environ["METADATA_SQS_QUEUE_URL"] - self.review_queue_url = os.environ["REVIEW_SQS_QUEUE_URL"] + self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL") + self.review_queue_url = os.getenv("REVIEW_SQS_QUEUE_URL") def put_staging_metadata_back_to_queue(self, staging_metadata: StagingSqsMetadata): request_context.patient_nhs_no = staging_metadata.nhs_number @@ -40,7 +40,7 @@ def send_message_to_review_queue( review_files = [ ReviewMessageFile( file_name=file.stored_file_name.split("/")[-1], - file_path=file.file_path, + file_path=file.file_path.lstrip("/"), ) for file in staging_metadata.files ] diff --git a/lambdas/services/bulk_upload/metadata_general_preprocessor.py b/lambdas/services/bulk_upload/metadata_general_preprocessor.py index afec1cae2f..027974aab4 100644 --- a/lambdas/services/bulk_upload/metadata_general_preprocessor.py +++ b/lambdas/services/bulk_upload/metadata_general_preprocessor.py @@ -19,45 +19,41 @@ class MetadataGeneralPreprocessor(MetadataPreprocessorService): def validate_record_filename(self, file_name: str, *args, **kwargs) -> str: - try: - file_path_prefix, current_file_name = ( - extract_document_path_for_lloyd_george_record(file_name) - ) - first_document_number, second_document_number, current_file_name = ( - extract_document_number_bulk_upload_file_name(current_file_name) - ) - current_file_name = extract_lloyd_george_record_from_bulk_upload_file_name( - current_file_name - ) - patient_name, current_file_name = ( - extract_patient_name_from_bulk_upload_file_name(current_file_name) - ) + file_path_prefix, current_file_name = ( + extract_document_path_for_lloyd_george_record(file_name) + ) + first_document_number, second_document_number, current_file_name = ( + extract_document_number_bulk_upload_file_name(current_file_name) + ) + current_file_name = extract_lloyd_george_record_from_bulk_upload_file_name( + current_file_name + ) + patient_name, current_file_name = ( + extract_patient_name_from_bulk_upload_file_name(current_file_name) + ) - if sum(c.isdigit() for c in current_file_name) != 18: - logger.info("Failed to find NHS number or date") - raise InvalidFileNameException("Incorrect NHS number or date format") + if sum(c.isdigit() for c in current_file_name) != 18: + logger.info("Failed to find NHS number or date") + raise InvalidFileNameException("Incorrect NHS number or date format") - nhs_number, current_file_name = ( - extract_nhs_number_from_bulk_upload_file_name(current_file_name) - ) - date, current_file_name = extract_date_from_bulk_upload_file_name( - current_file_name - ) - file_extension = extract_file_extension_from_bulk_upload_file_name( - current_file_name - ) - file_name = assemble_lg_valid_file_name_full_path( - file_path_prefix, - first_document_number, - second_document_number, - patient_name, - nhs_number, - date, - file_extension, - ) - logger.info(f"Finished processing, new file name is: {file_name}") - return file_name + nhs_number, current_file_name = ( + extract_nhs_number_from_bulk_upload_file_name(current_file_name) + ) + date, current_file_name = extract_date_from_bulk_upload_file_name( + current_file_name + ) + file_extension = extract_file_extension_from_bulk_upload_file_name( + current_file_name + ) + file_name = assemble_lg_valid_file_name_full_path( + file_path_prefix, + first_document_number, + second_document_number, + patient_name, + nhs_number, + date, + file_extension, + ) + logger.info(f"Finished processing, new file name is: {file_name}") + return file_name - except InvalidFileNameException as error: - logger.error(f"Failed to process {file_name} due to error: {error}") - raise error diff --git a/lambdas/services/bulk_upload_metadata_processor_service.py b/lambdas/services/bulk_upload_metadata_processor_service.py index 80aaa8a2b1..7fd4aa8141 100644 --- a/lambdas/services/bulk_upload_metadata_processor_service.py +++ b/lambdas/services/bulk_upload_metadata_processor_service.py @@ -11,6 +11,7 @@ import pydantic from botocore.exceptions import ClientError +from enums.document_review_reason import DocumentReviewReason from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat from enums.upload_status import UploadStatus from enums.virus_scan_result import VirusScanResult @@ -23,6 +24,7 @@ BulkUploadDynamoRepository, ) from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository +from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository from services.base.s3_service import S3Service from services.base.sqs_service import SQSService from services.bulk_upload.metadata_general_preprocessor import ( @@ -58,6 +60,7 @@ def __init__( metadata_heading_remap: dict, input_file_location: str = "", fixed_values: dict = None, + send_to_review_enabled: bool = False, ): self.staging_bucket_name = os.getenv("STAGING_STORE_BUCKET_NAME") self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL") @@ -65,6 +68,7 @@ def __init__( self.s3_service = S3Service() self.sqs_service = SQSService() self.dynamo_repository = BulkUploadDynamoRepository() + self.sqs_repository = BulkUploadSqsRepository() self.s3_repo = BulkUploadS3Repository() self.virus_scan_service = get_virus_scan_service() @@ -77,6 +81,7 @@ def __init__( self.metadata_mapping_validator_service = MetadataMappingValidatorService() self.metadata_formatter_service = metadata_formatter_service + self.send_to_review_enabled = send_to_review_enabled def download_metadata_from_s3(self) -> str: local_file_path = f"{self.temp_download_dir}/{self.file_key.split('/')[-1]}" @@ -131,15 +136,16 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]: patients: defaultdict[tuple[str, str], list[BulkUploadQueueMetadata]] = ( defaultdict(list) ) + failed_files: defaultdict[tuple[str, str], list[BulkUploadQueueMetadata]] = ( + defaultdict(list) + ) with open( csv_file_path, mode="r", encoding="utf-8-sig", errors="replace" ) as csv_file: csv_reader = csv.DictReader(csv_file) if csv_reader.fieldnames is None: - raise BulkUploadMetadataException( - f"Metdata file is empty or missing headers." - ) + raise BulkUploadMetadataException("Metadata file is empty or missing headers.") headers = [h.strip() for h in csv_reader.fieldnames] records = list(csv_reader) @@ -166,7 +172,14 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]: ) for row in validated_rows: - self.process_metadata_row(row, patients) + self.process_metadata_row(row, patients, failed_files) + + if self.send_to_review_enabled: + self.send_failed_files_to_review_queue(failed_files) + else: + logger.info( + f"Send to review is disabled. Skipping review queue for {len(failed_files)} failed file" + ) return [ StagingSqsMetadata(nhs_number=nhs_number, files=files) @@ -174,9 +187,11 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]: ] def process_metadata_row( - self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]] + self, row: dict, + patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]], + failed_files: dict[tuple[str, str], list[BulkUploadQueueMetadata]] ) -> None: - """Validate individual file metadata and attach to patient group.""" + """Validate individual file metadata and attach to a patient group.""" file_metadata = MetadataFile.model_validate(row) if self.fixed_values: @@ -187,15 +202,22 @@ def process_metadata_row( try: correct_file_name = self.validate_and_correct_filename(file_metadata) except InvalidFileNameException as error: - self.handle_invalid_filename(file_metadata, error, nhs_number) + self.handle_invalid_filename(file_metadata, error, nhs_number, ods_code, failed_files) return sqs_metadata = self.convert_to_sqs_metadata(file_metadata, correct_file_name) - sqs_metadata.file_path = self.file_key.rsplit("/", 1)[0] + "/" + sqs_metadata.file_path.lstrip("/") + + sqs_metadata.file_path = self.add_directory_path_to_file_path(file_metadata) + patients[(nhs_number, ods_code)].append(sqs_metadata) - def apply_fixed_values(self, file_metadata: MetadataFile) -> MetadataFile: + def add_directory_path_to_file_path(self, file_metadata): + if "/" in self.file_key: + directory_path = self.file_key.rsplit("/", 1)[0] + return directory_path + "/" + file_metadata.file_path.lstrip("/") + return file_metadata.file_path.lstrip("/") + def apply_fixed_values(self, file_metadata: MetadataFile) -> MetadataFile: metadata_dict = file_metadata.model_dump(by_alias=True) for field_name, fixed_value in self.fixed_values.items(): @@ -215,7 +237,7 @@ def convert_to_sqs_metadata( ) def create_expedite_sqs_metadata(self, key) -> StagingSqsMetadata: - """Build a single-patient SQS metadata payload for an expedite upload.""" + """Build a single-patient SQS metadata payload for an expedited upload.""" nhs_number, file_path, ods_code, scan_date = self.validate_expedite_file(key) return StagingSqsMetadata( nhs_number=nhs_number, @@ -235,7 +257,7 @@ def extract_patient_info(file_metadata: MetadataFile) -> tuple[str, str]: return file_metadata.nhs_number, file_metadata.gp_practice_code def validate_and_correct_filename(self, file_metadata: MetadataFile) -> str: - """Validate and normalize file name.""" + """Validate and normalise file name.""" try: validate_file_name(file_metadata.file_path.split("/")[-1]) return file_metadata.file_path @@ -267,7 +289,7 @@ def validate_expedite_file(self, s3_object_key: str): return nhs_number, file_name, ods_code, scan_date def handle_expedite_event(self, event): - """Process S3 EventBridge expedite uploads: enforce virus scan, ensure 1of1, extract identifiers + """Process S3 EventBridge expedite uploads: enforce virus scan, ensure 1of1, extract identifiers, and send metadata to SQS.""" try: unparsed_s3_object_key = event["detail"]["object"]["key"] @@ -299,19 +321,39 @@ def handle_invalid_filename( file_metadata: MetadataFile, error: InvalidFileNameException, nhs_number: str, + ods_code: str, + failed_files: dict[tuple[str, str], list[BulkUploadQueueMetadata]] ) -> None: - """Handle invalid filenames by logging and storing failure in Dynamo.""" + """Handle invalid filenames by logging, storing failure in Dynamo, and tracking for review.""" logger.error( f"Failed to process {file_metadata.file_path} due to error: {error}" ) failed_file = self.convert_to_sqs_metadata( file_metadata, file_metadata.file_path ) + failed_file.file_path = self.add_directory_path_to_file_path(file_metadata) + failed_files[(nhs_number, ods_code)].append(failed_file) + failed_entry = StagingSqsMetadata(nhs_number=nhs_number, files=[failed_file]) self.dynamo_repository.write_report_upload_to_dynamo( failed_entry, UploadStatus.FAILED, str(error) ) + def send_failed_files_to_review_queue( + self, + failed_files: dict[tuple[str, str], list[BulkUploadQueueMetadata]] + ) -> None: + for (nhs_number, ods_code), files in failed_files.items(): + staging_metadata = StagingSqsMetadata(nhs_number=nhs_number, files=files) + logger.info( + f"Sending {len(files)} failed file(s) to review queue for NHS number: {nhs_number}" + ) + self.sqs_repository.send_message_to_review_queue( + staging_metadata=staging_metadata, + uploader_ods=ods_code, + failure_reason=DocumentReviewReason.UNSUCCESSFUL_UPLOAD, + ) + def send_metadata_to_fifo_sqs( self, staging_sqs_metadata_list: list[StagingSqsMetadata] ) -> None: @@ -354,7 +396,7 @@ def copy_metadata_to_dated_folder(self): self.s3_service.delete_object(self.staging_bucket_name, self.file_key) def clear_temp_storage(self): - """Delete temporary working directory.""" + """Delete the temporary working directory.""" logger.info("Clearing temp storage directory") try: shutil.rmtree(self.temp_download_dir) diff --git a/lambdas/services/document_review_processor_service.py b/lambdas/services/document_review_processor_service.py index e1ac005056..ee2881cf50 100644 --- a/lambdas/services/document_review_processor_service.py +++ b/lambdas/services/document_review_processor_service.py @@ -16,10 +16,10 @@ from utils.exceptions import ( InvalidResourceIdException, PatientNotFoundException, - PdsErrorException, + PdsErrorException, InvalidNhsNumberException, ) from utils.request_context import request_context -from utils.utilities import get_pds_service +from utils.utilities import get_pds_service, validate_nhs_number logger = LoggingService(__name__) @@ -66,15 +66,16 @@ def _get_patient_custodian(self, review_message: ReviewMessageBody) -> str: "No valid NHS number found in message. Using uploader ODS as custodian" ) return review_message.uploader_ods + validate_nhs_number(review_message.nhs_number) pds_service = get_pds_service() patient_details = pds_service.fetch_patient_details( review_message.nhs_number ) return patient_details.general_practice_ods - except (PdsErrorException, InvalidResourceIdException): + except PdsErrorException: logger.info("Error when searching PDS. Using uploader ODS as custodian") return review_message.uploader_ods - except PatientNotFoundException: + except (PatientNotFoundException, InvalidResourceIdException, InvalidNhsNumberException): logger.info( "Patient not found in PDS. Using uploader ODS as custodian, and nhs number placeholder" ) diff --git a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py index d5a7da797a..603f4b0d69 100644 --- a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py +++ b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py @@ -26,7 +26,7 @@ def eventbridge_event_with_s3_key(key: str): def test_metadata_processor_lambda_handler_valid_event( - set_env, context, mock_metadata_service + set_env, context, mock_metadata_service, mock_validation_strict_enabled_send_to_review_disabled ): lambda_handler({"inputFileLocation": "test"}, context) @@ -34,7 +34,7 @@ def test_metadata_processor_lambda_handler_valid_event( def test_metadata_processor_lambda_handler_empty_event( - set_env, context, mock_metadata_service + set_env, context, mock_metadata_service, mock_validation_strict_enabled_send_to_review_disabled ): lambda_handler({}, context) @@ -42,7 +42,7 @@ def test_metadata_processor_lambda_handler_empty_event( def test_metadata_processor_lambda_handler_s3_event_triggers_expedite( - set_env, context, mock_metadata_service + set_env, context, mock_metadata_service, mock_validation_strict_enabled_send_to_review_disabled ): event = { "source": "aws.s3", @@ -60,7 +60,7 @@ def test_metadata_processor_lambda_handler_s3_event_triggers_expedite( def test_s3_event_with_non_expedite_key_is_rejected( - set_env, context, mock_metadata_service, caplog + set_env, context, mock_metadata_service, caplog, mock_validation_strict_enabled_send_to_review_disabled ): key_string = "uploads/1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" event = eventbridge_event_with_s3_key(key_string) @@ -72,7 +72,7 @@ def test_s3_event_with_non_expedite_key_is_rejected( mock_metadata_service.process_metadata.assert_not_called() -def test_s3_event_with_expedite_key_processes(set_env, context, mock_metadata_service): +def test_s3_event_with_expedite_key_processes(set_env, context, mock_metadata_service, mock_validation_strict_enabled_send_to_review_disabled): event = eventbridge_event_with_s3_key( "expedite%2F1of1_Lloyd_George_Record_[John Michael SMITH]_[1234567890]_[15-05-1990].pdf" ) diff --git a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py index 987f9b713e..05abfaf60e 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py @@ -52,7 +52,7 @@ MOCK_TEMP_FOLDER = "tests/unit/helpers/data/bulk_upload" SERVICE_PATH = "services.bulk_upload_metadata_processor_service" - +TEST_MOCK_METADATA_CSV = "path/to/metadata.csv" class MockMetadataPreprocessorService(MetadataPreprocessorService): def validate_record_filename(self, original_filename: str, *args, **kwargs) -> str: @@ -70,16 +70,20 @@ def test_service(mocker, set_env, mock_tempfile): mocker.patch( "services.bulk_upload_metadata_processor_service.BulkUploadS3Repository" ) + mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadSqsRepository" + ) mocker.patch( "services.bulk_upload_metadata_processor_service.get_virus_scan_service" ) service = BulkUploadMetadataProcessorService( metadata_formatter_service=MockMetadataPreprocessorService( - practice_directory="test_practice_directory" + practice_directory=TEST_MOCK_METADATA_CSV ), metadata_heading_remap={}, - input_file_location="test_input_file_location", + input_file_location=TEST_MOCK_METADATA_CSV, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") @@ -178,7 +182,7 @@ def test_process_metadata_catch_and_log_error_when_fail_to_get_metadata_csv_from mock_s3_service.download_file.side_effect = ClientError( {"Error": {"Code": "403", "Message": "Forbidden"}}, "S3:HeadObject" ) - expected_err_msg = 'Could not retrieve the following metadata file: test_input_file_location' + expected_err_msg = f'Could not retrieve the following metadata file: {TEST_MOCK_METADATA_CSV}' with pytest.raises(BulkUploadMetadataException) as e: test_service.process_metadata() @@ -279,7 +283,7 @@ def test_download_metadata_from_s3(mock_s3_service, test_service): expected_file_key = test_service.file_key expected_download_path = os.path.join( - test_service.temp_download_dir, expected_file_key + test_service.temp_download_dir, expected_file_key.split('/')[-1] ) mock_s3_service.download_file.assert_called_once_with( @@ -299,41 +303,11 @@ def test_download_metadata_from_s3_raise_error_when_failed_to_download( "s3_get_object", ) with pytest.raises(BulkUploadMetadataException, - match=r"Could not retrieve the following metadata file: test_input_file_location"): + match=f"Could not retrieve the following metadata file: {TEST_MOCK_METADATA_CSV}"): test_service.download_metadata_from_s3() -class TestMetadataPreprocessorService(MetadataPreprocessorService): - __test__ = False - - def validate_record_filename(self, original_filename: str, *args, **kwargs) -> str: - return original_filename - - -@pytest.fixture -def bulk_upload_service(mocker, set_env, mock_tempfile): - mocker.patch("services.bulk_upload_metadata_processor_service.S3Service") - mocker.patch("services.bulk_upload_metadata_processor_service.SQSService") - mocker.patch( - "services.bulk_upload_metadata_processor_service.BulkUploadDynamoRepository" - ) - mocker.patch( - "services.bulk_upload_metadata_processor_service.BulkUploadS3Repository" - ) - mocker.patch( - "services.bulk_upload_metadata_processor_service.get_virus_scan_service" - ) - - return BulkUploadMetadataProcessorService( - metadata_formatter_service=TestMetadataPreprocessorService( - practice_directory="test_practice_directory" - ), - metadata_heading_remap={}, - input_file_location="test_input_file_location", - ) - - -def test_duplicates_csv_to_sqs_metadata(mocker, bulk_upload_service): +def test_duplicates_csv_to_sqs_metadata(mocker, test_service): header = "FILEPATH,PAGE COUNT,GP-PRACTICE-CODE,NHS-NO,SECTION,SUB-SECTION,SCAN-DATE,SCAN-ID,USER-ID,UPLOAD" line1 = ( '/1234567890/1of2_Lloyd_George_Record_[Joe Bloggs]_[1234567890]_[25-12-2019].pdf,"","Y12345",' @@ -376,18 +350,18 @@ def test_duplicates_csv_to_sqs_metadata(mocker, bulk_upload_service): mocker.patch("os.path.isfile", return_value=True) mocker.patch.object( - bulk_upload_service.metadata_mapping_validator_service, + test_service.metadata_mapping_validator_service, "validate_and_normalize_metadata", side_effect=lambda records, fixed_values, remappings: (records, [], []), ) - actual = bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") + actual = test_service.csv_to_sqs_metadata("fake/path.csv") expected = copy.deepcopy(EXPECTED_PARSED_METADATA_2) for metadata in expected: for file in metadata.files: file.file_path = ( - f"test_input_file_location/{file.stored_file_name.lstrip('/')}" + f"{TEST_MOCK_METADATA_CSV.rsplit('/', 1)[0]}/{file.stored_file_name.lstrip('/')}" ) assert actual == expected @@ -444,6 +418,7 @@ def test_clear_temp_storage(set_env, mocker, mock_tempfile, test_service): def test_process_metadata_row_success(mocker, test_service): patients = defaultdict(list) + failed_files = defaultdict(list) row = { "FILEPATH": "some/path/file.pdf", "GP-PRACTICE-CODE": "Y12345", @@ -468,14 +443,14 @@ def test_process_metadata_row_success(mocker, test_service): "validate_record_filename", return_value="corrected.pdf", ) - test_service.process_metadata_row(row, patients) + test_service.process_metadata_row(row, patients, failed_files) key = ("1234567890", "Y12345") assert key in patients expected_sqs_metadata = BulkUploadQueueMetadata.model_validate( { - "file_path": "test_input_file_location/some/path/file.pdf", + "file_path": "path/to/some/path/file.pdf", "nhs_number": "1234567890", "gp_practice_code": "Y12345", "scan_date": "01/01/2023", @@ -484,20 +459,22 @@ def test_process_metadata_row_success(mocker, test_service): ) assert patients[key] == [expected_sqs_metadata] + assert len(failed_files) == 0 def test_process_metadata_row_adds_to_existing_entry(mocker): key = ("1234567890", "Y12345") mock_metadata_existing = BulkUploadQueueMetadata.model_validate( { - "file_path": "test_practice_directory/some/path/file1.pdf", + "file_path": "some/path/file1.pdf", "nhs_number": "1234567890", "gp_practice_code": "Y12345", "scan_date": "01/01/2023", - "stored_file_name": "test_practice_directory/some/path/file1.pdf", + "stored_file_name": "some/path/file1.pdf", } ) patients = {key: [mock_metadata_existing]} + failed_files = defaultdict(list) row = { "FILEPATH": "/some/path/file2.pdf", @@ -519,21 +496,22 @@ def test_process_metadata_row_adds_to_existing_entry(mocker): return_value=metadata, ) - preprocessor = mocker.Mock(practice_directory="test_practice_directory") + preprocessor = mocker.Mock(practice_directory="test_practice_directory/metadata.csv") preprocessor.validate_record_filename.return_value = "/some/path/file2.pdf" service = BulkUploadMetadataProcessorService( metadata_formatter_service=preprocessor, metadata_heading_remap={}, - input_file_location="test_input_file_location", + input_file_location="test_practice_directory/metadata.csv", + send_to_review_enabled=False, ) - service.process_metadata_row(row, patients) + service.process_metadata_row(row, patients, failed_files) assert len(patients[key]) == 2 assert patients[key][0] == mock_metadata_existing assert isinstance(patients[key][1], BulkUploadQueueMetadata) - assert patients[key][1].file_path == "test_input_file_location/some/path/file2.pdf" + assert patients[key][1].file_path == "test_practice_directory/some/path/file2.pdf" assert patients[key][1].stored_file_name == "/some/path/file2.pdf" @@ -548,6 +526,8 @@ def test_handle_invalid_filename_writes_failed_entry_to_dynamo( mocker, test_service, base_metadata_file ): nhs_number = "1234567890" + ods_code = "Y12345" + failed_files = defaultdict(list) error = InvalidFileNameException("Invalid filename format") mock_staging_metadata = mocker.patch( @@ -558,11 +538,12 @@ def test_handle_invalid_filename_writes_failed_entry_to_dynamo( test_service.dynamo_repository, "write_report_upload_to_dynamo" ) - test_service.handle_invalid_filename(base_metadata_file, error, nhs_number) + test_service.handle_invalid_filename(base_metadata_file, error, nhs_number, ods_code, failed_files) expected_file = test_service.convert_to_sqs_metadata( base_metadata_file, base_metadata_file.file_path ) + expected_file.file_path = f"{TEST_MOCK_METADATA_CSV.rsplit('/', 1)[0]}/{base_metadata_file.file_path}" mock_staging_metadata.assert_called_once_with( nhs_number=nhs_number, @@ -575,6 +556,120 @@ def test_handle_invalid_filename_writes_failed_entry_to_dynamo( str(error), ) + assert (nhs_number, ods_code) in failed_files + assert failed_files[(nhs_number, ods_code)] == [expected_file] + + +def test_csv_to_sqs_metadata_sends_failed_files_to_review_queue_when_enabled( + mocker, set_env, mock_tempfile +): + """Test that failed files are sent to review queue when flag is enabled""" + mocker.patch("services.bulk_upload_metadata_processor_service.S3Service") + mocker.patch("services.bulk_upload_metadata_processor_service.SQSService") + mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadDynamoRepository" + ) + mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadS3Repository" + ) + mock_sqs_repo = mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadSqsRepository" + ) + mocker.patch( + "services.bulk_upload_metadata_processor_service.get_virus_scan_service" + ) + + # Create mock preprocessor that raises InvalidFileNameException + class MockFailingPreprocessor(MetadataPreprocessorService): + def validate_record_filename(self, original_filename: str, *args, **kwargs): + raise InvalidFileNameException("Invalid filename") + + service = BulkUploadMetadataProcessorService( + metadata_formatter_service=MockFailingPreprocessor( + practice_directory="test_practice_directory" + ), + metadata_heading_remap={}, + input_file_location="test_input_file_location", + send_to_review_enabled=True, + ) + + mocker.patch( + "services.bulk_upload_metadata_processor_service.validate_file_name", + side_effect=LGInvalidFilesException("invalid"), + ) + + result = service.csv_to_sqs_metadata(MOCK_METADATA_CSV) + + # Should have sent to review queue + assert mock_sqs_repo.return_value.send_message_to_review_queue.called + assert len(result) == 0 # No valid patients + + +def test_csv_to_sqs_metadata_does_not_send_to_review_when_disabled( + mocker, set_env, mock_tempfile +): + """Test that failed files are NOT sent to review queue when flag is disabled""" + mocker.patch("services.bulk_upload_metadata_processor_service.S3Service") + mocker.patch("services.bulk_upload_metadata_processor_service.SQSService") + mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadDynamoRepository" + ) + mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadS3Repository" + ) + mock_sqs_repo = mocker.patch( + "services.bulk_upload_metadata_processor_service.BulkUploadSqsRepository" + ) + mocker.patch( + "services.bulk_upload_metadata_processor_service.get_virus_scan_service" + ) + + # Create mock preprocessor that raises InvalidFileNameException + class MockFailingPreprocessor(MetadataPreprocessorService): + def validate_record_filename(self, original_filename: str, *args, **kwargs): + raise InvalidFileNameException("Invalid filename") + + service = BulkUploadMetadataProcessorService( + metadata_formatter_service=MockFailingPreprocessor( + practice_directory="test_practice_directory" + ), + metadata_heading_remap={}, + input_file_location="test_input_file_location", + send_to_review_enabled=False, + ) + + mocker.patch( + "services.bulk_upload_metadata_processor_service.validate_file_name", + side_effect=LGInvalidFilesException("invalid"), + ) + + result = service.csv_to_sqs_metadata(MOCK_METADATA_CSV) + + # Should NOT have sent to review queue + assert not mock_sqs_repo.return_value.send_message_to_review_queue.called + assert len(result) == 0 # No valid patients + + +def test_csv_to_sqs_metadata_does_not_send_to_review_when_no_failures(mocker, test_service): + """Test that review queue is not called when there are no failures""" + mock_send_to_review = mocker.patch.object( + test_service.sqs_repository, "send_message_to_review_queue" + ) + + mocker.patch( + "services.bulk_upload_metadata_processor_service.validate_file_name", + return_value=True, + ) + + result = test_service.csv_to_sqs_metadata(MOCK_METADATA_CSV) + + # Should not send to review when all files succeed + assert not mock_send_to_review.called + assert len(result) > 0 # Should have valid patients + + +def test_csv_to_sqs_metadata_groups_multiple_failed_files_by_nhs_number(mocker, test_service): + pass def test_convert_to_sqs_metadata(base_metadata_file): stored_file_name = "corrected_file.pdf" @@ -721,42 +816,42 @@ def mock_csv_content(): return "\n".join([header, *rows]) -def test_csv_to_sqs_metadata_happy_path(mocker, bulk_upload_service, mock_csv_content): +def test_csv_to_sqs_metadata_happy_path(mocker, test_service, mock_csv_content): mocker.patch("builtins.open", mocker.mock_open(read_data=mock_csv_content)) mocker.patch.object( - bulk_upload_service.metadata_mapping_validator_service, + test_service.metadata_mapping_validator_service, "validate_and_normalize_metadata", side_effect=lambda records, fixed_values, remappings: (records, [], []), ) mock_process_metadata_row = mocker.patch.object( - bulk_upload_service, "process_metadata_row" + test_service, "process_metadata_row" ) - result = bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") + result = test_service.csv_to_sqs_metadata("fake/path.csv") - bulk_upload_service.metadata_mapping_validator_service.validate_and_normalize_metadata.assert_called_once() + test_service.metadata_mapping_validator_service.validate_and_normalize_metadata.assert_called_once() assert mock_process_metadata_row.call_count == 2 assert all(isinstance(item, StagingSqsMetadata) for item in result) def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_no_headers( - mocker, bulk_upload_service + mocker, test_service ): mocker.patch("builtins.open", mocker.mock_open(read_data="")) with pytest.raises(BulkUploadMetadataException, match="empty or missing headers"): - bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") + test_service.csv_to_sqs_metadata("fake/path.csv") def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_all_rows_rejected( - mocker, bulk_upload_service, mock_csv_content + mocker, test_service, mock_csv_content ): mocker.patch("builtins.open", mocker.mock_open(read_data=mock_csv_content)) mocker.patch.object( - bulk_upload_service.metadata_mapping_validator_service, + test_service.metadata_mapping_validator_service, "validate_and_normalize_metadata", return_value=( [], @@ -768,10 +863,10 @@ def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_all_rows_reje with pytest.raises( BulkUploadMetadataException, match="No valid metadata rows found" ): - bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") + test_service.csv_to_sqs_metadata("fake/path.csv") -def test_csv_to_sqs_metadata_groups_patients_correctly(mocker, bulk_upload_service): +def test_csv_to_sqs_metadata_groups_patients_correctly(mocker, test_service): header = "FILEPATH,GP-PRACTICE-CODE,NHS-NO,SCAN-DATE" data = "\n".join( [ @@ -784,18 +879,18 @@ def test_csv_to_sqs_metadata_groups_patients_correctly(mocker, bulk_upload_servi mocker.patch("builtins.open", mocker.mock_open(read_data=data)) mocker.patch.object( - bulk_upload_service.metadata_mapping_validator_service, + test_service.metadata_mapping_validator_service, "validate_and_normalize_metadata", side_effect=lambda records, fixed_values, remappings: (records, [], []), ) mocker.patch.object( - bulk_upload_service, + test_service, "process_metadata_row", - wraps=bulk_upload_service.process_metadata_row, + wraps=test_service.process_metadata_row, ) - result = bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") + result = test_service.csv_to_sqs_metadata("fake/path.csv") assert isinstance(result, list) assert all(isinstance(x, StagingSqsMetadata) for x in result) @@ -829,7 +924,7 @@ def mock_service_remapping_mandatory_fields(mocker, set_env, mock_tempfile): service = BulkUploadMetadataProcessorService( metadata_formatter_service=MockMetadataPreprocessorService( - practice_directory="test_practice_directory" + practice_directory="test_practice_directory/metadata.csv" ), metadata_heading_remap={ "NHS-NO": "NhsNumber", @@ -839,7 +934,8 @@ def mock_service_remapping_mandatory_fields(mocker, set_env, mock_tempfile): "USER-ID": "User ID", "UPLOAD": "Upload Date", }, - input_file_location="test_input_file_location", + input_file_location="test_practice_directory/metadata.csv", + send_to_review_enabled=False, ) mocker.patch.object( @@ -879,7 +975,7 @@ def test_remapping_mandatory_fields( nhs_number="123456789", files=[ BulkUploadQueueMetadata( - file_path="test_input_file_location/path/1.pdf", + file_path="test_practice_directory/path/1.pdf", gp_practice_code="Y12345", scan_date="02/01/2023", stored_file_name="/path/1.pdf", @@ -909,10 +1005,10 @@ def mock_service_no_remapping(mocker, set_env, mock_tempfile): service = BulkUploadMetadataProcessorService( metadata_formatter_service=MockMetadataPreprocessorService( - practice_directory="test_practice_directory" + practice_directory="test_practice_directory/metadata.csv" ), metadata_heading_remap={}, - input_file_location="test_input_file_location", + input_file_location="test_practice_directory/metadata.csv", ) mocker.patch.object( @@ -950,7 +1046,7 @@ def test_no_remapping_logic( nhs_number="123456789", files=[ BulkUploadQueueMetadata( - file_path="test_input_file_location/path/1.pdf", + file_path="test_practice_directory/path/1.pdf", gp_practice_code="Y12345", scan_date="02/01/2023", stored_file_name="/path/1.pdf", @@ -1181,6 +1277,7 @@ def test_apply_fixed_values_single_field(mocker, base_metadata_file): ), metadata_heading_remap={}, fixed_values={"SECTION": "AR"}, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") @@ -1202,6 +1299,7 @@ def test_apply_fixed_values_multiple_fields(mocker, base_metadata_file): "SUB-SECTION": "Mental Health", "SCAN-ID": "FIXED_SCAN_ID", }, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") @@ -1223,6 +1321,7 @@ def test_apply_fixed_values_overwrites_existing_value(mocker, base_metadata_file ), metadata_heading_remap={}, fixed_values={"SECTION": "AR"}, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") @@ -1239,6 +1338,7 @@ def test_apply_fixed_values_logs_applied_values(mocker, base_metadata_file, capl ), metadata_heading_remap={}, fixed_values={"SECTION": "AR", "SCAN-ID": "TEST_ID"}, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") @@ -1256,6 +1356,7 @@ def test_apply_fixed_values_returns_valid_metadata_file(mocker, base_metadata_fi ), metadata_heading_remap={}, fixed_values={"SECTION": "AR"}, + send_to_review_enabled=False, ) mocker.patch.object(service, "s3_service") diff --git a/lambdas/tests/unit/services/test_document_review_processor_service.py b/lambdas/tests/unit/services/test_document_review_processor_service.py index c1e00dfdec..8978ac3783 100644 --- a/lambdas/tests/unit/services/test_document_review_processor_service.py +++ b/lambdas/tests/unit/services/test_document_review_processor_service.py @@ -486,7 +486,7 @@ def test_get_patient_custodian_returns_uploader_ods_on_invalid_resource_id( result = service_under_test._get_patient_custodian(sample_review_message) assert result == "Y12345" - assert sample_review_message.nhs_number == "9000000009" + assert sample_review_message.nhs_number == "0000000000" def test_get_patient_custodian_handles_patient_not_found_sets_placeholder( diff --git a/tests/bulk-upload/scripts/run_bulk_upload.py b/tests/bulk-upload/scripts/run_bulk_upload.py index b125f34407..01edc2238c 100644 --- a/tests/bulk-upload/scripts/run_bulk_upload.py +++ b/tests/bulk-upload/scripts/run_bulk_upload.py @@ -4,7 +4,7 @@ import boto3 -def invoke_lambda(lambda_name, payload={}): +def invoke_lambda(lambda_name, payload): client = boto3.client("lambda") response = client.invoke( FunctionName=lambda_name, @@ -21,6 +21,11 @@ def invoke_lambda(lambda_name, payload={}): "--environment", help="The name of the environment", ) + parser.add_argument( + "--lambda-type", + help="Which lambda to trigger (BulkUploadMetadataLambda or BulkUploadMetadataProcessor)", + default="BulkUploadMetadataLambda", + ) parser.add_argument( "--start-bulk-upload", action="store_true", @@ -32,7 +37,18 @@ def invoke_lambda(lambda_name, payload={}): if not args.environment: args.environment = input("Please enter the name of the environment: ") + payload = {} + if args.lambda_type == "BulkUploadMetadataProcessor": + lambda_name = f"{args.environment}_BulkUploadMetadataProcessor" + payload = { + "inputFileLocation": "metadata.csv" + } + else: + lambda_name = f"{args.environment}_BulkUploadMetadataLambda" + + print(f"Using lambda: {lambda_name}") + if args.start_bulk_upload or input( "Would you like to start the Bulk Upload Process:" ): - invoke_lambda(f"{args.environment}_BulkUploadMetadataLambda") \ No newline at end of file + invoke_lambda(lambda_name, payload)