Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions providers/imap/src/airflow/providers/imap/hooks/imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,20 @@ def _check_mail_body(

def _create_files(self, mail_attachments: list, local_output_directory: str) -> None:
for name, payload in mail_attachments:
if self._is_symlink(name):
if self._is_symlink(name, local_output_directory):
self.log.error("Can not create file because it is a symlink!")
elif self._is_escaping_current_directory(name):
self.log.error("Can not create file because it is escaping the current directory!")
else:
self._create_file(name, payload, local_output_directory)

def _is_symlink(self, name: str) -> bool:
def _is_symlink(self, name: str, local_output_directory: str) -> bool:
# Check the resolved destination, not the bare attachment name: an
# attacker-supplied name is written under local_output_directory, so a
# symlink planted there is what would be followed by the open() below.
# IMPORTANT NOTE: os.path.islink is not working for windows symlinks
# See: https://stackoverflow.com/a/11068434
return os.path.islink(name)
return os.path.islink(self._correct_path(name, local_output_directory))

def _is_escaping_current_directory(self, name: str) -> bool:
return f"..{os.sep}" in name
Expand Down
2 changes: 1 addition & 1 deletion providers/imap/tests/unit/imap/hooks/test_imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def test_download_mail_attachments_with_symlink(self, mock_imaplib, mock_open_me
with ImapHook() as imap_hook:
imap_hook.download_mail_attachments(name="symlink", local_output_directory="test_directory")

assert mock_is_symlink.call_count == 1
mock_is_symlink.assert_called_once_with("test_directory/symlink")
mock_open_method.assert_not_called()
mock_open_method.return_value.write.assert_not_called()

Expand Down
Loading