summaryrefslogtreecommitdiffstats
path: root/pywrap/test_pcipywrap.py
blob: 257b4a5d703456affda6762398c9aa6ace86c87c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import threading
import pcipywrap
import random
import os
import json
import requests
import time

class test_pcipywrap():
   def __init__(self, device, model, num_threads = 150,
   write_percentage = 0.1, register = 'test_prop2',
   server_host = 'http://localhost', server_port = 12412,
   server_message_delay = 0):
	  #initialize enviroment variables
      if not 'APP_PATH' in os.environ:
         APP_PATH = ''
         file_dir = os.path.dirname(os.path.abspath(__file__))
         APP_PATH = str(os.path.abspath(file_dir + '/../..'))
         os.environ["APP_PATH"] = APP_PATH
       
      if not 'PCILIB_MODEL_DIR' in os.environ:   
         os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
      if not 'LD_LIBRARY_PATH' in os.environ: 
         os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
   
      random.seed()
      #create pcilib_instance
      self.pcilib = pcipywrap.Pcipywrap(device, model)
      self.num_threads = num_threads
      self.write_percentage = write_percentage
      self.register = register
      self.server_message_delay = server_message_delay
      self.server_port = server_port
      self.server_host = server_host
    
   def testThreadSafeReadWrite(self):
      def threadFunc():
         if random.randint(0, 100) >= (self.write_percentage * 100):
            ret = self.pcilib.get_property('/test/prop2')
            print self.register, ':', ret
            del ret
         else:
            val = random.randint(0, 65536)
            print 'set value:', val
            self.pcilib.write_register(val, self.register)
      try:
         while(1):
            thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
            for i in range(0, self.num_threads):
               thread_list[i].start()
            for i in range(0, self.num_threads):
               thread_list[i].join()
            print 'cycle done'
      except KeyboardInterrupt:
         print 'testing done'
         pass
   
   def testMemoryLeak(self):
      try:
         while(1):
   		    #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
   	  
            print self.pcilib.get_property_list('/test')
            print self.pcilib.get_register_info('test_prop1')
            #print self.pcilib.get_registers_list();
         
            #print self.pcilib.read_register('reg1')
            #print self.pcilib.write_register(12, 'reg1')
         
            #print self.pcilib.get_property('/test/prop2')
            #print self.pcilib.set_property(12, '/test/prop2')
      except KeyboardInterrupt:
         print 'testing done'
         pass

   def testServer(self):
      url = str(self.server_host + ':' + str(self.server_port))
      headers = {'content-type': 'application/json'}
      payload =[{'com': 'open', 'data2' : '12341'},
      #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
      {'command': 'help'},
      {'command': 'get_registers_list'},
      {'command': 'get_register_info', 'reg': 'reg1'},
      {'command': 'get_property_list'},
      {'command': 'read_register', 'reg': 'reg1'},
      {'command': 'write_register', 'reg': 'reg1'},
      {'command': 'get_property', 'prop': '/test/prop2'},
      {'command': 'set_property', 'prop': '/test/prop2'}]
      
      def sendRandomMessage():
         message_number = random.randint(1, len(payload) - 1)
         print 'message number: ', message_number
         payload[message_number]['value'] =  random.randint(0, 65535)
         r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
         print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
      
      try:    
         r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
         print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
   
         while(1):
            time.sleep(self.server_message_delay)
            thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
            for i in range(0, self.num_threads):
               thread_list[i].start()
            for i in range(0, self.num_threads):
               thread_list[i].join()
            print 'cycle done'
            
      except KeyboardInterrupt:
         print 'testing done'
         pass

if __name__ == '__main__':
   lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
   write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
   server_message_delay = 0)
   lib.testThreadSafeReadWrite()