Mbed Host Tests
conn_primitive_serial.py
Go to the documentation of this file.
1#!/usr/bin/env python
2"""
3mbed SDK
4Copyright (c) 2011-2016 ARM Limited
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17"""
18
19
20import time
21from serial import Serial, SerialException
22from mbed_host_tests import host_tests_plugins
23from mbed_host_tests.host_tests_plugins.host_test_plugins import HostTestPluginBase
24from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException
25
26
27class SerialConnectorPrimitive(ConnectorPrimitive):
28 def __init__(self, name, port, baudrate, config):
29 ConnectorPrimitive.__init__(self, name)
30 self.port = port
31 self.baudrate = int(baudrate)
32 self.read_timeout = 0.01 # 10 milli sec
34 self.config = config
35 self.target_id = self.config.get('target_id', None)
36 self.polling_timeoutpolling_timeout = config.get('polling_timeout', 60)
37 self.forced_reset_timeout = config.get('forced_reset_timeout', 1)
38 self.skip_reset = config.get('skip_reset', False)
39 self.serial = None
40
41 # Check if serial port for given target_id changed
42 # If it does we will use new port to open connections and make sure reset plugin
43 # later can reuse opened already serial port
44 #
45 # Note: This listener opens serial port and keeps connection so reset plugin uses
46 # serial port object not serial port name!
47 serial_port = HostTestPluginBase().check_serial_port_ready(self.port, target_id=self.target_id, timeout=self.polling_timeoutpolling_timeout)
48 if serial_port is None:
49 raise ConnectorPrimitiveException("Serial port not ready!")
50
51 if serial_port != self.port:
52 # Serial port changed for given targetID
53 self.logger.prn_inf("serial port changed from '%s to '%s')"% (self.port, serial_port))
54 self.port = serial_port
55
56 startTime = time.time()
57 self.logger.prn_inf("serial(port=%s, baudrate=%d, read_timeout=%s, write_timeout=%d)"% (self.port, self.baudrate, self.read_timeout, self.write_timeout))
58 while time.time() - startTime < self.polling_timeoutpolling_timeout:
59 try:
60 # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read
61 # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is
62 # inserted to let serial buffer collect data and avoid spinning on non blocking read().
63 self.serial = Serial(self.port, baudrate=self.baudrate, timeout=0, write_timeout=self.write_timeout)
64 except SerialException as e:
65 self.serial = None
66 self.LAST_ERRORLAST_ERROR = "connection lost, serial.Serial(%s, %d, %d, %d): %s"% (self.port,
67 self.baudrate,
68 self.read_timeout,
69 self.write_timeout,
70 str(e))
71 self.logger.prn_err(str(e))
72 self.logger.prn_err("Retry after 1 sec until %s seconds" % self.polling_timeoutpolling_timeout)
73 else:
74 if not self.skip_reset:
76 break
77 time.sleep(1)
78
79 def reset_dev_via_serial(self, delay=1):
80 """! Reset device using selected method, calls one of the reset plugins """
81 reset_type = self.config.get('reset_type', 'default')
82 if not reset_type:
83 reset_type = 'default'
84 disk = self.config.get('disk', None)
85
86 self.logger.prn_inf("reset device using '%s' plugin..."% reset_type)
87 result = host_tests_plugins.call_plugin('ResetMethod',
88 reset_type,
89 serial=self.serial,
90 disk=disk,
91 target_id=self.target_id,
92 polling_timeout=self.config.get('polling_timeout'))
93 # Post-reset sleep
94 if delay:
95 self.logger.prn_inf("waiting %.2f sec after reset"% delay)
96 time.sleep(delay)
97 self.logger.prn_inf("wait for it...")
98 return result
99
100 def read(self, count):
101 """! Read data from serial port RX buffer """
102 # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See
103 # comment on serial.Serial() call above about timeout.
104 time.sleep(self.read_timeout)
105 c = str()
106 try:
107 if self.serial:
108 c = self.serial.read(count)
109 except SerialException as e:
110 self.serial = None
111 self.LAST_ERRORLAST_ERROR = "connection lost, serial.read(%d): %s"% (count, str(e))
112 self.logger.prn_err(str(e))
113 return c
114
115 def write(self, payload, log=False):
116 """! Write data to serial port TX buffer """
117 try:
118 if self.serial:
119 self.serial.write(payload.encode('utf-8'))
120 if log:
121 self.logger.prn_txd(payload)
122 return True
123 except SerialException as e:
124 self.serial = None
125 self.LAST_ERRORLAST_ERROR = "connection lost, serial.write(%d bytes): %s"% (len(payload), str(e))
126 self.logger.prn_err(str(e))
127 return False
128
129 def flush(self):
130 if self.serial:
131 self.serial.flush()
132
133 def connected(self):
134 return bool(self.serial)
135
136 def finish(self):
137 if self.serial:
138 self.serial.close()
139
140 def reset(self):
142
143 def __del__(self):
144 self.finishfinish()
def finish(self)
Handle DUT dtor like (close resource) operations here.
def reset_dev_via_serial(self, delay=1)
Reset device using selected method, calls one of the reset plugins.
def finish(self)
Handle DUT dtor like (close resource) operations here.
def write(self, payload, log=False)
Write data to serial port TX buffer.