diff --git a/.gitignore b/.gitignore index ba0430d..8ae8ce5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -__pycache__/ \ No newline at end of file +__pycache__/ +.idea/ diff --git a/pd.py b/pd.py index bb7b7dd..e8fdddc 100644 --- a/pd.py +++ b/pd.py @@ -22,12 +22,26 @@ import sigrokdecode as srd from functools import reduce +from enum import Enum +from dshot.protoDshot import DshotCmd, DshotTelem, BitDshot, DshotSettings, Bit_DshotTelem class SamplerateError(Exception): pass +class State(Enum): + RESET = 0 + CMD = 1 + TELEM = 2 + + +class State_Dshot(Enum): + RESET = 0 + START = 1 + RECV = 2 + + class Decoder(srd.Decoder): api_version = 3 id = 'dshot' @@ -44,8 +58,9 @@ class Decoder(srd.Decoder): options = ( {'id': 'dshot_rate', 'desc': 'DShot Rate', 'default': '150','values': ('150', '300','600','1200')}, - { 'id': 'bidir', 'desc': 'Bidirectional DShot','default': 'False', 'values': ('True', 'False')}, + { 'id': 'bidir', 'desc': 'Bidirectional DShot','default': 'True', 'values': ('True', 'False')}, { 'id': 'log', 'desc': 'Write log file','default': 'no', 'values': ('yes', 'no')}, + {'id': 'edt_force', 'desc': 'Force EDT as telem type', 'default': 'no', 'values': ('True', 'False')}, ) annotations = ( ('bit', 'Bit'), @@ -53,149 +68,202 @@ class Decoder(srd.Decoder): ('throttle', 'Throttle'), ('checksum', 'CRC'), ('errors', 'Errors'), + ('telem_bit', 'Telem Bit'), + ('telem_erpm', 'Telem ERPM'), + ('telem_edt', 'Telem EDT'), + ('telem_errors', 'Telem Errors'), + ('telem_error2', 'Telem Errors2'), ) annotation_rows = ( ('bits', 'Bits', (0,)), ('dshot_data', 'DShot Data', (1,2,3)), ('dshot_errors', 'Dshot Errors', (4,)), + ('telem_bits', 'Telem Bits', (5,)), + ('dshot_telem_erpm', 'Dshot Telem ERPM', (6,)), + ('dshot_telem_edt', 'Dshot Telem', (7,)), + ('dshot_telem_errors', 'Dshot Errors', (8,)), + ('dshot_telem_errors2', 'Dshot Errors', (9,)), ) - dshot_period_lookup = {'150': 6.67e-6, '300': 3.33e-6,'600':1.67e-6,'1200':0.83e-6} + #dshot_period_lookup = {'150': 6.67e-6, '300': 3.33e-6,'600':1.67e-6,'1200':0.83e-6} + def __init__(self): self.reset() def reset(self): + self.state = State.CMD + self.state_telem = State_Dshot.START + self.state_dshot = State_Dshot.START + self.samplerate = None - # self.oldpin = None - # self.ss_packet = None - # self.ss = None - # self.es = None - # self.bits = [] self.inreset = False - self.bidirectional = False - self.dshot_period = 3.33e-6 - self.actual_period = None - self.halfbitwidth = None self.currbit_ss = None self.currbit_es = None - self.samples_toreset = None - self.samples_pp = None - + + self.dshot_cfg = DshotSettings() + + self.debug = False def start(self): - self.bidirectional = True if self.options['bidir'] == 'True' else False - self.dshot_period = self.dshot_period_lookup[self.options['dshot_rate']] - self.samples_pp = int(self.samplerate*self.dshot_period) - self.samples_toreset = self.samples_pp*3 - # self.halfbitwidth = int((self.samplerate / self.dshot_period) / 2.0) - #print("start period",self.dshot_period) + self.dshot_cfg.bidirectional = True if self.options['bidir'] == 'True' else False + self.dshot_cfg.edt_force = True if self.options['edt_force'] == 'True' else False + self.dshot_cfg.dshot_kbaud = int(self.options['dshot_rate'])*1000 + self.dshot_cfg.samplerate = self.samplerate + self.dshot_cfg.update() + self.out_ann = self.register(srd.OUTPUT_ANN) def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value - def handle_bits(self, results): - #ss, es, bit - #print(results) - bits = [result[2] for result in results] - # print(bits) - - - if len(bits) == 16: - dshot_value = int(reduce(lambda a, b: (a << 1) | b, bits[:11])) - telem_request = bits[11] - received_crc = int(reduce(lambda a, b: (a << 1) | b, bits[12:])) - - value_tocrc = int(reduce(lambda a, b: (a << 1) | b, bits[:12])) - - if self.bidirectional: - calculated_crc = int((~(value_tocrc ^ (value_tocrc >> 4) ^ (value_tocrc >> 8)))&0x0F) - else: - calculated_crc = int(((value_tocrc ^ (value_tocrc >> 4) ^ (value_tocrc >> 8)))&0x0F) + def display_bit(self, bitseq, annot): + self.put(bitseq.ss, bitseq.es, self.out_ann, + [annot, ['%d' % bool(bitseq)]]) - if received_crc == calculated_crc: - crc_ok = True - else: - crc_ok = False + def display_dshot(self,dshot): + crc_startsample = dshot.results[12].ss - # TODO: Align this correctly - crc_startsample = results[12][0] - - # Split annotation based on value type - if dshot_value < 48: - # Command - self.put(results[0][0], crc_startsample, self.out_ann, - [1, ['%04d' % dshot_value]]) - else: - # Throttle - self.put(results[0][0], crc_startsample, self.out_ann, - [2, ['%04d' % dshot_value]]) - - self.put(crc_startsample, results[15][1], self.out_ann, [3, ['Calc CRC: '+('%04d' % calculated_crc)+' TXed CRC:'+('%04d' % received_crc)]]) - if not crc_ok: - self.put(crc_startsample, results[15][1], self.out_ann, - [4, ['CRC INVALID']]) - - - self.bits = [] - self.ss_packet = None + # Split annotation based on value type + if dshot.dshot_value < 48: + # Command + self.put(dshot.results[0].ss, crc_startsample, self.out_ann, + [1, ['%04d' % dshot.dshot_value]]) else: - return - # self.put(results[0][0], results[-1::1][1], self.out_ann, - # [1, ['ERROR: INVALID PACKET LENGTH', 'ERR', 'E']]) + # Throttle + self.put(dshot.results[0].ss, crc_startsample, self.out_ann, + [2, ['%04d' % dshot.dshot_value]]) + + self.put(crc_startsample, dshot.results[15].es, self.out_ann, + [3, ['Calc CRC: ' + ('%04d' % dshot.crc_calc) + ' TXed CRC:' + ('%04d' % dshot.crc_recv)]]) + if not dshot.crc_ok: + self.put(crc_startsample, dshot.results[15].es, self.out_ann, + [4, ['CRC INVALID']]) + def display_telem(self, telem): + crc_startsample = telem.results[12].ss + self.put(crc_startsample, telem.results[15].es, self.out_ann, + [3, ['Calc CRC: ' + ('%04d' % telem.crc_calc) + ' TXed CRC:' + ('%04d' % telem.crc_recv)]]) + if not telem.crc_ok: + self.put(crc_startsample, telem.results[15].es, self.out_ann, + [4, ['CRC INVALID']]) - def handle_bit(self, ss, es, nb_ss): - period = nb_ss - ss - duty = es - ss - # Ideal duty for T0H: 33%, T1H: 66%. - bit_ = (duty / period) > 0.5 - self.put(ss, nb_ss, self.out_ann, - [0, ['%d' % bit_]]) - return [ss,nb_ss,bit_] - def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') - - results = [] + + dshot_value = DshotCmd(self.dshot_cfg) + telem_value = DshotTelem(self.dshot_cfg) + + #bitseq = BitDshot() while True: - if not self.bidirectional: - pins = self.wait([{0: 'r'},{0: 'f'},{'skip':self.samples_toreset}]) - else: - pins = self.wait([{0: 'f'},{0: 'r'},{'skip':self.samples_toreset}]) - - if self.currbit_ss and self.currbit_es and self.matched[2]: - # Assume end of packet if have seen start and end of a potential bit but no further change within 3 periods - # TODO: Confirm wait period this works with spec - results += [self.handle_bit(self.currbit_ss,self.currbit_es,(self.currbit_ss+self.samples_pp))] - self.currbit_ss = None - self.currbit_es = None - - # Pass results to decoder - self.handle_bits(results) - results = [] - - - if self.matched[0] and not self.currbit_ss and not self.currbit_es: - # Start of bit - self.currbit_ss = self.samplenum - elif self.matched[1] and self.currbit_ss and not self.currbit_es: - # End of bit - self.currbit_es = self.samplenum - elif self.matched[0] and self.currbit_es and self.currbit_ss: - # Have complete bit, can handle bit now - result = [self.handle_bit(self.currbit_ss,self.currbit_es,self.samplenum)] - # print(result) - results += result - self.currbit_ss = self.samplenum - self.currbit_es = None + + match self.state: + case State.CMD: + match self.state_dshot: + case State_Dshot.RESET: + dshot_value = DshotCmd(self.dshot_cfg) + self.state_dshot = State_Dshot.START + + case State_Dshot.START: + if not self.dshot_cfg.bidirectional: + pins = self.wait([{0: 'r'}, {0: 'f'}, {'skip': self.dshot_cfg.samples_after_motorcmd}]) + else: + pins = self.wait([{0: 'f'}, {0: 'r'}, {'skip': self.dshot_cfg.samples_after_motorcmd}]) + #TODO: Increase skip to maximum time for effiency + #TODO: Mark any changes in this time as errors? Option to reduce load? + + if self.currbit_ss and self.currbit_es and self.matched[2]: + # Assume end of packet if have seen start and end of a potential bit but no further change within 3 periods + # TODO: Confirm wait period this works with spec + + args = self.currbit_ss, self.currbit_es, (self.currbit_ss + self.dshot_cfg.samples_pp) + curr_bit = BitDshot(*args) + dshot_value.add_bit(curr_bit) + self.display_bit(curr_bit,0) + self.currbit_ss = None + self.currbit_es = None + #print(results) + # Pass results to decoder + + result = dshot_value.handle_bits_dshot() + if result: + self.display_dshot(dshot_value) + self.state_dshot = State_Dshot.RESET + if result and self.dshot_cfg.bidirectional: + self.state = State.TELEM + + + if self.matched[0] and not self.currbit_ss and not self.currbit_es: + # Start of bit + self.currbit_ss = self.samplenum + elif self.matched[1] and self.currbit_ss and not self.currbit_es: + # End of bit + self.currbit_es = self.samplenum + elif self.matched[0] and self.currbit_es and self.currbit_ss: + # Have complete bit, can handle bit now + args = self.currbit_ss, self.currbit_es, self.samplenum + curr_bit = BitDshot(*args) + dshot_value.add_bit(curr_bit) + self.display_bit(curr_bit,0) + + self.currbit_ss = self.samplenum + self.currbit_es = None + case State.TELEM: + match self.state_telem: + case State_Dshot.RESET: + telem_value = DshotTelem(self.dshot_cfg) + self.state_telem = State_Dshot.START + + case State_Dshot.START: + # First wait for falling edge (idle high) + pins = self.wait([{0: 'f'}]) + # Save start pulse + tlm_start = self.samplenum + # Switch to receiving state + self.state_telem = State_Dshot.RECV + # TODO: Check if still low after 1/8 bitlength for error det? + case State_Dshot.RECV: + # First conditions skips half bit width and matches low + # Second condition skips half bit width and matches high + pins = self.wait([{0: 'l', 'skip': self.dshot_cfg.telem_baudrate_midpoint}, + {0: 'h', 'skip': self.dshot_cfg.telem_baudrate_midpoint}]) + + # Append next bit + args = (self.samplenum - self.dshot_cfg.telem_baudrate_midpoint), self.samplenum, (self.samplenum + self.dshot_cfg.telem_baudrate_midpoint) + curr_bit = Bit_DshotTelem(*args,self.matched) + self.display_bit(curr_bit,5) + telem_value.add_bit(curr_bit) + + + # Skip half bitwidth to end of bit + pins = self.wait([{'skip': self.dshot_cfg.telem_baudrate_midpoint}]) + + if telem_value.bits.bit_length() >= 20-1: + telem_value.process_telem() + self.display_telem(telem_value) + # Reset + self.state_telem = State_Dshot.RESET + # Except Dshot packet next + self.state = State.CMD + + + # If not mark as error + + # Then skip x samples and sample + # Repeat for 21 bits (TBC) + + + #TODO: What happens if it gets stuck in the wrong state? + + + + diff --git a/protoDshot/__init__.py b/protoDshot/__init__.py new file mode 100644 index 0000000..d445e06 --- /dev/null +++ b/protoDshot/__init__.py @@ -0,0 +1,2 @@ +from .dshot_bits import * +from .protocols_motor import * diff --git a/protoDshot/dshot_bits.py b/protoDshot/dshot_bits.py new file mode 100644 index 0000000..f1bee5d --- /dev/null +++ b/protoDshot/dshot_bits.py @@ -0,0 +1,64 @@ +class Sequence(): + def __init__(self): + self.ss = None + self.ts = None + self.es = None + + def samples(self): + return self.es-self.ss + +class BitDshot(Sequence): + def __init__(self, ss, ts, es): + super().__init__() + self.period = None + self.duty = None + self.bit_ = None + self.ss, self.ts, self.es = ss, ts, es + self.process_bit() + return + def getBit(self): + if self.bit_ is None: + raise ValueError + return self.bit_ + def process_bit(self): + + self.period = self.es - self.ss + self.duty = self.ts - self.ss + # Ideal duty for T0H: 33%, T1H: 66%. + self.bit_ = (self.duty / self.period) > 0.5 + # TODO: Add tolerance + return self.getBit() + + def __bool__(self): + return bool(self.getBit()) +class Bit_DshotTelem(Sequence): + def __init__(self, ss, ts, es, matched): + super().__init__() + self.period = None + self.duty = None + self.bit_ = None + + # Only start sample on first bit is truly known, all other ss/es are guessed based on midpoint + self.ss, self.ts, self.es = ss, ts, es + self.process_bit(matched) + + def getBit(self): + if self.bit_ is None: + raise ValueError + return self.bit_ + def process_bit(self,matched): + # Low/High @ given sample + if matched == (True, False): + # 0 value + self.bit_ = 0 + + # High + if matched == (False, True): + # 1 value + self.bit_ = 1 + return self.getBit() + + def __bool__(self): + return bool(self.getBit()) + + diff --git a/protoDshot/protocols_motor.py b/protoDshot/protocols_motor.py new file mode 100644 index 0000000..d2ff316 --- /dev/null +++ b/protoDshot/protocols_motor.py @@ -0,0 +1,192 @@ +from functools import reduce +from enum import Enum + +gcr_tables = { + "0b11001": 0x0, + "0b11011": 0x1, + "0b10010": 0x2, + "0b10011": 0x3, + "0b11101": 0x4, + "0b10101": 0x5, + "0b10110": 0x6, + "0b10111": 0x7, + "0b11010": 0x8, + "0b1001": 0x9, + "0b1010": 0xa, + "0b1011": 0xb, + "0b11110": 0xc, + "0b1101": 0xd, + "0b1110": 0xe, + "0b1111": 0xf +} + + + +class DshotSettings(): + def __init__(self): + self.samplerate = 0 + #print(self.samplerate) + self.bidirectional = False + self.dshot_kbaud = 300e3 + self.dshot_period = None + self.samples_after_motorcmd = None + self.samples_after_telempkt = None + self.samples_pp = None + self.telem_baudrate_midpoint = 0 + self.telem_start = None + self.edt_force = False + self.update() + return + + def update(self): + self.dshot_period = 1 / self.dshot_kbaud + self.samples_pp = int(self.samplerate * self.dshot_period) + self.samples_after_motorcmd = self.samples_pp * 3 + self.samples_after_telempkt = self.samples_pp * 3 + self.telem_baudrate_midpoint = int((self.samplerate / (self.dshot_kbaud * (5 / 4))) / 2.0) + +class DshotCommon(): + def __init__(self,settings_Dshot=DshotSettings()): + self.cfg = settings_Dshot + self.crc_recv = None + self.crc_calc = None + self.crc_ok = False + self.bits = 0 + self.results = [] + + def checkCRC(self,data,crc_recv): + self.crc_recv = crc_recv + if self.cfg.bidirectional: + # TODO: Move CRC out? + self.crc_calc = int((~(data ^ (data >> 4) ^ (data >> 8))) & 0x0F) + else: + self.crc_calc = int(((data ^ (data >> 4) ^ (data >> 8))) & 0x0F) + + if not (self.crc_recv == self.crc_calc): + self.crc_ok = False + return False + self.crc_ok = True + return True + + def add_bit(self, seq): + self.results += [seq] + self.bits = self.bits << 1 + self.bits = self.bits | seq.bit_ + +class DshotCmd(DshotCommon): + def __init__(self,*args): + super().__init__(*args) + + self.dshot_value = None + self.telem_request = None + return + def handle_bits_dshot(self): + # ss, es, bit + if len(self.results) != 16: + return False + # Get bits only + bits = [bool(result) for result in self.results] + # Convert to binary from list + bits = reduce(lambda a, b: (a << 1) | b, bits) + # Seperate CRC + crc_recv = bits & 0xF + bits = bits >> 4 + # Remainder is data + data = bits + # Telem request + self.telem_request = bits & 0x1 + # Rest is dshot value + bits = bits >> 1 + self.dshot_value = bits + + if not self.checkCRC(data,crc_recv): + return False + + + return True + # TODO: Align this correctly + + +class BitException(ValueError): + def __init__(self, arg1, arg2): + super().__init__(arg1) + print("Second argument is " + arg2) + + +class DshotTelem(DshotCommon): + def __init__(self,*args): + super().__init__(*args) + + self.dshot_value = None + self.telem_request = None + self.xor = 0b0 + return + + + + def bits_xor_next(self,bits): + return bits ^ (bits >> 1) + + def bits_gcr(self,bits): + try: + return gcr_tables[bin(bits)] + except: + raise + + def process_telem_erpm(self): + # Raw packet + #self.put(start, end, self.out_ann, [6, ['%23s' % bin(packet)]]) + # XOR with next? + bits = self.bits + bits &= 0x0FFFFF + bits = self.bits_xor_next(bits) + #self.put(start,end, self.out_ann, [7, ['%23s' % bin(packet)]]) + # Undo GCR + output = 0b0 + + nibbles = 4 + bitmask = 0b11111 << ((nibbles - 1) * 5) + + for n in range(nibbles): + try: + gcr_n = bitmask & bits + key = gcr_n >> (nibbles - (n + 1)) * 5 + ungcr = self.bits_gcr(key) + output = (output << 4) | ungcr + bitmask = (bitmask >> 5) + except: + raise BitException(bin(key),bin(gcr_n)) + + # Compare CRC + crc_recv = output & 0xF + data = (output >> 4) & 0xFFF + + if not self.checkCRC(data,crc_recv): + raise ValueError + return True + # self.put(end - ((self.telem_baudrate_midpoint * 2) * 4), + # end, self.out_ann, + # [7, ['%23s' % ("RX CRC: " + hex(crc_received) + " Calc CRC: " + hex(crc_calc))]]) + # if crc_calc != crc_received: + # self.put(end - ((self.telem_baudrate_midpoint * 2) * 4), + # end, self.out_ann, + # [8, ['%23s' % ("CRC ERROR!")]]) + # The upper 12 bit contain the eperiod (1/erps) in the following bitwise encoding: + # + # e e e m m m m m m m m m + # + # The 9 bit value M needs to shifted left E times to get the period in micro seconds. + # This gives a range of 1 us to 65408 us. Which translates to a min e-frequency of 15.29 hz or for 14 pole motors 3.82 hz. + return + + def process_telem_edt(self): + + return + + def process_telem(self): + if self.cfg.edt_force: + self.process_telem_edt() + else: + self.process_telem_erpm() + + return \ No newline at end of file diff --git a/protoDshot/test/__init__.py b/protoDshot/test/__init__.py new file mode 100644 index 0000000..d34abe9 --- /dev/null +++ b/protoDshot/test/__init__.py @@ -0,0 +1,2 @@ +from protoDshot.dshot_bits import * +from protoDshot.protocols_motor import * diff --git a/protoDshot/test/test.py b/protoDshot/test/test.py new file mode 100644 index 0000000..ca58462 --- /dev/null +++ b/protoDshot/test/test.py @@ -0,0 +1,51 @@ +import unittest +from protoDshot.dshot_bits import * +from protoDshot.protocols_motor import * + + +class MyTestCase(unittest.TestCase): + def setUp(self): + cfg = DshotSettings() + cfg.bidirectional = True + cfg.edt_force = False + cfg.dshot_kbaud = 300 * 1000 + cfg.samplerate = 4e6 # 4mhz + cfg.update() + + self.telem_value = DshotTelem(cfg) + + def test_xor(self): + self.assertEqual(self.telem_value.bits_xor_next(0b1110100100110001001), 0b1001110110101001101, "xor with next") + self.assertNotEqual(self.telem_value.bits_xor_next(0b11101001001100010010), 0b1001110110101001101, "xor with next") + + def test_gcr(self): + self.assertEqual(self.telem_value.bits_gcr(0b10110),0x6) + with self.assertRaises(KeyError): + self.telem_value.bits_gcr(0b110) + + def test_eprm_long(self): + self.telem_value.bits = 0b11101001001100010010 + with self.assertRaises(BitException): + self.telem_value.process_telem_erpm() + def test_erpm_good(self): + self.telem_value.bits = 0b1110100100110001001 + self.assertTrue(self.telem_value.process_telem_erpm()) + + def test_eprm_zero_packet(self): + # Should return a warning in UI, raise ValueError? + self.telem_value.bits = 0b1010010100101010001 + self.assertTrue(self.telem_value.process_telem_erpm()) + + + + + + + # self.telem_value.bits = 0b1110100100110001000 + # telem_value.process_telem() + # self.assertNotEqual(telem_value.xor, 0b1001110110101001101) + + + +if __name__ == '__main__': + unittest.main()