Changeset 4537
- Timestamp:
- 07/18/08 19:20:10 (5 years ago)
- Location:
- developers/john_lee/pyfso
- Files:
-
- 5 edited
-
__init__.py (modified) (1 diff)
-
accelerometer.py (modified) (3 diffs)
-
dialer.py (modified) (3 diffs)
-
fso_backend.py (modified) (5 diffs)
-
general.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
developers/john_lee/pyfso/__init__.py
r4527 r4537 1 """ 2 3 (C) 2008 John Lee <john_lee@openmoko.com> 4 (C) 2008 Openmoko, Inc. 5 GPLv2 or later 6 """ 1 7 from accelerometer import Accelerometer, MockAccelerometer, \ 2 8 InputDevAccelerometer, Gta02Accelerometer, get_xy_theta -
developers/john_lee/pyfso/accelerometer.py
r4527 r4537 1 1 """ 2 Accelerometer class. It can run in two different modes. In pull 3 mode, it accepts read request then retrieve accelerometer values. In 4 daemon mode, it updates itself as often as possible, notifies the 5 observers about value changes and answers request with its current 6 state. 7 2 Get accelerometer signals from dbus and parse them. 8 3 (C) 2008 John Lee <john_lee@openmoko.com> 9 4 (C) 2008 Openmoko, Inc. 10 5 GPLv2 or later 11 6 """ 12 from __future__ import with_statement 13 from general import Subject 7 from fso_backend import FSOObject 8 from general import Subject, DumpObserver 9 import math 14 10 15 import math 16 import os 17 import struct 18 from threading import RLock 19 from time import sleep 11 class FSOAccelerometer(Subject): 12 def __init__(self, fso_obj): 13 super(FSOAccelerometer, self).__init__() 14 self.fso = fso_obj 15 self.fso.onAccelerometer.append(self.on_accelerometer) 20 16 21 class Accelerometer(Subject): 22 """The base class of all accelerometers. 23 """ 24 25 def __new__(cls, *args, **kwargs): 26 cls.sample_rate = property(cls._get_sample_rate, cls._set_sample_rate) 27 obj = object.__new__(cls, *args, **kwargs) 28 return obj 29 30 def __init__(self, device, sample_rate=None): 31 """device: the name of the input 32 sample_rate: how many samples per second 33 """ 34 super(Accelerometer, self).__init__() 35 self._statelock = RLock() 36 self._state = (0, -1000, 0) 37 self._open_device(device) 38 self._daemonized = False 39 if sample_rate is not None: 40 self.sample_rate = sample_rate 41 42 def _open_device(self, device): 43 raise NotImplementedError 44 45 def _get_sample_rate(self): 46 """get sample rate 47 """ 48 raise NotImplementedError 49 50 def _set_sample_rate(self, sample_rate): 51 """set sample rate 52 """ 53 raise NotImplementedError 54 55 def _retrieve(self): 56 """read and parse the current values. could be time 57 consuming. 58 """ 59 raise NotImplementedError 60 61 @property 62 def state(self): 63 """return the current state 64 """ 65 if self._daemonized: 66 with self._statelock: 67 return self._state 68 else: 69 state = self._retrieve() 70 with self._statelock: 71 self._state = state 72 return self._state 73 74 def daemonize(self): 75 """a infinite loop to update self.state. will notify 76 observers about state changes. 77 """ 78 self._daemonized = True 79 while self._daemonized: 80 state = self._retrieve() 81 with self._statelock: 82 self._state = state 83 self._notify(self._state) 84 85 def stop(self): 86 """stop daemon 87 """ 88 self._daemonized = False 17 def on_accelerometer(self, x, y, z): 18 self._notify(x, y, z) 89 19 90 20 91 class MockAccelerometer(Accelerometer): 92 """Mock accelerometer class. 93 >>> g = MockAccelerometer() 94 >>> g.sample_rate 95 100 96 >>> g.sample_rate = 400 97 >>> g.sample_rate 98 400 99 """ 100 101 def __init__(self): 102 super(MockAccelerometer, self).__init__(None) 103 self.sample_rate = 100 104 105 def _open_device(self, device): 106 pass 107 108 def _set_sample_rate(self, sample_rate): 109 self._sample_rate = sample_rate 110 111 def _get_sample_rate(self): 112 return self._sample_rate 113 114 def _retrieve(self): 115 sleep(1/self._sample_rate) 116 return (0, -1000, 0) 117 118 119 class InputDevAccelerometer(Accelerometer): 120 """Read values from kernel input device 121 """ 122 123 # Event types 124 EV_SYN = 0x00 125 EV_KEY = 0x01 126 EV_REL = 0x02 127 EV_ABS = 0x03 128 EV_MSC = 0x04 129 EV_SW = 0x05 130 EV_LED = 0x11 131 EV_SND = 0x12 132 EV_REP = 0x14 133 EV_FF = 0x15 134 EV_PWR = 0x16 135 EV_FF = 0x17 136 EV_MAX = 0x1f 137 EV_CNT = (EV_MAX+1) 138 139 # Relative axes 140 REL_X = 0x00 141 REL_Y = 0x01 142 REL_Z = 0x02 143 REL_RX = 0x03 144 REL_RY = 0x04 145 REL_RZ = 0x05 146 REL_HWHEEL = 0x06 147 REL_DIAL = 0x07 148 REL_WHEEL = 0x08 149 REL_MISC = 0x09 150 REL_MAX = 0x0f 151 REL_CNT = REL_MAX + 1 152 153 input_event_struct = "@llHHi" 154 input_event_size = struct.calcsize(input_event_struct) 155 156 def __init__(self, *args, **kwargs): 157 super(InputDevAccelerometer, self).__init__(*args, **kwargs) 158 159 def _open_device(self, device): 160 self.device = os.open(device, os.O_RDONLY | os.O_SYNC) 161 162 def _unpack(self): 163 """struct input_event { 164 struct timeval time; /* (long, long) */ 165 __u16 type; 166 __u16 code; 167 __s32 value; 168 }; 169 return (tv_sec, tv_usec, type, code, value) 170 """ 171 for i in xrange(0, 5): 172 data = os.read(self.device, InputDevAccelerometer.input_event_size) 173 if len(data) >= InputDevAccelerometer.input_event_size: 174 break; 175 else: 176 raise Exception() 177 178 return struct.unpack(InputDevAccelerometer.input_event_struct, data) 179 180 def _unpack_xyz(self): 181 """return a 3 tuple 182 """ 183 # wait for EV_SYN 184 while self._unpack()[2] != InputDevAccelerometer.EV_SYN: 185 pass 186 # now return (x, y, z) 187 return (self._unpack()[4], self._unpack()[4], self._unpack()[4]) 188 189 190 class Gta02Accelerometer(InputDevAccelerometer): 191 """Read values from gta02. for now we use just one. 192 >>> g = Gta02Accelerometer() 193 >>> g.sample_rate = 400 194 >>> g.sample_rate 195 400 196 >>> g.sample_rate = 100 197 >>> g.sample_rate 198 100 199 """ 200 201 INPUT_DEV = '/dev/input/event3' 202 SYS_SAMPLE_RATE = '/sys/devices/platform/spi_s3c24xx_gpio.1/spi0.1/sample_rate' 203 204 def __init__(self, device=None, sample_rate=None): 205 if device == None: 206 device = Gta02Accelerometer.INPUT_DEV 207 super(Gta02Accelerometer, self).__init__(device, sample_rate) 208 209 def _set_sample_rate(self, sample_rate): 210 """possible values: 100, 400 211 """ 212 f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'w', 0) 213 f.write('%d\n' % sample_rate) 214 f.close() 215 216 def _get_sample_rate(self): 217 f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'r', 0) 218 sample_rate = int(f.read()) 219 f.close() 220 return sample_rate 221 222 def _retrieve(self): 223 return self._unpack_xyz() 224 225 226 # shamelessly stole from olv 21 # shamelessly stoled from olv 227 22 def get_xy_theta(u): 228 23 """get the angle related to (0, -1), clockwise. … … 238 33 239 34 240 class DumpObserver(object):241 """Does nothing but print the subject state when notified.242 """243 244 def __init__(self, subject=None):245 if subject == None:246 subject = MockAccelerometer()247 subject.attach(self)248 self.subject = subject249 250 def update(self, state):251 t = get_xy_theta(state)252 print 'x = %d, y = %d, z = %d, theta = %f pi' % (state + (t / math.pi, ))253 254 255 def dump():256 try:257 gsensor = Gta02Accelerometer(sample_rate=400)258 except OSError:259 gsensor = MockAccelerometer()260 DumpObserver(gsensor)261 gsensor.daemonize()262 263 264 35 def _doctest(): 265 36 try: … … 271 42 272 43 44 def _test(): 45 from dbus.mainloop.glib import DBusGMainLoop 46 fso = FSOObject() 47 fso.initialize(DBusGMainLoop()) 48 accelerometer = FSOAccelerometer(fso) 49 DumpObserver(accelerometer) 50 import gobject 51 try: 52 gobject.MainLoop().run() 53 except KeyboardInterrupt: 54 return 55 56 273 57 if __name__ == '__main__': 274 58 _doctest() 59 _test() -
developers/john_lee/pyfso/dialer.py
r4536 r4537 1 """ 2 3 (C) 2008 John Lee <john_lee@openmoko.com> 4 (C) 2008 Openmoko, Inc. 5 GPLv2 or later 6 """ 1 7 from general import Subject 2 8 from fso_backend import FSOObject … … 65 71 66 72 67 def need_initialized(real_f):68 def decorator(self, *args, **kwargs):69 if self.initialized:70 return real_f(self, *args, **kwargs)71 else:72 return False73 return decorator74 75 76 73 class FSODialer(Dialer): 77 74 """A Dialer implementation based on fso dbus API. 78 75 """ 79 def __init__(self, mainloop):76 def __init__(self, fso_obj): 80 77 super(FSODialer, self).__init__() 81 78 self.callid = None 82 79 self.initialized = False 83 80 self.registered = False 84 self.fso = FSOObject() 81 self.fso = fso_obj 82 self._set_state('release') 85 83 from threading import Thread 86 # put the slow init function into background 87 Thread(target=self._init, args=(mainloop, )).start() 88 # FIXME: get state from fso and set self state 89 self._set_state('release') 84 Thread(target=self._init).run() 90 85 91 def _init(self , mainloop):92 if not self.fso. initialize(mainloop):93 return86 def _init(self): 87 if not self.fso.fullinit: 88 self.fso.initialize() 94 89 self.fso.onCallStatus.append(self.on_call_status) 95 self.initialized = True96 90 if not self._register(): 97 91 return 98 92 self.registered = True 93 # FIXME: get state from fso and set self state 99 94 return True 100 95 … … 132 127 def _test(): 133 128 from dbus.mainloop.glib import DBusGMainLoop 134 FSODialer(DBusGMainLoop()) 135 129 fso = FSOObject() 130 fso.initialize(DBusGMainLoop()) 131 dialer = FSODialer(fso) 132 import gobject 133 try: 134 gobject.MainLoop().run() 135 except KeyboardInterrupt: 136 return 136 137 137 138 if __name__ == "__main__": -
developers/john_lee/pyfso/fso_backend.py
r4536 r4537 1 """ 2 3 (C) 2008 John Lee <john_lee@openmoko.com> 4 (C) 2008 Openmoko, Inc. 5 GPLv2 or later 6 """ 1 7 import os 2 8 from dbus import DBusException, SystemBus, Interface 3 9 from general import Singleton 4 10 5 class FSOObject( Singleton):6 """ based on the code from zhone but it's a singleton class now.11 class FSOObject(object): 12 """Based on the code from zhone. 7 13 """ 8 9 def __init__( self ): 14 def __init__(self): 10 15 self.objects = {} 11 16 self.onResourceChanged = [] … … 14 19 self.onIdleStateChanged = [] 15 20 self.onSignalStrength = [] 21 self.onAccelerometer = [] 16 22 self.ignoreSuspend = False 17 23 … … 44 50 return obj 45 51 46 def initialize( self, mainloop):52 def initialize(self, mainloop): 47 53 if self.fullinit: 48 54 return True … … 113 119 self.display_iface = Interface( self.display_obj, "org.freesmartphone.Device.Display" ) 114 120 self.display_iface.SetBrightness( 90 ) 121 122 self.accelerometer_obj = self.bus.get_object("org.freesmartphone.odeviced", "/org/freesmartphone/Device/Accelerometer") 123 self.accelerometer_iface = Interface(self.accelerometer_obj, "org.freesmartphone.Device.Accelerometer") 124 self.accelerometer_iface.connect_to_signal("Event", self.cbAccelerometer) 125 115 126 if self.device_obj is None: 116 127 failcount += 1 … … 175 186 print 'SignalStrength:', strength 176 187 188 def cbAccelerometer(self, x, y, z): 189 for cb in self.onAccelerometer: 190 cb(x, y, z) 191 177 192 178 193 def _test(): 179 194 from dbus.mainloop.glib import DBusGMainLoop 195 import sys 180 196 fso = FSOObject() 181 197 fso.initialize(DBusGMainLoop()) 198 import gobject 199 try: 200 gobject.MainLoop().run() 201 except KeyboardInterrupt: 202 return 182 203 183 204 -
developers/john_lee/pyfso/general.py
r4527 r4537 1 """ 2 3 (C) 2008 John Lee <john_lee@openmoko.com> 4 (C) 2008 Openmoko, Inc. 5 GPLv2 or later 6 """ 1 7 from __future__ import with_statement 2 8 from threading import RLock … … 29 35 30 36 class Singleton(object): 31 __single_lock = RLock() 37 """the implementation of the singleton class in python. 38 >>> class A(Singleton): 39 ... def __init__(self): 40 ... print 'init' 41 ... 42 >>> a1 = A.instance() 43 init 44 >>> a2 = A.instance() 45 """ 32 46 __single = None 33 def __new__(cls, *args, **kwargs): 34 with cls.__single_lock: 35 if cls != type(cls.__single): 36 # subclass can create their own 37 cls.__single = object.__new__(cls, *args, **kwargs) 38 return cls.__single 39 #FIXME: the __init__ problem 47 48 @classmethod 49 def instance(cls, *args, **kwargs): 50 if cls != type(cls.__single): 51 cls.__single = cls(*args, **kwargs) 52 return cls.__single 53 54 55 class DumpObserver(object): 56 def __init__(self, subject): 57 subject.attach(self) 58 59 def update(self, *args, **kwargs): 60 print args, kwargs 61 62 63 def _doctest(): 64 try: 65 import doctest 66 except ImportError: 67 return 68 doctest.testmod() 69 70 71 if __name__ == '__main__': 72 _doctest()
Note: See TracChangeset
for help on using the changeset viewer.
