Project

General

Profile

Rewrite of the Data Collection part of mxCuBE

1. The problem

In the current mxCuBE a central role is taken by the DataCollect Hardware Object,
actually split in two files because of historical reasons1 :
  • DataCollect.py, 3600+ lines of code
  • MultiCollect.py, 400 loc

Those "Hardware Objects" make the link between Data Collect GUI components like
DataCollectBrick2, DataCollectParameters, etc. and the whole data acquisition
sequence on the beamline, including sample handling and data acquisition status
report to a LIMS thanks to other Hardware Objects like SampleChanger and
ISPyBClient.

The code suffers from different flaws making maintenance and development of new
features difficult ; also, it is dependent on an external beamline acquisition
sequencer like "spec" for the main data collection sequence, thus creating a gap
between beamlines with spec, where all ESRF macros can be shared, and non-spec
sites where everything has to be written from scratch by local mxCuBE developers.
Moreover, it has a negative impact on the mxCuBE collaboration, since new or
improved data collection schemes cannot easily be shared between mxCuBE collaboration
members.
Adaptation to other sites is even more difficult because DataCollect relies on the
presence of a LIMS and a sample changer, which is not the case at all places.

Some of those problems can easily be fixed :
  • making LIMS and Sample Changer optional
  • removing ESRF-specific code
  • adding more configuration options to properly handle differences between institutes,
    still using the same codebase
  • better documentation
Other problems need a deeper look. Hardware Objects live within an event-driven
architecture ; external events (like "a motor X moved" or "hutch door is opened") are
handled in callbacks, while particular procedures are running. Some events even trigger
next steps of procedures. Think of the data collection procedure ; it needs to load
a sample, then to start data acquisition sequence. While the sample is loading
(it can take up to 30 seconds), the graphical interface needs to stay responsive and
user can even have the possibility to cancel what she's doing. This has a direct
impact on the code :
  • threads are used to run such procedures, to be separated from GUI main loop
  • synchronization is an issue
  • special care has to be taken when accessing GUI thread
  • receiving events lead to having callbacks, leading themselves to wait on other events
    thus producing long callback chains, hard to follow and hard to debug or modify

In those conditions, it is hard to think of including the lower level data
collection sequence into mxCuBE : it would just add complicated code on top of
a big spaghetti soup.

Following the 2nd mxCuBE meeting at Soleil, it has been decided to rewrite
the DataCollect Hardware Object, trying to fix the problems described above.

2. Using "green threads" instead of OS-level threads

As stated above, the DataCollect Hardware Object of mxCuBE uses threads for doing
blocking operations like waiting on a condition (e.g waiting for a sample to be mounted by
the sample changer), or more generally speaking waiting on I/O operations ; this
way more complex procedures can be written in Python, chaining blocking calls one
by one.

Using preemptive OS-level threads for I/O operations is a common abuse of
multithreading.
It makes everything more complicated, because of the very nature of OS-level
threads: code running independently of its parent process, under the operating system
control (scheduling), sharing memory with it. One has to use synchronization primitives
(locks, mutexes, semaphores, etc) to ensure code is "thread-safe".

Threads are very good for CPU-intensive tasks - like doing a long calculation for
example. It can execute code in parallel, to make it even faster. Of course, parallel
execution is only possible if the number of simultaneous running threads is less or
equal to the number of cores in the computer to reach maximum performance. Techniques
like thread pools exist to help with this, for example.

Having a lot more threads than cores can lead to a performance degradation compared to
other techniques.

OS threads are a low-level ressource, that is necessary for low-level system programming
or CPU-bound code, but it is quite "dangerous" to have them proliferating if it can be
avoided.

"Green threads" represent a solution to this issue: just like traditional OS-level
threads, they run an independent unit of code, sharing memory with their parent.
But contrary to OS threads, they are not pre-empted by the system: their execution
mode is cooperative, meaning that it is the job of its green thread to "pass"
execution to another one ("yielding"). It reduces a lot problems with synchronization,
since the programmer knows when a switch will occur.

The unit of code that is ran by a green thread is called a coroutine. Coroutines are
a generalized version of subroutines to allow multiple entry points for suspending and
resuming execution at certain locations.

Recently, the Go programming language made the coroutine concept more visible to the
community ("goroutines"). However a lot of programming languages support coroutines
natively (Go, Javascript 1.7+, Haskell, Scheme, Lua, Ruby, Tcl 8.6+, Erlang...).
It is not the case of Python, though. Python only supports generators, luckily different
Python libraries exist to bring true coroutines support (inspired by Stackless Python).

On top of coroutines, a process-level scheduler has to dispatch the execution time between
them.

One bit is still missing for green threads to replace OS-level threads for I/O bound
applications : all I/O operations need to be non-blocking. Blocking would break the
cooperative execution of coroutines.

Traditionally, non-blocking I/O is achieved thanks to the "select" system call. Nowadays,
some more advanced techniques exist (epoll call on Linux, kqueue on FreeBSD, etc.).
Libraries like libevent handle scheduling and bring non-blocking I/O, and more (timers,
for example), to host applications.

The idea for mxCuBE is to put everything altogether, in order to get rid of the
problems for Data Collection procedures (see 1. The problem): coroutines will simplify
a lot the writing of procedures/sequences, non-blocking I/O will be used for
communicating with underlying control systems, and an event loop will run next to
Qt's loop for scheduling.

3. Introducing gevent

gevent is an open-source project started in 2010. It is a networking library that
uses the greenlet module2 to provide a synchronous API on top of libevent3 event
loop.

Features of interest for mxCuBE include:
  • a fast event loop based on libevent
  • lightweight execution units based on greenlet
  • cooperative socket module
  • ability to use standard library and 3rd-party module written for standard
    blocking sockets transparently, thanks to "monkey patching" of Python standard library

Recently, Guido Van Rossum (the father of Python, BDFL for the Python project !) mentionned
during its keynote address at PyCon 2012 conference that "he likes the way that gevent
presents asynchronous usage to developers". He continued saying he's not a fan of
anything that requires him to write a callback.

I think we can all agree with Guido based on our experience with mxCuBE...

gevent is going to be introduced at the Bliss Framework level, thus making green threads,
asynchronous I/O and scheduling available for all components: Hardware Objects,
GUI Bricks, etc.

Some parts of Bliss Framework will need a small refactoring, in order to be compliant with
gevent. This "background work" is gonna be transparent for mxCuBE developers.

4. Implementation of the new DataCollect Hardware Object

THIS IS A FIRST DRAFT VERSION

The new implementation proposed here is still compatible with the current versions of
the corresponding mxCuBE GUI components.

Soon, mxCuBE will get a completely new GUI interface for data collection in general,
and it will be the occasion of removing old code or changing the communication
between the DataCollect Hardware Object and its GUI bricks.

Following the new guideline that has been decided during the 2d mxCuBE meeting at
Soleil, the DataCollect Hardware Object is made of an abstract class and a
concrete class. The abstract class should stay the same on every site, whereas the
concrete class can largely differ.

The new DataCollect code contains both data acquisition logic (dealing with detector, beamline
motors etc.) and higher-level logic (storing data collections in LIMS, sample handling with
a sample changer, etc.).

A lot of functions are just defined with "pass" in the abstract class : the idea is
to have them properly implemented in the concrete class, making calls to the
control system to perform the required operations. The Bliss Framework will provide
asynchronous I/O calls for spec, Taco and Tango servers. Of course, it is foreseen to add
the same facility for Tine.

The real data acquisition loop is located in the "loop" method. Each single data collection
is made by the "do_collect" method. The "collect" method is the wrapper method that
starts the loop.

In order to make the distinguish between beamline control and beamline configuration, two
new members are used in the code:
  • self.bl_control
  • self.bl_config

"self.bl_control" corresponds to the following:

BeamlineControl = collections.namedtuple('BeamlineControl',
                                         ['diffractometer',
                                          'sample_changer',
                                          #'slitbox',
                                          'lims',
                                          'safety_shutter',
                                          'machine_current',
                                          #'cryo_stream',
                                          'energy',
                                          'detector_distance',
                                          'transmission'])

All members are optional. For example, if a beamline doesn't have a sample changer, it is valid
to just let it uninitialized. If a beamline is single-wavelength, it can just leave the
"energy" BeamlineControl component empty. Otherwise, the concrete Hardware Object can provide
Hardware Objects for each BeamlineControl component.

Each one of those will get an abstract class, making it easier to implement at all sites.

"self.bl_config" corresponds to the following :

BeamlineConfig = collections.namedtuple('BeamlineConfig',
                                        ['directory_prefix',
                                         'default_exposure_time',
                                         'default_number_of_passes',
                                         'maximum_radiation_exposure',
                                         'nominal_beam_intensity',
                                         'minimum_exposure_time',
                                         'minimum_phi_speed',
                                         'minimum_phi_oscillation',
                                         'maximum_phi_speed',
                                         'detector_fileext',
                                         'detector_type',
                                         'detector_mode',
                                         'beam_ax',
                                         'beam_ay',
                                         'beam_bx',
                                         'beam_by'])

The goal of this object is to put together all beamline configuration information.
This preliminary implementation is far from being the final one, and is just
taking the same elements as the "mxlocal" Hardware Object in the previous DataCollect
code.

class AbstractMultiCollect(object):
    __metaclass__ = abc.ABCMeta

    def __init__(self):
        self.bl_control = BeamlineControl(*[None]*8)
        self.bl_config = BeamlineConfig(*[None]*16)
        self.sample_centring_result = AsyncResult()
        self.data_collect_task = None
        self.oscillations_history = []

    def setControlObjects(self, **control_objects):
      self.bl_control = BeamlineControl(**control_objects)

    def setBeamlineConfiguration(self, **configuration_parameters):
      self.bl_config = BeamlineConfig(**configuration_parameters)

    @abc.abstractmethod
    @task
    def set_transmission(self, transmission_percent):
        pass

    @abc.abstractmethod
    @task
    def set_wavelength(self, wavelength):
        pass

    @abc.abstractmethod
    @task
    def close_fast_shutter(self):
        pass

    @abc.abstractmethod
    @task
    def move_detector(self, distance):
        pass

    @abc.abstractmethod
    @task
    def move_motors(self, motor_position_dict):
        return

    @abc.abstractmethod
    @task
    def take_background_image(self):
        pass

    @abc.abstractmethod
    @task
    def open_safety_shutter(self):
        pass

    @abc.abstractmethod
    @task
    def close_safety_shutter(self):
        pass

    @abc.abstractmethod
    @task
    def adjust_i0_i1_diode_gains(self):
        pass

    @abc.abstractmethod
    @task
    def prepare_oscillation(self, start, osc_range, exptime, npass):
        """Should return osc_start and osc_end positions -
        gonio should be ready for data collection after this ;
        Remember to check for still image if range is too small !
        """ 
        pass

    @abc.abstractmethod
    @task
    def do_oscillation(self, start, end, exptime, npass):
        pass

    @abc.abstractmethod
    @task
    def start_acquisition(self, filename, exptime, npass, first_frame=False):
        pass

    @abc.abstractmethod
    @task
    def stop_acquisition(self, last_frame=False):
        pass

    @abc.abstractmethod
    @task
    def reset_detector(self):
        pass

    def sample_centring_done(self, success, centring_info):
        if success:
            self.sample_centring_result.set(centring_info)
        else:
            self.sample_centring_result.set(RuntimeError("centring failed !"))

    def get_sample_info_from_parameters(self, parameters):
        """Returns sample_id, sample_location and sample_code from data collection parameters""" 
        sample_info = parameters.get("sample_reference")

        try:
            sample_id = int(sample_info["blSampleId"])
        except:
            sample_id = None

        try:
            sample_code = sample_info["code"]
        except:
            sample_code = None

        sample_location = None

        try:
            sample_container_number = int(sample_info['container_reference'])
        except:
            pass
        else:
            try:
                vial_number = int(sample_info["sample_location"])
            except:
                pass
            else:
                sample_location = (sample_container_number, vial_number)

        return sample_id, sample_location, sample_code

    def create_directories(self, images_directory, process_directory):
        for directory in (images_directory, process_directory):
            try:
                os.makedirs(directory)
            except os.error, e:
                if e.errno != errno.EEXIST:
                    raise

    @task
    def load_sample(self, sample_location, sample_code = None, sample_id = None):
        if sample_id is not None:
            # try to find more sample info from lims
            if self.bl_control.lims:
                sample_info = self.bl_control.lims.getBLSample(sample_id)

        self.emit("collectMountingSample", (sample_code, sample_location, False))

        self.bl_control.sample_changer.loadSample(sample_code, sample_location, sample_info)

        self.emit("collectMountingSample", (sample_code, sample_location, True))

        def prepare_wedges_to_collect(self, start, nframes, reference_interval, inverse_beam):
        # code to prepare the list of frames to collect: [(start, wedge_size), ...]
        wedge_sizes_list = [reference_interval]*(nframes/reference_interval)
        remaining_frames = nframes % reference_interval
        if remaining_frames:
            wedge_sizes_list.append(remaining_frames)
        wedges_to_collect = []
        for wedge_size in wedge_sizes_list:
            wedges_to_collect.append((start, wedge_size))
            if inverse_beam:
                wedges_to_collect.append((start+180, wedge_size))
            start += wedge_size
        return wedges_to_collect

    def update_oscillations_history(self, data_collect_parameters):
      sample_id, sample_code, sample_location = self.get_sample_info_from_parameters(data_collect_parameters)
      self.oscillations_history.append((sample_id, sample_code, sample_location, data_collect_parameters))
      return len(self.oscillations_history), sample_id, sample_code, sample_location

    @task
    def do_collect(self, owner, data_collect_parameters, sample_loading_timeout = None, sample_centring_timeout = None):
        # adjusting parameters
        # creating directories for images and processing files
        file_parameters = data_collect_parameters["fileinfo"]

        self.create_directories(file_parameters['directory'],  file_parameters['process_directory'])

        file_parameters["suffix"] = self.bl_config.detector_fileext
        image_file_template = "%(prefix)s_%(run_number)s_%%04d.%(suffix)s" % file_parameters
        file_parameters["template"] = image_file_template

        # database filling
        if self.bl_control.lims:
            start_time = time.strftime("%Y-%m-%d %H:%M:%S")
            data_collect_parameters["collection_start_time"] = start_time
            if self.bl_control.machine_current:
                data_collect_parameters["synchrotron_mode"] = self.bl_control.machine_current.getFillMode()
            self.bl_control.lims.storeDataCollection(data_collect_parameters, None) #TODO: self.bl_config instead of None

                # sample loading
        sample_was_loaded = False
        if self.bl_control.sample_changer:
            sample_id, sample_location, sample_code = self.get_sample_info_from_parameters(data_collect_parameters)

            if sample_location is not None:
                loaded_sample_location = self.bl_control.sample_changer.getLoadedSampleLocation()

                if loaded_sample_location != sample_location:
                  self.emit("collectMountingSample", (sample_code, sample_location, None))
                  try:
                    self.load_sample(sample_location, sample_id, timeout = sample_loading_timeout)
                  except:
                    self.emit("collectMountingSample", (sample_code, sample_location, False))
                    raise
                  else:
                    self.emit("collectMountingSample", (sample_code, sample_location, True))
                  sample_was_loaded = True
        else:
            sample_id = None
            sample_location = None
            sample_code = None

        # sample centring
        if sample_was_loaded:
            self.sample_centring_result = AsyncResult()

            try:
                automatic_centring = str(data_collect_parameters["start_auto_centring"]).upper()=='TRUE'
            except:
                automatic_centring = False

            # TO DO: handle automatic centring case
            # here it is just normal centring :
            self.emit("collectValidateCentring", (True, file_parameters))

            centring_info = self.self.sample_centring_result.get(timeout = sample_centring_timeout)

            # TO DO: save snapshots, store sample centring positions and snapshots in database
            pass

        # data collection
        oscillation_parameters = data_collect_parameters["oscillation_sequence"][0]

        move_detector_task = self.move_detector(oscillation_parameters.get("detector_distance"), wait = False)

        self.set_transmission(oscillation_parameters.get("transmission", 100))

        self.set_wavelength(oscillation_parameters.get("wavelength"))

        self.close_fast_shutter()

        self.move_motors({})

        move_detector_task.get() #wait end of detector motion

        if data_collect_parameters.get("dark"):
            self.take_background_image()

        skip_existing_images = data_collect_parameters["skip_images"]

        with cleanup(self.close_safety_shutter):
            self.open_safety_shutter()

            self.adjust_i0_i1_diode_gains()

            inverse_beam = "reference_interval" in oscillation_parameters
            reference_interval = oscillation_parameters.get("reference_interval", 1)
            wedges_to_collect = self.prepare_wedges_to_collect(oscillation_parameters["start"],
                                                               oscillation_parameters["number_of_images"],
                                                               reference_interval,
                                                               inverse_beam)

            nframes = len(wedges_to_collect)
            self.emit("collectNumberOfFrames", nframes)

            frame = 0
            image_number = oscillation_parameters["start_image_number"]
            osc_range = oscillation_parameters["range"]
            exptime = oscillation_parameters["exposure_time"]
            npass = oscillation_parameters["number_of_passes"]
            for start, wedge_size in wedges_to_collect:
                end = start + osc_range

                filename = os.path.join(file_parameters["directory"], image_file_template % image_number)

                if skip_existing_images and os.path.isfile(filename):
                    logging.info("Skipping existing image %s", filename)
                    pass
                else:
                    logging.info("Frame %d, %7.3f to %7.3f degrees", frame+1, start, end)

                    osc_start, osc_end = self.prepare_oscillation(start, osc_range, exptime, npass)

                    with error_cleanup(self.reset_detector):
                        self.start_acquisition(filename, exptime, npass, first_frame = frame==0)
                        self.do_oscillation(osc_start, osc_end, exptime, npass)
                        self.stop_acquisition(last_frame = frame==nframes-1)

                        # TODO: store image in database
                        pass

                        self.emit("collectImageTaken", frame+1)

                start = end
                frame += 1
                image_number += 1

    @task
    def loop(self, owner, data_collect_parameters_list, finished_callback=None):
        try:
            self.emit("collectReady", (False, ))

            for data_collect_parameters in data_collect_parameters_list:
                logging.debug("collect parameters = %r", data_collect_parameters)

                if data_collect_parameters == data_collect_parameters_list[0]:
                    # first data collect => force taking a dark current image
                    data_collect_parameters["dark"] = True

                try:
                  # emit signals to make bricks happy
                  self.emit("collectStarted", (owner, 1))
                  osc_id, sample_id, sample_code, sample_location = self.update_oscillations_history(data_collect_parameters)
                  self.emit('collectOscillationStarted', (owner, sample_id, sample_code, sample_location, data_collect_parameters, osc_id))
                  # now really start collect sequence
                  self.do_collect(owner, data_collect_parameters)
                except:
                  logging.exception("Data collection failed")
                  # TO DO: update database after collect
                  data_collection_id = None #should be: id in ISPyB
                  self.emit("collectOscillationFailed", (owner, False, "collection failed!", data_collection_id, osc_id))
                  if callable(finished_callback):
                    finished_callback(False,)
                  self.emit("collectFailed", owner, False, "collection failed!")
                  break
                else:
                  # TO DO: update database after collect
                  data_collection_id = None #should be: id in ISPyB
                  self.emit("collectOscillationFinished", (owner, True, "collection successful", data_collection_id, osc_id))
                  if callable(finished_callback):
                    finished_callback()
                  self.emit("collectEnded", owner, True, "collection successful")
        finally:
            self.emit("collectReady", (True, ))

    def collect(self, owner, data_collect_parameters_list, finished_callback=None):
        self.data_collect_task = self.loop(owner, data_collect_parameters_list, finished_callback, wait = False)

    #TODO: rename to stop_collect
    def stopCollect(self, owner):
        if self.data_collect_task is not None:
            self.data_collect_task.kill()

The study of the code above shows the use of a "task" decorator. This is a helper
decorator to make a coroutine out of a simple standard Python method :

def task(func):
    def start_task(*args, **kwargs):
        logging.info("Starting %s%s", func.__name__, args)

        wait = kwargs.get("wait", True)

        try:
            t = gevent.spawn(func, *args)
            if kwargs.get("wait", True):
                return t.get(timeout = kwargs.get("timeout"))
            else:
              return t
        except:
            logging.error("Exception happened in %s, exiting", func.__name__)
            t.kill()
            raise

    return start_task

In addition to this helper decorator, two other helper classes are used in order
to implement "cleanup" functionality ; the idea is to use context managers ("with"
keyword in Python) to have cleanup methods being executed just on error, or all the
time (see an example of usage in the "do_collect" method) :

class cleanup:
  def __init__(self,cleanup_func,**keys) :
    self.cleanup_func = cleanup_func
    self.keys = keys

  def __enter__(self):
    pass

  def __exit__(self,*args):
    if self.cleanup_func is not None:
      self.cleanup_func(**self.keys)

class error_cleanup:
  def __init__(self,error_func,**keys) :
    self.error_func = error_func
    self.keys = keys

  def __enter__(self):
    pass

  def __exit__(self,*args):
    if args[0] is not None and self.error_func is not None:
      self.error_func(**self.keys)

1 first implementation of mxCuBE didn't support executing several data
acquisition sequences in a row (aka. "data collection pipeline"), it was
decided to make a higher level wrapper around the original code called
"MultiCollect" in order to circumvent this limitation.

2 http://pypi.python.org/pypi/greenlet

3 http://libevent.org