Source code for shapelink.shapelink_plugin

"""Receive data in real-time from a Shape-In instance via zmq"""
import abc

import numpy as np
from PySide2 import QtCore
import zmq

from .msg_def import message_ids
from .util import qstream_read_array
from .feat_util import map_requested_features_to_defined_features


[docs]class EventData: def __init__(self): self.id = -1 self.scalars = list() self.traces = list() self.images = list()
[docs]class ShapeLinkPlugin(abc.ABC): def __init__(self, bind_to='tcp://*:6666', random_port=False, verbose=False): """Shape-Link plug-in meta class Parameters ---------- bind_to: str IP and port to bind to (where Shape-In runs) random_port: bool If set to `True`, ZMQ will use its `socket.bind_to_random_port` method. This will override only the port number of the `bind_to` ShapeLinkPlugin argument. verbose: bool Set to `True` to see additional debugging information. """ super(ShapeLinkPlugin, self).__init__() self.zmq_context = zmq.Context.instance() self.socket = self.zmq_context.socket(zmq.REP) self.socket.RCVTIMEO = 5000 self.socket.SNDTIMEO = 5000 self.ip_address = bind_to[:-5] self.port_address = bind_to[-4:] if random_port: self.port_address = self.socket.bind_to_random_port( self.ip_address) else: self.socket.bind(bind_to) self.verbose = verbose if self.verbose: print(" Init Shape-Link") print(" Bind to: {}{}".format(self.ip_address, self.port_address)) self.scalar_len = 0 self.vector_len = 0 self.image_len = 0 self.image_shape = None self.image_shape_len = 2 self.reg_features = EventData() self.registered = False self._first_call = True
[docs] def after_register(self): """Called after registration with Shape-In is complete""" pass
[docs] def after_transmission(self): """Called after Shape-In ends data transmission"""
[docs] def handle_messages(self): """Handle messages from Shape-In Please don't override this function. Use :func:`ShapeLinkPlugin.handle_event` for your customized plugins. """ # read first byte if self._first_call: # needed for accurate line_profiler tests try: # get message from socket message = self.socket.recv() rcv = QtCore.QByteArray(message) except zmq.error.ZMQError: if self.verbose: print(" ZMQ Error - timed out") return self._first_call = False else: try: # get message from socket message = self.socket.recv() rcv = QtCore.QByteArray(message) except zmq.error.ZMQError: if self.verbose: print(" ZMQ Error - timed out") return rcv_stream = QtCore.QDataStream(rcv, QtCore.QIODevice.ReadOnly) r = rcv_stream.readInt64() send_data = QtCore.QByteArray() send_stream = QtCore.QDataStream(send_data, QtCore.QIODevice.WriteOnly) if r == message_ids["MSG_ID_FEATURE_REQ"]: # Allow plugin to request features self.run_features_request_message(send_stream) elif r == message_ids["MSG_ID_REGISTER"]: # register self.run_register_message(rcv_stream, send_stream) self.after_register() elif r == message_ids["MSG_ID_EOT"]: # End of Transmission (EOT) message self.run_EOT_message(send_stream) self.after_transmission() elif r >= 0: e = self.run_event_message(r, rcv_stream) # pass event object to user-defined method ret = self.handle_event(e) send_stream.writeBool(ret) else: # unknown message raise ValueError("Received unknown message header: {}".format(r)) self.socket.send(send_data)
[docs] def run_features_request_message(self, send_stream): """Called before registration. The user can specify features for Shape-In to send. This limits the data being transferred. This can be useful for plugins that require only specific features. feats is a list of three lists. The sublists are sc, tr, and im """ user_feats = self.choose_features() feats = map_requested_features_to_defined_features(user_feats) assert isinstance(feats, list), "feats is a list" assert len(feats) == 3 # feats must be sent one by one, list of lists doesn't work for feat in feats: send_stream.writeQStringList(feat) send_stream.writeInt64(message_ids["MSG_ID_FEATURE_REQ_ACK"])
[docs] def run_register_message(self, rcv_stream, send_stream): # register self.reg_features.scalars = rcv_stream.readQStringList() self.reg_features.traces = rcv_stream.readQStringList() self.reg_features.images = rcv_stream.readQStringList() self.image_shape = qstream_read_array(rcv_stream, np.uint16) self.scalar_len = len(self.reg_features.scalars) self.vector_len = len(self.reg_features.traces) self.image_len = len(self.reg_features.images) assert self.image_shape_len == len(self.image_shape) send_stream.writeInt64(message_ids["MSG_ID_REGISTER_ACK"]) if self.verbose: print(" Registered data container formats:") print(" scalars:", self.reg_features.scalars) print(" traces:", self.reg_features.traces) print(" images:", self.reg_features.images) print(" image_shape:", self.image_shape)
[docs] def run_event_message(self, r, rcv_stream): # data package with id r # check if id was received already # unpack data e = EventData() e.id = r if self.scalar_len > 0: e.scalars = qstream_read_array(rcv_stream, np.float64) assert len(e.scalars) == self.scalar_len if self.vector_len > 0: n_traces = rcv_stream.readUInt32() assert n_traces == self.vector_len # read traces piece by piece for i in range(n_traces): e.traces.append(qstream_read_array(rcv_stream, np.int16)) if self.image_len > 0: n_images = rcv_stream.readUInt32() assert n_images == self.image_len # read images piece by piece, checking for binary mask for im_name in self.reg_features.images: if im_name == "mask": mask_data = qstream_read_array(rcv_stream, np.bool_) e.images.append(mask_data.reshape(self.image_shape)) elif im_name == "contour": contour_data = qstream_read_array(rcv_stream, np.uint8) e.images.append( contour_data.reshape(len(contour_data)//2, 2)) elif im_name == "image": image_data = qstream_read_array(rcv_stream, np.uint8) e.images.append(image_data.reshape(self.image_shape)) else: raise ValueError( "Image feature '{}' not recognised".format(im_name)) return e
[docs] def run_EOT_message(self, send_stream): # End of Transmission (EOT) message send_stream.writeInt64(message_ids["MSG_ID_EOT_ACK"])
[docs] @abc.abstractmethod def handle_event(self, event_data: EventData) -> bool: """Abstract method to be overridden by plugins implementations""" return False
[docs] @abc.abstractmethod def choose_features(self): """Abstract method to be overridden by plugins implementations. Notes ----- When features are chosen by a plugin implementation, only those chosen features will be transferred between ShapeIn and the plugin. This has the effect of ignoring any features specified by the user in the --features (-f) option of the command line interface. """ return list(([], [], []))