238 lines
6.9 KiB
Python
238 lines
6.9 KiB
Python
from SDT_Device.Protocol import Protocol
|
|
import threading
|
|
import csv
|
|
|
|
|
|
class PulseGenerator(Protocol):
|
|
MAX_CHANNEL = 12
|
|
|
|
CMD = {
|
|
"SESSION_COMPLETE": 0x00,
|
|
"DEVICE_RESET": 0x10,
|
|
"DEVICE_START": 0x11,
|
|
"SESSION_PERIOD": 0x12,
|
|
"BASE_CHANNEL_ENABLE": 0x20,
|
|
"SESSION_COUNT": 0x30,
|
|
"BASE_CHANNEL_DATA_SIZE": 0x40,
|
|
"BASE_CHANNEL_SESSION_POSITION": 0x50,
|
|
"BASE_CHANNEL_OUTPUT_DELAY": 0x62,
|
|
"BASE_CHANNEL_DATA": 0x70,
|
|
"HEART_BIT": 0xF000,
|
|
}
|
|
|
|
def __init__(self, ip, port):
|
|
super().__init__(ip, port)
|
|
|
|
self.setDeviceId(self.DEVICE_ID["PG"])
|
|
self.setDeviceSerial(0)
|
|
self.heartbitCount = 0
|
|
|
|
self.classGetCallback = self._classGetCallback
|
|
self.classHeartBitCallback = self._heartBitCallback
|
|
self.classDataCallback = self._dataCallback
|
|
|
|
self._getEvent = threading.Event()
|
|
self._dataEvent = threading.Event()
|
|
|
|
self._readBuffer = []
|
|
|
|
def _read(self):
|
|
self._getEvent.wait()
|
|
self._getEvent.clear()
|
|
return self._readBuffer
|
|
|
|
def _classGetCallback(self, command, payload):
|
|
self._readBuffer = payload
|
|
self._getEvent.set()
|
|
|
|
def _heartBitCallback(self):
|
|
self.heartbitCount += 1
|
|
|
|
def _dataCallback(self, command, payload):
|
|
if command == self.CMD["SESSION_COMPLETE"]:
|
|
self._dataEvent.set()
|
|
|
|
def WaitUntilSessionIsCompleted(self):
|
|
self._dataEvent.wait()
|
|
self._dataEvent.clear()
|
|
|
|
# enable = 0에서 reset 적용, 1에서 해제
|
|
def setReset(self, enable):
|
|
args = (self.CLASS["SET"], self.CMD["DEVICE_RESET"], [enable])
|
|
self.send(args)
|
|
|
|
def getReset(self):
|
|
args = (self.CLASS["GET"], self.CMD["DEVICE_RESET"])
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setStart(self, enable):
|
|
args = (self.CLASS["SET"], self.CMD["DEVICE_START"], [enable])
|
|
self.send(args)
|
|
|
|
def getStart(self):
|
|
args = (self.CLASS["GET"], self.CMD["DEVICE_START"])
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setSessionPeriod(self, period, unit="raw"):
|
|
if unit != "raw":
|
|
if unit.lower() == "s":
|
|
period *= int(1e9)
|
|
elif unit.lower() == "ms":
|
|
period *= int(1e6)
|
|
elif unit.lower() == "us":
|
|
period *= int(1e3)
|
|
|
|
if period < 5:
|
|
period = 5
|
|
_round = round(period / 5)
|
|
else:
|
|
_round = period
|
|
|
|
args = (self.CLASS["SET"], self.CMD["SESSION_PERIOD"], [_round])
|
|
self.send(args)
|
|
|
|
def getSessionPeriod(self, unit="raw"):
|
|
args = (self.CLASS["GET"], self.CMD["SESSION_PERIOD"])
|
|
self.send(args)
|
|
|
|
period = self._read()[0]
|
|
|
|
if unit.lower() == "s":
|
|
period /= int(1e9)
|
|
elif unit.lower() == "ms":
|
|
period /= int(1e6)
|
|
elif unit.lower() == "us":
|
|
period /= int(1e3)
|
|
period *= 5
|
|
|
|
return period
|
|
|
|
def setChannelEnable(self, ch, enable):
|
|
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch, [enable])
|
|
self.send(args)
|
|
|
|
def getChannelEnable(self, ch):
|
|
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch)
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setSessionCount(self, count):
|
|
args = (self.CLASS["SET"], self.CMD["SESSION_COUNT"], [count])
|
|
self.send(args)
|
|
|
|
def getSessionCount(self):
|
|
args = (self.CLASS["GET"], self.CMD["SESSION_COUNT"])
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setChannelDataSize(self, ch, size):
|
|
if size >= 2048:
|
|
size = 2048
|
|
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch, [size])
|
|
self.send(args)
|
|
|
|
def getChannelDataSize(self, ch):
|
|
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch)
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setChannelSessionPosition(self, ch, position):
|
|
args = (
|
|
self.CLASS["SET"],
|
|
self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch,
|
|
[position],
|
|
)
|
|
self.send(args)
|
|
|
|
def getChannelSessionPosition(self, ch):
|
|
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch)
|
|
self.send(args)
|
|
return self._read()[0]
|
|
|
|
def setChannelOutputDelay(self, ch, delayTime):
|
|
args = (
|
|
self.CLASS["SET"],
|
|
self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch,
|
|
[delayTime],
|
|
)
|
|
self.send(args)
|
|
|
|
def getChannelOutputDelay(self, ch):
|
|
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch)
|
|
self.send(args)
|
|
return self._read()
|
|
|
|
def setChannelData(self, ch, data, unit="raw"):
|
|
rawData = []
|
|
if len(data) > 0:
|
|
unitContant = 1
|
|
if unit != "raw":
|
|
if unit.lower() == "s":
|
|
unitContant *= int(1e9)
|
|
elif unit.lower() == "ms":
|
|
unitContant *= int(1e6)
|
|
elif unit.lower() == "us":
|
|
unitContant *= int(1e3)
|
|
|
|
for value in data:
|
|
value *= unitContant
|
|
|
|
if unit != "raw":
|
|
if value <= 5:
|
|
value = 5
|
|
_round = round(value / 5) - 1
|
|
else:
|
|
_round = value
|
|
rawData.append(_round)
|
|
|
|
args = (
|
|
self.CLASS["SET"],
|
|
self.CMD["BASE_CHANNEL_DATA"] + ch,
|
|
rawData,
|
|
)
|
|
self.send(args)
|
|
|
|
def getChannelData(self, ch, unit="raw"):
|
|
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA"] + ch)
|
|
self.send(args)
|
|
|
|
data_ns = []
|
|
rawData = self._read()
|
|
|
|
unit_const = 1
|
|
|
|
if unit.lower() == "s":
|
|
unit_const /= int(1e9)
|
|
elif unit.lower() == "ms":
|
|
unit_const /= int(1e6)
|
|
elif unit.lower() == "us":
|
|
unit_const /= int(1e3)
|
|
|
|
unit_const *= 5
|
|
|
|
if len(rawData) > 0:
|
|
for value in rawData:
|
|
if unit.lower() != "raw":
|
|
nano_sec_data = (value + 1) * unit_const
|
|
else:
|
|
nano_sec_data = value
|
|
data_ns.append(round(nano_sec_data, 9))
|
|
|
|
return data_ns
|
|
|
|
def readPulseDataFromCSV(self, ch, path):
|
|
ret = []
|
|
|
|
with open(path, newline="") as csvfile:
|
|
csvreader = csv.reader(csvfile)
|
|
next(csvreader)
|
|
for row in csvreader:
|
|
if row[ch] != "":
|
|
ret.append(int(row[ch]))
|
|
else:
|
|
continue
|
|
|
|
return ret
|