#!/usr/bin/env python import sys import time import numpy as np import PyTango import threading from gi.repository import Uca, GObject from PyTango import Attr, AttrWriteType, DevState from PyTango.server import Device, DeviceMeta, attribute, device_property, command, server_run try: import tifffile HAVE_TIFFFILE = True except ImportError: print("Could not import tifffile, consider to install it") HAVE_TIFFFILE = False def get_tango_type(obj_type): mapping = { GObject.TYPE_BOOLEAN: PyTango.CmdArgType.DevBoolean, GObject.TYPE_CHAR: PyTango.CmdArgType.DevUChar, GObject.TYPE_UCHAR: PyTango.CmdArgType.DevUChar, GObject.TYPE_FLOAT: PyTango.CmdArgType.DevFloat, GObject.TYPE_INT: PyTango.CmdArgType.DevShort, # DevInt is invalid? GObject.TYPE_UINT: PyTango.CmdArgType.DevUShort, # check GObject.TYPE_UINT64: PyTango.CmdArgType.DevULong, GObject.TYPE_LONG: PyTango.CmdArgType.DevLong, GObject.TYPE_DOUBLE: PyTango.CmdArgType.DevDouble, GObject.TYPE_STRING: PyTango.CmdArgType.DevString, GObject.TYPE_ENUM: PyTango.CmdArgType.DevShort, } tango_type = mapping.get(obj_type, None) if tango_type is not None: return tango_type if obj_type.is_a(GObject.TYPE_ENUM): return PyTango.CmdArgType.DevUShort def get_tango_write_type(prop): if prop.flags & GObject.ParamFlags.WRITABLE: if prop.flags & GObject.ParamFlags.READABLE: return AttrWriteType.READ_WRITE return AttrWriteType.WRITE if prop.flags & GObject.ParamFlags.READABLE: return AttrWriteType.READ raise RuntimeError("{} has no valid param flag".format(prop.name)) def prop_to_attr_name(name): return name.replace('-', '_') def attr_to_prop_name(name): return name.replace('_', '-') class Camera(Device): __metaclass__ = DeviceMeta camera = device_property(dtype=str, default_value='mock') image = attribute(label="Image", dtype=[[np.uint16]], max_dim_x=8192, max_dim_y=8192) def init_device(self): Device.init_device(self) self.set_state(DevState.ON) self.pm = Uca.PluginManager() self.device = self.pm.get_camerah(self.camera, None) self.attrs = {} for prop in self.device.props: tango_type = get_tango_type(prop.value_type) write_type = get_tango_write_type(prop) if tango_type: name = prop_to_attr_name(prop.name) attr = Attr(name, tango_type, write_type) attr_props = PyTango.UserDefaultAttrProp() attr_props.set_description(prop.blurb) attr.set_default_properties(attr_props) write_func = None if write_type in (AttrWriteType.WRITE, AttrWriteType.READ_WRITE): write_func = self.do_write self.attrs[prop.name] = self.add_attribute(attr, self.do_read, write_func) def grab(self): array = np.empty(self.shape, dtype=self.dtype) data = array.__array_interface__['data'][0] self.device.grab(data) return array def do_read(self, attr): name = attr_to_prop_name(attr.get_name()) attr.set_value(self.device.get_property(name)) def do_write(self, attr): name = attr_to_prop_name(attr.get_name()) self.device.set_property(name, attr.get_write_value()) @command def Start(self): self.shape = (self.device.props.roi_height, self.device.props.roi_width) self.dtype = np.uint16 if self.device.props.sensor_bitdepth > 8 else np.uint8 self.device.start_recording() @command def Stop(self): self.device.stop_recording() @command(dtype_in=str, doc_in="Path and filename to store frame") def Store(self, path): def grab_and_write(): frame = self.grab() if HAVE_TIFFFILE: tifffile.imsave(path, frame) else: np.savez(open(path, 'wb'), frame) threading.Thread(target=grab_and_write).start() @command def Trigger(self): self.device.trigger() def read_image(self): return self.grab() if __name__ == '__main__': server_run((Camera,))