149 lines
4.4 KiB
Python
149 lines
4.4 KiB
Python
|
from SDT_Device.Protocol import Protocol
|
||
|
import threading
|
||
|
import copy
|
||
|
import timeit
|
||
|
|
||
|
|
||
|
class DTS(Protocol):
|
||
|
MAX_CHANNEL = 4
|
||
|
|
||
|
CMD = {
|
||
|
"ADC_CH_DATA_BASE": 0x00,
|
||
|
"SYSTEM_RESET": 0x10,
|
||
|
"DEBUG_DATA": 0x20,
|
||
|
"AVG_ENABLE_DELAY": 0x30,
|
||
|
"AVG_SAMPLE_RANGE": 0x31,
|
||
|
"AVG_STEP": 0x32,
|
||
|
"AVG_START": 0x33,
|
||
|
"AVG_TIMER_TICK": 0x34,
|
||
|
"HEART_BIT": 0xF000,
|
||
|
}
|
||
|
|
||
|
def __init__(self, ip, port):
|
||
|
super().__init__(ip, port)
|
||
|
|
||
|
self.setDeviceId(self.DEVICE_ID["DTS"])
|
||
|
self.setDeviceSerial(0)
|
||
|
self.setMaxLen(4 * 65536 + 20)
|
||
|
self.classGetCallback = self._classGetCallback
|
||
|
self.classDataCallback = self._classDataCallback
|
||
|
self.classHeartBitCallback = self._classHeartBitCallback
|
||
|
|
||
|
self._getEvnet = threading.Event()
|
||
|
self._dataEvnet = threading.Event()
|
||
|
|
||
|
self._readBuffer = []
|
||
|
self.adcDataCh0 = []
|
||
|
self.adcDataCh1 = []
|
||
|
self.adcDataCh2 = []
|
||
|
self.adcDataCh3 = []
|
||
|
|
||
|
self.hbCount = 0
|
||
|
self.dataCount = [0 for _ in range(4)]
|
||
|
|
||
|
def _classHeartBitCallback(self):
|
||
|
self.hbCount += 1
|
||
|
|
||
|
def _classGetCallback(self, command, payload):
|
||
|
self._readBuffer = payload
|
||
|
self._getEvnet.set()
|
||
|
|
||
|
def _read(self):
|
||
|
self._getEvnet.wait()
|
||
|
self._getEvnet.clear()
|
||
|
return self._readBuffer
|
||
|
|
||
|
def _classDataCallback(self, command, payload):
|
||
|
if command == (self.CMD["ADC_CH_DATA_BASE"] + 0):
|
||
|
self.adcDataCh0 = payload
|
||
|
self.dataCount[0] += 1
|
||
|
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 1):
|
||
|
self.adcDataCh1 = payload
|
||
|
self.dataCount[1] += 1
|
||
|
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 2):
|
||
|
self.adcDataCh2 = payload
|
||
|
self.dataCount[2] += 1
|
||
|
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 3):
|
||
|
self.adcDataCh3 = payload
|
||
|
self.dataCount[3] += 1
|
||
|
|
||
|
# self._dataEvnet.set()
|
||
|
|
||
|
def ReadAdcData(self, ch):
|
||
|
# self._waitAdcData()
|
||
|
ret = []
|
||
|
|
||
|
if ch == 0:
|
||
|
ret = self.adcDataCh0
|
||
|
elif ch == 1:
|
||
|
ret = self.adcDataCh1
|
||
|
elif ch == 2:
|
||
|
ret = self.adcDataCh2
|
||
|
elif ch == 3:
|
||
|
ret = self.adcDataCh3
|
||
|
|
||
|
return ret
|
||
|
|
||
|
def _waitAdcData(self):
|
||
|
self._dataEvnet.wait()
|
||
|
self._dataEvnet.clear()
|
||
|
|
||
|
def setSystemReset(self): # 1번
|
||
|
args = (self.CLASS["SET"], self.CMD["SYSTEM_RESET"], [1])
|
||
|
self.send(args)
|
||
|
|
||
|
def getSystemReset(self):
|
||
|
args = (self.CLASS["GET"], self.CMD["SYSTEM_RESET"])
|
||
|
self.send(args)
|
||
|
return self._read()[0]
|
||
|
|
||
|
def getDebugData(self):
|
||
|
args = (self.CLASS["GET"], self.CMD["DEBUG_DATA"])
|
||
|
self.send(args)
|
||
|
return self._read()
|
||
|
|
||
|
def setAvgEnableDelay(self, ch, val): # 3번
|
||
|
args = (self.CLASS["SET"], self.CMD["AVG_ENABLE_DELAY"], [ch, val])
|
||
|
self.send(args)
|
||
|
|
||
|
def getAvgEnableDelay(self, ch):
|
||
|
args = (self.CLASS["GET"], self.CMD["AVG_ENABLE_DELAY"], [ch])
|
||
|
self.send(args)
|
||
|
return self._read()[1]
|
||
|
|
||
|
def setAvgSampleRange(self, ch, val): # 4번
|
||
|
args = (self.CLASS["SET"], self.CMD["AVG_SAMPLE_RANGE"], [ch, val])
|
||
|
self.send(args)
|
||
|
|
||
|
def getAvgSampleRange(self, ch):
|
||
|
args = (self.CLASS["GET"], self.CMD["AVG_SAMPLE_RANGE"], [ch])
|
||
|
self.send(args)
|
||
|
return self._read()[1]
|
||
|
|
||
|
def setAvgStep(self, ch, val): # 5번
|
||
|
args = (self.CLASS["SET"], self.CMD["AVG_STEP"], [ch, val])
|
||
|
self.send(args)
|
||
|
|
||
|
def getAvgStep(self, ch):
|
||
|
args = (self.CLASS["GET"], self.CMD["AVG_STEP"], [ch])
|
||
|
self.send(args)
|
||
|
return self._read()[1]
|
||
|
|
||
|
def setAvgStart(self, ch, val): # 2번 변수 바꿔서 6번
|
||
|
args = (self.CLASS["SET"], self.CMD["AVG_START"], [ch, val])
|
||
|
self.send(args)
|
||
|
|
||
|
def getAvgStart(self, ch):
|
||
|
args = (self.CLASS["GET"], self.CMD["AVG_START"], [ch])
|
||
|
self.send(args)
|
||
|
return self._read()[1]
|
||
|
|
||
|
def setTimerTick(self, ch, val): # 7번
|
||
|
args = (self.CLASS["SET"], self.CMD["AVG_TIMER_TICK"], [ch, val])
|
||
|
self.send(args)
|
||
|
|
||
|
def getTimerTick(self, ch):
|
||
|
args = (self.CLASS["GET"], self.CMD["AVG_TIMER_TICK"], [ch])
|
||
|
self.send(args)
|
||
|
return self._read()[1]
|