Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions providers/amazon/docs/operators/emr/emr_serverless.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,72 @@ To monitor the state of an EMR Serverless Application you can use
:start-after: [START howto_sensor_emr_serverless_application]
:end-before: [END howto_sensor_emr_serverless_application]

.. _howto/operator:EmrServerlessStartSessionOperator:

Start an EMR Serverless interactive session
===========================================

To start an EMR Serverless interactive session that a Spark Connect client can attach to, use
:class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStartSessionOperator`.
Set ``deferrable=True`` to release the worker slot while the session warms up.

.. note::
Interactive sessions require Amazon EMR release ``emr-7.13.0`` or later, and the session APIs
(``StartSession``, ``GetSession``, ``GetSessionEndpoint``, ``TerminateSession``) are only
available in ``botocore>=1.43.0``. Deferrable mode additionally needs ``aiobotocore>=3.6.0``,
the first release whose ``botocore`` pin allows 1.43.0. The Amazon provider keeps a lower
minimum for these libraries, so install compatible versions to use interactive sessions.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_emr_serverless_session.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_start_session]
:end-before: [END howto_operator_emr_serverless_start_session]

.. _howto/operator:EmrServerlessGetSessionEndpointOperator:

Get an interactive session endpoint
===================================

To resolve the Spark Connect endpoint and a short-lived auth token for a running session, use
:class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessGetSessionEndpointOperator`.
The token expires (about one hour), so run this immediately before connecting.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_emr_serverless_session.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_get_session_endpoint]
:end-before: [END howto_operator_emr_serverless_get_session_endpoint]

.. _howto/operator:EmrServerlessStopSessionOperator:

Stop an EMR Serverless interactive session
==========================================

To terminate an interactive session, use
:class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStopSessionOperator`.
Set ``trigger_rule=ALL_DONE`` so it runs even if a downstream task fails.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_emr_serverless_session.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_stop_session]
:end-before: [END howto_operator_emr_serverless_stop_session]

.. _howto/sensor:EmrServerlessSessionSensor:

Wait on an EMR Serverless interactive session state
====================================================

To wait until an interactive session reaches a ready state (``STARTED`` or ``IDLE``), use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessSessionSensor`.

.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_emr_serverless_session.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_serverless_session]
:end-before: [END howto_sensor_emr_serverless_session]

Reference
---------

Expand Down
54 changes: 54 additions & 0 deletions providers/amazon/src/airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ class EmrServerlessHook(AwsBaseHook):
APPLICATION_FAILURE_STATES = {"STOPPED", "TERMINATED"}
APPLICATION_SUCCESS_STATES = {"CREATED", "STARTED"}

SESSION_INTERMEDIATE_STATES = {"SUBMITTED", "STARTING"}
SESSION_FAILURE_STATES = {"FAILED", "TERMINATING", "TERMINATED"}
SESSION_SUCCESS_STATES = {"STARTED", "IDLE"}

def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["client_type"] = "emr-serverless"
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -311,6 +315,56 @@ def cancel_running_jobs(

return count

def start_session(
self,
application_id: str,
execution_role_arn: str,
name: str | None = None,
idle_timeout_minutes: int | None = None,
configuration_overrides: dict | None = None,
) -> str:
"""
Start an EMR Serverless interactive session and return its id.

:param application_id: The id of the EMR Serverless application to run the session on.
:param execution_role_arn: The IAM role ARN the session assumes to access data.
:param name: An optional name for the session.
:param idle_timeout_minutes: Auto-stop the session after this many idle minutes.
:param configuration_overrides: Optional Spark/monitoring configuration overrides.
"""
params: dict[str, Any] = {
"applicationId": application_id,
"executionRoleArn": execution_role_arn,
}
if name is not None:
params["name"] = name
if idle_timeout_minutes is not None:
params["idleTimeoutMinutes"] = idle_timeout_minutes
if configuration_overrides is not None:
params["configurationOverrides"] = configuration_overrides
return self.conn.start_session(**params)["sessionId"]

def get_session_state(self, application_id: str, session_id: str) -> str:
"""Return the current state of an interactive session."""
return self.conn.get_session(applicationId=application_id, sessionId=session_id)["session"]["state"]

def get_session_endpoint(self, application_id: str, session_id: str) -> dict:
"""
Return the raw ``GetSessionEndpoint`` boto3 response for a session.

The response includes the Spark Connect ``endpoint`` URL, a short-lived ``authToken``
(valid for about one hour), and its ``authTokenExpiresAt`` timestamp. Callers should
fetch it immediately before connecting rather than caching it.

.. seealso::
- :external+boto3:py:meth:`EMRServerless.Client.get_session_endpoint`
"""
return self.conn.get_session_endpoint(applicationId=application_id, sessionId=session_id)

def terminate_session(self, application_id: str, session_id: str) -> None:
"""Terminate an interactive session."""
self.conn.terminate_session(applicationId=application_id, sessionId=session_id)


def is_connection_being_updated_exception(exception: BaseException) -> bool:
return (
Expand Down
Loading
Loading