Source code for shapelink.shapein_simulator

"""Simulate a Shape-In instance

The communication is based on a simple REQ REP pattern
all methods return when the transmission was acknowledged
by the peer.
"""
import time
from typing import List

import dclab
import numpy as np
from PySide2 import QtCore
import zmq

from .msg_def import message_ids
from .util import qstream_write_array
import dclab.definitions as dfn


[docs]class ShapeInSimulator: def __init__(self, destination="tcp://localhost:6666", verbose=False): self.destination = destination self.verbose = verbose if self.verbose: print("Init ShapeIn Simulator") print("Connect to: ", destination) self.zmq_context = zmq.Context.instance() self.socket = self.zmq_context.socket(zmq.REQ) self.socket.RCVTIMEO = 5000 self.socket.SNDTIMEO = 5000 self.socket.connect(destination) self.scalar_len = 0 self.vector_len = 0 self.image_len = 0 self.image_names = [] self.image_shape_len = 2 self.registered = False self.response = list()
[docs] def send_request_for_features(self): # prepare message in byte stream msg = QtCore.QByteArray() msg_stream = QtCore.QDataStream(msg, QtCore.QIODevice.WriteOnly) msg_stream.writeInt64(message_ids["MSG_ID_FEATURE_REQ"]) try: if self.verbose: print("Send request for features message") # send the message over the socket self.socket.send(msg) # get ACK before return rcv = QtCore.QByteArray(self.socket.recv()) except zmq.error.ZMQError: if self.verbose: print("ZMQ Error") return rcv_stream = QtCore.QDataStream(rcv, QtCore.QIODevice.ReadOnly) feats = [] for i in range(3): feats.append(rcv_stream.readQStringList()) r = rcv_stream.readInt64() if r == message_ids["MSG_ID_FEATURE_REQ_ACK"]: if self.verbose: print("Feature Request ACK") else: print("Feature Request failed!") if len([i for sublist in feats for i in sublist]) == 0: if self.verbose: print("Feature Request List Empty") feats = None return feats
[docs] def register_parameters(self, scalar_reg_features=None, vector_reg_features=None, image_reg_features=None, image_shape=None, settings_names=None, settings_values=None ): """Register parameters that are sent to other processes""" if settings_values is None: settings_values = [] if settings_names is None: settings_names = [] if image_reg_features is None: image_reg_features = [] if image_shape is None: image_shape = [] if vector_reg_features is None: vector_reg_features = [] if scalar_reg_features is None: scalar_reg_features = [] assert len(settings_values) == len( settings_names), "Mismatch setting names and values" self.scalar_len = len(scalar_reg_features) self.vector_len = len(vector_reg_features) self.image_len = len(image_reg_features) self.image_names = image_reg_features self.image_shape_len = len(image_shape) self.response.clear() # prepare message in byte stream msg = QtCore.QByteArray() msg_stream = QtCore.QDataStream(msg, QtCore.QIODevice.WriteOnly) msg_stream.writeInt64(message_ids["MSG_ID_REGISTER"]) # send parameters msg_stream.writeQStringList(scalar_reg_features) msg_stream.writeQStringList(vector_reg_features) msg_stream.writeQStringList(image_reg_features) qstream_write_array(msg_stream, image_shape) # send settings for name, value in zip(settings_names, settings_values): msg_stream.writeQString(name) msg_stream.writeQVariant(value) try: if self.verbose: print("Send registration message") # send the message over the socket self.socket.send(msg) # get ACK before return rcv = QtCore.QByteArray(self.socket.recv()) except zmq.error.ZMQError: if self.verbose: print("ZMQ Error") return rcv_stream = QtCore.QDataStream(rcv, QtCore.QIODevice.ReadOnly) r = rcv_stream.readInt64() if r == message_ids["MSG_ID_REGISTER_ACK"]: if self.verbose: print("Registration ACK") self.registered = True else: print("Registering parameters failed!") self.registered = False
[docs] def send_event(self, event_id: int, scalar_values: np.array, # vector of vector of short vector_values: List[np.array], image_values: List[np.array]) -> bool: """Send a single event to the other process""" # prepare message in byte stream msg = QtCore.QByteArray() msg_stream = QtCore.QDataStream(msg, QtCore.QIODevice.WriteOnly) msg_stream.writeInt64(event_id) assert len(scalar_values) == self.scalar_len assert len(vector_values) == self.vector_len assert len(image_values) == self.image_len assert np.issubdtype(scalar_values.dtype, np.floating) if self.scalar_len > 0: qstream_write_array(msg_stream, scalar_values) if self.vector_len > 0: msg_stream.writeUInt32(self.vector_len) for e in vector_values: assert e.dtype == np.int16, "fluorescence data is int16" qstream_write_array(msg_stream, e) if self.image_len > 0: msg_stream.writeUInt32(self.image_len) for (im_name, e) in zip(self.image_names, image_values): if im_name == "mask": assert e.dtype == np.bool_, "'mask' data is bool" else: assert e.dtype == np.uint8, "'image' data is uint8" qstream_write_array(msg_stream, e.flatten()) try: # send the message over the socket self.socket.send(msg) # get ACK before return rcv_data = QtCore.QByteArray(self.socket.recv()) except zmq.error.ZMQError: if self.verbose: print("ZMQ Error") return rcv_stream = QtCore.QDataStream(rcv_data, QtCore.QIODevice.ReadOnly) self.response.append(rcv_stream.readBool()) return self.response[-1]
[docs] def send_end_of_transmission(self): """Send end of transmission packet""" # prepare message in byte stream msg = QtCore.QByteArray() msg_stream = QtCore.QDataStream(msg, QtCore.QIODevice.WriteOnly) msg_stream.writeInt64(message_ids["MSG_ID_EOT"]) # reset state self.registered = False # print responses if self.verbose: print(self.response) try: if self.verbose: print("Sending EOT:", msg) # send the message over the socket self.socket.send(msg) # get ACK before return rcv_data = QtCore.QByteArray(self.socket.recv()) except zmq.error.ZMQError: print("ZMQ Error - No ACK for EOT") return rcv_stream = QtCore.QDataStream(rcv_data, QtCore.QIODevice.ReadOnly) r = rcv_stream.readInt64() if r != message_ids["MSG_ID_EOT_ACK"]: print("Did not receive ACK for EOT but: ", r) else: if self.verbose: print("EOT success")
[docs]def start_simulator(path, features=None, destination="tcp://localhost:6666", verbose=1): """Run a Shape-In simulator using data from an RT-DC dataset Parameters ---------- path : str File path to a .rtdc file features : list, default None A list of RT-DC features e.g., ["image", "circ", "deform"] destination : str The socket to which the ShapeInSimulator will connect. By default it is set to "tcp://localhost:6666". These are the protocol, host and port in the form "protocol://host:port". verbose : int Prints extra information during the transfer process, such as simulator speed. Increment to increase verbosity. See Also -------- shapelink.cli.run_simulator """ with dclab.new_dataset(path) as ds: if verbose: print("Opened dataset", ds.identifier, ds.title) if features is None: features = ds.features_innate s = ShapeInSimulator(destination=destination) # check for user plugin-defined features, which override the CLI plugin_features = s.send_request_for_features() if plugin_features is not None: sc_features, tr_features, im_features = plugin_features else: for feat in features: if not dfn.feature_exists(feat, scalar_only=False): raise ValueError("Invalid feature name '{}'".format(feat)) sc_features = sorted(set(ds.features_scalar) & set(ds.features) & set(features)) if "trace" in ds and "trace" in features: tr_features = sorted(ds['trace'].keys()) else: tr_features = [] im_features = sorted({"image", "mask"} & set(ds.features) & set(features)) image_shape = np.array(ds["image"][0].shape, dtype=np.uint16) s.register_parameters( scalar_reg_features=sc_features, vector_reg_features=tr_features, image_reg_features=im_features, image_shape=image_shape, settings_names=[], settings_values=[], ) t0 = time.perf_counter_ns() c = 0 if verbose: print("Send event data:") for event_index in range(len(ds)): scalars = list() vectors = list() images = list() for feat in sc_features: scalars.append(ds[feat][event_index]) for feat in tr_features: tr = np.array(ds['trace'][feat][event_index], dtype=np.int16) vectors.append(tr) for feat in im_features: if ds[feat][event_index].dtype == bool: im = np.array(ds[feat][event_index], dtype=bool) else: im = np.array(ds[feat][event_index], dtype=np.uint8) images.append(im) s.send_event(event_index, np.array(scalars, dtype=np.float64), vectors, images) c += 1 t1 = time.perf_counter_ns() # Finally stop with EOT message s.send_end_of_transmission() dt = (t1 - t0) * 1e-9 if verbose: print("Simulation event rate: {:.5g} Hz".format(c / dt)) print("Simulation time: {:.5g} s".format(dt))