186 lines
5.8 KiB
Python
186 lines
5.8 KiB
Python
from SDT_Device.Protocol import Protocol
|
|
import threading
|
|
import time
|
|
|
|
|
|
class CCU(Protocol):
|
|
MAX_CHANNEL = 20
|
|
|
|
CMD = {
|
|
"PULSE_COUNT": 0x00,
|
|
"COINCIDENCE_COUNT": 0x01,
|
|
"INPUT_DELAY": 0x10,
|
|
"PULSE_WIDTH": 0x20,
|
|
"COINCIDENCE_CH_SELECT": 0x30,
|
|
"UPDATE_TIMER_PERIOD": 0x40,
|
|
"UPDATE_TRIG_SELECT": 0x50,
|
|
"START": 0x60,
|
|
"COUNTER_MUX": 0x70,
|
|
"INPUT_CH_MUX": 0x80,
|
|
"HEART_BIT": 0xF000,
|
|
}
|
|
|
|
def __init__(self, ip, port):
|
|
super().__init__(ip, port)
|
|
|
|
self.setDeviceId(self.DEVICE_ID["CCU"])
|
|
self.setDeviceSerial(0)
|
|
self.classGetCallback = self._classGetCallback
|
|
self.classDataCallback = self._classDataCallback
|
|
|
|
self._getEvent = threading.Event()
|
|
self._dataEvent = threading.Event()
|
|
|
|
self._readBuffer = []
|
|
self._pulseCount = [0 for i in range(20)]
|
|
self._coincidenceCount = [0 for i in range(20)]
|
|
self.updateCount = 0
|
|
self.updateCoinCount = 0
|
|
|
|
self.debugCount = 0
|
|
|
|
def _classGetCallback(self, command, payload):
|
|
self._readBuffer = payload
|
|
self._getEvent.set()
|
|
|
|
def _read(self):
|
|
self._getEvent.wait()
|
|
self._getEvent.clear()
|
|
return self._readBuffer
|
|
|
|
def _classDataCallback(self, command, payload):
|
|
if command == self.CMD["PULSE_COUNT"]:
|
|
self._pulseCount = payload
|
|
self.updateCount += 1
|
|
elif command == self.CMD["COINCIDENCE_COUNT"]:
|
|
self._coincidenceCount = payload
|
|
self.updateCoinCount += 1
|
|
self._dataEvent.set()
|
|
|
|
def readCountAllNonBlocking(self):
|
|
return self._pulseCount, self._coincidenceCount
|
|
|
|
def readCountAllBlocking(self):
|
|
self._dataEvent.wait()
|
|
self._dataEvent.clear()
|
|
return self._pulseCount, self._coincidenceCount
|
|
|
|
def setInputDelay(self, ch, delay):
|
|
delayList = self.getInputDelayAll()
|
|
delayList[ch] = delay
|
|
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delayList)
|
|
self.send(args)
|
|
|
|
def setInputDelayAll(self, delay: list):
|
|
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delay)
|
|
self.send(args)
|
|
|
|
def getInputDelay(self, ch):
|
|
return self.getInputDelayAll()[ch]
|
|
|
|
def getInputDelayAll(self):
|
|
args = (self.CLASS["GET"], self.CMD["INPUT_DELAY"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setPulseWidth(self, ch, width):
|
|
widthList = self.getPulseWidthAll()
|
|
widthList[ch] = width
|
|
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], widthList)
|
|
self.send(args)
|
|
|
|
def setPulseWidthAll(self, width: list):
|
|
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], width)
|
|
self.send(args)
|
|
|
|
def getPulseWidth(self, ch):
|
|
return self.getPulseWidthAll()[ch]
|
|
|
|
def getPulseWidthAll(self):
|
|
args = (self.CLASS["GET"], self.CMD["PULSE_WIDTH"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setCoincidenceChSelect(self, ch, chSelect):
|
|
coincidenceList = self.getCoincidenceChSelectAll()
|
|
coincidenceList[ch] = chSelect
|
|
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], coincidenceList)
|
|
self.send(args)
|
|
|
|
def setCoincidenceChSelectAll(self, chSelect: list):
|
|
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], chSelect)
|
|
self.send(args)
|
|
|
|
def getCoincidenceChSelect(self, ch):
|
|
return self.getCoincidenceChSelectAll()[ch]
|
|
|
|
def getCoincidenceChSelectAll(self):
|
|
args = (self.CLASS["GET"], self.CMD["COINCIDENCE_CH_SELECT"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setUpdateTimerPeriod(self, period):
|
|
args = (self.CLASS["SET"], self.CMD["UPDATE_TIMER_PERIOD"], [period])
|
|
self.send(args)
|
|
|
|
def getUpdateTimerPeriod(self):
|
|
args = (self.CLASS["GET"], self.CMD["UPDATE_TIMER_PERIOD"])
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setTrigSelect(self, trigType):
|
|
args = (self.CLASS["SET"], self.CMD["UPDATE_TRIG_SELECT"], [trigType])
|
|
self.send(args)
|
|
|
|
def getTrigSelect(self):
|
|
args = (self.CLASS["GET"], self.CMD["UPDATE_TRIG_SELECT"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setStart(self, enable):
|
|
args = (self.CLASS["SET"], self.CMD["START"], [enable])
|
|
self.send(args)
|
|
|
|
def getStart(self):
|
|
args = (self.CLASS["GET"], self.CMD["START"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
# val = 0 -> use internal pulse window
|
|
# val = 1 -> use external pulse window
|
|
def setCounterMux(self, ch, val):
|
|
valList = self.getCounterMuxAll()
|
|
valList[ch] = val
|
|
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], valList)
|
|
self.send(args)
|
|
|
|
def setCounterMuxAll(self, val: list):
|
|
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], val)
|
|
self.send(args)
|
|
|
|
def getCounterMux(self, ch):
|
|
return self.getCounterMuxAll()[ch]
|
|
|
|
def getCounterMuxAll(self):
|
|
args = (self.CLASS["GET"], self.CMD["COUNTER_MUX"])
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setInputChMux(self, ch, val):
|
|
valList = self.getInputChMuxAll()
|
|
valList[ch] = val
|
|
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], valList)
|
|
self.send(args)
|
|
|
|
def setInputChMuxAll(self, val: list):
|
|
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], val)
|
|
self.send(args)
|
|
|
|
def getInputChMux(self, ch):
|
|
return self.getInputChMuxAll()[ch]
|
|
|
|
def getInputChMuxAll(self):
|
|
args = (self.CLASS["GET"], self.CMD["INPUT_CH_MUX"])
|
|
self.send(args)
|
|
return self._read()
|