From 1fe4406f374291ab2e86e95a97341fd9c475fcb8 Mon Sep 17 00:00:00 2001 From: Jun Wako Date: Fri, 24 Apr 2015 16:26:14 +0900 Subject: Squashed 'tmk_core/' changes from 7967731..b9e0ea0 b9e0ea0 Merge commit '7fa9d8bdea3773d1195b04d98fcf27cf48ddd81d' as 'tool/mbed/mbed-sdk' 7fa9d8b Squashed 'tool/mbed/mbed-sdk/' content from commit 7c21ce5 git-subtree-dir: tmk_core git-subtree-split: b9e0ea08cb940de20b3610ecdda18e9d8cd7c552 --- .../workspace_tools/host_tests/__init__.py | 59 +++ .../workspace_tools/host_tests/default_auto.py | 36 ++ .../workspace_tools/host_tests/detect_auto.py | 55 +++ .../workspace_tools/host_tests/dev_null_auto.py | 50 +++ .../mbed-sdk/workspace_tools/host_tests/echo.py | 59 +++ .../host_tests/echo_flow_control.py | 48 +++ .../host_tests/example/BroadcastReceive.py | 25 ++ .../host_tests/example/BroadcastSend.py | 30 ++ .../host_tests/example/MulticastReceive.py | 31 ++ .../host_tests/example/MulticastSend.py | 30 ++ .../host_tests/example/TCPEchoClient.py | 28 ++ .../host_tests/example/TCPEchoServer.py | 30 ++ .../host_tests/example/UDPEchoClient.py | 28 ++ .../host_tests/example/UDPEchoServer.py | 27 ++ .../workspace_tools/host_tests/example/__init__.py | 16 + .../workspace_tools/host_tests/hello_auto.py | 34 ++ .../workspace_tools/host_tests/host_registry.py | 36 ++ .../workspace_tools/host_tests/host_test.py | 397 +++++++++++++++++++++ .../host_tests/host_tests_plugins/__init__.py | 68 ++++ .../host_tests_plugins/host_test_plugins.py | 118 ++++++ .../host_tests_plugins/host_test_registry.py | 89 +++++ .../host_tests_plugins/module_copy_firefox.py | 76 ++++ .../host_tests_plugins/module_copy_mbed.py | 71 ++++ .../host_tests_plugins/module_copy_mps2.py | 107 ++++++ .../host_tests_plugins/module_copy_shell.py | 64 ++++ .../host_tests_plugins/module_copy_silabs.py | 61 ++++ .../host_tests_plugins/module_reset_mbed.py | 72 ++++ .../host_tests_plugins/module_reset_mps2.py | 74 ++++ .../host_tests_plugins/module_reset_silabs.py | 66 ++++ .../mbed-sdk/workspace_tools/host_tests/mbedrpc.py | 287 +++++++++++++++ .../mbed-sdk/workspace_tools/host_tests/midi.py | 72 ++++ .../workspace_tools/host_tests/net_test.py | 27 ++ .../mbed-sdk/workspace_tools/host_tests/rpc.py | 56 +++ .../workspace_tools/host_tests/rtc_auto.py | 50 +++ .../workspace_tools/host_tests/stdio_auto.py | 56 +++ .../workspace_tools/host_tests/tcpecho_client.py | 57 +++ .../host_tests/tcpecho_client_auto.py | 87 +++++ .../workspace_tools/host_tests/tcpecho_server.py | 50 +++ .../host_tests/tcpecho_server_auto.py | 84 +++++ .../host_tests/tcpecho_server_loop.py | 40 +++ .../host_tests/udp_link_layer_auto.py | 145 ++++++++ .../workspace_tools/host_tests/udpecho_client.py | 55 +++ .../host_tests/udpecho_client_auto.py | 77 ++++ .../workspace_tools/host_tests/udpecho_server.py | 29 ++ .../host_tests/udpecho_server_auto.py | 68 ++++ .../workspace_tools/host_tests/wait_us_auto.py | 69 ++++ 46 files changed, 3194 insertions(+) create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py create mode 100644 tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py (limited to 'tool/mbed/mbed-sdk/workspace_tools/host_tests') diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py new file mode 100644 index 0000000000..cae0e20908 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/__init__.py @@ -0,0 +1,59 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from host_registry import HostRegistry + +# Host test supervisors +from echo import EchoTest +from rtc_auto import RTCTest +from stdio_auto import StdioTest +from hello_auto import HelloTest +from detect_auto import DetectPlatformTest +from default_auto import DefaultAuto +from dev_null_auto import DevNullTest +from wait_us_auto import WaitusTest +from tcpecho_server_auto import TCPEchoServerTest +from udpecho_server_auto import UDPEchoServerTest +from tcpecho_client_auto import TCPEchoClientTest +from udpecho_client_auto import UDPEchoClientTest + +# Populate registry with supervising objects +HOSTREGISTRY = HostRegistry() +HOSTREGISTRY.register_host_test("echo", EchoTest()) +HOSTREGISTRY.register_host_test("default", DefaultAuto()) +HOSTREGISTRY.register_host_test("rtc_auto", RTCTest()) +HOSTREGISTRY.register_host_test("hello_auto", HelloTest()) +HOSTREGISTRY.register_host_test("stdio_auto", StdioTest()) +HOSTREGISTRY.register_host_test("detect_auto", DetectPlatformTest()) +HOSTREGISTRY.register_host_test("default_auto", DefaultAuto()) +HOSTREGISTRY.register_host_test("wait_us_auto", WaitusTest()) +HOSTREGISTRY.register_host_test("dev_null_auto", DevNullTest()) +HOSTREGISTRY.register_host_test("tcpecho_server_auto", TCPEchoServerTest()) +HOSTREGISTRY.register_host_test("udpecho_server_auto", UDPEchoServerTest()) +HOSTREGISTRY.register_host_test("tcpecho_client_auto", TCPEchoClientTest()) +HOSTREGISTRY.register_host_test("udpecho_client_auto", UDPEchoClientTest()) + +############################################################################### +# Functional interface for test supervisor registry +############################################################################### + + +def get_host_test(ht_name): + return HOSTREGISTRY.get_host_test(ht_name) + +def is_host_test(ht_name): + return HOSTREGISTRY.is_host_test(ht_name) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py new file mode 100644 index 0000000000..0883d79d53 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/default_auto.py @@ -0,0 +1,36 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from sys import stdout + +class DefaultAuto(): + """ Simple, basic host test's test runner waiting for serial port + output from MUT, no supervision over test running in MUT is executed. + """ + def test(self, selftest): + result = selftest.RESULT_SUCCESS + try: + while True: + c = selftest.mbed.serial_read(512) + if c is None: + return selftest.RESULT_IO_SERIAL + stdout.write(c) + stdout.flush() + except KeyboardInterrupt, _: + selftest.notify("\r\n[CTRL+C] exit") + result = selftest.RESULT_ERROR + return result diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py new file mode 100644 index 0000000000..2999946c08 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/detect_auto.py @@ -0,0 +1,55 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re + +class DetectPlatformTest(): + PATTERN_MICRO_NAME = "Target '(\w+)'" + re_detect_micro_name = re.compile(PATTERN_MICRO_NAME) + + def test(self, selftest): + result = True + + c = selftest.mbed.serial_readline() # {{start}} preamble + if c is None: + return selftest.RESULT_IO_SERIAL + + selftest.notify(c.strip()) + selftest.notify("HOST: Detecting target name...") + + c = selftest.mbed.serial_readline() + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(c.strip()) + + # Check for target name + m = self.re_detect_micro_name.search(c) + if m and len(m.groups()): + micro_name = m.groups()[0] + micro_cmp = selftest.mbed.options.micro == micro_name + result = result and micro_cmp + selftest.notify("HOST: MUT Target name '%s', expected '%s'... [%s]"% (micro_name, + selftest.mbed.options.micro, + "OK" if micro_cmp else "FAIL")) + + for i in range(0, 2): + c = selftest.mbed.serial_readline() + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(c.strip()) + + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py new file mode 100644 index 0000000000..4538f6d79e --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/dev_null_auto.py @@ -0,0 +1,50 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +class DevNullTest(): + + def check_readline(self, selftest, text): + """ Reads line from serial port and checks if text was part of read string + """ + result = False + c = selftest.mbed.serial_readline() + if c and text in c: + result = True + return result + + def test(self, selftest): + result = True + # Test should print some text and later stop printing + # 'MBED: re-routing stdout to /null' + res = self.check_readline(selftest, "re-routing stdout to /null") + if not res: + # We haven't read preamble line + result = False + else: + # Check if there are printed characters + str = '' + for i in range(3): + c = selftest.mbed.serial_read(32) + if c is None: + return selftest.RESULT_IO_SERIAL + else: + str += c + if len(str) > 0: + result = False + break + selftest.notify("Received %d bytes: %s"% (len(str), str)) + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py new file mode 100644 index 0000000000..75e534fb84 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo.py @@ -0,0 +1,59 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys +import uuid +from sys import stdout + +class EchoTest(): + + # Test parameters + TEST_SERIAL_BAUDRATE = 115200 + TEST_LOOP_COUNT = 50 + + def test(self, selftest): + """ This host test will use mbed serial port with + baudrate 115200 to perform echo test on that port. + """ + # Custom initialization for echo test + selftest.mbed.init_serial_params(serial_baud=self.TEST_SERIAL_BAUDRATE) + selftest.mbed.init_serial() + + # Test function, return True or False to get standard test notification on stdout + selftest.mbed.flush() + selftest.notify("HOST: Starting the ECHO test") + result = True + + """ This ensures that there are no parasites left in the serial buffer. + """ + for i in range(0, 2): + selftest.mbed.serial_write("\n") + c = selftest.mbed.serial_readline() + + for i in range(0, self.TEST_LOOP_COUNT): + TEST_STRING = str(uuid.uuid4()) + "\n" + selftest.mbed.serial_write(TEST_STRING) + c = selftest.mbed.serial_readline() + if c is None: + return selftest.RESULT_IO_SERIAL + if c.strip() != TEST_STRING.strip(): + selftest.notify('HOST: "%s" != "%s"'% (c, TEST_STRING)) + result = False + else: + sys.stdout.write('.') + stdout.flush() + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py new file mode 100644 index 0000000000..7ea11e9736 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/echo_flow_control.py @@ -0,0 +1,48 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from host_test import Test + + +class EchoTest(Test): + def __init__(self): + Test.__init__(self) + self.mbed.init_serial() + self.mbed.extra_serial.rtscts = True + self.mbed.reset() + + def test(self): + self.mbed.flush() + self.notify("Starting the ECHO test") + TEST="longer serial test" + check = True + for i in range(1, 100): + self.mbed.extra_serial.write(TEST + "\n") + l = self.mbed.extra_serial.readline().strip() + if not l: continue + + if l != TEST: + check = False + self.notify('"%s" != "%s"' % (l, TEST)) + else: + if (i % 10) == 0: + self.notify('.') + + return check + + +if __name__ == '__main__': + EchoTest().run() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py new file mode 100644 index 0000000000..2e846ca19e --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastReceive.py @@ -0,0 +1,25 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket + +BROADCAST_PORT = 58083 + +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(('0.0.0.0', BROADCAST_PORT)) + +while True: + print s.recvfrom(256) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py new file mode 100644 index 0000000000..0a5f8c3201 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/BroadcastSend.py @@ -0,0 +1,30 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket +from time import sleep, time + +BROADCAST_PORT = 58083 + +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(('', 0)) +s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + +while True: + print "Broadcasting..." + data = 'Hello World: ' + repr(time()) + '\n' + s.sendto(data, ('', BROADCAST_PORT)) + sleep(1) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py new file mode 100644 index 0000000000..9001f40b7d --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastReceive.py @@ -0,0 +1,31 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket +import struct + +MCAST_GRP = '224.1.1.1' +MCAST_PORT = 5007 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.bind(('', MCAST_PORT)) +mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) + +sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + +while True: + print sock.recv(10240) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py new file mode 100644 index 0000000000..8efd4534ae --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/MulticastSend.py @@ -0,0 +1,30 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket +from time import sleep, time + +MCAST_GRP = '224.1.1.1' +MCAST_PORT = 5007 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) +sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + +while True: + print "Multicast to group: %s\n" % MCAST_GRP + data = 'Hello World: ' + repr(time()) + '\n' + sock.sendto(data, (MCAST_GRP, MCAST_PORT)) + sleep(1) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py new file mode 100644 index 0000000000..dfa9bfdae7 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoClient.py @@ -0,0 +1,28 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket + +ECHO_SERVER_ADDRESS = "10.2.202.45" +ECHO_PORT = 7 + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.connect((ECHO_SERVER_ADDRESS, ECHO_PORT)) + +s.sendall('Hello, world') +data = s.recv(1024) +s.close() +print 'Received', repr(data) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py new file mode 100644 index 0000000000..1324edbe64 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/TCPEchoServer.py @@ -0,0 +1,30 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket + +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind(('', 7)) +s.listen(1) + +while True: + conn, addr = s.accept() + print 'Connected by', addr + while True: + data = conn.recv(1024) + if not data: break + conn.sendall(data) + conn.close() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py new file mode 100644 index 0000000000..6a6cf8c902 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoClient.py @@ -0,0 +1,28 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket + +ECHO_SERVER_ADDRESS = '10.2.202.45' +ECHO_PORT = 7 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +sock.sendto("Hello World\n", (ECHO_SERVER_ADDRESS, ECHO_PORT)) +response = sock.recv(256) +sock.close() + +print response diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py new file mode 100644 index 0000000000..38503489ee --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/UDPEchoServer.py @@ -0,0 +1,27 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket + +ECHO_PORT = 7 + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sock.bind(('', ECHO_PORT)) + +while True: + data, address = sock.recvfrom(256) + print "datagram from", address + sock.sendto(data, address) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py new file mode 100644 index 0000000000..10e7e1d1de --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/example/__init__.py @@ -0,0 +1,16 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" \ No newline at end of file diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py new file mode 100644 index 0000000000..69b39bf6bb --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/hello_auto.py @@ -0,0 +1,34 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +class HelloTest(): + HELLO_WORLD = "Hello World" + + def test(self, selftest): + c = selftest.mbed.serial_readline() + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify("Read %d bytes:"% len(c)) + selftest.notify(c.strip()) + + result = True + # Because we can have targetID here let's try to decode + if len(c) < len(self.HELLO_WORLD): + result = False + else: + result = self.HELLO_WORLD in c + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py new file mode 100644 index 0000000000..d52384834a --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_registry.py @@ -0,0 +1,36 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +class HostRegistry: + """ Class stores registry with host tests and objects representing them + """ + HOST_TESTS = {} # host_test_name -> host_test_ojbect + + def register_host_test(self, ht_name, ht_object): + if ht_name not in self.HOST_TESTS: + self.HOST_TESTS[ht_name] = ht_object + + def unregister_host_test(self): + if ht_name in HOST_TESTS: + self.HOST_TESTS[ht_name] = None + + def get_host_test(self, ht_name): + return self.HOST_TESTS[ht_name] if ht_name in self.HOST_TESTS else None + + def is_host_test(self, ht_name): + return ht_name in self.HOST_TESTS + \ No newline at end of file diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py new file mode 100644 index 0000000000..4dd16505a2 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_test.py @@ -0,0 +1,397 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Check if 'serial' module is installed +try: + from serial import Serial +except ImportError, e: + print "Error: Can't import 'serial' module: %s"% e + exit(-1) + +import os +import re +import types +from sys import stdout +from time import sleep, time +from optparse import OptionParser + +import host_tests_plugins + +# This is a little tricky. We need to add upper directory to path so +# we can find packages we want from the same level as other files do +import sys +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) + + +class Mbed: + """ Base class for a host driven test + """ + def __init__(self): + parser = OptionParser() + + parser.add_option("-m", "--micro", + dest="micro", + help="The target microcontroller", + metavar="MICRO") + + parser.add_option("-p", "--port", + dest="port", + help="The serial port of the target mbed", + metavar="PORT") + + parser.add_option("-d", "--disk", + dest="disk", + help="The target disk path", + metavar="DISK_PATH") + + parser.add_option("-f", "--image-path", + dest="image_path", + help="Path with target's image", + metavar="IMAGE_PATH") + + parser.add_option("-c", "--copy", + dest="copy_method", + help="Copy method selector", + metavar="COPY_METHOD") + + parser.add_option("-C", "--program_cycle_s", + dest="program_cycle_s", + help="Program cycle sleep. Define how many seconds you want wait after copying bianry onto target", + type="float", + metavar="COPY_METHOD") + + parser.add_option("-t", "--timeout", + dest="timeout", + help="Timeout", + metavar="TIMEOUT") + + parser.add_option("-r", "--reset", + dest="forced_reset_type", + help="Forces different type of reset") + + parser.add_option("-R", "--reset-timeout", + dest="forced_reset_timeout", + metavar="NUMBER", + type="int", + help="When forcing a reset using option -r you can set up after reset timeout in seconds") + + (self.options, _) = parser.parse_args() + + self.DEFAULT_RESET_TOUT = 0 + self.DEFAULT_TOUT = 10 + + if self.options.port is None: + raise Exception("The serial port of the target mbed have to be provided as command line arguments") + + # Options related to copy / reset mbed device + self.port = self.options.port + self.disk = self.options.disk + self.image_path = self.options.image_path.strip('"') + self.copy_method = self.options.copy_method + self.program_cycle_s = float(self.options.program_cycle_s) + + self.serial = None + self.serial_baud = 9600 + self.serial_timeout = 1 + + self.timeout = self.DEFAULT_TOUT if self.options.timeout is None else self.options.timeout + print 'MBED: Instrumentation: "%s" and disk: "%s"' % (self.port, self.disk) + + def init_serial_params(self, serial_baud=9600, serial_timeout=1): + """ Initialize port parameters. + This parameters will be used by self.init_serial() function to open serial port + """ + self.serial_baud = serial_baud + self.serial_timeout = serial_timeout + + def init_serial(self, serial_baud=None, serial_timeout=None): + """ Initialize serial port. + Function will return error is port can't be opened or initialized + """ + # Overload serial port configuration from default to parameters' values if they are specified + serial_baud = serial_baud if serial_baud is not None else self.serial_baud + serial_timeout = serial_timeout if serial_timeout is not None else self.serial_timeout + + # Clear serial port + if self.serial: + self.serial.close() + self.serial = None + + # We will pool for serial to be re-mounted if it was unmounted after device reset + result = self.pool_for_serial_init(serial_baud, serial_timeout) # Blocking + + # Port can be opened + if result: + self.flush() + return result + + def pool_for_serial_init(self, serial_baud, serial_timeout, pooling_loops=40, init_delay=0.5, loop_delay=0.25): + """ Functions pools for serial port readiness + """ + result = True + last_error = None + # This loop is used to check for serial port availability due to + # some delays and remounting when devices are being flashed with new software. + for i in range(pooling_loops): + sleep(loop_delay if i else init_delay) + try: + self.serial = Serial(self.port, baudrate=serial_baud, timeout=serial_timeout) + except Exception as e: + result = False + last_error = "MBED: %s"% str(e) + stdout.write('.') + stdout.flush() + else: + print "...port ready!" + result = True + break + if not result and last_error: + print last_error + return result + + def set_serial_timeout(self, timeout): + """ Wraps self.mbed.serial object timeout property + """ + result = None + if self.serial: + self.serial.timeout = timeout + result = True + return result + + def serial_read(self, count=1): + """ Wraps self.mbed.serial object read method + """ + result = None + if self.serial: + try: + result = self.serial.read(count) + except: + result = None + return result + + def serial_readline(self, timeout=5): + """ Wraps self.mbed.serial object read method to read one line from serial port + """ + result = '' + start = time() + while (time() - start) < timeout: + if self.serial: + try: + c = self.serial.read(1) + result += c + except Exception as e: + print "MBED: %s"% str(e) + result = None + break + if c == '\n': + break + return result + + def serial_write(self, write_buffer): + """ Wraps self.mbed.serial object write method + """ + result = None + if self.serial: + try: + result = self.serial.write(write_buffer) + except: + result = None + return result + + def reset_timeout(self, timeout): + """ Timeout executed just after reset command is issued + """ + for n in range(0, timeout): + sleep(1) + + def reset(self): + """ Calls proper reset plugin to do the job. + Please refer to host_test_plugins functionality + """ + # Flush serials to get only input after reset + self.flush() + if self.options.forced_reset_type: + result = host_tests_plugins.call_plugin('ResetMethod', self.options.forced_reset_type, disk=self.disk) + else: + result = host_tests_plugins.call_plugin('ResetMethod', 'default', serial=self.serial) + # Give time to wait for the image loading + reset_tout_s = self.options.forced_reset_timeout if self.options.forced_reset_timeout is not None else self.DEFAULT_RESET_TOUT + self.reset_timeout(reset_tout_s) + return result + + def copy_image(self, image_path=None, disk=None, copy_method=None): + """ Closure for copy_image_raw() method. + Method which is actually copying image to mbed + """ + # Set closure environment + image_path = image_path if image_path is not None else self.image_path + disk = disk if disk is not None else self.disk + copy_method = copy_method if copy_method is not None else self.copy_method + # Call proper copy method + result = self.copy_image_raw(image_path, disk, copy_method) + sleep(self.program_cycle_s) + return result + + def copy_image_raw(self, image_path=None, disk=None, copy_method=None): + """ Copy file depending on method you want to use. Handles exception + and return code from shell copy commands. + """ + if copy_method is not None: + # image_path - Where is binary with target's firmware + result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk) + else: + copy_method = 'default' + result = host_tests_plugins.call_plugin('CopyMethod', copy_method, image_path=image_path, destination_disk=disk) + return result; + + def flush(self): + """ Flush serial ports + """ + result = False + if self.serial: + self.serial.flushInput() + self.serial.flushOutput() + result = True + return result + + +class HostTestResults: + """ Test results set by host tests + """ + def __init__(self): + self.RESULT_SUCCESS = 'success' + self.RESULT_FAILURE = 'failure' + self.RESULT_ERROR = 'error' + self.RESULT_IO_SERIAL = 'ioerr_serial' + self.RESULT_NO_IMAGE = 'no_image' + self.RESULT_IOERR_COPY = "ioerr_copy" + self.RESULT_PASSIVE = "passive" + self.RESULT_NOT_DETECTED = "not_detected" + self.RESULT_MBED_ASSERT = "mbed_assert" + + +import workspace_tools.host_tests as host_tests + + +class Test(HostTestResults): + """ Base class for host test's test runner + """ + # Select default host_test supervision (replaced after autodetection) + test_supervisor = host_tests.get_host_test("default") + + def __init__(self): + self.mbed = Mbed() + + def detect_test_config(self, verbose=False): + """ Detects test case configuration + """ + result = {} + while True: + line = self.mbed.serial_readline() + if "{start}" in line: + self.notify("HOST: Start test...") + break + else: + # Detect if this is property from TEST_ENV print + m = re.search('{([\w_]+);([\w\d\+ ]+)}}', line[:-1]) + if m and len(m.groups()) == 2: + # This is most likely auto-detection property + result[m.group(1)] = m.group(2) + if verbose: + self.notify("HOST: Property '%s' = '%s'"% (m.group(1), m.group(2))) + else: + # We can check if this is TArget Id in mbed specific format + m2 = re.search('^([\$]+)([a-fA-F0-9]+)', line[:-1]) + if m2 and len(m2.groups()) == 2: + if verbose: + target_id = m2.group(1) + m2.group(2) + self.notify("HOST: TargetID '%s'"% target_id) + self.notify(line[len(target_id):-1]) + else: + self.notify("HOST: Unknown property: %s"% line.strip()) + return result + + def run(self): + """ Test runner for host test. This function will start executing + test and forward test result via serial port to test suite + """ + # Copy image to device + self.notify("HOST: Copy image onto target...") + result = self.mbed.copy_image() + if not result: + self.print_result(self.RESULT_IOERR_COPY) + + # Initialize and open target's serial port (console) + self.notify("HOST: Initialize serial port...") + result = self.mbed.init_serial() + if not result: + self.print_result(self.RESULT_IO_SERIAL) + + # Reset device + self.notify("HOST: Reset target...") + result = self.mbed.reset() + if not result: + self.print_result(self.RESULT_IO_SERIAL) + + # Run test + try: + CONFIG = self.detect_test_config(verbose=True) # print CONFIG + + if "host_test_name" in CONFIG: + if host_tests.is_host_test(CONFIG["host_test_name"]): + self.test_supervisor = host_tests.get_host_test(CONFIG["host_test_name"]) + result = self.test_supervisor.test(self) #result = self.test() + + if result is not None: + self.print_result(result) + else: + self.notify("HOST: Passive mode...") + except Exception, e: + print str(e) + self.print_result(self.RESULT_ERROR) + + def setup(self): + """ Setup and check if configuration for test is + correct. E.g. if serial port can be opened. + """ + result = True + if not self.mbed.serial: + result = False + self.print_result(self.RESULT_IO_SERIAL) + return result + + def notify(self, message): + """ On screen notification function + """ + print message + stdout.flush() + + def print_result(self, result): + """ Test result unified printing function + """ + self.notify("\r\n{{%s}}\r\n{{end}}" % result) + + +class DefaultTestSelector(Test): + """ Test class with serial port initialization + """ + def __init__(self): + HostTestResults.__init__(self) + Test.__init__(self) + +if __name__ == '__main__': + DefaultTestSelector().run() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py new file mode 100644 index 0000000000..913da02648 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/__init__.py @@ -0,0 +1,68 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import host_test_registry + +# This plugins provide 'flashing' methods to host test scripts +import module_copy_mbed +import module_copy_shell +import module_copy_silabs +#import module_copy_firefox +#import module_copy_mps2 + +# Plugins used to reset certain platform +import module_reset_mbed +import module_reset_silabs +#import module_reset_mps2 + + +# Plugin registry instance +HOST_TEST_PLUGIN_REGISTRY = host_test_registry.HostTestRegistry() + +# Static plugin registration +# Some plugins are commented out if they are not stable or not commonly used +HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mbed.load_plugin()) +HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_shell.load_plugin()) +HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mbed.load_plugin()) +#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_firefox.load_plugin()) + +# Extra platforms support +#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_mps2.load_plugin()) +#HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_mps2.load_plugin()) +HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_copy_silabs.load_plugin()) +HOST_TEST_PLUGIN_REGISTRY.register_plugin(module_reset_silabs.load_plugin()) + +# TODO: extend plugin loading to files with name module_*.py loaded ad-hoc + +############################################################################### +# Functional interface for host test plugin registry +############################################################################### +def call_plugin(type, capability, *args, **kwargs): + """ Interface to call plugin registry functional way + """ + return HOST_TEST_PLUGIN_REGISTRY.call_plugin(type, capability, *args, **kwargs) + +def get_plugin_caps(type): + """ Returns list of all capabilities for plugin family with the same type. + If there are no capabilities empty list is returned + """ + return HOST_TEST_PLUGIN_REGISTRY.get_plugin_caps(type) + +def print_plugin_info(): + """ Prints plugins' information in user friendly way + """ + print HOST_TEST_PLUGIN_REGISTRY diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py new file mode 100644 index 0000000000..8bc1da35d3 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_plugins.py @@ -0,0 +1,118 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from os import access, F_OK +from sys import stdout +from time import sleep +from subprocess import call + + +class HostTestPluginBase: + """ Base class for all plug-ins used with host tests. + """ + ########################################################################### + # Interface: + ########################################################################### + + ########################################################################### + # Interface attributes defining plugin name, type etc. + ########################################################################### + name = "HostTestPluginBase" # Plugin name, can be plugin class name + type = "BasePlugin" # Plugin type: ResetMethod, Copymethod etc. + capabilities = [] # Capabilities names: what plugin can achieve + # (e.g. reset using some external command line tool) + stable = False # Determine if plugin is stable and can be used + + ########################################################################### + # Interface methods + ########################################################################### + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + return False + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability e.g. may directly just call some command line + program or execute building pythonic function + """ + return False + + ########################################################################### + # Interface helper methods - overload only if you need to have custom behaviour + ########################################################################### + def print_plugin_error(self, text): + """ Function prints error in console and exits always with False + """ + print "Plugin error: %s::%s: %s"% (self.name, self.type, text) + return False + + def print_plugin_info(self, text, NL=True): + """ Function prints notification in console and exits always with True + """ + if NL: + print "Plugin info: %s::%s: %s"% (self.name, self.type, text) + else: + print "Plugin info: %s::%s: %s"% (self.name, self.type, text), + return True + + def print_plugin_char(self, char): + """ Function prints char on stdout + """ + stdout.write(char) + stdout.flush() + return True + + def check_mount_point_ready(self, destination_disk, init_delay=0.2, loop_delay=0.25): + """ Checks if destination_disk is ready and can be accessed by e.g. copy commands + @init_delay - Initial delay time before first access check + @loop_delay - pooling delay for access check + """ + if not access(destination_disk, F_OK): + self.print_plugin_info("Waiting for mount point '%s' to be ready..."% destination_disk, NL=False) + sleep(init_delay) + while not access(destination_disk, F_OK): + sleep(loop_delay) + self.print_plugin_char('.') + + def check_parameters(self, capabilitity, *args, **kwargs): + """ This function should be ran each time we call execute() + to check if none of the required parameters is missing. + """ + missing_parameters = [] + for parameter in self.required_parameters: + if parameter not in kwargs: + missing_parameters.append(parameter) + if len(missing_parameters) > 0: + self.print_plugin_error("execute parameter(s) '%s' missing!"% (', '.join(parameter))) + return False + return True + + def run_command(self, cmd, shell=True): + """ Runs command from command line. + """ + result = True + try: + ret = call(cmd, shell=shell) + if ret: + self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd)) + return False + except Exception as e: + result = False + self.print_plugin_error("[ret=%d] Command: %s"% (int(ret), cmd)) + self.print_plugin_error(str(e)) + return result diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py new file mode 100644 index 0000000000..5237b9a254 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/host_test_registry.py @@ -0,0 +1,89 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +class HostTestRegistry: + """ Simple class used to register and store + host test plugins for further usage + """ + # Here we actually store all the plugins + PLUGINS = {} # 'Plugin Name' : Plugin Object + + def print_error(self, text): + print "Plugin load failed. Reason: %s"% text + + def register_plugin(self, plugin): + """ Registers and stores plugin inside registry for further use. + Method also calls plugin's setup() function to configure plugin if needed. + + Note: Different groups of plugins may demand different extra parameter. Plugins + should be at least for one type of plugin configured with the same parameters + because we do not know which of them will actually use particular parameter. + """ + # TODO: + # - check for unique caps for specified type + if plugin.name not in self.PLUGINS: + if plugin.setup(): # Setup plugin can be completed without errors + self.PLUGINS[plugin.name] = plugin + return True + else: + self.print_error("%s setup failed"% plugin.name) + else: + self.print_error("%s already loaded"% plugin.name) + return False + + def call_plugin(self, type, capability, *args, **kwargs): + """ Execute plugin functionality respectively to its purpose + """ + for plugin_name in self.PLUGINS: + plugin = self.PLUGINS[plugin_name] + if plugin.type == type and capability in plugin.capabilities: + return plugin.execute(capability, *args, **kwargs) + return False + + def get_plugin_caps(self, type): + """ Returns list of all capabilities for plugin family with the same type. + If there are no capabilities empty list is returned + """ + result = [] + for plugin_name in self.PLUGINS: + plugin = self.PLUGINS[plugin_name] + if plugin.type == type: + result.extend(plugin.capabilities) + return sorted(result) + + def load_plugin(self, name): + """ Used to load module from + """ + mod = __import__("module_%s"% name) + return mod + + def __str__(self): + """ User friendly printing method to show hooked plugins + """ + from prettytable import PrettyTable + column_names = ['name', 'type', 'capabilities', 'stable'] + pt = PrettyTable(column_names) + for column in column_names: + pt.align[column] = 'l' + for plugin_name in sorted(self.PLUGINS.keys()): + name = self.PLUGINS[plugin_name].name + type = self.PLUGINS[plugin_name].type + stable = self.PLUGINS[plugin_name].stable + capabilities = ', '.join(self.PLUGINS[plugin_name].capabilities) + row = [name, type, capabilities, stable] + pt.add_row(row) + return pt.get_string() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py new file mode 100644 index 0000000000..360835e498 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_firefox.py @@ -0,0 +1,76 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from os.path import join, basename +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginCopyMethod_Firefox(HostTestPluginBase): + + def file_store_firefox(self, file_path, dest_disk): + try: + from selenium import webdriver + profile = webdriver.FirefoxProfile() + profile.set_preference('browser.download.folderList', 2) # custom location + profile.set_preference('browser.download.manager.showWhenStarting', False) + profile.set_preference('browser.download.dir', dest_disk) + profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream') + # Launch browser with profile and get file + browser = webdriver.Firefox(profile) + browser.get(file_path) + browser.close() + except: + return False + return True + + # Plugin interface + name = 'HostTestPluginCopyMethod_Firefox' + type = 'CopyMethod' + capabilities = ['firefox'] + required_parameters = ['image_path', 'destination_disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + try: + from selenium import webdriver + except ImportError, e: + self.print_plugin_error("Error: firefox copy method requires selenium library. %s"% e) + return False + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + image_path = kwargs['image_path'] + destination_disk = kwargs['destination_disk'] + # Prepare correct command line parameter values + image_base_name = basename(image_path) + destination_path = join(destination_disk, image_base_name) + if capabilitity == 'firefox': + self.file_store_firefox(image_path, destination_path) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginCopyMethod_Firefox() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py new file mode 100644 index 0000000000..18fe0c42ae --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mbed.py @@ -0,0 +1,71 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from shutil import copy +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginCopyMethod_Mbed(HostTestPluginBase): + + def generic_mbed_copy(self, image_path, destination_disk): + """ Generic mbed copy method for "mbed enabled" devices. + It uses standard python shuitl function to copy + image_file (target specific binary) to device's disk. + """ + result = True + if not destination_disk.endswith('/') and not destination_disk.endswith('\\'): + destination_disk += '/' + try: + copy(image_path, destination_disk) + except Exception, e: + self.print_plugin_error("shutil.copy('%s', '%s')"% (image_path, destination_disk)) + self.print_plugin_error("Error: %s"% str(e)) + result = False + return result + + # Plugin interface + name = 'HostTestPluginCopyMethod_Mbed' + type = 'CopyMethod' + stable = True + capabilities = ['default'] + required_parameters = ['image_path', 'destination_disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + if capabilitity == 'default': + image_path = kwargs['image_path'] + destination_disk = kwargs['destination_disk'] + # Wait for mount point to be ready + self.check_mount_point_ready(destination_disk) # Blocking + result = self.generic_mbed_copy(image_path, destination_disk) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginCopyMethod_Mbed() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py new file mode 100644 index 0000000000..f7768873f9 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_mps2.py @@ -0,0 +1,107 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +from os.path import join +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginCopyMethod_MPS2(HostTestPluginBase): + + # MPS2 specific flashing / binary setup funcitons + def mps2_set_board_image_file(self, disk, images_cfg_path, image0file_path, image_name='images.txt'): + """ This function will alter image cfg file. + Main goal of this function is to change number of images to 1, comment all + existing image entries and append at the end of file new entry with test path. + @return True when all steps succeed. + """ + MBED_SDK_TEST_STAMP = 'test suite entry' + image_path = join(disk, images_cfg_path, image_name) + new_file_lines = [] # New configuration file lines (entries) + + # Check each line of the image configuration file + try: + with open(image_path, 'r') as file: + for line in file: + if re.search('^TOTALIMAGES', line): + # Check number of total images, should be 1 + new_file_lines.append(re.sub('^TOTALIMAGES:[\t ]*[\d]+', 'TOTALIMAGES: 1', line)) + elif re.search('; - %s[\n\r]*$'% MBED_SDK_TEST_STAMP, line): + # Look for test suite entries and remove them + pass # Omit all test suite entries + elif re.search('^IMAGE[\d]+FILE', line): + # Check all image entries and mark the ';' + new_file_lines.append(';' + line) # Comment non test suite lines + else: + # Append line to new file + new_file_lines.append(line) + except IOError as e: + return False + + # Add new image entry with proper commented stamp + new_file_lines.append('IMAGE0FILE: %s ; - %s\r\n'% (image0file_path, MBED_SDK_TEST_STAMP)) + + # Write all lines to file + try: + with open(image_path, 'w') as file: + for line in new_file_lines: + file.write(line), + except IOError: + return False + + return True + + def mps2_select_core(self, disk, mobo_config_name=""): + """ Function selects actual core + """ + # TODO: implement core selection + pass + + def mps2_switch_usb_auto_mounting_after_restart(self, disk, usb_config_name=""): + """ Function alters configuration to allow USB MSD to be mounted after restarts + """ + # TODO: implement USB MSD restart detection + pass + + # Plugin interface + name = 'HostTestPluginCopyMethod_MPS2' + type = 'CopyMethod' + capabilities = ['mps2'] + required_parameters = ['image_path', 'destination_disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + if capabilitity == 'mps2': + # TODO: Implement MPS2 firmware setup here + pass + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginCopyMethod_MPS2() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py new file mode 100644 index 0000000000..f7fb23b0a7 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_shell.py @@ -0,0 +1,64 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +from os.path import join, basename +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginCopyMethod_Shell(HostTestPluginBase): + + # Plugin interface + name = 'HostTestPluginCopyMethod_Shell' + type = 'CopyMethod' + stable = True + capabilities = ['shell', 'cp', 'copy', 'xcopy'] + required_parameters = ['image_path', 'destination_disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + image_path = kwargs['image_path'] + destination_disk = kwargs['destination_disk'] + # Wait for mount point to be ready + self.check_mount_point_ready(destination_disk) # Blocking + # Prepare correct command line parameter values + image_base_name = basename(image_path) + destination_path = join(destination_disk, image_base_name) + if capabilitity == 'shell': + if os.name == 'nt': capabilitity = 'copy' + elif os.name == 'posix': capabilitity = 'cp' + if capabilitity == 'cp' or capabilitity == 'copy' or capabilitity == 'copy': + copy_method = capabilitity + cmd = [copy_method, image_path, destination_path] + result = self.run_command(cmd) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginCopyMethod_Shell() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py new file mode 100644 index 0000000000..1572bbc6ee --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_copy_silabs.py @@ -0,0 +1,61 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginCopyMethod_Silabs(HostTestPluginBase): + + # Plugin interface + name = 'HostTestPluginCopyMethod_Silabs' + type = 'CopyMethod' + capabilities = ['eACommander', 'eACommander-usb'] + required_parameters = ['image_path', 'destination_disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + self.EACOMMANDER_CMD = 'eACommander.exe' + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + image_path = kwargs['image_path'] + destination_disk = kwargs['destination_disk'] + if capabilitity == 'eACommander': + cmd = [self.EACOMMANDER_CMD, + '--serialno', destination_disk, + '--flash', image_path, + '--resettype', '2', '--reset'] + result = self.run_command(cmd) + elif capabilitity == 'eACommander-usb': + cmd = [self.EACOMMANDER_CMD, + '--usb', destination_disk, + '--flash', image_path] + result = self.run_command(cmd) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginCopyMethod_Silabs() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py new file mode 100644 index 0000000000..0390d84ba6 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mbed.py @@ -0,0 +1,72 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginResetMethod_Mbed(HostTestPluginBase): + + def safe_sendBreak(self, serial): + """ Wraps serial.sendBreak() to avoid serial::serialposix.py exception on Linux + Traceback (most recent call last): + File "make.py", line 189, in + serial.sendBreak() + File "/usr/lib/python2.7/dist-packages/serial/serialposix.py", line 511, in sendBreak + termios.tcsendbreak(self.fd, int(duration/0.25)) + error: (32, 'Broken pipe') + """ + result = True + try: + serial.sendBreak() + except: + # In linux a termios.error is raised in sendBreak and in setBreak. + # The following setBreak() is needed to release the reset signal on the target mcu. + try: + serial.setBreak(False) + except: + result = False + return result + + # Plugin interface + name = 'HostTestPluginResetMethod_Mbed' + type = 'ResetMethod' + stable = True + capabilities = ['default'] + required_parameters = ['serial'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + if capabilitity == 'default': + serial = kwargs['serial'] + result = self.safe_sendBreak(serial) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginResetMethod_Mbed() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py new file mode 100644 index 0000000000..22938090bb --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_mps2.py @@ -0,0 +1,74 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +from host_test_plugins import HostTestPluginBase + +# Note: This plugin is not fully functional, needs improvements + +class HostTestPluginResetMethod_MPS2(HostTestPluginBase): + """ Plugin used to reset ARM_MPS2 platform + Supports: + reboot.txt - startup from standby state, reboots when in run mode. + shutdown.txt - shutdown from run mode. + reset.txt - reset FPGA during run mode. + """ + def touch_file(self, path): + """ Touch file and set timestamp to items + """ + with open(path, 'a'): + os.utime(path, None) + + # Plugin interface + name = 'HostTestPluginResetMethod_MPS2' + type = 'ResetMethod' + capabilities = ['reboot.txt', 'shutdown.txt', 'reset.txt'] + required_parameters = ['disk'] + + def setup(self, *args, **kwargs): + """ Prepare / configure plugin to work. + This method can receive plugin specific parameters by kwargs and + ignore other parameters which may affect other plugins. + """ + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + + if capabilitity == 'reboot.txt': + # TODO: Implement touch file for reboot + pass + + elif capabilitity == 'shutdown.txt': + # TODO: Implement touch file for shutdown + pass + + elif capabilitity == 'reset.txt': + # TODO: Implement touch file for reset + pass + + return result + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginResetMethod_MPS2() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py new file mode 100644 index 0000000000..2c05cb21c3 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/host_tests_plugins/module_reset_silabs.py @@ -0,0 +1,66 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from host_test_plugins import HostTestPluginBase + + +class HostTestPluginResetMethod_SiLabs(HostTestPluginBase): + + # Plugin interface + name = 'HostTestPluginResetMethod_SiLabs' + type = 'ResetMethod' + stable = True + capabilities = ['eACommander', 'eACommander-usb'] + required_parameters = ['disk'] + + def setup(self, *args, **kwargs): + """ Configure plugin, this function should be called before plugin execute() method is used. + """ + # Note you need to have eACommander.exe on your system path! + self.EACOMMANDER_CMD = 'eACommander.exe' + return True + + def execute(self, capabilitity, *args, **kwargs): + """ Executes capability by name. + Each capability may directly just call some command line + program or execute building pythonic function + """ + result = False + if self.check_parameters(capabilitity, *args, **kwargs) is True: + disk = kwargs['disk'].rstrip('/\\') + + if capabilitity == 'eACommander': + # For this copy method 'disk' will be 'serialno' for eACommander command line parameters + # Note: Commands are executed in the order they are specified on the command line + cmd = [self.EACOMMANDER_CMD, + '--serialno', disk, + '--resettype', '2', '--reset',] + result = self.run_command(cmd) + elif capabilitity == 'eACommander-usb': + # For this copy method 'disk' will be 'usb address' for eACommander command line parameters + # Note: Commands are executed in the order they are specified on the command line + cmd = [self.EACOMMANDER_CMD, + '--usb', disk, + '--resettype', '2', '--reset',] + result = self.run_command(cmd) + return result + + +def load_plugin(): + """ Returns plugin available in this module + """ + return HostTestPluginResetMethod_SiLabs() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py new file mode 100644 index 0000000000..3fbc2e6f0d --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/mbedrpc.py @@ -0,0 +1,287 @@ +""" +mbed SDK +Copyright (c) 2010-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +Example: +> from mbedRPC import* +> mbed = SerialRPC("COM5",9600); +> myled = DigitalOut(mbed, LED1); +> myled.write(1) +> +""" +import serial, urllib2, time + + +class pin(): + def __init__(self, id): + self.name = id + +LED1 = pin("LED1") +LED2 = pin("LED2") +LED3 = pin("LED3") +LED4 = pin("LED4") + +p5 = pin("p5") +p6 = pin("p6") +p7 = pin("p7") +p8 = pin("p8") +p9 = pin("p9") +p10 = pin("p10") +p11 = pin("p11") +p12 = pin("p12") +p13 = pin("p13") +p14 = pin("p14") +p15 = pin("p15") +p16 = pin("p16") +p17 = pin("p17") +p18 = pin("p18") +p19 = pin("p19") +p20 = pin("p20") +p21 = pin("p21") +p22 = pin("p22") +p23 = pin("p23") +p24 = pin("p24") +p25 = pin("p25") +p26 = pin("p26") +p27 = pin("p27") +p28 = pin("p28") +p29 = pin("p29") +p30 = pin("p30") + + +#mbed super class +class mbed: + def __init__(self): + print("This will work as a demo but no transport mechanism has been selected") + + def rpc(self, name, method, args): + print("Superclass method not overridden") + +#Transport mechanisms, derived from mbed + +class SerialRPC(mbed): + def __init__(self, port, baud=9600, reset=True, debug=False): + self.ser = serial.Serial(port) + self.ser.setBaudrate(baud) + self.ser.flushInput() + self.ser.flushOutput() + self.debug = debug + if reset: + if debug: + print "Reset mbed" + self.ser.sendBreak() + time.sleep(2) + + def rpc(self, name, method, args): + request = "/" + name + "/" + method + " " + " ".join(args) + if self.debug: + print "[RPC::TX] %s" % request + self.ser.write(request + "\n") + + while True: + response = self.ser.readline().strip() + if self.debug: + print "[RPC::RX] %s" % response + + # Ignore comments + if not response.startswith('#'): break + return response + + +class HTTPRPC(mbed): + def __init__(self, ip): + self.host = "http://" + ip + + def rpc(self, name, method, args): + response = urllib2.urlopen(self.host + "/rpc/" + name + "/" + method + "," + ",".join(args)) + return response.read().strip() + + +#mbed Interfaces + +class DigitalOut(): + def __init__(self, this_mbed , mpin): + self.mbed = this_mbed + if isinstance(mpin, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("DigitalOut", "new", [mpin.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def write(self, value): + r = self.mbed.rpc(self.name, "write", [str(value)]) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return int(r) + + +class AnalogIn(): + def __init__(self, this_mbed , mpin): + self.mbed = this_mbed + if isinstance(mpin, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("AnalogIn", "new", [mpin.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return float(r) + + def read_u16(self): + r = self.mbed.rpc(self.name, "read_u16", []) + return int(r) + + +class AnalogOut(): + def __init__(self, this_mbed , mpin): + self.mbed = this_mbed + if isinstance(mpin, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("AnalogOut", "new", [mpin.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def write(self, value): + r = self.mbed.rpc(self.name, "write", [str(value)]) + + def write_u16(self, value): + r = self.mbed.rpc(self.name, "write_u16", [str(value)]) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return float(r) + + +class DigitalIn(): + def __init__(self, this_mbed , mpin): + self.mbed = this_mbed + if isinstance(mpin, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("DigitalIn", "new", [mpin.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return int(r) + + +class PwmOut(): + def __init__(self, this_mbed , mpin): + self.mbed = this_mbed + if isinstance(mpin, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("PwmOut", "new", [mpin.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def write(self, value): + r = self.mbed.rpc(self.name, "write", [str(value)]) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return float(r) + + def period(self, value): + r = self.mbed.rpc(self.name, "period", [str(value)]) + + def period_ms(self, value): + r = self.mbed.rpc(self.name, "period_ms", [str(value)]) + + def period_us(self, value): + r = self.mbed.rpc(self.name, "period_us", [str(value)]) + + def puslewidth(self, value): + r = self.mbed.rpc(self.name, "pulsewidth", [str(value)]) + + def puslewidth_ms(self, value): + r = self.mbed.rpc(self.name, "pulsewidth_ms", [str(value)]) + + def puslewidth_us(self, value): + r = self.mbed.rpc(self.name, "pulsewidth_us", [str(value)]) + + +class Serial(): + def __init__(self, this_mbed , tx, rx = ""): + self.mbed = this_mbed + if isinstance(tx, str): + self.name = mpin + elif isinstance(mpin, pin): + self.name = self.mbed.rpc("Serial", "new", [tx.name, rx.name]) + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def putc(self, value): + r = self.mbed.rpc(self.name, "putc", [str(value)]) + + def puts(self, value): + r = self.mbed.rpc(self.name, "puts", [ "\"" + str(value) + "\""]) + + def getc(self): + r = self.mbed.rpc(self.name, "getc", []) + return int(r) + + +class RPCFunction(): + def __init__(self, this_mbed , name): + self.mbed = this_mbed + if isinstance(name, str): + self.name = name + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return int(r) + + def run(self, input): + r = self.mbed.rpc(self.name, "run", [input]) + return r + + +class RPCVariable(): + def __init__(self, this_mbed , name): + self.mbed = this_mbed + if isinstance(name, str): + self.name = name + + def __del__(self): + r = self.mbed.rpc(self.name, "delete", []) + + def write(self, value): + self.mbed.rpc(self.name, "write", [str(value)]) + + def read(self): + r = self.mbed.rpc(self.name, "read", []) + return r + + +def wait(s): + time.sleep(s) diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py new file mode 100644 index 0000000000..67f34ea6f8 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/midi.py @@ -0,0 +1,72 @@ +from __future__ import print_function +import sys +import re +import time +import mido +from mido import Message + + +def test_midi_in(port): + expected_messages_count=0 + while expected_messages_count < 7: + for message in port.iter_pending(): + if message.type in ('note_on', 'note_off', 'program_change', 'sysex'): + yield message + expected_messages_count+=1 + time.sleep(0.1) + +def test_midi_loopback(input_port): + expected_messages_count=0 + while expected_messages_count < 1: + for message in input_port.iter_pending(): + print('Test MIDI OUT loopback received {}'.format(message.hex())) + expected_messages_count+=1 + +def test_midi_out_loopback(output_port,input_port): + print("Test MIDI OUT loopback") + output_port.send(Message('program_change', program=1)) + test_midi_loopback(input_port) + + output_port.send(Message('note_on', note=21)) + test_midi_loopback(input_port) + + output_port.send(Message('note_off', note=21)) + test_midi_loopback(input_port) + + output_port.send(Message('sysex', data=[0x7E,0x7F,0x09,0x01])) + test_midi_loopback(input_port) + + output_port.send(Message('sysex', data=[0x7F,0x7F,0x04,0x01,0x7F,0x7F])) + test_midi_loopback(input_port) + + output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x7F,0x00,0x41])) + test_midi_loopback(input_port) + + output_port.send(Message('sysex', data=[0x41,0x10,0x42,0x12,0x40,0x00,0x04,0x7F,0x3D])) + test_midi_loopback(input_port) + +portname="" + +while portname=="": + print("Wait for MIDI IN plug ...") + for name in mido.get_input_names(): + matchObj = re.match( r'Mbed', name) + + if matchObj: + portname=name + time.sleep( 1 ) + +try: + input_port = mido.open_input(portname) + output_port = mido.open_output(portname) + + print('Using {}'.format(input_port)) + + print("Test MIDI IN") + + for message in test_midi_in(input_port): + print('Test MIDI IN received {}'.format(message.hex())) + + test_midi_out_loopback(output_port,input_port) +except KeyboardInterrupt: + pass \ No newline at end of file diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py new file mode 100644 index 0000000000..01b4541abb --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/net_test.py @@ -0,0 +1,27 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from host_test import Test, Simple +from sys import stdout + +class NETTest(Simple): + def __init__(self): + Test.__init__(self) + self.mbed.init_serial(115200) + self.mbed.reset() + +if __name__ == '__main__': + NETTest().run() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py new file mode 100644 index 0000000000..84b85d2cc6 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/rpc.py @@ -0,0 +1,56 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from host_test import Test +from mbedrpc import SerialRPC, DigitalOut, DigitalIn, pin + + +class RpcTest(Test): + def test(self): + self.notify("RPC Test") + s = SerialRPC(self.mbed.port, debug=True) + + self.notify("Init remote objects") + + p_out = pin("p10") + p_in = pin("p11") + + if hasattr(self.mbed.options, 'micro'): + if self.mbed.options.micro == 'M0+': + print "Freedom Board: PTA12 <-> PTC4" + p_out = pin("PTA12") + p_in = pin("PTC4") + + self.output = DigitalOut(s, p_out); + self.input = DigitalIn(s, p_in); + + self.check = True + self.write_read_test(1) + self.write_read_test(0) + return self.check + + def write_read_test(self, v): + self.notify("Check %d" % v) + self.output.write(v) + if self.input.read() != v: + self.notify("ERROR") + self.check = False + else: + self.notify("OK") + + +if __name__ == '__main__': + RpcTest().run() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py new file mode 100644 index 0000000000..d267936517 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/rtc_auto.py @@ -0,0 +1,50 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +from time import time, strftime, gmtime + +class RTCTest(): + PATTERN_RTC_VALUE = "\[(\d+)\] \[(\d+-\d+-\d+ \d+:\d+:\d+ [AaPpMm]{2})\]" + re_detect_rtc_value = re.compile(PATTERN_RTC_VALUE) + + def test(self, selftest): + test_result = True + start = time() + sec_prev = 0 + for i in range(0, 5): + # Timeout changed from default: we need to wait longer for some boards to start-up + c = selftest.mbed.serial_readline(timeout=10) + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(c.strip()) + delta = time() - start + m = self.re_detect_rtc_value.search(c) + if m and len(m.groups()): + sec = int(m.groups()[0]) + time_str = m.groups()[1] + correct_time_str = strftime("%Y-%m-%d %H:%M:%S %p", gmtime(float(sec))) + single_result = time_str == correct_time_str and sec > 0 and sec > sec_prev + test_result = test_result and single_result + result_msg = "OK" if single_result else "FAIL" + selftest.notify("HOST: [%s] [%s] received time %+d sec after %.2f sec... %s"% (sec, time_str, sec - sec_prev, delta, result_msg)) + sec_prev = sec + else: + test_result = False + break + start = time() + return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py new file mode 100644 index 0000000000..1fe18906aa --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/stdio_auto.py @@ -0,0 +1,56 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +import random +from time import time + +class StdioTest(): + PATTERN_INT_VALUE = "Your value was: (-?\d+)" + re_detect_int_value = re.compile(PATTERN_INT_VALUE) + + def test(self, selftest): + test_result = True + + c = selftest.mbed.serial_readline() # {{start}} preamble + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(c) + + for i in range(0, 10): + random_integer = random.randint(-99999, 99999) + selftest.notify("HOST: Generated number: " + str(random_integer)) + start = time() + selftest.mbed.serial_write(str(random_integer) + "\n") + + serial_stdio_msg = selftest.mbed.serial_readline() + if serial_stdio_msg is None: + return selftest.RESULT_IO_SERIAL + delay_time = time() - start + selftest.notify(serial_stdio_msg.strip()) + + # Searching for reply with scanned values + m = self.re_detect_int_value.search(serial_stdio_msg) + if m and len(m.groups()): + int_value = m.groups()[0] + int_value_cmp = random_integer == int(int_value) + test_result = test_result and int_value_cmp + selftest.notify("HOST: Number %s read after %.3f sec ... [%s]"% (int_value, delay_time, "OK" if int_value_cmp else "FAIL")) + else: + test_result = False + break + return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py new file mode 100644 index 0000000000..303f002ab8 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client.py @@ -0,0 +1,57 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import socket +import string, random +from time import time + +from private_settings import SERVER_ADDRESS + +ECHO_PORT = 7 + +LEN_PACKET = 127 +N_PACKETS = 5000 +TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 +MEGA = float(1024 * 1024) +UPDATE_STEP = (N_PACKETS/10) + +class TCP_EchoClient: + def __init__(self, host): + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.connect((host, ECHO_PORT)) + self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) + + def __packet(self): + # Comment out the checks when measuring the throughput + # self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) + self.s.send(self.packet) + data = self.s.recv(LEN_PACKET) + # assert self.packet == data, "packet error:\n%s\n%s\n" % (self.packet, data) + + def test(self): + start = time() + for i in range(N_PACKETS): + if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.) + self.__packet() + t = time() - start + print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA) + + def __del__(self): + self.s.close() + +while True: + e = TCP_EchoClient(SERVER_ADDRESS) + e.test() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py new file mode 100644 index 0000000000..fe915a1ce9 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_client_auto.py @@ -0,0 +1,87 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys +import socket +from sys import stdout +from SocketServer import BaseRequestHandler, TCPServer + +class TCPEchoClient_Handler(BaseRequestHandler): + def handle(self): + """ One handle per connection + """ + print "HOST: Connection received...", + count = 1; + while True: + data = self.request.recv(1024) + if not data: break + self.request.sendall(data) + if '{{end}}' in str(data): + print + print str(data) + else: + if not count % 10: + sys.stdout.write('.') + count += 1 + stdout.flush() + +class TCPEchoClientTest(): + def send_server_ip_port(self, selftest, ip_address, port_no): + """ Set up network host. Reset target and and send server IP via serial to Mbed + """ + c = selftest.mbed.serial_readline() # 'TCPCllient waiting for server IP and port...' + if c is None: + self.print_result(selftest.RESULT_IO_SERIAL) + return + + selftest.notify(c.strip()) + selftest.notify("HOST: Sending server IP Address to target...") + + connection_str = ip_address + ":" + str(port_no) + "\n" + selftest.mbed.serial_write(connection_str) + selftest.notify(connection_str) + + # Two more strings about connection should be sent by MBED + for i in range(0, 2): + c = selftest.mbed.serial_readline() + if c is None: + selftest.print_result(self.RESULT_IO_SERIAL) + return + selftest.notify(c.strip()) + + def test(self, selftest): + # We need to discover SERVEP_IP and set up SERVER_PORT + # Note: Port 7 is Echo Protocol: + # + # Port number rationale: + # + # The Echo Protocol is a service in the Internet Protocol Suite defined + # in RFC 862. It was originally proposed for testing and measurement + # of round-trip times[citation needed] in IP networks. + # + # A host may connect to a server that supports the Echo Protocol using + # the Transmission Control Protocol (TCP) or the User Datagram Protocol + # (UDP) on the well-known port number 7. The server sends back an + # identical copy of the data it received. + SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) + SERVER_PORT = 7 + + # Returning none will suppress host test from printing success code + server = TCPServer((SERVER_IP, SERVER_PORT), TCPEchoClient_Handler) + print "HOST: Listening for TCP connections: " + SERVER_IP + ":" + str(SERVER_PORT) + self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT) + server.serve_forever() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py new file mode 100644 index 0000000000..4a68bd9ee7 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server.py @@ -0,0 +1,50 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from SocketServer import BaseRequestHandler, TCPServer +from time import time + +from private_settings import LOCALHOST + +MAX_INDEX = 126 +MEGA = float(1024 * 1024) + +class TCP_EchoHandler(BaseRequestHandler): + def handle(self): + print "\nconnection received" + start = time() + bytes = 0 + index = 0 + while True: + data = self.request.recv(1024) + if not data: break + + bytes += len(data) + for n in map(ord, data): + if n != index: + print "data error %d != %d" % (n , index) + index += 1 + if index > MAX_INDEX: + index = 0 + + self.request.sendall(data) + t = time() - start + b = float(bytes * 8) * 2 + print "Throughput: (%.2f)Mbits/s" % ((b/t)/MEGA) + +server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) +print "listening for connections" +server.serve_forever() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py new file mode 100644 index 0000000000..8bc0e300d7 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_auto.py @@ -0,0 +1,84 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +import sys +import uuid +import socket +from sys import stdout + +class TCPEchoServerTest(): + ECHO_SERVER_ADDRESS = "" + ECHO_PORT = 0 + ECHO_LOOPs = 100 + s = None # Socket + + PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" + re_detect_server_ip = re.compile(PATTERN_SERVER_IP) + + def test(self, selftest): + result = False + c = selftest.mbed.serial_readline() + if c is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(c) + + m = self.re_detect_server_ip.search(c) + if m and len(m.groups()): + self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) + self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method + selftest.notify("HOST: TCP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) + + # We assume this test fails so can't send 'error' message to server + try: + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.connect((self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) + except Exception, e: + self.s = None + selftest.notify("HOST: Socket error: %s"% e) + return selftest.RESULT_ERROR + + print 'HOST: Sending %d echo strings...'% self.ECHO_LOOPs, + for i in range(0, self.ECHO_LOOPs): + TEST_STRING = str(uuid.uuid4()) + try: + self.s.sendall(TEST_STRING) + data = self.s.recv(128) + except Exception, e: + self.s = None + selftest.notify("HOST: Socket error: %s"% e) + return selftest.RESULT_ERROR + + received_str = repr(data)[1:-1] + if TEST_STRING == received_str: # We need to cut not needed single quotes from the string + sys.stdout.write('.') + stdout.flush() + result = True + else: + print "Expected: " + print "'%s'"% TEST_STRING + print "received: " + print "'%s'"% received_str + result = False + break + + if self.s is not None: + self.s.close() + else: + selftest.notify("HOST: TCP Server not found") + result = False + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py new file mode 100644 index 0000000000..df483974aa --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/tcpecho_server_loop.py @@ -0,0 +1,40 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +# Be sure that the tools directory is in the search path +import sys +from os.path import join, abspath, dirname +ROOT = abspath(join(dirname(__file__), "..", "..")) +sys.path.insert(0, ROOT) + +from workspace_tools.private_settings import LOCALHOST +from SocketServer import BaseRequestHandler, TCPServer + + +class TCP_EchoHandler(BaseRequestHandler): + def handle(self): + print "\nHandle connection from:", self.client_address + while True: + data = self.request.recv(1024) + if not data: break + self.request.sendall(data) + self.request.close() + print "socket closed" + +if __name__ == '__main__': + server = TCPServer((LOCALHOST, 7), TCP_EchoHandler) + print "listening for connections on:", (LOCALHOST, 7) + server.serve_forever() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py new file mode 100644 index 0000000000..cb0578fdf6 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py @@ -0,0 +1,145 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +""" +How to use: +make.py -m LPC1768 -t ARM -d E:\ -n NET_14 +udp_link_layer_auto.py -p COM20 -d E:\ -t 10 +""" + +import re +import uuid +import socket +import thread +from sys import stdout +from time import time, sleep +from host_test import DefaultTest +from SocketServer import BaseRequestHandler, UDPServer + + +# Received datagrams (with time) +dict_udp_recv_datagrams = dict() + +# Sent datagrams (with time) +dict_udp_sent_datagrams = dict() + + +class UDPEchoClient_Handler(BaseRequestHandler): + def handle(self): + """ One handle per connection + """ + _data, _socket = self.request + # Process received datagram + data_str = repr(_data)[1:-1] + dict_udp_recv_datagrams[data_str] = time() + + +def udp_packet_recv(threadName, server_ip, server_port): + """ This function will receive packet stream from mbed device + """ + server = UDPServer((server_ip, server_port), UDPEchoClient_Handler) + print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port) + server.serve_forever() + + +class UDPEchoServerTest(DefaultTest): + ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts + ECHO_PORT = 0 # UDP port for datagram bursts + CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters + s = None # Socket + + TEST_PACKET_COUNT = 1000 # how many packets should be send + TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms + PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in % + + PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" + re_detect_server_ip = re.compile(PATTERN_SERVER_IP) + + def get_control_data(self, command="stat\n"): + BUFFER_SIZE = 256 + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT)) + except Exception, e: + data = None + s.send(command) + data = s.recv(BUFFER_SIZE) + s.close() + return data + + def test(self): + serial_ip_msg = self.mbed.serial_readline() + if serial_ip_msg is None: + return self.RESULT_IO_SERIAL + stdout.write(serial_ip_msg) + stdout.flush() + # Searching for IP address and port prompted by server + m = self.re_detect_server_ip.search(serial_ip_msg) + if m and len(m.groups()): + self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) + self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method + self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) + + # Open client socket to burst datagrams to UDP server in mbed + try: + self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except Exception, e: + self.s = None + self.notify("HOST: Error: %s"% e) + return self.RESULT_ERROR + + # UDP replied receiver works in background to get echoed datagrams + SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) + SERVER_PORT = self.ECHO_PORT + 1 + thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT)) + sleep(0.5) + + # Burst part + for no in range(self.TEST_PACKET_COUNT): + TEST_STRING = str(uuid.uuid4()) + payload = str(no) + "__" + TEST_STRING + self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) + dict_udp_sent_datagrams[payload] = time() + sleep(self.TEST_STRESS_FACTOR) + + if self.s is not None: + self.s.close() + + # Wait 5 seconds for packets to come + result = True + self.notify("HOST: Test Summary:") + for d in range(5): + sleep(1.0) + summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0 + self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d, + summary_datagram_success, + len(dict_udp_recv_datagrams), + self.TEST_PACKET_COUNT, + self.TEST_STRESS_FACTOR)) + result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO) + stdout.flush() + + # Getting control data from test + self.notify("...") + self.notify("HOST: Mbed Summary:") + mbed_stats = self.get_control_data() + self.notify(mbed_stats) + return self.RESULT_SUCCESS if result else self.RESULT_FAILURE + + +if __name__ == '__main__': + UDPEchoServerTest().run() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py new file mode 100644 index 0000000000..1ff833f175 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client.py @@ -0,0 +1,55 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from socket import socket, AF_INET, SOCK_DGRAM +import string, random +from time import time + +from private_settings import CLIENT_ADDRESS + +ECHO_PORT = 7 + +LEN_PACKET = 127 +N_PACKETS = 5000 +TOT_BITS = float(LEN_PACKET * N_PACKETS * 8) * 2 +MEGA = float(1024 * 1024) +UPDATE_STEP = (N_PACKETS/10) + +class UDP_EchoClient: + s = socket(AF_INET, SOCK_DGRAM) + + def __init__(self, host): + self.host = host + self.packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) + + def __packet(self): + # Comment out the checks when measuring the throughput + # packet = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(LEN_PACKET)) + UDP_EchoClient.s.sendto(packet, (self.host, ECHO_PORT)) + data = UDP_EchoClient.s.recv(LEN_PACKET) + # assert packet == data, "packet error:\n%s\n%s\n" % (packet, data) + + def test(self): + start = time() + for i in range(N_PACKETS): + if (i % UPDATE_STEP) == 0: print '%.2f%%' % ((float(i)/float(N_PACKETS)) * 100.) + self.__packet() + t = time() - start + print 'Throughput: (%.2f)Mbits/s' % ((TOT_BITS / t)/MEGA) + +while True: + e = UDP_EchoClient(CLIENT_ADDRESS) + e.test() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py new file mode 100644 index 0000000000..7896127077 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_client_auto.py @@ -0,0 +1,77 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import sys +import socket +from sys import stdout +from SocketServer import BaseRequestHandler, UDPServer + +class UDPEchoClient_Handler(BaseRequestHandler): + def handle(self): + """ One handle per connection + """ + data, socket = self.request + socket.sendto(data, self.client_address) + if '{{end}}' in data: + print + print data + else: + sys.stdout.write('.') + stdout.flush() + +class UDPEchoClientTest(): + + def send_server_ip_port(self, selftest, ip_address, port_no): + c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' + if c is None: + selftest.print_result(selftest.RESULT_IO_SERIAL) + return + selftest.notify(c.strip()) + + selftest.notify("HOST: Sending server IP Address to target...") + connection_str = ip_address + ":" + str(port_no) + "\n" + selftest.mbed.serial_write(connection_str) + + c = selftest.mbed.serial_readline() # 'UDPCllient waiting for server IP and port...' + if c is None: + self.print_result(selftest.RESULT_IO_SERIAL) + return + selftest.notify(c.strip()) + return selftest.RESULT_PASSIVE + + def test(self, selftest): + # We need to discover SERVEP_IP and set up SERVER_PORT + # Note: Port 7 is Echo Protocol: + # + # Port number rationale: + # + # The Echo Protocol is a service in the Internet Protocol Suite defined + # in RFC 862. It was originally proposed for testing and measurement + # of round-trip times[citation needed] in IP networks. + # + # A host may connect to a server that supports the Echo Protocol using + # the Transmission Control Protocol (TCP) or the User Datagram Protocol + # (UDP) on the well-known port number 7. The server sends back an + # identical copy of the data it received. + SERVER_IP = str(socket.gethostbyname(socket.getfqdn())) + SERVER_PORT = 7 + + # Returning none will suppress host test from printing success code + server = UDPServer((SERVER_IP, SERVER_PORT), UDPEchoClient_Handler) + print "HOST: Listening for UDP connections..." + self.send_server_ip_port(selftest, SERVER_IP, SERVER_PORT) + server.serve_forever() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py new file mode 100644 index 0000000000..f6074332e4 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server.py @@ -0,0 +1,29 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from SocketServer import BaseRequestHandler, UDPServer +from private_settings import SERVER_ADDRESS + +class UDP_EchoHandler(BaseRequestHandler): + def handle(self): + data, socket = self.request + print "client:", self.client_address + print "data:", data + socket.sendto(data, self.client_address) + +server = UDPServer((SERVER_ADDRESS, 7195), UDP_EchoHandler) +print "listening for connections" +server.serve_forever() diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py new file mode 100644 index 0000000000..a7ee026306 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/udpecho_server_auto.py @@ -0,0 +1,68 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import re +import sys +import uuid +from sys import stdout +from socket import socket, AF_INET, SOCK_DGRAM + +class UDPEchoServerTest(): + ECHO_SERVER_ADDRESS = "" + ECHO_PORT = 0 + s = None # Socket + + PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)" + re_detect_server_ip = re.compile(PATTERN_SERVER_IP) + + def test(self, selftest): + result = True + serial_ip_msg = selftest.mbed.serial_readline() + if serial_ip_msg is None: + return selftest.RESULT_IO_SERIAL + selftest.notify(serial_ip_msg) + # Searching for IP address and port prompted by server + m = self.re_detect_server_ip.search(serial_ip_msg) + if m and len(m.groups()): + self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4]) + self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method + selftest.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT)) + + # We assume this test fails so can't send 'error' message to server + try: + self.s = socket(AF_INET, SOCK_DGRAM) + except Exception, e: + self.s = None + selftest.notify("HOST: Socket error: %s"% e) + return selftest.RESULT_ERROR + + for i in range(0, 100): + TEST_STRING = str(uuid.uuid4()) + self.s.sendto(TEST_STRING, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT)) + data = self.s.recv(len(TEST_STRING)) + received_str = repr(data)[1:-1] + if TEST_STRING != received_str: + result = False + break + sys.stdout.write('.') + stdout.flush() + else: + result = False + + if self.s is not None: + self.s.close() + return selftest.RESULT_SUCCESS if result else selftest.RESULT_FAILURE diff --git a/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py b/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py new file mode 100644 index 0000000000..2ab66a3b51 --- /dev/null +++ b/tool/mbed/mbed-sdk/workspace_tools/host_tests/wait_us_auto.py @@ -0,0 +1,69 @@ +""" +mbed SDK +Copyright (c) 2011-2013 ARM Limited + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from time import time + +class WaitusTest(): + """ This test is reading single characters from stdio + and measures time between their occurrences. + """ + TICK_LOOP_COUNTER = 13 + TICK_LOOP_SUCCESSFUL_COUNTS = 10 + DEVIATION = 0.10 # +/-10% + + def test(self, selftest): + test_result = True + # First character to start test (to know after reset when test starts) + if selftest.mbed.set_serial_timeout(None) is None: + return selftest.RESULT_IO_SERIAL + c = selftest.mbed.serial_read(1) + if c is None: + return selftest.RESULT_IO_SERIAL + if c == '$': # target will printout TargetID e.g.: $$$$1040e649d5c09a09a3f6bc568adef61375c6 + #Read additional 39 bytes of TargetID + if selftest.mbed.serial_read(39) is None: + return selftest.RESULT_IO_SERIAL + c = selftest.mbed.serial_read(1) # Re-read first 'tick' + if c is None: + return selftest.RESULT_IO_SERIAL + start_serial_pool = time() + start = time() + + success_counter = 0 + + for i in range(0, self.TICK_LOOP_COUNTER): + c = selftest.mbed.serial_read(1) + if c is None: + return selftest.RESULT_IO_SERIAL + delta = time() - start + deviation = abs(delta - 1) + # Round values + delta = round(delta, 2) + deviation = round(deviation, 2) + # Check if time measurements are in given range + deviation_ok = True if delta > 0 and deviation <= self.DEVIATION else False + success_counter = success_counter+1 if deviation_ok else 0 + msg = "OK" if deviation_ok else "FAIL" + selftest.notify("%s in %.2f sec (%.2f) [%s]"% (c, delta, deviation, msg)) + start = time() + if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS: + break + measurement_time = time() - start_serial_pool + selftest.notify("Consecutive OK timer reads: %d"% success_counter) + selftest.notify("Completed in %.2f sec" % (measurement_time)) + test_result = True if success_counter >= self.TICK_LOOP_SUCCESSFUL_COUNTS else False + return selftest.RESULT_SUCCESS if test_result else selftest.RESULT_FAILURE -- cgit v1.2.3