#!/usr/bin/env python import sys import time import numpy as np import PyTango from gi.repository import Uca, GObject from PyTango import Attr, AttrWriteType, DevState from PyTango.server import Device, DeviceMeta, attribute, device_property, command, server_run try: import tifffile HAVE_TIFFFILE = True except ImportError: print("Could not import tifffile, consider to install it") HAVE_TIFFFILE = False def get_tango_type(obj_type): mapping = { GObject.TYPE_BOOLEAN: PyTango.CmdArgType.DevBoolean, GObject.TYPE_CHAR: PyTango.CmdArgType.DevUChar, GObject.TYPE_UCHAR: PyTango.CmdArgType.DevUChar, GObject.TYPE_FLOAT: PyTango.CmdArgType.DevFloat, GObject.TYPE_INT: PyTango.CmdArgType.DevShort, # DevInt is invalid? GObject.TYPE_UINT: PyTango.CmdArgType.DevUShort, # check GObject.TYPE_LONG: PyTango.CmdArgType.DevLong, GObject.TYPE_DOUBLE: PyTango.CmdArgType.DevDouble, GObject.TYPE_STRING: PyTango.CmdArgType.DevString, GObject.TYPE_ENUM: PyTango.CmdArgType.DevShort, } tango_type = mapping.get(obj_type, None) if tango_type is not None: return tango_type if obj_type.is_a(GObject.TYPE_ENUM): return PyTango.CmdArgType.DevUShort def get_tango_write_type(prop): if prop.flags & GObject.ParamFlags.WRITABLE: if prop.flags & GObject.ParamFlags.READABLE: return AttrWriteType.READ_WRITE return AttrWriteType.WRITE if prop.flags & GObject.ParamFlags.READABLE: return AttrWriteType.READ raise RuntimeError("{} has no valid param flag".format(prop.name)) def prop_to_attr_name(name): return name.replace('-', '_') def attr_to_prop_name(name): return name.replace('_', '-') class Camera(Device): __metaclass__ = DeviceMeta camera = device_property(dtype=str, default_value='mock') image = attribute(label="Image", dtype=[[np.uint16]], max_dim_x=4096, max_dim_y=4096) def init_device(self): Device.init_device(self) self.set_state(DevState.ON) self.pm = Uca.PluginManager() self.device = self.pm.get_camerah(self.camera, None) self.attrs = {} for prop in self.device.props: tango_type = get_tango_type(prop.value_type) write_type = get_tango_write_type(prop) if tango_type: name = prop_to_attr_name(prop.name) attr = Attr(name, tango_type, write_type) attr_props = PyTango.UserDefaultAttrProp() attr_props.set_description(prop.blurb) attr.set_default_properties(attr_props) write_func = None if write_type in (AttrWriteType.WRITE, AttrWriteType.READ_WRITE): write_func = self.do_write self.attrs[prop.name] = self.add_attribute(attr, self.do_read, write_func) def grab(self): array = np.empty(self.shape, dtype=self.dtype) data = array.__array_interface__['data'][0] self.device.grab(data) return array def do_read(self, attr): name = attr_to_prop_name(attr.get_name()) attr.set_value(self.device.get_property(name)) def do_write(self, attr): name = attr_to_prop_name(attr.get_name()) self.device.set_property(name, attr.get_write_value()) @command def Start(self): self.shape = (self.device.props.roi_height, self.device.props.roi_width) self.dtype = np.uint16 if self.device.props.sensor_bitdepth > 8 else np.uint8 self.device.start_recording() @command def Stop(self): self.device.stop_recording() @command(dtype_in=str, doc_in="Path and filename to store frame") def Store(self, path): frame = self.grab() if HAVE_TIFFFILE: tifffile.imsave(path, frame) else: np.savez(open(path, 'wb'), frame) @command def Trigger(self): self.device.trigger() def read_image(self): return self.grab() if __name__ == '__main__': server_run((Camera,))