Added better debug messages.

这个提交包含在:
Craig Warren
2021-03-17 15:09:57 +00:00
父节点 e25af13d63
当前提交 c9a442c41b

查看文件

@@ -167,7 +167,7 @@ class MPIExecutor(object):
self.busy = [False] * len(self.workers)
if self.is_master():
logger.basic(f'\nMPIExecutor with comm: {self.comm.name}, Master: {self.master}, Workers: {self.workers}')
logger.basic(f'\n({self.comm.name}) - Master: {self.master}, Workers: {self.workers}')
def __enter__(self):
"""Context manager enter.
@@ -222,7 +222,7 @@ class MPIExecutor(object):
raise RuntimeError('Start has already been called')
self._up = True
logger.debug('Starting up MPIExecutor master/workers...')
logger.debug(f'({self.comm.name}) - Starting up MPIExecutor master/workers...')
if self.is_worker():
self.__wait()
@@ -231,12 +231,12 @@ class MPIExecutor(object):
"""
if self.is_master():
logger.debug('Terminating. Sending sentinel to all workers.')
logger.debug(f'({self.comm.name}) - Terminating. Sending sentinel to all workers.')
# Send sentinel to all workers
for worker in self.workers:
self.comm.send(None, dest=worker, tag=Tags.EXIT)
logger.debug('Waiting for all workers to terminate.')
logger.debug(f'({self.comm.name}) - Waiting for all workers to terminate.')
down = [False] * len(self.workers)
while True:
@@ -248,7 +248,7 @@ class MPIExecutor(object):
break
self._up = False
logger.debug('All workers terminated.')
logger.debug(f'({self.comm.name}) - All workers terminated.')
def submit(self, jobs, sleep=0.0):
"""Submits a list of jobs to the workers and returns the results.
@@ -281,7 +281,7 @@ class MPIExecutor(object):
for i, worker in enumerate(self.workers):
if self.comm.Iprobe(source=worker, tag=Tags.DONE):
job_idx, result = self.comm.recv(source=worker, tag=Tags.DONE)
logger.debug(f'Received finished job {job_idx} from worker {worker:d}.')
logger.debug(f'({self.comm.name}) - Received finished job {job_idx} from worker {worker:d}.')
results[job_idx] = result
self.busy[i] = False
elif self.comm.Iprobe(source=worker, tag=Tags.READY):
@@ -289,16 +289,16 @@ class MPIExecutor(object):
self.comm.recv(source=worker, tag=Tags.READY)
self.busy[i] = True
job_idx = num_jobs - len(my_jobs)
logger.debug(f'Sending job {job_idx} to worker {worker:d}.')
logger.debug(f'({self.comm.name}) - Sending job {job_idx} to worker {worker:d}.')
self.comm.send((job_idx, my_jobs.pop(0)), dest=worker, tag=Tags.START)
elif self.comm.Iprobe(source=worker, tag=Tags.EXIT):
logger.debug(f'Worker on rank {worker:d} has terminated.')
logger.debug(f'({self.comm.name}) - Worker on rank {worker:d} has terminated.')
self.comm.recv(source=worker, tag=Tags.EXIT)
self.busy[i] = False
time.sleep(sleep)
logger.debug('Finished all jobs.')
logger.debug(f'({self.comm.name}) - Finished all jobs.')
return results
@@ -313,26 +313,26 @@ class MPIExecutor(object):
status = MPI.Status()
logger.debug('Starting up worker.')
logger.debug(f'({self.comm.name}) - Starting up worker.')
while True:
self.comm.send(None, dest=self.master, tag=Tags.READY)
logger.debug(f'Worker on rank {self.rank} waiting for job.')
logger.debug(f'({self.comm.name}) - Worker on rank {self.rank} waiting for job.')
data = self.comm.recv(source=self.master, tag=MPI.ANY_TAG, status=status)
tag = status.tag
if tag == Tags.START:
job_idx, work = data
logger.debug(f'Received job {job_idx} (work={work}).')
logger.debug(f'({self.comm.name}) - Received job {job_idx} (work={work}).')
result = self.__guarded_work(work)
logger.debug('Finished job. Sending results to master.')
logger.debug(f'({self.comm.name}) - Finished job. Sending results to master.')
self.comm.send((job_idx, result), dest=self.master, tag=Tags.DONE)
elif tag == Tags.EXIT:
logger.debug('Received sentinel from master.')
logger.debug(f'({self.comm.name}) - Received sentinel from master.')
break
logger.debug('Terminating worker.')
logger.debug(f'({self.comm.name}) - Terminating worker.')
self.comm.send(None, dest=self.master, tag=Tags.EXIT)
def __guarded_work(self, work):