diff options
Diffstat (limited to 'pywrap/test_pcilib.py')
-rw-r--r-- | pywrap/test_pcilib.py | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/pywrap/test_pcilib.py b/pywrap/test_pcilib.py new file mode 100644 index 0000000..398d975 --- /dev/null +++ b/pywrap/test_pcilib.py @@ -0,0 +1,270 @@ +import threading +import pcilib +import random +import os +import json +import requests +import time +from optparse import OptionParser, OptionGroup + +class test_pcilib(): + def __init__(self, + device, + model, + num_threads = 150, + write_percentage = 0.1, + register = 'reg1', + prop = '/test/prop1', + branch = '/test', + server_host = 'http://localhost', + server_port = 12412, + server_message_delay = 0): + #initialize enviroment variables + if not 'APP_PATH' in os.environ: + APP_PATH = '' + file_dir = os.path.dirname(os.path.abspath(__file__)) + APP_PATH = str(os.path.abspath(file_dir + '/../..')) + os.environ["APP_PATH"] = APP_PATH + + if not 'PCILIB_MODEL_DIR' in os.environ: + os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml" + if not 'LD_LIBRARY_PATH' in os.environ: + os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib" + + random.seed() + #create pcilib_instance + self.device = device + self.model = model + self.pcilib = pcilib.pcilib(device, model) + self.num_threads = num_threads + self.write_percentage = write_percentage + self.register = register + self.prop = prop + self.branch = branch + self.server_message_delay = server_message_delay + self.server_port = server_port + self.server_host = server_host + + def testLocking(self, message, lock_id = None): + url = str(self.server_host + ':' + str(self.server_port)) + headers = {'content-type': 'application/json'} + payload =[{'command': 'lock', 'lock_id': lock_id}, + {'command': 'try_lock', 'lock_id': lock_id}, + {'command': 'unlock', 'lock_id': lock_id}, + {'command': 'lock_global'}, + {'command': 'unlock_global'}, + {'command': 'help'}] + r = requests.get(url, data=json.dumps(payload[message]), headers=headers) + print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))) + + def testThreadSafeReadWrite(self): + def threadFunc(): + if random.randint(0, 100) >= (self.write_percentage * 100): + ret = self.pcilib.get_property(self.prop) + print(self.register, ':', ret) + del ret + else: + val = random.randint(0, 65536) + print('set value:', val) + self.pcilib.set_property(val, self.prop) + try: + while(1): + thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)] + for i in range(0, self.num_threads): + thread_list[i].start() + for i in range(0, self.num_threads): + thread_list[i].join() + print('cycle done') + except KeyboardInterrupt: + print('testing done') + pass + + def testMemoryLeak(self): + try: + while(1): + val = long(random.randint(0, 8096)) + self.pcilib = pcilib.pcilib(self.device, self.model) + print(self.pcilib.get_property_list(self.branch)) + print(self.pcilib.get_register_info(self.register)) + print(self.pcilib.get_registers_list()) + print(self.pcilib.write_register(val, self.register)) + print(self.pcilib.read_register(self.register)) + print(self.pcilib.set_property(val, self.prop)) + print(self.pcilib.get_property(self.prop)) + except KeyboardInterrupt: + print('testing done') + pass + + def testServer(self): + url = str(self.server_host + ':' + str(self.server_port)) + headers = {'content-type': 'application/json'} + payload =[{'com': 'open', 'data2' : '12341'}, + #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'}, + {'command': 'help'}, + {'command': 'get_registers_list'}, + {'command': 'get_register_info', 'reg': self.register}, + {'command': 'get_property_list', 'branch': self.branch}, + {'command': 'read_register', 'reg': self.register}, + {'command': 'write_register', 'reg': self.register}, + {'command': 'get_property', 'prop': self.prop}, + {'command': 'set_property', 'prop': self.prop}] + + def sendRandomMessage(): + message_number = random.randint(1, len(payload) - 1) + print('message number: ', message_number) + payload[message_number]['value'] = random.randint(0, 8096) + r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers) + if(r.headers['content-type'] == 'application/json'): + print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))) + else: + print(r.content) + try: + r = requests.get(url, data=json.dumps(payload[1]), headers=headers) + if(r.headers['content-type'] == 'application/json'): + print(json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))) + else: + print(r.content) + + while(1): + time.sleep(self.server_message_delay) + thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)] + for i in range(0, self.num_threads): + thread_list[i].start() + for i in range(0, self.num_threads): + thread_list[i].join() + print('cycle done') + + except KeyboardInterrupt: + print('testing done') + pass + +if __name__ == '__main__': + #parce command line options + parser = OptionParser() + parser.add_option("-d", "--device", action="store", + type="string", dest="device", default=str('/dev/fpga0'), + help="FPGA device (/dev/fpga0)") + parser.add_option("-m", "--model", action="store", + type="string", dest="model", default=str('test'), + help="Memory model (autodetected)") + parser.add_option("-t", "--threads", action="store", + type="int", dest="threads", default=150, + help="Threads number (150)") + + server_group = OptionGroup(parser, "Server Options", + "Options for testing server.") + server_group.add_option("-p", "--port", action="store", + type="int", dest="port", default=9000, + help="Set testing server port (9000)") + server_group.add_option("--host", action="store", + type="string", dest="host", default='http://localhost', + help="Set testing server host (http://localhost)") + server_group.add_option("--delay", action="store", + type="float", dest="delay", default=0.0, + help="Set delay in seconds between sending messages to setver (0.0)") + parser.add_option_group(server_group) + + rw_group = OptionGroup(parser, "Registers/properties Options", + "Group stores testing register and property options") + rw_group.add_option("--write_percentage", action="store", + type="float", dest="write_percentage", default=0.5, + help="Set percentage (0.0 - 1.0) of write commands in multithread (0.5)" + "read/write test") + rw_group.add_option("-r", "--register", action="store", + type="string", dest="register", default='reg1', + help="Set register name (reg1)") + rw_group.add_option("--property", action="store", + type="string", dest="prop", default='/test/prop1', + help="Set property name (/test/prop1)") + rw_group.add_option("-b", "--branch", action="store", + type="string", dest="branch", default='/test', + help="Set property branch (/test)") + + parser.add_option_group(rw_group) + + test_group = OptionGroup(parser, "Test commands group", + "This group conatains aviable commands for testing. " + "If user add more than one command, they will process" + "sequientally. To stop test, press Ctrl-C." + ) + test_group.add_option("--test_mt_rw", action="store_true", + dest="test_mt_rw", default=False, + help="Multithread read/write test. This test will execute " + "get_property/set_property commands with random values in multi-thread mode") + test_group.add_option("--test_memory_leak", action="store_true", + dest="test_memory_leak", default=False, + help="Python wrap memory leak test. This test will execute all" + "Python wrap memory leak test. This test will execute all" + ) + test_group.add_option("--test_server", action="store_true", + dest="test_server", default=False, + help="Python server test. This test will send " + "random commands to server in multi-thread mode" + ) + parser.add_option_group(test_group) + + lock_group = OptionGroup(parser, "Locking test group") + + lock_group.add_option("--lock", action="store", + dest="lock_id", default=None, type="string", + help="Lock id." + ) + lock_group.add_option("--try_lock", action="store", + dest="try_lock_id", default=None, type="string", + help="Try to lock id." + ) + lock_group.add_option("--unlock", action="store", + dest="unlock_id", default=None, type="string", + help="Unlock lock with id." + ) + lock_group.add_option("--lock_global", action="store_true", + dest="lock_global", default=False, + help="Global pcilib lock." + ) + lock_group.add_option("--unlock_global", action="store_true", + dest="unlock_global", default=False, + help="Global pcilib unlock." + ) + + lock_group.add_option("--get_server_message", action="store_true", + dest="get_server_message", default=False, + help="Global pcilib unlock." + ) + + + parser.add_option_group(lock_group) + + opts = parser.parse_args()[0] + + #create pcilib test instance + lib = test_pcilib(opts.device, + opts.model, + num_threads = opts.threads, + write_percentage = opts.write_percentage, + register = opts.register, + prop = opts.prop, + branch = opts.branch, + server_host = opts.host, + server_port = opts.port, + server_message_delay = opts.delay) + + #make some tests + if opts.test_mt_rw: + lib.testThreadSafeReadWrite() + if opts.test_memory_leak: + lib.testMemoryLeak() + if opts.test_server: + lib.testServer() + if opts.lock_id: + lib.testLocking(0, opts.lock_id) + if opts.try_lock_id: + lib.testLocking(1, opts.try_lock_id) + if opts.unlock_id: + lib.testLocking(2, opts.unlock_id) + if opts.lock_global: + lib.testLocking(3) + if opts.unlock_global: + lib.testLocking(4) + if(opts.get_server_message): + lib.testLocking(5) + |