Signed-off-by: Robert Shaw <robshaw@redhat.com>
This commit is contained in:
Robert Shaw 2025-07-20 21:06:47 +00:00
parent 0018dd01c9
commit a46bc0a6a3

View File

@ -901,6 +901,10 @@ class DPAsyncMPClient(AsyncMPClient):
assert self.stats_update_address is not None
assert len(self.engine_ranks_managed) > 1
start_idx = self.engine_ranks_managed[0]
end_idx = self.engine_ranks_managed[-1]
async def run_engine_stats_update_task():
with make_zmq_socket(self.ctx, self.stats_update_address,
zmq.XSUB) as socket, make_zmq_socket(
@ -947,6 +951,7 @@ class DPAsyncMPClient(AsyncMPClient):
self.engines_running = True
msg = msgspec.msgpack.encode(
(target_eng_index, self.current_wave))
logger.info("FIRST_REQ")
await socket.send(msg)
buf = None
@ -966,7 +971,7 @@ class DPAsyncMPClient(AsyncMPClient):
self.engines_running = running
# NOTE: counts include all global Cores. Slice
# to get get the Core's managed by this client.
self.lb_engines = counts[self.engine_ranks_managed]
self.lb_engines = counts[start_idx:end_idx]
resources.stats_update_task = asyncio.create_task(
run_engine_stats_update_task())