You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							94 lines
						
					
					
						
							3.3 KiB
						
					
					
				
			
		
		
	
	
							94 lines
						
					
					
						
							3.3 KiB
						
					
					
				| import select
 | |
| from serial import Serial
 | |
| from crcmod import mkCrcFun
 | |
| from struct import pack, unpack_from, calcsize
 | |
| 
 | |
| class ModemDiag:
 | |
|   def __init__(self):
 | |
|     self.serial = self.open_serial()
 | |
|     self.pend = b''
 | |
| 
 | |
|   def open_serial(self):
 | |
|     serial = Serial("/dev/ttyUSB0", baudrate=115200, rtscts=True, dsrdtr=True, timeout=0, exclusive=True)
 | |
|     serial.flush()
 | |
|     serial.reset_input_buffer()
 | |
|     serial.reset_output_buffer()
 | |
|     return serial
 | |
| 
 | |
|   ccitt_crc16 = mkCrcFun(0x11021, initCrc=0, xorOut=0xffff)
 | |
|   ESCAPE_CHAR = b'\x7d'
 | |
|   TRAILER_CHAR = b'\x7e'
 | |
| 
 | |
|   def hdlc_encapsulate(self, payload):
 | |
|     payload += pack('<H', ModemDiag.ccitt_crc16(payload))
 | |
|     payload = payload.replace(self.ESCAPE_CHAR, bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]))
 | |
|     payload = payload.replace(self.TRAILER_CHAR, bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]))
 | |
|     payload += self.TRAILER_CHAR
 | |
|     return payload
 | |
| 
 | |
|   def hdlc_decapsulate(self, payload):
 | |
|     assert len(payload) >= 3
 | |
|     assert payload[-1:] == self.TRAILER_CHAR
 | |
|     payload = payload[:-1]
 | |
|     payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.TRAILER_CHAR[0] ^ 0x20]), self.TRAILER_CHAR)
 | |
|     payload = payload.replace(bytes([self.ESCAPE_CHAR[0], self.ESCAPE_CHAR[0] ^ 0x20]), self.ESCAPE_CHAR)
 | |
|     assert payload[-2:] == pack('<H', ModemDiag.ccitt_crc16(payload[:-2]))
 | |
|     return payload[:-2]
 | |
| 
 | |
|   def recv(self):
 | |
|     # self.serial.read_until makes tons of syscalls!
 | |
|     raw_payload = [self.pend]
 | |
|     while self.TRAILER_CHAR not in raw_payload[-1]:
 | |
|       select.select([self.serial.fd], [], [])
 | |
|       raw = self.serial.read(0x10000)
 | |
|       raw_payload.append(raw)
 | |
|     raw_payload = b''.join(raw_payload)
 | |
|     raw_payload, self.pend = raw_payload.split(self.TRAILER_CHAR, 1)
 | |
|     raw_payload += self.TRAILER_CHAR
 | |
|     unframed_message = self.hdlc_decapsulate(raw_payload)
 | |
|     return unframed_message[0], unframed_message[1:]
 | |
| 
 | |
|   def send(self, packet_type, packet_payload):
 | |
|     self.serial.write(self.hdlc_encapsulate(bytes([packet_type]) + packet_payload))
 | |
| 
 | |
| # *** end class ***
 | |
| 
 | |
| DIAG_LOG_F = 16
 | |
| DIAG_LOG_CONFIG_F = 115
 | |
| LOG_CONFIG_RETRIEVE_ID_RANGES_OP = 1
 | |
| LOG_CONFIG_SET_MASK_OP = 3
 | |
| LOG_CONFIG_SUCCESS_S = 0
 | |
| 
 | |
| def send_recv(diag, packet_type, packet_payload):
 | |
|   diag.send(packet_type, packet_payload)
 | |
|   while 1:
 | |
|     opcode, payload = diag.recv()
 | |
|     if opcode != DIAG_LOG_F:
 | |
|       break
 | |
|   return opcode, payload
 | |
| 
 | |
| def setup_logs(diag, types_to_log):
 | |
|   opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xI', LOG_CONFIG_RETRIEVE_ID_RANGES_OP))
 | |
| 
 | |
|   header_spec = '<3xII'
 | |
|   operation, status = unpack_from(header_spec, payload)
 | |
|   assert operation == LOG_CONFIG_RETRIEVE_ID_RANGES_OP
 | |
|   assert status == LOG_CONFIG_SUCCESS_S
 | |
| 
 | |
|   log_masks = unpack_from('<16I', payload, calcsize(header_spec))
 | |
| 
 | |
|   for log_type, log_mask_bitsize in enumerate(log_masks):
 | |
|     if log_mask_bitsize:
 | |
|       log_mask = [0] * ((log_mask_bitsize+7)//8)
 | |
|       for i in range(log_mask_bitsize):
 | |
|         if ((log_type<<12)|i) in types_to_log:
 | |
|           log_mask[i//8] |= 1 << (i%8)
 | |
|       opcode, payload = send_recv(diag, DIAG_LOG_CONFIG_F, pack('<3xIII',
 | |
|           LOG_CONFIG_SET_MASK_OP,
 | |
|           log_type,
 | |
|           log_mask_bitsize
 | |
|       ) + bytes(log_mask))
 | |
|       assert opcode == DIAG_LOG_CONFIG_F
 | |
|       operation, status = unpack_from(header_spec, payload)
 | |
|       assert operation == LOG_CONFIG_SET_MASK_OP
 | |
|       assert status == LOG_CONFIG_SUCCESS_S
 | |
| 
 |