1616# Third-party imports
1717from botocore .exceptions import ClientError
1818from pydantic import BaseModel , Field
19+ from sagemaker .core .common_utils import TagsDict
1920from sagemaker .core .helper .session_helper import Session
2021from sagemaker .core .resources import Pipeline , PipelineExecution , Tag
2122from sagemaker .core .telemetry .telemetry_logging import _telemetry_emitter
@@ -38,6 +39,7 @@ def _create_evaluation_pipeline(
3839 pipeline_definition : str ,
3940 session : Optional [Any ] = None ,
4041 region : Optional [str ] = None ,
42+ tags : Optional [List [TagsDict ]] = [],
4143) -> Any :
4244 """Helper method to create a SageMaker pipeline for evaluation.
4345
@@ -49,6 +51,7 @@ def _create_evaluation_pipeline(
4951 pipeline_definition (str): JSON pipeline definition (Jinja2 template).
5052 session (Optional[Any]): SageMaker session object.
5153 region (Optional[str]): AWS region.
54+ tags (Optional[List[TagsDict]]): List of tags to include in pipeline
5255
5356 Returns:
5457 Any: Created Pipeline instance (ready for execution).
@@ -65,9 +68,9 @@ def _create_evaluation_pipeline(
6568 resolved_pipeline_definition = template .render (pipeline_name = pipeline_name )
6669
6770 # Create tags for the pipeline
68- tags = [
71+ tags . extend ( [
6972 {"key" : _TAG_SAGEMAKER_MODEL_EVALUATION , "value" : "true" }
70- ]
73+ ])
7174
7275 pipeline = Pipeline .create (
7376 pipeline_name = pipeline_name ,
@@ -163,7 +166,8 @@ def _get_or_create_pipeline(
163166 pipeline_definition : str ,
164167 role_arn : str ,
165168 session : Optional [Session ] = None ,
166- region : Optional [str ] = None
169+ region : Optional [str ] = None ,
170+ create_tags : Optional [List [TagsDict ]] = [],
167171) -> Pipeline :
168172 """Get existing pipeline or create/update it.
169173
@@ -177,6 +181,7 @@ def _get_or_create_pipeline(
177181 role_arn: IAM role ARN for pipeline execution
178182 session: Boto3 session (optional)
179183 region: AWS region (optional)
184+ create_tags (Optional[List[TagsDict]]): List of tags to include in pipeline
180185
181186 Returns:
182187 Pipeline instance (existing updated or newly created)
@@ -225,19 +230,19 @@ def _get_or_create_pipeline(
225230
226231 # No matching pipeline found, create new one
227232 logger .info (f"No existing pipeline found with prefix { pipeline_name_prefix } , creating new one" )
228- return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region )
233+ return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region , create_tags )
229234
230235 except ClientError as e :
231236 error_code = e .response ['Error' ]['Code' ]
232237 if "ResourceNotFound" in error_code :
233- return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region )
238+ return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region , create_tags )
234239 else :
235240 raise
236241
237242 except Exception as e :
238243 # If search fails for other reasons, try to create
239244 logger .info (f"Error searching for pipeline ({ str (e )} ), attempting to create new pipeline" )
240- return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region )
245+ return _create_evaluation_pipeline (eval_type , role_arn , pipeline_definition , session , region , create_tags )
241246
242247
243248def _start_pipeline_execution (
@@ -505,7 +510,8 @@ def start(
505510 role_arn : str ,
506511 s3_output_path : Optional [str ] = None ,
507512 session : Optional [Session ] = None ,
508- region : Optional [str ] = None
513+ region : Optional [str ] = None ,
514+ tags : Optional [List [TagsDict ]] = [],
509515 ) -> 'EvaluationPipelineExecution' :
510516 """Create sagemaker pipeline execution. Optionally creates pipeline.
511517
@@ -517,6 +523,7 @@ def start(
517523 s3_output_path (Optional[str]): S3 location where evaluation results are stored.
518524 session (Optional[Session]): Boto3 session for API calls.
519525 region (Optional[str]): AWS region for the pipeline.
526+ tags (Optional[List[TagsDict]]): List of tags to include in pipeline
520527
521528 Returns:
522529 EvaluationPipelineExecution: Started pipeline execution instance.
@@ -547,7 +554,8 @@ def start(
547554 pipeline_definition = pipeline_definition ,
548555 role_arn = role_arn ,
549556 session = session ,
550- region = region
557+ region = region ,
558+ create_tags = tags ,
551559 )
552560
553561 # Start pipeline execution via boto3
0 commit comments