Changeset 4539
- Timestamp:
- 07/18/08 19:20:29 (5 years ago)
- Location:
- developers/john_lee/pyfso
- Files:
-
- 5 edited
-
__init__.py (modified) (1 diff)
-
accelerometer.py (modified) (3 diffs)
-
dialer.py (modified) (6 diffs)
-
fso_backend.py (modified) (5 diffs)
-
general.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
developers/john_lee/pyfso/__init__.py
r4537 r4539 5 5 GPLv2 or later 6 6 """ 7 from accelerometer import Accelerometer, MockAccelerometer, \ 8 InputDevAccelerometer, Gta02Accelerometer, get_xy_theta 9 7 from accelerometer import MockAccelerometer, FSOAccelerometer 10 8 from dialer import Dialer, MockDialer, FSODialer 9 from fso_backend import FSOObject -
developers/john_lee/pyfso/accelerometer.py
r4537 r4539 5 5 GPLv2 or later 6 6 """ 7 from fso_backend import FSOObject 7 from dbus import Interface, SystemBus 8 from dbus.exceptions import DBusException 8 9 from general import Subject, DumpObserver 9 10 import math 10 11 12 class MockAccelerometer(Subject): 13 """>>> m = MockAccelerometer() 14 >>> d = DumpObserver(m) 15 (0, -1000, 0) {} 16 """ 17 def __init__(self, *args, **kwargs): 18 super(MockAccelerometer, self).__init__() 19 20 def attach(self, function): 21 super(MockAccelerometer, self).attach(function) 22 self._notify(0, -1000, 0) 23 24 11 25 class FSOAccelerometer(Subject): 12 def __init__(self, fso _obj):26 def __init__(self, fso): 13 27 super(FSOAccelerometer, self).__init__() 14 self.fso = fso_obj 15 self.fso.onAccelerometer.append(self.on_accelerometer) 16 17 def on_accelerometer(self, x, y, z): 18 self._notify(x, y, z) 28 fso.onAccelerometer.append(self._notify) 19 29 20 30 … … 42 52 43 53 44 def _test ():54 def _test_fso(): 45 55 from dbus.mainloop.glib import DBusGMainLoop 46 fso = FSOObject() 47 fso.initialize(DBusGMainLoop()) 48 accelerometer = FSOAccelerometer(fso) 56 from fso_backend import FSOObject 57 accelerometer = FSOAccelerometer(FSOObject(SystemBus(mainloop=DBusGMainLoop()))) 49 58 DumpObserver(accelerometer) 50 59 import gobject … … 57 66 if __name__ == '__main__': 58 67 _doctest() 59 _test ()68 _test_fso() -
developers/john_lee/pyfso/dialer.py
r4537 r4539 6 6 """ 7 7 from general import Subject 8 from fso_backend import FSOObject 9 from dbus import DBusException 8 from dbus import DBusException, SystemBus 10 9 from threading import Timer 11 10 … … 74 73 """A Dialer implementation based on fso dbus API. 75 74 """ 76 def __init__(self, fso _obj):75 def __init__(self, fso): 77 76 super(FSODialer, self).__init__() 78 77 self.callid = None 79 78 self.initialized = False 80 79 self.registered = False 81 self.fso = fso_obj 80 self.fso = fso 81 self.fso.onCallStatus.append(self.on_call_status) 82 82 self._set_state('release') 83 83 from threading import Thread … … 85 85 86 86 def _init(self): 87 if not self.fso.fullinit:88 self.fso.initialize()89 self.fso.onCallStatus.append(self.on_call_status)90 if not self._register():91 return92 self.registered = True93 # FIXME: get state from fso and set self state94 return True95 96 def _register(self):97 87 if self.fso.gsm_network_iface.GetStatus()['registration'] == 'unregistered': 98 88 try: … … 100 90 self.fso.gsm_network_iface.Register() 101 91 print 'registered' 92 self.registered = True 102 93 except DBusException, e: 103 94 # FIXME pin number? 104 95 print e 105 96 self.registered = False 106 return False107 return True108 97 109 98 @need_registered … … 125 114 126 115 127 def _test ():116 def _test_fso(): 128 117 from dbus.mainloop.glib import DBusGMainLoop 129 fso = FSOObject() 130 fso.initialize(DBusGMainLoop()) 131 dialer = FSODialer(fso) 118 from fso_backend import FSOObject 119 try: 120 dialer = FSODialer(FSOObject(SystemBus(mainloop=DBusGMainLoop()))) 121 except DBusException: 122 return 132 123 import gobject 133 124 try: … … 137 128 138 129 if __name__ == "__main__": 139 _test ()130 _test_fso() -
developers/john_lee/pyfso/fso_backend.py
r4537 r4539 1 1 """ 2 2 Originated from zhone 3 3 (C) 2008 John Lee <john_lee@openmoko.com> 4 4 (C) 2008 Openmoko, Inc. … … 12 12 """Based on the code from zhone. 13 13 """ 14 def __init__(self): 14 def __init__(self, bus): 15 15 16 self.objects = {} 16 17 self.onResourceChanged = [] … … 21 22 self.onAccelerometer = [] 22 23 self.ignoreSuspend = False 24 self.bus = bus 23 25 24 self.f ramework_obj= None25 self. gsm_device_obj= None26 self.fw_iface = None 27 self.usage_iface = None 26 28 self.gsm_device_iface = None 27 self.usage_iface = None 29 self.gsm_sim_iface = None 30 self.gsm_network_iface = None 31 self.gsm_call_iface = None 32 self.gsm_test_iface = None 28 33 self.device_iface = None 29 34 self.device_power_iface = None 30 self.idlenotifier_obj = None31 35 self.idlenotifier_iface = None 32 self.inputnotifier_obj = None33 36 self.inputnotifier_iface = None 34 self.display_obj = None35 37 self.display_iface = None 38 self.accelerometer_iface = None 36 39 37 self.fullinit = False 40 # Framework -- at least we should have this. if not, let exception raise. 41 fw_obj = self.bus.get_object('org.freesmartphone.frameworkd', 42 '/org/freesmartphone/Framework') 43 self.fw_iface = Interface(fw_obj, "org.freesmartphone.Objects") 44 45 # Usage 46 usage_obj = self.tryGetProxy('org.freesmartphone.ousaged', 47 '/org/freesmartphone/Usage') 48 if usage_obj: 49 print 'creating usage interface', 50 self.usage_iface = Interface(usage_obj, 'org.freesmartphone.Usage') 51 self.usage_iface.connect_to_signal("ResourceChanged", self.cbResourceChanged) 52 self.usage_iface.RequestResource("GSM") 53 print 'ok' 54 55 # Phone 56 gsm_device_obj = self.tryGetProxy('org.freesmartphone.ogsmd', 57 '/org/freesmartphone/GSM/Device') 58 if gsm_device_obj: 59 print "creating gsm interfaces", 60 self.gsm_device_iface = Interface(gsm_device_obj, 'org.freesmartphone.GSM.Device') 61 self.gsm_sim_iface = Interface(gsm_device_obj, 'org.freesmartphone.GSM.SIM') 62 self.gsm_network_iface = Interface(gsm_device_obj, 'org.freesmartphone.GSM.Network') 63 self.gsm_call_iface = Interface(gsm_device_obj, 'org.freesmartphone.GSM.Call') 64 self.gsm_test_iface = Interface(gsm_device_obj, 'org.freesmartphone.GSM.Test') 65 self.gsm_call_iface.connect_to_signal("CallStatus", self.cbCallStatus) 66 self.gsm_network_iface.connect_to_signal("Status", self.cbNetworkStatus) 67 self.gsm_network_iface.connect_to_signal('SignalStrength', self.cbSignalStrength) 68 print 'ok' 69 70 device_obj = self.tryGetProxy('org.freesmartphone.odeviced', 71 '/org/freesmartphone/Device') 72 if device_obj: 73 self.device_iface = Interface(device_obj, 'org.freesmartphone.Device') 74 print 'device interface ok' 75 76 self.device_power_iface = self.tryGetFirstInterface( 77 "org.freesmartphone.odeviced", "org.freesmartphone.Device.PowerSupply") 78 79 self.idlenotifier_iface = self.tryGetFirstInterface( 80 "org.freesmartphone.odeviced", "org.freesmartphone.Device.IdleNotifier") 81 if self.idlenotifier_iface: 82 self.idlenotifier_iface.connect_to_signal("State", self.cbIdleStateChanged) 83 84 inputnotifier_obj = self.tryGetProxy("org.freesmartphone.odeviced", "/org/freesmartphone/Device/Input") 85 if inputnotifier_obj: 86 self.inputnotifier_iface = Interface(inputnotifier_obj, "org.freesmartphone.Device.Input") 87 self.inputnotifier_iface.connect_to_signal("Event", self.cbEvent) 88 89 self.display_iface = self.tryGetFirstInterface( 90 "org.freesmartphone.odeviced", "org.freesmartphone.Device.Display") 91 if self.display_iface: 92 self.display_iface.SetBrightness(90) 93 94 accelerometer_obj = self.tryGetProxy( 95 "org.freesmartphone.odeviced", "/org/freesmartphone/Device/Accelerometer") 96 if accelerometer_obj: 97 self.accelerometer_iface = Interface(accelerometer_obj, "org.freesmartphone.Device.Accelerometer") 98 self.accelerometer_iface.connect_to_signal("Event", self.cbAccelerometer) 99 100 def tryGetFirstInterface(self, busname, interface): 101 if self.fw_iface: 102 list = self.fw_iface.ListObjectsByInterface(interface) 103 if list and (len(list) > 1): 104 obj = self.tryGetProxy(busname, list[0]) 105 return Interface(obj, interface) 38 106 39 107 def tryGetProxy( self, busname, objname ): … … 49 117 self.objects[ "%s:%s" % ( busname, objname ) ] = obj 50 118 return obj 51 52 def initialize(self, mainloop):53 if self.fullinit:54 return True55 try:56 self.bus = SystemBus( mainloop=mainloop )57 except DBusException, e:58 print "could not connect to dbus_object system bus:", e59 return False60 61 # Framework62 fw_obj = self.tryGetProxy( 'org.freesmartphone.frameworkd', '/org/freesmartphone/Framework' )63 if fw_obj is None:64 print ( "could not connect to org.freesmartphone.frameworkd -- is the framework daemon started?" )65 return False66 else:67 self.fw = Interface( fw_obj, "org.freesmartphone.Objects" )68 failcount = 069 70 # Usage71 self.usage_obj = self.tryGetProxy( 'org.freesmartphone.ousaged', '/org/freesmartphone/Usage' )72 if ( self.usage_obj is not None ) and ( self.usage_iface is None ):73 self.usage_iface = Interface(self.usage_obj, 'org.freesmartphone.Usage')74 self.usage_iface.connect_to_signal( "ResourceChanged", self.cbResourceChanged )75 self.usage_iface.RequestResource("GSM")76 if self.usage_obj is None:77 failcount += 178 else:79 print "usage ok", self.usage_iface80 81 # Phone82 self.gsm_device_obj = self.tryGetProxy( 'org.freesmartphone.ogpsd', '/org/freesmartphone/GSM/Device' )83 print self.gsm_device_obj84 85 if ( self.gsm_device_obj is not None ) and ( self.gsm_device_iface is None ):86 print "creating gsm interfaces"87 self.gsm_device_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Device')88 self.gsm_sim_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.SIM')89 self.gsm_network_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Network')90 self.gsm_call_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Call')91 self.gsm_test_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Test')92 self.gsm_call_iface.connect_to_signal( "CallStatus", self.cbCallStatus )93 self.gsm_network_iface.connect_to_signal( "Status", self.cbNetworkStatus )94 self.gsm_network_iface.connect_to_signal('SignalStrength', self.cbSignalStrength)95 if self.gsm_device_obj is None:96 failcount += 197 else:98 print "gsm ok", self.gsm_network_iface99 100 self.device_obj = self.tryGetProxy( 'org.freesmartphone.odeviced', '/org/freesmartphone/Device' )101 if ( self.device_obj is not None ) and ( self.device_iface is None ):102 self.device_iface = Interface( self.device_obj, 'org.freesmartphone.Device' )103 104 self.device_power_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.PowerSupply" )[0] )105 self.device_power_iface = Interface(self.device_power_obj, 'org.freesmartphone.Device.PowerSupply')106 107 self.idlenotifier_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.IdleNotifier" )[0] )108 109 self.idlenotifier_iface = Interface( self.idlenotifier_obj, "org.freesmartphone.Device.IdleNotifier" )110 self.idlenotifier_iface.connect_to_signal( "State", self.cbIdleStateChanged )111 112 self.inputnotifier_obj = self.bus.get_object( "org.freesmartphone.odeviced", "/org/freesmartphone/Device/Input" )113 self.inputnotifier_iface = Interface( self.inputnotifier_obj, "org.freesmartphone.Device.Input" )114 self.inputnotifier_iface.connect_to_signal( "Event", self.cbEvent )115 116 print "displays:", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )117 self.display_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )[0] )118 if self.display_obj is not None:119 self.display_iface = Interface( self.display_obj, "org.freesmartphone.Device.Display" )120 self.display_iface.SetBrightness( 90 )121 122 self.accelerometer_obj = self.bus.get_object("org.freesmartphone.odeviced", "/org/freesmartphone/Device/Accelerometer")123 self.accelerometer_iface = Interface(self.accelerometer_obj, "org.freesmartphone.Device.Accelerometer")124 self.accelerometer_iface.connect_to_signal("Event", self.cbAccelerometer)125 126 if self.device_obj is None:127 failcount += 1128 else:129 print "device ok", self.device_iface130 131 print "failcount=", failcount132 if failcount == 0:133 self.fullinit = True134 return self.fullinit135 119 136 120 def cbResourceChanged( self, resourcename ): … … 194 178 from dbus.mainloop.glib import DBusGMainLoop 195 179 import sys 196 fso = FSOObject() 197 fso.initialize(DBusGMainLoop()) 180 fso = FSOObject(SystemBus(mainloop=DBusGMainLoop())) 198 181 import gobject 199 182 try: -
developers/john_lee/pyfso/general.py
r4537 r4539 1 1 """ 2 3 2 (C) 2008 John Lee <john_lee@openmoko.com> 4 3 (C) 2008 Openmoko, Inc. … … 16 15 self._observers = [] 17 16 18 def attach(self, observer):17 def attach(self, function): 19 18 """attach as an observer 20 19 """ 21 20 with self._observers_lock: 22 self._observers.append( observer)21 self._observers.append(function) 23 22 24 def detach(self, observer):23 def detach(self, function): 25 24 """detach from observers 26 25 """ 27 26 with self._observers_lock: 28 self._observers.remove( observer)27 self._observers.remove(function) 29 28 30 29 def _notify(self, *args, **kwargs): 31 30 with self._observers_lock: 32 for oin self._observers:33 o.update(*args, **kwargs)31 for f in self._observers: 32 f(*args, **kwargs) 34 33 35 34 … … 54 53 55 54 class DumpObserver(object): 55 """ dump everything the subject notifies. 56 >>> s = Subject() 57 >>> d = DumpObserver(s) 58 >>> s._notify('test') 59 ('test',) {} 60 """ 56 61 def __init__(self, subject): 57 subject.attach(self )62 subject.attach(self.update) 58 63 59 64 def update(self, *args, **kwargs):
Note: See TracChangeset
for help on using the changeset viewer.
