Project

General

Profile

Poller module

The Hardware Repository package from mxCuBE v2 branch brings a new Poller
module.

The Poller module can be used to do polling on control systems objects
(like Taco or Tango servers) ; the goal is to have a well-integrated
facility for such task, instead of re-inventing the wheel for every
control system. Also, it is using multi-threading to make sure calls
to the control system object won't block mxCuBE graphical interface
(provided that Python GIL is released and acquired again in an appropriate
manner by the underlying control system library).

How to use the Poller module

The Poller module comes with a "poll" function, taking 6 arguments:

def poll(polled_call, 
         polled_call_args = (),
         polling_period = 1000,
         value_changed_callback = None, 
         error_callback = None, 
         compare = True)

"polled_call" is the callable object (e.g a Python function or method)
that will be regularly called in a separate thread. The second argument
is a tuple with the arguments to use for the call. The polling period
is in milliseconds. When the polled call returns, if "compare" is True
(by default) "value_changed_callback" is called. It is always called
if compare is False. In case of exception, "error_callback" is called.

No reference is kept to the callable object - if the callable object
is garbage collected, polling stops (without raising an error).

The "poll" function returns a Poller object. If not needed, you can
ignore it. Internally, the Poller module keeps track of all Poller
objects. Keeping the Poller object is useful though to call its
methods:
  1. stop() : stops polling
  2. restart(delay=0) : restart polling with the new specified
    polled call, after delay milliseconds have passed
  3. get_polling_period()
  4. set_polling_period(milliseconds)
  5. get_id() : returns a unique id for the Poller object

In case of exception when executing the polled call, polling is not
restarted automatically: the error callback is fired, and polling
has to be restarted using the "restart" call.

When using the "restart" call, a new Poller object is returned.

The callback receives the original exception that occured at polling
time, and the poller id ; so you can use the same error callback for
different Poller objects.

Here is a typical example of error callback:

The error callback signature is the following :

def example_error_callback(exception, poller_id):
  try:
    raise exception
  except:
    logging.exception("Uncaught exception while polling")

  # reinitialize/reconnect to underlying control system ?
  # then restart polling after a delay
  poller = Poller.get_poller(poller_id)
  logging.info("restarting polling in 1 second")
  poller.restart(polled_call, 1000)

How does Poller ensures thread safety

The Poller calls the polled callable object in a separate thread.
But the callbacks (value changed and on error) are executed in
the main mxCuBE thread.

Starting from mxCuBE v2, 2 loops are running in the main thread:
  • Qt's loop
  • gevent loop

(more precisely, there is only one loop the Qt one that also
runs gevent processing in the current implementation)

The Poller interacts with the gevent loop, as it is not part of
the GUI part of mxCuBE.

gevent has a mechanism to deal with multiple threads called
the "asynchronous watcher". Each Poller object creates an async
watcher in its constructor:

class _Poller(threading.Thread):
    def __init__(self, polled_call, polled_call_args=(), polling_period=1000, value_changed_callback=None, error_callback=None, compare=True):
        threading.Thread.__init__(self)
        ...
        self.async_watcher = gevent.get_hub().loop.async()

It is associated to a callback, as soon as the Poller thread
starts, using the "start" method of the async watcher object:

self.async_watcher.start(self.new_event)

From the Poller thread, it is enough to call "self.async_watcher.send()"
to have the "self.new_event" callback being executed by gevent loop,
as soon as it can, in a thread safe manner.

In order to pass data between the threads, a Queue object can be
used. Indeed, when the "new_event" callback is executed by gevent
loop, in the meantime the other thread is not blocked. The way data is
passed has to thread-safe too.

Here is a schematic view of the whole thing:

         THREAD A (gevent loop)             |               THREAD B
                                            |
aw = gevent.get_hub().loop.async_watcher()  | self.aw = aw
q = Queue.Queue()                           | self.q = q
def my_thread_safe_callback():              |
  ...                                       |
aw.start(my_thread_safe_callback)           |
                                            | self.run() => now in thread body
                                            | ...
                                            | from thread body: self.q.put(my_data)
                                            |                   self.aw.send()
=> in my_thread_safe_callback:              |
   my_data = q.get()                        |

Real world examples of Poller module usage within mxCuBE

The Taco Command module

The Tango Command module

Additionally to the Poller usage, the Tango Command module also
makes another use of the gevent async watcher objects with
Tango events.

Indeed, Tango events are received in a callback, from the Tango
threads. So the final "user" callback has to be received within
the gevent loop - the same trick as described above is used.

In general, any thread-safety issue between a gevent loop and
a thread can be solved like described above.

To be done with Poller: Tine Command module