mirror of
https://git.datalinker.icu/vllm-project/vllm.git
synced 2026-05-08 12:13:37 +08:00
debugging
This commit is contained in:
parent
62da375465
commit
252bf0809e
@ -289,10 +289,8 @@ class Worker(WorkerBase):
|
|||||||
if self.profiler is None:
|
if self.profiler is None:
|
||||||
raise RuntimeError("Profiler is not enabled.")
|
raise RuntimeError("Profiler is not enabled.")
|
||||||
if is_start:
|
if is_start:
|
||||||
assert False
|
|
||||||
self.profiler.start()
|
self.profiler.start()
|
||||||
else:
|
else:
|
||||||
assert False
|
|
||||||
self.profiler.stop()
|
self.profiler.stop()
|
||||||
|
|
||||||
def execute_dummy_batch(self) -> None:
|
def execute_dummy_batch(self) -> None:
|
||||||
|
|||||||
@ -54,7 +54,7 @@ class UBatchContext:
|
|||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
global _CURRENT_CONTEXT
|
global _CURRENT_CONTEXT
|
||||||
_CURRENT_CONTEXT[threading.get_ident()] = None
|
_CURRENT_CONTEXT[threading.get_ident()] = None
|
||||||
print("Finishing ubatch %d\n" % self.id, flush=True)
|
# print("Finishing ubatch %d\n" % self.id, flush=True)
|
||||||
self.cpu_signal_event.set()
|
self.cpu_signal_event.set()
|
||||||
self.cpu_wait_event.clear()
|
self.cpu_wait_event.clear()
|
||||||
self.current_stream = self.compute_stream
|
self.current_stream = self.compute_stream
|
||||||
@ -75,22 +75,22 @@ class UBatchContext:
|
|||||||
# assert not self.cpu_wait_event.is_set()
|
# assert not self.cpu_wait_event.is_set()
|
||||||
pass
|
pass
|
||||||
def _signal_comm_done(self):
|
def _signal_comm_done(self):
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self.gpu_comm_done_event.record(self.comm_stream)
|
self.gpu_comm_done_event.record(self.comm_stream)
|
||||||
|
|
||||||
def _signal_compute_done(self):
|
def _signal_compute_done(self):
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self.gpu_compute_done_event.record(self.compute_stream)
|
self.gpu_compute_done_event.record(self.compute_stream)
|
||||||
|
|
||||||
def _wait_compute_done(self):
|
def _wait_compute_done(self):
|
||||||
# print(f"{self.id} Waiting on COMPUTE stream", flush=True)
|
# print(f"{self.id} Waiting on COMPUTE stream", flush=True)
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self.comm_stream.wait_event(self.gpu_compute_done_event)
|
self.comm_stream.wait_event(self.gpu_compute_done_event)
|
||||||
# print("Compute stream done", flush=True)
|
# print("Compute stream done", flush=True)
|
||||||
|
|
||||||
def _wait_comm_done(self):
|
def _wait_comm_done(self):
|
||||||
# print(f"{self.id} Waiting on COMM stream", flush=True)
|
# print(f"{self.id} Waiting on COMM stream", flush=True)
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self.compute_stream.wait_event(self.gpu_comm_done_event)
|
self.compute_stream.wait_event(self.gpu_comm_done_event)
|
||||||
# print("Comm stream done", flush=True)
|
# print("Comm stream done", flush=True)
|
||||||
|
|
||||||
@ -104,22 +104,22 @@ class UBatchContext:
|
|||||||
|
|
||||||
def _cpu_yield(self):
|
def _cpu_yield(self):
|
||||||
# print(f"UBatchContext: {self.id} yielding CPU", flush=True)
|
# print(f"UBatchContext: {self.id} yielding CPU", flush=True)
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self.cpu_signal_event.set()
|
self.cpu_signal_event.set()
|
||||||
self.cpu_wait_event.wait()
|
self.cpu_wait_event.wait()
|
||||||
self.cpu_wait_event.clear()
|
self.cpu_wait_event.clear()
|
||||||
self._restore_context()
|
self._restore_context()
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
# print(f"UBatchContext: {self.id} resuming CPU", flush=True)
|
# print(f"UBatchContext: {self.id} resuming CPU", flush=True)
|
||||||
|
|
||||||
def yield_and_switch_from_compute_to_comm(self):
|
def yield_and_switch_from_compute_to_comm(self):
|
||||||
assert current_stream() == self.compute_stream
|
assert current_stream() == self.compute_stream
|
||||||
# dp_rank = get_dp_group().rank_in_group
|
# dp_rank = get_dp_group().rank_in_group
|
||||||
# print(f"DP: {dp_rank} UB: {self.id} Yield and switch from {self.stream_string()}", flush=True)
|
# print(f"DP: {dp_rank} UB: {self.id} Yield and switch from {self.stream_string()}", flush=True)
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self._signal_compute_done()
|
self._signal_compute_done()
|
||||||
self._cpu_yield()
|
self._cpu_yield()
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
assert self.current_stream == self.compute_stream
|
assert self.current_stream == self.compute_stream
|
||||||
self.update_stream(self.comm_stream)
|
self.update_stream(self.comm_stream)
|
||||||
# print(f"DP: {dp_rank} UB: {self.id} Resuming on stream {self.stream_string()}", flush=True)
|
# print(f"DP: {dp_rank} UB: {self.id} Resuming on stream {self.stream_string()}", flush=True)
|
||||||
@ -129,10 +129,10 @@ class UBatchContext:
|
|||||||
assert current_stream() == self.comm_stream
|
assert current_stream() == self.comm_stream
|
||||||
# dp_rank = get_dp_group().rank_in_group
|
# dp_rank = get_dp_group().rank_in_group
|
||||||
# print(f"DP: {dp_rank} UB: {self.id} Yield and switch from {self.stream_string()}", flush=True)
|
# print(f"DP: {dp_rank} UB: {self.id} Yield and switch from {self.stream_string()}", flush=True)
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
self._signal_comm_done()
|
self._signal_comm_done()
|
||||||
self._cpu_yield()
|
self._cpu_yield()
|
||||||
self.ctx_valid_state()
|
# self.ctx_valid_state()
|
||||||
assert self.current_stream == self.comm_stream
|
assert self.current_stream == self.comm_stream
|
||||||
self.update_stream(self.compute_stream)
|
self.update_stream(self.compute_stream)
|
||||||
# print(f"DP: {dp_rank} UB: {self.id} Resuming on stream {self.stream_string()}", flush=True)
|
# print(f"DP: {dp_rank} UB: {self.id} Resuming on stream {self.stream_string()}", flush=True)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user