Source code for shapelink.shapelink_plugin

"""Receive data in real-time from a Shape-In instance via zmq"""
import abc

import numpy as np
from PySide2 import QtCore
import zmq

from . import msg_def
from .util import qstream_read_array


[docs]class EventData: def __init__(self): self.id = -1 self.scalars = list() self.traces = list() self.images = list()
[docs]class ShapeLinkPlugin(abc.ABC): def __init__(self, bind_to='tcp://*:6666', verbose=False): """Shape-Link plug-in meta class Parameters ---------- bind_to: str IP and port to bind to (where Shape-In runs) verbose: bool Set to `True` to see additional debugging information. """ super(ShapeLinkPlugin, self).__init__() self.verbose = verbose if self.verbose: print(" Init Shape-Link") print(" Bind to: ", bind_to) self.zmq_context = zmq.Context.instance() self.socket = self.zmq_context.socket(zmq.REP) self.socket.RCVTIMEO = 5000 self.socket.SNDTIMEO = 5000 self.socket.bind(bind_to) self.scalar_len = 0 self.vector_len = 0 self.image_len = 0 self.registered_data_format = EventData() self.registered = False
[docs] def after_register(self): """Called after registration with Shape-In is complete""" pass
[docs] def after_transmission(self): """Called after Shape-In ends data transmission"""
[docs] def handle_messages(self): """Handle messages from Shape-In Please don't override this function. Use :func:`ShapeLinkPlugin.handle_event` for your customized plugins. """ # read first byte try: # get message from socket rcv = QtCore.QByteArray(self.socket.recv()) except zmq.error.ZMQError: if self.verbose: print(" ZMQ Error - timed out") return rcv_stream = QtCore.QDataStream(rcv, QtCore.QIODevice.ReadOnly) r = rcv_stream.readInt64() send_data = QtCore.QByteArray() send_stream = QtCore.QDataStream(send_data, QtCore.QIODevice.WriteOnly) if r == msg_def.MSG_ID_REGISTER: # register self.registered_data_format.scalars = rcv_stream.readQStringList() self.registered_data_format.traces = rcv_stream.readQStringList() self.registered_data_format.images = rcv_stream.readQStringList() send_stream.writeInt64(msg_def.MSG_ID_REGISTER_ACK) if self.verbose: print(" Registered data container formats:") print(" scalars: ", self.registered_data_format.scalars) print(" traces: ", self.registered_data_format.traces) print(" images: ", self.registered_data_format.images) self.after_register() elif r == msg_def.MSG_ID_EOT: # EOT message send_stream.writeInt64(msg_def.MSG_ID_EOT_ACK) self.after_transmission() elif r >= 0: # data package with id r # check if id was received already # unpack data e = EventData() e.id = r e.scalars = qstream_read_array(rcv_stream, np.float64) assert len(e.scalars) == len(self.registered_data_format.scalars) n_traces = rcv_stream.readUInt32() assert n_traces == len(self.registered_data_format.traces) # read traces piece by piece for i in range(n_traces): e.traces.append(qstream_read_array(rcv_stream, np.int16)) n_images = rcv_stream.readUInt32() assert n_images == len(self.registered_data_format.images) # read images piece by piece for i in range(n_images): e.images.append(qstream_read_array(rcv_stream, np.uint8)) # and re-shape # ??? # pass event object to user-defined method ret = self.handle_event(e) send_stream.writeBool(ret) else: # unknown message raise ValueError("Received unknown message header!") self.socket.send(send_data)
[docs] @abc.abstractmethod def handle_event(self, event_data: EventData) -> bool: """Abstract method to be overridden by plugins implementations""" return False