[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, master, updated. milestone4-368-g700ab82
Daniel Willmann
daniel at totalueberwachung.de
Mon Feb 2 18:51:16 UTC 2009
The following commit has been merged in the master branch:
commit 9b3f9054f35886f1d930c6b6199b35542d2cebab
Author: Daniel Willmann <daniel at totalueberwachung.de>
Date: Fri Nov 14 03:44:19 2008 +0100
ogsmd: Rename AbstractSMS -> SMS, make decodeSMS a class method
diff --git a/framework/subsystems/ogsmd/gsm/sms.py b/framework/subsystems/ogsmd/gsm/sms.py
index 35a96e2..622120a 100644
--- a/framework/subsystems/ogsmd/gsm/sms.py
+++ b/framework/subsystems/ogsmd/gsm/sms.py
@@ -19,7 +19,7 @@ class SMSError(Exception):
pass
# ** Dekodieren
-# smsobject = decodeSMS( pdu )
+# smsobject = SMS.decode( pdu )
# print "von", smsobject.sender(), "um", smsobject.arrivalTime(), "via" smsobject.serviceCenter(), ...
# print "uses charset", smsobject.charset(), ...
# ** Enkodieren
@@ -30,119 +30,6 @@ class SMSError(Exception):
# * SmsMessage (repraesentiert eine -- moeglicherweise Multipart -- Nachricht)
# * Weitere, fuer spezifische SMS-Typen (Status Report) eigene Klassen? Ggfs. zu komplex.
-def decodeSMS( pdu, direction ):
- # first convert the string into a bytestream
- try:
- bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
- except ValueError:
- raise SMSError, "PDU malformed"
-
- sms = AbstractSMS( direction )
-
- offset = 0
- # SCA - Service Center address
- sca_len = bytes[offset]
- offset += 1
- if sca_len > 0:
- sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
- else:
- sms.sca = False
-
- offset += sca_len
- # PDU type
- pdu_type = bytes[offset]
-
- sms.pdu_mti = pdu_type & 0x03
- sms.pdu_rp = pdu_type & 0x80 != 0
- sms.pdu_udhi = pdu_type & 0x40 != 0
- sms.pdu_srr = pdu_type & 0x20 != 0
- sms.pdu_sri = sms.pdu_srr
- sms.pdu_vpf = (pdu_type & 0x18)>>3
- sms.pdu_rd = pdu_type & 0x04 != 0
- sms.pdu_mms = sms.pdu_rd
-
- offset += 1
- if sms.pdu_mti == 1:
- # MR - Message Reference
- sms.mr = bytes[offset]
- offset += 1
-
- # OA/DA - Originating or Destination Address
- # WARNING, the length is coded in digits of the number, not in octets occupied!
- oa_len = 1 + (bytes[offset] + 1) / 2
- offset += 1
- sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
- sms.da = sms.oa
-
- offset += oa_len
- # PID - Protocol identifier
- sms.pid = bytes[offset]
-
- offset += 1
- # DCS - Data Coding Scheme
- sms.dcs = bytes[offset]
-
- offset += 1
- if sms.pdu_mti == 0:
- # SCTS - Service Centre Time Stamp
- sms.scts = decodePDUTime( bytes[offset:offset+7] )
- offset += 7
- else:
- # VP - Validity Period FIXME
- if sms.pdu_vpf == 2:
- # Relative
- sms.vp = bytes[offset]
- offset += 1
- elif sms.pdu_vpf == 3:
- # Absolute
- sms.vp = decodePDUTime( bytes[offset:offset+7] )
- offset += 7
-
- # UD - User Data
- ud_len = bytes[offset]
- offset += 1
- parse_userdata( sms, ud_len, bytes[offset:] )
- return sms
-
-def parse_userdata( sms, ud_len, bytes ):
- offset = 0
- sms.udh = {}
- if sms.pdu_udhi:
- # Decode the headers
- udh_len = bytes[offset]
- offset += 1
- while offset < udh_len:
- # Information Element
- iei = bytes[offset]
- offset += 1
- ie_len = bytes[offset]
- offset += 1
- ie_data = bytes[offset:offset+ie_len]
- offset += ie_len
- # FIXME
- sms.udh[iei] = ie_data
-
- # User Data FIXME
- # We need to look at the DCS in order to be able to decide what
- # to use here
-
- # We need to lose the padding bits before the start of the
- # seven-bit packed data, which means we need to figure out how
- # many there are...
- # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
-
- userdata = "".join( map( chr, bytes[offset:] ) )
- if sms.dcs_alphabet == "gsm_default":
- padding_size = ((7 * ud_len) - (8 * (offset))) % 7
- userdata = unpack_sevenbit(bytes[offset:], padding_size)
- septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
- userdata = userdata[:septets]
-
- if not sms.dcs_alphabet is None:
- sms.ud = userdata.decode( sms.dcs_alphabet )
- else:
- sms.ud = userdata
-
class PDUAddress:
@classmethod
def guess( cls, number ):
@@ -201,7 +88,82 @@ class PDUAddress:
return flatten( [length, 0x80 | self.type << 4 | self.dialplan, enc] )
-class AbstractSMS(object):
+class SMS(object):
+ @classmethod
+ def decode( cls, pdu, direction ):
+ # first convert the string into a bytestream
+ try:
+ bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
+ except ValueError:
+ raise SMSError, "PDU malformed"
+
+ sms = cls( direction )
+
+ offset = 0
+ # SCA - Service Center address
+ sca_len = bytes[offset]
+ offset += 1
+ if sca_len > 0:
+ sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
+ else:
+ sms.sca = False
+
+ offset += sca_len
+ # PDU type
+ pdu_type = bytes[offset]
+
+ sms.pdu_mti = pdu_type & 0x03
+ sms.pdu_rp = pdu_type & 0x80 != 0
+ sms.pdu_udhi = pdu_type & 0x40 != 0
+ sms.pdu_srr = pdu_type & 0x20 != 0
+ sms.pdu_sri = sms.pdu_srr
+ sms.pdu_vpf = (pdu_type & 0x18)>>3
+ sms.pdu_rd = pdu_type & 0x04 != 0
+ sms.pdu_mms = sms.pdu_rd
+
+ offset += 1
+ if sms.pdu_mti == 1:
+ # MR - Message Reference
+ sms.mr = bytes[offset]
+ offset += 1
+
+ # OA/DA - Originating or Destination Address
+ # WARNING, the length is coded in digits of the number, not in octets occupied!
+ oa_len = 1 + (bytes[offset] + 1) / 2
+ offset += 1
+ sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
+ sms.da = sms.oa
+
+ offset += oa_len
+ # PID - Protocol identifier
+ sms.pid = bytes[offset]
+
+ offset += 1
+ # DCS - Data Coding Scheme
+ sms.dcs = bytes[offset]
+
+ offset += 1
+ if sms.pdu_mti == 0:
+ # SCTS - Service Centre Time Stamp
+ sms.scts = decodePDUTime( bytes[offset:offset+7] )
+ offset += 7
+ else:
+ # VP - Validity Period FIXME
+ if sms.pdu_vpf == 2:
+ # Relative
+ sms.vp = bytes[offset]
+ offset += 1
+ elif sms.pdu_vpf == 3:
+ # Absolute
+ sms.vp = decodePDUTime( bytes[offset:offset+7] )
+ offset += 7
+
+ # UD - User Data
+ ud_len = bytes[offset]
+ offset += 1
+ sms._parse_userdata( ud_len, bytes[offset:] )
+ return sms
+
def __init__( self, direction ):
self.direction = direction
self.sca = False
@@ -222,6 +184,45 @@ class AbstractSMS(object):
self.dcs_mwi_type = None
self.dcs_mclass = None
+ def _parse_userdata( self, ud_len, bytes ):
+ offset = 0
+ self.udh = {}
+ if self.pdu_udhi:
+ # Decode the headers
+ udh_len = bytes[offset]
+ offset += 1
+ while offset < udh_len:
+ # Information Element
+ iei = bytes[offset]
+ offset += 1
+ ie_len = bytes[offset]
+ offset += 1
+ ie_data = bytes[offset:offset+ie_len]
+ offset += ie_len
+ # FIXME
+ self.udh[iei] = ie_data
+
+ # User Data FIXME
+ # We need to look at the DCS in order to be able to decide what
+ # to use here
+
+ # We need to lose the padding bits before the start of the
+ # seven-bit packed data, which means we need to figure out how
+ # many there are...
+ # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
+
+ userdata = "".join( map( chr, bytes[offset:] ) )
+ if self.dcs_alphabet == "gsm_default":
+ padding_size = ((7 * ud_len) - (8 * (offset))) % 7
+ userdata = unpack_sevenbit(bytes[offset:], padding_size)
+ septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
+ userdata = userdata[:septets]
+
+ if not self.dcs_alphabet is None:
+ self.ud = userdata.decode( self.dcs_alphabet )
+ else:
+ self.ud = userdata
+
def _getDCS( self ):
# TODO throw exceptions on invalid combinations
if self.dcs_mwi_type is None:
@@ -435,31 +436,36 @@ Alphabet: %s
Message: %s
""" % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
-class CellBroadcast(AbstractSMS):
- def __init__(self, pdu):
- self.dcs_alphabet = "gsm_default"
- self.dcs_language = None
- self.dcs_language_indication = False
- self.dcs_compressed = False
- self.dcs_mclass = None
+class CellBroadcast(SMS):
+ @classmethod
+ def decode( cls, pdu):
# first convert the string into a bytestream
bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
- self.sn = bytes[0] << 8 | bytes[1]
- self.mid = bytes[2] << 8 | bytes[3]
- self.dcs = bytes[4]
- self.page = bytes[5]
+ cb = cls()
+ cb.sn = bytes[0] << 8 | bytes[1]
+ cb.mid = bytes[2] << 8 | bytes[3]
+ cb.dcs = bytes[4]
+ cb.page = bytes[5]
userdata = "".join( map( chr, bytes[6:] ) )
- if self.dcs_alphabet == "gsm_default":
+ if cb.dcs_alphabet == "gsm_default":
userdata = unpack_sevenbit(bytes[6:])
- if not self.dcs_alphabet is None:
+ if not cb.dcs_alphabet is None:
# \n is the padding character in CB messages so strip it
- self.ud = userdata.decode( self.dcs_alphabet ).strip("\n")
+ cb.ud = userdata.decode( cb.dcs_alphabet ).strip("\n")
else:
- self.ud = userdata
+ cb.ud = userdata
+
+ return cb
+ def __init__(self):
+ self.dcs_alphabet = "gsm_default"
+ self.dcs_language = None
+ self.dcs_language_indication = False
+ self.dcs_compressed = False
+ self.dcs_mclass = None
def _getDCS( self ):
if self.dcs_language_indication is None:
@@ -536,6 +542,10 @@ class CellBroadcast(AbstractSMS):
dcs = property( _getDCS, _setDCS )
+ def pdu( self ):
+ # We don't need to generate the PDU for Cell Broadcasts
+ pass
+
def __repr__(self):
return """CellBroadcast
SN: %i
@@ -609,14 +619,14 @@ if __name__ == "__main__":
def testpdu(pdu, dir):
try:
- sms = decodeSMS(pdu, dir)
+ sms = SMS.decode(pdu, dir)
genpdu = sms.pdu()
if pdu != genpdu:
print "ERROR: Reencoded SMS doesn't match"
print "Orig PDU: ", pdu
print "ReencPDU: ", genpdu
print repr(sms)
- sms = decodeSMS(genpdu, dir)
+ sms = SMS.decode(genpdu, dir)
print repr(sms)
except SMSError, e:
print "%s, PDU was: %s\n" % (e, pdu)
@@ -628,7 +638,7 @@ if __name__ == "__main__":
testpdu(pdu, "MO")
for pdu in pdus_CB:
- cb = CellBroadcast(pdu)
+ cb = CellBroadcast.decode(pdu)
print repr(cb)
# vim: expandtab shiftwidth=4 tabstop=4
diff --git a/framework/subsystems/ogsmd/modems/abstract/mediator.py b/framework/subsystems/ogsmd/modems/abstract/mediator.py
index f80f050..8a9cf24 100644
--- a/framework/subsystems/ogsmd/modems/abstract/mediator.py
+++ b/framework/subsystems/ogsmd/modems/abstract/mediator.py
@@ -769,7 +769,7 @@ class SimRetrieveMessagebook( SimMediator ):
else:
# Now we decode the actual PDU
- sms = ogsmd.gsm.sms.decodeSMS( line, direction)
+ sms = ogsmd.gsm.sms.SMS.decode( line, direction )
result.append( ( index, status, str(sms.oa), sms.ud, sms.featureMap ) )
self._ok( result )
else:
@@ -799,7 +799,7 @@ class SimRetrieveMessage( SimMediator ):
length = int(header.groupdict()["pdulen"])
else:
# Now we decode the actual PDU
- sms = ogsmd.gsm.sms.decodeSMS( line, direction )
+ sms = ogsmd.gsm.sms.SMS.decode( line, direction )
result = ( status, str(sms.oa), sms.ud, sms.featureMap )
self._ok( *result )
@@ -816,7 +816,7 @@ class SimSetServiceCenterNumber( SimMediator ):
class SimStoreMessage( SimMediator ):
#=========================================================================#
def trigger( self ):
- sms = ogsmd.gsm.sms.AbstractSMS("MO")
+ sms = ogsmd.gsm.sms.SMS("MO")
sms.pdu_mti = 1
sms.pid = 0
sms.dcs = 0
@@ -864,7 +864,7 @@ class SimDeleteMessage( SimMediator ):
class SmsSendMessage( SmsMediator ):
#=========================================================================#
def trigger( self ):
- sms = ogsmd.gsm.sms.AbstractSMS("MO")
+ sms = ogsmd.gsm.sms.SMS("MO")
sms.pdu_mti = 1
sms.pid = 0
sms.dcs = 0
diff --git a/framework/subsystems/ogsmd/modems/abstract/unsolicited.py b/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
index 7d799cc..1309724 100644
--- a/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
+++ b/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
@@ -42,7 +42,7 @@ class AbstractUnsolicitedResponseDelegate( object ):
"""
values = safesplit( righthandside, ',' )
if len( values ) == 1:
- cb = ogsmd.gsm.sms.CellBroadcast(pdu)
+ cb = ogsmd.gsm.sms.CellBroadcast.decode(pdu)
sn = cb.sn
channel = cb.mid
dcs = cb.dcs
@@ -74,7 +74,7 @@ class AbstractUnsolicitedResponseDelegate( object ):
header = safesplit( righthandside, ',' )
length = int(header[1])
# Now we decode the actual PDU
- sms = ogsmd.gsm.sms.decodeSMS( pdu, "MT" )
+ sms = ogsmd.gsm.sms.SMS.decode( pdu, "MT" )
self._object.IncomingMessage( str(sms.oa), sms.ud, sms.featureMap )
# +CMTI: "SM",7
--
FSO frameworkd Debian packaging
More information about the pkg-fso-commits
mailing list