Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
5ab5ed6
init
aglinxinyuan Jun 14, 2025
0192bd5
update
aglinxinyuan Jun 14, 2025
86ecfb8
update
aglinxinyuan Jun 14, 2025
81ca4da
update
aglinxinyuan Jun 14, 2025
8d5c444
fix fmt
aglinxinyuan Jun 14, 2025
5522304
fix fmt
aglinxinyuan Jun 14, 2025
0dc7bc1
fix fmt
aglinxinyuan Jun 14, 2025
1c0cda6
fix fmt
aglinxinyuan Jun 14, 2025
8fdec07
fix fmt
aglinxinyuan Jun 14, 2025
57218fa
Merge branch 'master' into xinyuan-rename-ecm
aglinxinyuan Jun 15, 2025
bceae64
init
aglinxinyuan Jun 16, 2025
7c257a7
rename to is_ecm_aligned
aglinxinyuan Jun 16, 2025
64b5fc0
rename to _send_ecm_to_channel
aglinxinyuan Jun 16, 2025
62c6877
rename to isECMAligned
aglinxinyuan Jun 16, 2025
8d910a3
rename to sendECMToChannel
aglinxinyuan Jun 16, 2025
c121bc3
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 17, 2025
056ab91
fix fmt
aglinxinyuan Jun 17, 2025
c16b976
Merge branch 'master' into xinyuan-rename-ecm
aglinxinyuan Jun 17, 2025
5823947
Merge branch 'xinyuan-rename-ecm' into xinyuan-loop-mvp
aglinxinyuan Jun 17, 2025
89276d4
fix fmt
aglinxinyuan Jun 17, 2025
82bde54
update
aglinxinyuan Jun 17, 2025
f157b12
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 17, 2025
6c45784
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 18, 2025
2b2b176
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 22, 2025
34132fc
UPDATE
aglinxinyuan Jun 22, 2025
7ef95a4
done!
aglinxinyuan Jun 22, 2025
f05840b
fix fmt
aglinxinyuan Jun 22, 2025
1ba81ed
fix fmt
aglinxinyuan Jun 22, 2025
7336b52
fix fmt
aglinxinyuan Jun 22, 2025
bca669f
refactor
aglinxinyuan Jun 23, 2025
082b1ff
refactor
aglinxinyuan Jun 23, 2025
d659a31
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 25, 2025
bfb5946
fix fmt
aglinxinyuan Jun 25, 2025
714ca9d
fix fmt
aglinxinyuan Jun 25, 2025
445ce3b
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 26, 2025
fbe951d
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 27, 2025
e719817
fix fmt
aglinxinyuan Jun 27, 2025
591d9da
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jun 29, 2025
5444727
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 2, 2025
01aaecd
update
aglinxinyuan Jul 3, 2025
0521c1e
update
aglinxinyuan Jul 3, 2025
014656c
update
aglinxinyuan Jul 3, 2025
bc5b05c
update
aglinxinyuan Jul 3, 2025
fc274cf
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 3, 2025
768966b
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 7, 2025
1151bd8
fix fmt
aglinxinyuan Jul 7, 2025
612780d
fix fmt
aglinxinyuan Jul 7, 2025
e2ed665
fix fmt
aglinxinyuan Jul 7, 2025
39d7a03
fix fmt
aglinxinyuan Jul 7, 2025
d766dbc
Update core/workflow-operator/src/main/scala/edu/uci/ics/amber/operat…
aglinxinyuan Jul 7, 2025
e283da5
fix fmt
aglinxinyuan Jul 7, 2025
747e636
fix fmt
aglinxinyuan Jul 7, 2025
39251ac
remove sleep
aglinxinyuan Jul 7, 2025
d01b5eb
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 8, 2025
0d7305b
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 11, 2025
63239c4
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 11, 2025
b1772e1
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 11, 2025
e0d0b0a
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 11, 2025
9724b5c
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 14, 2025
a58671b
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Jul 21, 2025
f158694
Merge branch 'master' into xinyuan-loop-mvp
aglinxinyuan Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message ControlRequest {
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
EndIterationRequest endIterationRequest = 59;

// request for testing
Ping ping = 100;
Expand Down Expand Up @@ -271,4 +272,8 @@ message PrepareCheckpointRequest{

message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
}

message EndIterationRequest{
core.ActorVirtualIdentity worker = 1 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ service WorkerService {
rpc EndWorker(EmptyRequest) returns (EmptyReturn);
rpc StartChannel(EmptyRequest) returns (EmptyReturn);
rpc EndChannel(EmptyRequest) returns (EmptyReturn);
rpc EndIteration(EndIterationRequest) returns (EmptyReturn);
rpc NextIteration(EmptyRequest) returns (EmptyReturn);
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.edu.uci.ics.amber.engine.architecture.rpc import (
EmptyReturn,
EndIterationRequest,
)
from core.models.internal_marker import EndIteration


class EndIterationHandler(ControlHandler):
async def end_iteration(self, req: EndIterationRequest) -> EmptyReturn:
self.context.tuple_processing_manager.current_internal_marker = EndIteration(
req.worker
)
return EmptyReturn()
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from core.architecture.handlers.control.start_channel_handler import StartChannelHandler
from core.architecture.handlers.control.end_channel_handler import EndChannelHandler
from core.architecture.handlers.control.end_iteration_handler import EndIterationHandler
from core.architecture.handlers.control.end_worker_handler import EndWorkerHandler
from core.architecture.handlers.control.evaluate_expression_handler import (
EvaluateExpressionHandler,
Expand Down Expand Up @@ -63,6 +64,7 @@ class AsyncRPCHandlerInitializer(
EndWorkerHandler,
StartChannelHandler,
EndChannelHandler,
EndIterationHandler,
NoOperationHandler,
):
pass
8 changes: 8 additions & 0 deletions core/amber/src/main/python/core/models/internal_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.

from dataclasses import dataclass
from proto.edu.uci.ics.amber.core import ActorVirtualIdentity


class InternalMarker:
"""
Expand All @@ -31,3 +34,8 @@ class StartChannel(InternalMarker):

class EndChannel(InternalMarker):
pass


@dataclass
class EndIteration(InternalMarker):
worker: ActorVirtualIdentity
8 changes: 8 additions & 0 deletions core/amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ def close(self) -> None:
"""
pass

def reset(self) -> None:
"""
Reset the operator to its initial state.
"""
self.close()
self.open()

def process_state(self, state: State, port: int) -> Optional[State]:
"""
Process an input State from the given link.
Expand Down Expand Up @@ -238,6 +245,7 @@ def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike

def on_finish(self, port: int) -> Iterator[Optional[TableLike]]:
table = Table(self.__table_data[port])
self.__table_data.clear()
yield from self.process_table(table, port)

@abstractmethod
Expand Down
6 changes: 4 additions & 2 deletions core/amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Iterator, Optional
from core.architecture.managers import Context
from core.models import ExceptionInfo, State, TupleLike, InternalMarker
from core.models.internal_marker import StartChannel, EndChannel
from core.models.internal_marker import StartChannel, EndChannel, EndIteration
from core.models.table import all_output_to_tuple
from core.util import Stoppable
from core.util.console_message.replace_print import replace_print
Expand Down Expand Up @@ -74,7 +74,9 @@ def process_internal_marker(self, internal_marker: InternalMarker) -> None:
):
if isinstance(internal_marker, StartChannel):
self._set_output_state(executor.produce_state_on_start(port_id))
elif isinstance(internal_marker, EndChannel):
elif isinstance(internal_marker, EndChannel) or isinstance(
internal_marker, EndIteration
):
self._set_output_state(executor.produce_state_on_finish(port_id))
self._switch_context()
self._set_output_tuple(executor.on_finish(port_id))
Expand Down
27 changes: 22 additions & 5 deletions core/amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
InternalQueue,
Tuple,
)
from core.models.internal_marker import StartChannel, EndChannel
from core.models.internal_marker import StartChannel, EndChannel, EndIteration
from core.models.internal_queue import (
DataElement,
DCMElement,
Expand All @@ -55,6 +55,7 @@
EmbeddedControlMessage,
AsyncRpcContext,
ControlRequest,
EndIterationRequest,
)
from proto.edu.uci.ics.amber.engine.architecture.worker import (
WorkerState,
Expand All @@ -65,6 +66,7 @@
ChannelIdentity,
EmbeddedControlMessageIdentity,
)
from core.util import set_one_of


class MainLoop(StoppableQueueBlockingRunnable):
Expand Down Expand Up @@ -284,6 +286,17 @@ def _process_end_channel(self) -> None:
)
self.complete()

def _process_end_iteration(self) -> None:
worker_id = self.context.tuple_processing_manager.current_internal_marker.worker
self.process_input_state()
self.process_input_tuple()
self._send_ecm_to_data_channels(
"EndIteration",
EmbeddedControlMessageType.PORT_ALIGNMENT,
EndIterationRequest(worker_id),
)
self.context.executor_manager.executor.reset()

def _process_ecm(self, ecm_element: ECMElement):
"""
Processes a received ECM and handles synchronization,
Expand Down Expand Up @@ -335,21 +348,25 @@ def _process_ecm(self, ecm_element: ECMElement):
{
StartChannel: self._process_start_channel,
EndChannel: self._process_end_channel,
EndIteration: self._process_end_iteration,
}[type(self.context.tuple_processing_manager.current_internal_marker)]()

def _send_ecm_to_data_channels(
self, method_name: str, alignment: EmbeddedControlMessageType
self,
method: str,
alignment: EmbeddedControlMessageType,
request: ControlRequest = EmptyRequest(),
) -> None:
for active_channel_id in self.context.output_manager.get_output_channel_ids():
if not active_channel_id.is_control:
ecm = EmbeddedControlMessage(
EmbeddedControlMessageIdentity(method_name),
EmbeddedControlMessageIdentity(method),
alignment,
[],
{
active_channel_id.to_worker_id.name: ControlInvocation(
method_name,
ControlRequest(empty_request=EmptyRequest()),
method,
set_one_of(ControlRequest, request),
AsyncRpcContext(
ActorVirtualIdentity(), ActorVirtualIdentity()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class ControlRequest(betterproto.Message):
query_statistics_request: "QueryStatisticsRequest" = betterproto.message_field(
58, group="sealed_value"
)
end_iteration_request: "EndIterationRequest" = betterproto.message_field(
59, group="sealed_value"
)
ping: "Ping" = betterproto.message_field(100, group="sealed_value")
"""request for testing"""

Expand Down Expand Up @@ -404,6 +407,11 @@ class QueryStatisticsRequest(betterproto.Message):
)


@dataclass(eq=False, repr=False)
class EndIterationRequest(betterproto.Message):
worker: "___core__.ActorVirtualIdentity" = betterproto.message_field(1)


@dataclass(eq=False, repr=False)
class ControlReturn(betterproto.Message):
"""The generic return message"""
Expand Down Expand Up @@ -996,6 +1004,40 @@ async def end_channel(
metadata=metadata,
)

async def end_iteration(
self,
end_iteration_request: "EndIterationRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
"/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/EndIteration",
end_iteration_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)

async def next_iteration(
self,
empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
"/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/NextIteration",
empty_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)

async def debug_command(
self,
debug_command_request: "DebugCommandRequest",
Expand Down Expand Up @@ -1551,6 +1593,14 @@ async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn":
async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def end_iteration(
self, end_iteration_request: "EndIterationRequest"
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def next_iteration(self, empty_request: "EmptyRequest") -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

async def debug_command(
self, debug_command_request: "DebugCommandRequest"
) -> "EmptyReturn":
Expand Down Expand Up @@ -1684,6 +1734,20 @@ async def __rpc_end_channel(
response = await self.end_channel(request)
await stream.send_message(response)

async def __rpc_end_iteration(
self, stream: "grpclib.server.Stream[EndIterationRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
response = await self.end_iteration(request)
await stream.send_message(response)

async def __rpc_next_iteration(
self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
response = await self.next_iteration(request)
await stream.send_message(response)

async def __rpc_debug_command(
self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
) -> None:
Expand Down Expand Up @@ -1810,6 +1874,18 @@ def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
EmptyRequest,
EmptyReturn,
),
"/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/EndIteration": grpclib.const.Handler(
self.__rpc_end_iteration,
grpclib.const.Cardinality.UNARY_UNARY,
EndIterationRequest,
EmptyReturn,
),
"/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/NextIteration": grpclib.const.Handler(
self.__rpc_next_iteration,
grpclib.const.Cardinality.UNARY_UNARY,
EmptyRequest,
EmptyReturn,
),
"/edu.uci.ics.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler(
self.__rpc_debug_command,
grpclib.const.Cardinality.UNARY_UNARY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ class OutputManager(
outputIterator.appendSpecialTupleToEnd(FinalizeExecutor())
}

def finalizeIteration(worker: ActorVirtualIdentity): Unit = {
outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker))
}

/**
* This method is only used for ensuring correct region execution. Some operators may have input port dependency
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator`
Expand Down
Loading
Loading