4Copyright (c) 2011-2016 ARM Limited
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12Unless required by applicable law or agreed to
in writing, software
13distributed under the License
is distributed on an
"AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
15See the License
for the specific language governing permissions
and
16limitations under the License.
19from mbed_host_tests import DEFAULT_BAUD_RATE
20from mbed_host_tests.host_tests_conn_proxy.conn_primitive import ConnectorPrimitive
23class RemoteConnectorPrimitive(ConnectorPrimitive):
24 def __init__(self, name, config, importer=__import__):
25 ConnectorPrimitive.__init__(self, name)
29 self.
grm_port = int(config.get(
'grm_port', 8000))
32 self.
baudrate = config.get(
'baudrate', DEFAULT_BAUD_RATE)
36 if self.
config.get(
"tags"):
38 for tag
in config[
"tags"].split(
','):
49 def __remote_init(self, importer):
50 """! Initialize DUT using GRM APIs """
55 except ImportError
as error:
56 self.
logger.prn_err(
"unable to load global resource manager '%s' module!" % self.
grm_module)
57 self.
logger.prn_err(str(error))
61 self.
logger.prn_inf(
"remote resources initialization: remote(host=%s, port=%s)" %
68 resources = self.
client.get_resources()
69 self.
logger.prn_inf(
"remote resources count: %d" % len(resources))
75 except Exception
as error:
76 self.
logger.prn_err(
"can't allocate resource: '%s', reason: %s" % (self.
platform_name, str(error)))
84 except Exception
as error:
85 self.
logger.prn_err(str(error))
90 def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE):
91 """! Open remote connection to DUT """
92 self.
logger.prn_inf(
"opening connection to platform at baudrate='%s'" % baudrate)
94 raise Exception(
"remote resource not exists!")
96 serial_parameters = self.
remote_module.SerialParameters(baudrate=baudrate)
99 self.
logger.prn_inf(
"open_connection() failed")
102 def __remote_disconnect(self):
104 raise Exception(
"remote resource not exists!")
108 except Exception
as error:
109 self.
logger.prn_err(
"RemoteConnectorPrimitive.disconnect() failed, reason: " + str(error))
111 def __remote_reset(self):
112 """! Use GRM remote API to reset DUT """
113 self.
logger.prn_inf(
"remote resources reset...")
115 raise Exception(
"remote resource not exists!")
118 raise Exception(
"remote resources reset failed!")
120 self.
logger.prn_inf(
"reset() failed")
123 def __remote_flashing(self, filename, forceflash=False):
124 """! Use GRM remote API to flash DUT """
125 self.
logger.prn_inf(
"remote resources flashing with '%s'..." % filename)
127 raise Exception(
"remote resource not exists!")
130 raise Exception(
"remote resource flashing failed!")
132 self.
logger.prn_inf(
"flash() failed")
136 """! Read 'count' bytes of data from DUT """
138 raise Exception(
"remote resource not exists!")
142 except Exception
as error:
143 self.
logger.prn_err(
"RemoteConnectorPrimitive.read(%d): %s" % (count, str(error)))
146 def write(self, payload, log=False):
147 """! Write 'payload' to DUT """
152 self.
logger.prn_txd(payload)
154 except Exception
as error:
156 self.
logger.prn_err(str(error))
163 return self.remote_module
and self.selected_resource
and self.selected_resource.is_allocated
168 def __remote_release(self):
173 except Exception
as error:
174 self.
logger.prn_err(
"RemoteConnectorPrimitive.release failed, reason: " + str(error))
def finish(self)
Handle DUT dtor like (close resource) operations here.
def connected(self)
Check if there is a connection to DUT.
def read(self, count)
Read 'count' bytes of data from DUT.
def finish(self)
Handle DUT dtor like (close resource) operations here.
def connected(self)
Check if there is a connection to DUT.
def __remote_init(self, importer)
def reset(self)
Reset the dut.
def __remote_disconnect(self)
def __init__(self, name, config, importer=__import__)
def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE)
def flush(self)
Flush read/write channels of DUT.
def write(self, payload, log=False)
Write 'payload' to DUT.
def __remote_release(self)
def __remote_flashing(self, filename, forceflash=False)