Project

General

Profile

Asynchronous processing: Problems in event linking » processdistributor.py

Joel Lüthi, 01/20/2016 12:40 PM

 
#
# COPYRIGHT:
# The Leginon software is Copyright 2003
# The Scripps Research Institute, La Jolla, CA
# For terms of the license agreement
# see http://ami.scripps.edu/software/leginon-license
#

import targetwatcher
import calibrationclient
from leginon import leginondata
import event
import instrument
import imagewatcher
import mosaic
import threading
import node
import targethandler
import appclient
from pyami import convolver, imagefun, mrc, numpil
import numpy
import pyami.quietscipy
import scipy.ndimage as nd
import gui.wx.DnaTargetFinder
import gui.wx.DnaClickTargetFinder
import os
import shortpath
import math
import presets
import time
import version
import subprocess32 as subprocess

try:
set = set
except NameError:
import sets

set = sets.Set


# process = subprocess.Popen()
# http://stackoverflow.com/questions/2581817/python-subprocess-callback-when-cmd-exits

class ProcessDistributor(imagewatcher.ImageWatcher, targethandler.TargetWaitHandler, targetwatcher.TargetWatcher):
panelclass = gui.wx.DnaTargetFinder.Panel
settingsclass = leginondata.TargetFinderSettingsData
defaultsettings = {
'queue': False,
'wait for done': True,
'ignore images': False,
'user check': False,
'queue drift': True,
'sort target': False,
'allow append': False,
}
eventinputs = imagewatcher.ImageWatcher.eventinputs \
+ [event.AcquisitionImagePublishEvent] \
+ [event.ImageTargetListPublishEvent] \
+ targethandler.TargetWaitHandler.eventinputs
eventoutputs = imagewatcher.ImageWatcher.eventoutputs \
+ targethandler.TargetWaitHandler.eventoutputs
targetnames = ['acquisition', 'focus', 'preview', 'reference', 'done', 'meter']

def __init__(self, id, session, managerlocation, **kwargs):
imagewatcher.ImageWatcher.__init__(self, id, session, managerlocation,
**kwargs)
targethandler.TargetWaitHandler.__init__(self)
self.instrument = instrument.Proxy(self.objectservice, self.session)
self.presetsclient = presets.PresetsClient(self)
self.calclients = {
'image shift': calibrationclient.ImageShiftCalibrationClient(self),
'stage position': calibrationclient.StageCalibrationClient(self),
'modeled stage position':
calibrationclient.ModeledStageCalibrationClient(self),
'beam size':
calibrationclient.BeamSizeCalibrationClient(self)
}
self.parent_imageid = None
self.current_image_pixelsize = None
self.focusing_targetlist = None
self.last_acq_node = None
self.next_acq_node = None
self.targetimagevector = (0, 0)
self.targetbeamradius = 0
self.resetLastFocusedTargetList(None)
self.currentlyProcessing = False
self.currentlyAcquiring = False
self.to_be_processed = []
self.processing_finished = []
self.square_acquisition_finished_id = [] # List that contains the id of the last subsquare iamge that was acquired and that can be used with an self.publishImageProcessDone(imageid) call to start acquiring the next square at low magnification

def processData(self, idata):
'''
That's the class that handles AcquisitionImagePublishEvent
'''
if isinstance(idata, leginondata.ImageData):
self.setStatus('processing')
self.logger.debug('Imagewatcher.processData (ImageData)')
imageid = idata.dbid
# self.currentimagedata = idata
if len(idata['filename']) == 45:
# String that contains the beginning of the filename of the image that will be processed in dnaForkTargetFinderPipeline
imageStartString = idata['filename'][:-10]

# If it's the center image with index 6, set self.center_image to the value of that imagedata (AcquisitionImageData object)
if idata['filename'][-6] == '6':
self.center_image = idata

if idata['filename'][-7:-5] == '10':
# If it's the last file of the square, start processing of this square in a new thread and check for next acquisition target
# if self.center_image:
# self.processImageData(self.center_image)
self.currentlyAcquiring = False # Because acquisition just finished
print "start processing"
self.square_acquisition_finished_id.append(imageid)
# Submit square to processing if nothing is processing yet, otherwise append to "to_be processed" list

# Start next acquisition (either low magnification or high magnification if a prior square has finished being processed)
self.startAcquisition()

else:
self.publishImageProcessDone(imageid)

else:
self.imageStartString = None
self.logger.error(idata['filename'], 'is not in the expected format. Are the preset names changed?')
self.logger.info('Expected format is "yymmmddd_x_xxxxxgrid_xxxxx550x_v01_xxxxx2900x"')

# self.processImageData(idata)
# self.publishImageProcessDone(imageid)
# self.logger.debug('Imagewathcer.processData (ImageData) done')
self.setStatus('idle')

elif isinstance(idata, leginondata.ImageListData):
self.setStatus('processing')
self.logger.debug('Imagewathcer.processData (ImageListData)')
self.processImageListData(idata)
if 'images' in idata and idata['images'] is not None:
for ref in idata['images']:
imageid = ref.dbid
self.publishImageProcessDone(imageid=imageid)
self.logger.debug('Imagewathcer.processData (ImageListData) done')
self.setStatus('idle')
imageid = idata.dbid
self.publishImageProcessDone(imageid)

else:
self.setStatus('idle')
raise TypeError('data to be processed must be an ImageData instance')
imageid = idata.dbid
self.publishImageProcessDone(imageid)

def handleApplicationEvent(self, evt):
'''
Find the Acquisition class or its subclass instance bound
to this node upon application loading.
'''
app = evt['application']
# self.last_acq_node = appclient.getLastNodeThruBinding(app,self.name,'AcquisitionImagePublishEvent','Acquisition')
self.last_acq_node = appclient.getLastNodeThruBinding(app, self.name, 'ImageTargetListPublishEvent',
'Acquisition')
self.next_acq_node = appclient.getNextNodeThruBinding(app, self.name, 'ImageTargetListPublishEvent',
'Acquisition')

def startAcquisition(self):
if self.processing_finished:
# If there are squares that are processed but where the targets have not been acquired yet, submit them for acquisition
target_id = self.processing_finished.pop(0)
print "Implement call to high mag acquisition & submit the correct targets"

else:
imageid = self.square_acquisition_finished_id.pop(0)
self.publishImageProcessDone(imageid)

def processTargetData(self, targetdata, attempt=None):
'''
This is called by TargetWatcher.processData when targets available
If called with targetdata=None, this simulates what occurs at
a target (going to presets, acquiring images, etc.)
'''
# Submit the same target list instantly

'''
zlp_preset_name = self.settings['preset order'][-1]
self.tuneEnergyFilter(zlp_preset_name)
try:
self.validatePresets()
except InvalidPresetsSequence, e:
if targetdata is None or targetdata['type'] == 'simulated':
## don't want to repeat in this case
self.logger.error(str(e))
return 'aborted'
else:
raise

presetnames = self.settings['preset order']
ret = 'ok'
self.onTarget = False
for preset_index, newpresetname in enumerate(presetnames):
if self.alreadyAcquired(targetdata, newpresetname):
continue

if targetdata is not None and targetdata['type'] != 'simulated' and self.settings['adjust for transform'] != 'no':
if self.settings['drift between'] and self.goodnumber > 0:
self.declareDrift('between targets')
targetonimage = targetdata['delta column'],targetdata['delta row']
targetdata = self.adjustTargetForTransform(targetdata)
self.logger.info('target adjusted by (%.1f,%.1f) (column, row)' % (targetdata['delta column']-targetonimage[0],targetdata['delta row']-targetonimage[1]))
offset = {'x':self.settings['target offset col'],'y':self.settings['target offset row']}
if offset['x'] or offset['y']:
targetdata = self.makeTransformTarget(targetdata,offset)

# set stage z first before move
z = self.moveToLastFocusedStageZ(targetdata)
self.testprint('preset manager moved to LastFocusedStageZ %s' % (z,))

### determine how to move to target
try:
emtarget = self.targetToEMTargetData(targetdata, z)
except InvalidStagePosition:
return 'invalid'

presetdata = self.presetsclient.getPresetByName(newpresetname)

pause_between_time = self.settings['pause between time']
if preset_index > 0 and pause_between_time > 0.0:
self.logger.info('Pausing for extra %.1f before acquisition with %s' % (pause_between_time,newpresetname))
time.sleep(pause_between_time)
### acquire film or CCD
self.startTimer('acquire')
ret = self.acquire(presetdata, emtarget, attempt=attempt, target=targetdata)
self.stopTimer('acquire')
# in these cases, return immediately
if ret in ('aborted', 'repeat'):
self.reportStatus('acquisition', 'Acquisition state is "%s"' % ret)
break
if ret == 'repeat':
return repeat

self.reportStatus('processing', 'Processing complete')

return ret
'''


def getImageFromDB(self, filename):
# only want filename without path and extension
filename = os.path.split(filename)[1]
filename = '.'.join(filename.split('.')[:-1])
q = leginondata.AcquisitionImageData(filename=filename)
results = self.research(datainstance=q)
if not results:
return None
imagedata = results[0]
return imagedata
(3-3/3)