[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, master, updated. milestone4-368-g700ab82

Daniel Willmann daniel at totalueberwachung.de
Mon Feb 2 18:51:16 UTC 2009


The following commit has been merged in the master branch:
commit 9b3f9054f35886f1d930c6b6199b35542d2cebab
Author: Daniel Willmann <daniel at totalueberwachung.de>
Date:   Fri Nov 14 03:44:19 2008 +0100

    ogsmd: Rename AbstractSMS -> SMS, make decodeSMS a class method

diff --git a/framework/subsystems/ogsmd/gsm/sms.py b/framework/subsystems/ogsmd/gsm/sms.py
index 35a96e2..622120a 100644
--- a/framework/subsystems/ogsmd/gsm/sms.py
+++ b/framework/subsystems/ogsmd/gsm/sms.py
@@ -19,7 +19,7 @@ class SMSError(Exception):
     pass
 
 #    ** Dekodieren
-#    smsobject = decodeSMS( pdu )
+#    smsobject = SMS.decode( pdu )
 #    print "von", smsobject.sender(), "um", smsobject.arrivalTime(), "via" smsobject.serviceCenter(), ...
 #    print "uses charset", smsobject.charset(), ...
 #    ** Enkodieren
@@ -30,119 +30,6 @@ class SMSError(Exception):
 # * SmsMessage (repraesentiert eine -- moeglicherweise Multipart -- Nachricht)
 # * Weitere, fuer spezifische SMS-Typen (Status Report) eigene Klassen? Ggfs. zu komplex.
 
-def decodeSMS( pdu, direction ):
-    # first convert the string into a bytestream
-    try:
-        bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
-    except ValueError:
-        raise SMSError, "PDU malformed"
-
-    sms = AbstractSMS( direction )
-
-    offset = 0
-    # SCA - Service Center address
-    sca_len = bytes[offset]
-    offset += 1
-    if sca_len > 0:
-        sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
-    else:
-        sms.sca = False
-
-    offset += sca_len
-    # PDU type
-    pdu_type = bytes[offset]
-
-    sms.pdu_mti = pdu_type & 0x03
-    sms.pdu_rp = pdu_type & 0x80 != 0
-    sms.pdu_udhi = pdu_type & 0x40 != 0
-    sms.pdu_srr = pdu_type & 0x20 != 0
-    sms.pdu_sri = sms.pdu_srr
-    sms.pdu_vpf =  (pdu_type & 0x18)>>3
-    sms.pdu_rd = pdu_type & 0x04 != 0
-    sms.pdu_mms = sms.pdu_rd
-
-    offset += 1
-    if sms.pdu_mti == 1:
-        # MR - Message Reference
-        sms.mr = bytes[offset]
-        offset += 1
-
-    # OA/DA - Originating or Destination Address
-    # WARNING, the length is coded in digits of the number, not in octets occupied!
-    oa_len = 1 + (bytes[offset] + 1) / 2
-    offset += 1
-    sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
-    sms.da = sms.oa
-
-    offset += oa_len
-    # PID - Protocol identifier
-    sms.pid = bytes[offset]
-
-    offset += 1
-    # DCS - Data Coding Scheme
-    sms.dcs = bytes[offset]
-
-    offset += 1
-    if sms.pdu_mti == 0:
-        # SCTS - Service Centre Time Stamp
-        sms.scts = decodePDUTime( bytes[offset:offset+7] )
-        offset += 7
-    else:
-        # VP - Validity Period FIXME
-        if sms.pdu_vpf == 2:
-            # Relative
-            sms.vp = bytes[offset]
-            offset += 1
-        elif sms.pdu_vpf == 3:
-            # Absolute
-            sms.vp = decodePDUTime( bytes[offset:offset+7] )
-            offset += 7
-
-    # UD - User Data
-    ud_len = bytes[offset]
-    offset += 1
-    parse_userdata( sms, ud_len, bytes[offset:] )
-    return sms
-
-def parse_userdata( sms, ud_len, bytes ):
-    offset = 0
-    sms.udh = {}
-    if sms.pdu_udhi:
-        # Decode the headers
-        udh_len =  bytes[offset]
-        offset += 1
-        while offset < udh_len:
-            # Information Element
-            iei = bytes[offset]
-            offset += 1
-            ie_len = bytes[offset]
-            offset += 1
-            ie_data = bytes[offset:offset+ie_len]
-            offset += ie_len
-            # FIXME
-            sms.udh[iei] = ie_data
-
-    # User Data FIXME
-    # We need to look at the DCS in order to be able to decide what
-    # to use here
-
-    # We need to lose the padding bits before the start of the
-    # seven-bit packed data, which means we need to figure out how
-    # many there are...
-    # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
-
-    userdata = "".join( map( chr, bytes[offset:] ) )
-    if sms.dcs_alphabet == "gsm_default":
-        padding_size = ((7 * ud_len) - (8 * (offset))) % 7
-        userdata = unpack_sevenbit(bytes[offset:], padding_size)
-        septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
-        userdata = userdata[:septets]
-
-    if not sms.dcs_alphabet is None:
-        sms.ud = userdata.decode( sms.dcs_alphabet )
-    else:
-        sms.ud = userdata
-
 class PDUAddress:
     @classmethod
     def guess( cls, number ):
@@ -201,7 +88,82 @@ class PDUAddress:
         return flatten( [length, 0x80 | self.type << 4 | self.dialplan, enc] )
 
 
-class AbstractSMS(object):
+class SMS(object):
+    @classmethod
+    def decode( cls, pdu, direction ):
+        # first convert the string into a bytestream
+        try:
+            bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
+        except ValueError:
+            raise SMSError, "PDU malformed"
+
+        sms = cls( direction )
+
+        offset = 0
+        # SCA - Service Center address
+        sca_len = bytes[offset]
+        offset += 1
+        if sca_len > 0:
+            sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
+        else:
+            sms.sca = False
+
+        offset += sca_len
+        # PDU type
+        pdu_type = bytes[offset]
+
+        sms.pdu_mti = pdu_type & 0x03
+        sms.pdu_rp = pdu_type & 0x80 != 0
+        sms.pdu_udhi = pdu_type & 0x40 != 0
+        sms.pdu_srr = pdu_type & 0x20 != 0
+        sms.pdu_sri = sms.pdu_srr
+        sms.pdu_vpf =  (pdu_type & 0x18)>>3
+        sms.pdu_rd = pdu_type & 0x04 != 0
+        sms.pdu_mms = sms.pdu_rd
+
+        offset += 1
+        if sms.pdu_mti == 1:
+            # MR - Message Reference
+            sms.mr = bytes[offset]
+            offset += 1
+
+        # OA/DA - Originating or Destination Address
+        # WARNING, the length is coded in digits of the number, not in octets occupied!
+        oa_len = 1 + (bytes[offset] + 1) / 2
+        offset += 1
+        sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
+        sms.da = sms.oa
+
+        offset += oa_len
+        # PID - Protocol identifier
+        sms.pid = bytes[offset]
+
+        offset += 1
+        # DCS - Data Coding Scheme
+        sms.dcs = bytes[offset]
+
+        offset += 1
+        if sms.pdu_mti == 0:
+            # SCTS - Service Centre Time Stamp
+            sms.scts = decodePDUTime( bytes[offset:offset+7] )
+            offset += 7
+        else:
+            # VP - Validity Period FIXME
+            if sms.pdu_vpf == 2:
+                # Relative
+                sms.vp = bytes[offset]
+                offset += 1
+            elif sms.pdu_vpf == 3:
+                # Absolute
+                sms.vp = decodePDUTime( bytes[offset:offset+7] )
+                offset += 7
+
+        # UD - User Data
+        ud_len = bytes[offset]
+        offset += 1
+        sms._parse_userdata( ud_len, bytes[offset:] )
+        return sms
+
     def __init__( self, direction ):
         self.direction = direction
         self.sca = False
@@ -222,6 +184,45 @@ class AbstractSMS(object):
         self.dcs_mwi_type = None
         self.dcs_mclass = None
 
+    def _parse_userdata( self, ud_len, bytes ):
+        offset = 0
+        self.udh = {}
+        if self.pdu_udhi:
+            # Decode the headers
+            udh_len =  bytes[offset]
+            offset += 1
+            while offset < udh_len:
+                # Information Element
+                iei = bytes[offset]
+                offset += 1
+                ie_len = bytes[offset]
+                offset += 1
+                ie_data = bytes[offset:offset+ie_len]
+                offset += ie_len
+                # FIXME
+                self.udh[iei] = ie_data
+
+        # User Data FIXME
+        # We need to look at the DCS in order to be able to decide what
+        # to use here
+
+        # We need to lose the padding bits before the start of the
+        # seven-bit packed data, which means we need to figure out how
+        # many there are...
+        # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
+
+        userdata = "".join( map( chr, bytes[offset:] ) )
+        if self.dcs_alphabet == "gsm_default":
+            padding_size = ((7 * ud_len) - (8 * (offset))) % 7
+            userdata = unpack_sevenbit(bytes[offset:], padding_size)
+            septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
+            userdata = userdata[:septets]
+
+        if not self.dcs_alphabet is None:
+            self.ud = userdata.decode( self.dcs_alphabet )
+        else:
+            self.ud = userdata
+
     def _getDCS( self ):
         # TODO throw exceptions on invalid combinations
         if self.dcs_mwi_type is None:
@@ -435,31 +436,36 @@ Alphabet: %s
 Message: %s
 """ % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
 
-class CellBroadcast(AbstractSMS):
-    def __init__(self, pdu):
-        self.dcs_alphabet = "gsm_default"
-        self.dcs_language = None
-        self.dcs_language_indication = False
-        self.dcs_compressed = False
-        self.dcs_mclass = None
+class CellBroadcast(SMS):
+    @classmethod
+    def decode( cls, pdu):
         # first convert the string into a bytestream
         bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
 
-        self.sn = bytes[0] << 8 | bytes[1]
-        self.mid = bytes[2] << 8 | bytes[3]
-        self.dcs = bytes[4]
-        self.page = bytes[5]
+        cb = cls()
+        cb.sn = bytes[0] << 8 | bytes[1]
+        cb.mid = bytes[2] << 8 | bytes[3]
+        cb.dcs = bytes[4]
+        cb.page = bytes[5]
 
         userdata = "".join( map( chr, bytes[6:] ) )
-        if self.dcs_alphabet == "gsm_default":
+        if cb.dcs_alphabet == "gsm_default":
             userdata = unpack_sevenbit(bytes[6:])
 
-        if not self.dcs_alphabet is None:
+        if not cb.dcs_alphabet is None:
             # \n is the padding character in CB messages so strip it
-            self.ud = userdata.decode( self.dcs_alphabet ).strip("\n")
+            cb.ud = userdata.decode( cb.dcs_alphabet ).strip("\n")
         else:
-            self.ud = userdata
+            cb.ud = userdata
+
+        return cb
 
+    def __init__(self):
+        self.dcs_alphabet = "gsm_default"
+        self.dcs_language = None
+        self.dcs_language_indication = False
+        self.dcs_compressed = False
+        self.dcs_mclass = None
 
     def _getDCS( self ):
         if self.dcs_language_indication is None:
@@ -536,6 +542,10 @@ class CellBroadcast(AbstractSMS):
 
     dcs = property( _getDCS, _setDCS )
 
+    def pdu( self ):
+        # We don't need to generate the PDU for Cell Broadcasts
+        pass
+
     def __repr__(self):
         return """CellBroadcast
 SN: %i
@@ -609,14 +619,14 @@ if __name__ == "__main__":
 
     def testpdu(pdu, dir):
         try:
-            sms = decodeSMS(pdu, dir)
+            sms = SMS.decode(pdu, dir)
             genpdu = sms.pdu()
             if pdu != genpdu:
                 print "ERROR: Reencoded SMS doesn't match"
                 print "Orig PDU: ", pdu
                 print "ReencPDU: ", genpdu
                 print repr(sms)
-                sms = decodeSMS(genpdu, dir)
+                sms = SMS.decode(genpdu, dir)
             print repr(sms)
         except SMSError, e:
             print "%s, PDU was: %s\n" % (e, pdu)
@@ -628,7 +638,7 @@ if __name__ == "__main__":
         testpdu(pdu, "MO")
 
     for pdu in pdus_CB:
-        cb = CellBroadcast(pdu)
+        cb = CellBroadcast.decode(pdu)
         print repr(cb)
 
 # vim: expandtab shiftwidth=4 tabstop=4
diff --git a/framework/subsystems/ogsmd/modems/abstract/mediator.py b/framework/subsystems/ogsmd/modems/abstract/mediator.py
index f80f050..8a9cf24 100644
--- a/framework/subsystems/ogsmd/modems/abstract/mediator.py
+++ b/framework/subsystems/ogsmd/modems/abstract/mediator.py
@@ -769,7 +769,7 @@ class SimRetrieveMessagebook( SimMediator ):
                 else:
                     # Now we decode the actual PDU
 
-                    sms = ogsmd.gsm.sms.decodeSMS( line, direction)
+                    sms = ogsmd.gsm.sms.SMS.decode( line, direction )
                     result.append( ( index, status, str(sms.oa), sms.ud, sms.featureMap ) )
             self._ok( result )
         else:
@@ -799,7 +799,7 @@ class SimRetrieveMessage( SimMediator ):
                     length = int(header.groupdict()["pdulen"])
                 else:
                     # Now we decode the actual PDU
-                    sms = ogsmd.gsm.sms.decodeSMS( line, direction )
+                    sms = ogsmd.gsm.sms.SMS.decode( line, direction )
                     result = ( status, str(sms.oa), sms.ud, sms.featureMap )
 
             self._ok( *result )
@@ -816,7 +816,7 @@ class SimSetServiceCenterNumber( SimMediator ):
 class SimStoreMessage( SimMediator ):
 #=========================================================================#
     def trigger( self ):
-        sms = ogsmd.gsm.sms.AbstractSMS("MO")
+        sms = ogsmd.gsm.sms.SMS("MO")
         sms.pdu_mti = 1
         sms.pid = 0
         sms.dcs = 0
@@ -864,7 +864,7 @@ class SimDeleteMessage( SimMediator ):
 class SmsSendMessage( SmsMediator ):
 #=========================================================================#
     def trigger( self ):
-        sms = ogsmd.gsm.sms.AbstractSMS("MO")
+        sms = ogsmd.gsm.sms.SMS("MO")
         sms.pdu_mti = 1
         sms.pid = 0
         sms.dcs = 0
diff --git a/framework/subsystems/ogsmd/modems/abstract/unsolicited.py b/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
index 7d799cc..1309724 100644
--- a/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
+++ b/framework/subsystems/ogsmd/modems/abstract/unsolicited.py
@@ -42,7 +42,7 @@ class AbstractUnsolicitedResponseDelegate( object ):
         """
         values = safesplit( righthandside, ',' )
         if len( values ) == 1:
-            cb = ogsmd.gsm.sms.CellBroadcast(pdu)
+            cb = ogsmd.gsm.sms.CellBroadcast.decode(pdu)
             sn = cb.sn
             channel = cb.mid
             dcs = cb.dcs
@@ -74,7 +74,7 @@ class AbstractUnsolicitedResponseDelegate( object ):
         header = safesplit( righthandside, ',' )
         length = int(header[1])
         # Now we decode the actual PDU
-        sms = ogsmd.gsm.sms.decodeSMS( pdu, "MT" )
+        sms = ogsmd.gsm.sms.SMS.decode( pdu, "MT" )
         self._object.IncomingMessage( str(sms.oa), sms.ud, sms.featureMap )
 
     # +CMTI: "SM",7

-- 
FSO frameworkd Debian packaging



More information about the pkg-fso-commits mailing list