You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					95 lines
				
				3.3 KiB
			
		
		
			
		
	
	
					95 lines
				
				3.3 KiB
			| 
								 
											4 years ago
										 
									 | 
							
								import select
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								from serial import Serial
							 | 
						||
| 
								 | 
							
								from crcmod import mkCrcFun
							 | 
						||
| 
								 | 
							
								from struct import pack, unpack_from, calcsize
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class ModemDiag:
							 | 
						||
| 
								 | 
							
								  def __init__(self):
							 | 
						||
| 
								 | 
							
								    self.serial = self.open_serial()
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    self.pend = b''
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								
							 | 
						||
| 
								 | 
							
								  def open_serial(self):
							 | 
						||
| 
								 
											3 years ago
										 
									 | 
							
								    serial = Serial("/dev/ttyUSB0", baudrate=115200, rtscts=True, dsrdtr=True, timeout=0, exclusive=True)
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    serial.flush()
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    serial.reset_input_buffer()
							 | 
						||
| 
								 | 
							
								    serial.reset_output_buffer()
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    return serial
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  ccitt_crc16 = mkCrcFun(0x11021, initCrc=0, xorOut=0xffff)
							 | 
						||
| 
								 | 
							
								  ESCAPE_CHAR = b'\x7d'
							 | 
						||
| 
								 | 
							
								  TRAILER_CHAR = b'\x7e'
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def hdlc_encapsulate(self, payload):
							 | 
						||
| 
								 | 
							
								    payload += pack('<H', ModemDiag.ccitt_crc16(payload))
							 | 
						||
| 
								 | 
							
								    payload = payload.replace(self.ESCAPE_CHAR, bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]))
							 | 
						||
| 
								 | 
							
								    payload = payload.replace(self.TRAILER_CHAR, bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]))
							 | 
						||
| 
								 | 
							
								    payload += self.TRAILER_CHAR
							 | 
						||
| 
								 | 
							
								    return payload
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def hdlc_decapsulate(self, payload):
							 | 
						||
| 
								 | 
							
								    assert len(payload) >= 3
							 | 
						||
| 
								 | 
							
								    assert payload[-1:] == self.TRAILER_CHAR
							 | 
						||
| 
								 | 
							
								    payload = payload[:-1]
							 | 
						||
| 
								 | 
							
								    payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]), self.TRAILER_CHAR)
							 | 
						||
| 
								 | 
							
								    payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]), self.ESCAPE_CHAR)
							 | 
						||
| 
								 | 
							
								    assert payload[-2:] == pack('<H', ModemDiag.ccitt_crc16(payload[:-2]))
							 | 
						||
| 
								 | 
							
								    return payload[:-2]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def recv(self):
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    # self.serial.read_until makes tons of syscalls!
							 | 
						||
| 
								 | 
							
								    raw_payload = [self.pend]
							 | 
						||
| 
								 | 
							
								    while self.TRAILER_CHAR not in raw_payload[-1]:
							 | 
						||
| 
								 | 
							
								      select.select([self.serial.fd], [], [])
							 | 
						||
| 
								 | 
							
								      raw = self.serial.read(0x10000)
							 | 
						||
| 
								 | 
							
								      raw_payload.append(raw)
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    raw_payload = b''.join(raw_payload)
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    raw_payload, self.pend = raw_payload.split(self.TRAILER_CHAR, 1)
							 | 
						||
| 
								 | 
							
								    raw_payload += self.TRAILER_CHAR
							 | 
						||
| 
								 
											4 years ago
										 
									 | 
							
								    unframed_message = self.hdlc_decapsulate(raw_payload)
							 | 
						||
| 
								 | 
							
								    return unframed_message[0], unframed_message[1:]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def send(self, packet_type, packet_payload):
							 | 
						||
| 
								 | 
							
								    self.serial.write(self.hdlc_encapsulate(bytes([packet_type]) + packet_payload))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# *** end class ***
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								DIAG_LOG_F = 16
							 | 
						||
| 
								 | 
							
								DIAG_LOG_CONFIG_F = 115
							 | 
						||
| 
								 | 
							
								LOG_CONFIG_RETRIEVE_ID_RANGES_OP = 1
							 | 
						||
| 
								 | 
							
								LOG_CONFIG_SET_MASK_OP = 3
							 | 
						||
| 
								 | 
							
								LOG_CONFIG_SUCCESS_S = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def send_recv(diag, packet_type, packet_payload):
							 | 
						||
| 
								 | 
							
								  diag.send(packet_type, packet_payload)
							 | 
						||
| 
								 | 
							
								  while 1:
							 | 
						||
| 
								 | 
							
								    opcode, payload = diag.recv()
							 | 
						||
| 
								 | 
							
								    if opcode != DIAG_LOG_F:
							 | 
						||
| 
								 | 
							
								      break
							 | 
						||
| 
								 | 
							
								  return opcode, payload
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def setup_logs(diag, types_to_log):
							 | 
						||
| 
								 | 
							
								  opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xI', LOG_CONFIG_RETRIEVE_ID_RANGES_OP))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  header_spec = '<3xII'
							 | 
						||
| 
								 | 
							
								  operation, status = unpack_from(header_spec, payload)
							 | 
						||
| 
								 | 
							
								  assert operation == LOG_CONFIG_RETRIEVE_ID_RANGES_OP
							 | 
						||
| 
								 | 
							
								  assert status == LOG_CONFIG_SUCCESS_S
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  log_masks = unpack_from('<16I', payload, calcsize(header_spec))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  for log_type, log_mask_bitsize in enumerate(log_masks):
							 | 
						||
| 
								 | 
							
								    if log_mask_bitsize:
							 | 
						||
| 
								 | 
							
								      log_mask = [0] * ((log_mask_bitsize+7)//8)
							 | 
						||
| 
								 | 
							
								      for i in range(log_mask_bitsize):
							 | 
						||
| 
								 | 
							
								        if ((log_type<<12)|i) in types_to_log:
							 | 
						||
| 
								 | 
							
								          log_mask[i//8] |= 1 << (i%8)
							 | 
						||
| 
								 | 
							
								      opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xIII',
							 | 
						||
| 
								 | 
							
								          LOG_CONFIG_SET_MASK_OP,
							 | 
						||
| 
								 | 
							
								          log_type,
							 | 
						||
| 
								 | 
							
								          log_mask_bitsize
							 | 
						||
| 
								 | 
							
								      ) + bytes(log_mask))
							 | 
						||
| 
								 | 
							
								      assert opcode == DIAG_LOG_CONFIG_F
							 | 
						||
| 
								 | 
							
								      operation, status = unpack_from(header_spec, payload)
							 | 
						||
| 
								 | 
							
								      assert operation == LOG_CONFIG_SET_MASK_OP
							 | 
						||
| 
								 | 
							
								      assert status == LOG_CONFIG_SUCCESS_S
							 |