[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, master, updated. milestone4-368-g700ab82

Daniel Willmann daniel at totalueberwachung.de
Mon Feb 2 18:51:18 UTC 2009


The following commit has been merged in the master branch:
commit bb64b62e42d743ce09765ddd35fed8878503d4aa
Merge: 08a0cd7fb96950a8be203f1dc0aa1bdea5db2543 d8a55311c5d29bb2946c76e8445748e04049eb1a
Author: Daniel Willmann <daniel at totalueberwachung.de>
Date:   Tue Nov 18 00:02:57 2008 +0100

    Merge branch 'stabilization/milestone4'

diff --combined framework/subsystems/ogsmd/gsm/sms.py
index e33f388,716e12b..fc1fd1b
--- a/framework/subsystems/ogsmd/gsm/sms.py
+++ b/framework/subsystems/ogsmd/gsm/sms.py
@@@ -15,11 -15,8 +15,11 @@@ from ogsmd.gsm.convert import 
  from ogsmd.gsm.const import CB_PDU_DCS_LANGUAGE
  import math
  
 +class SMSError(Exception):
 +    pass
 +
  #    ** Dekodieren
 -#    smsobject = decodeSMS( pdu )
 +#    smsobject = SMS.decode( pdu )
  #    print "von", smsobject.sender(), "um", smsobject.arrivalTime(), "via" smsobject.serviceCenter(), ...
  #    print "uses charset", smsobject.charset(), ...
  #    ** Enkodieren
@@@ -30,6 -27,114 +30,6 @@@
  # * SmsMessage (repraesentiert eine -- moeglicherweise Multipart -- Nachricht)
  # * Weitere, fuer spezifische SMS-Typen (Status Report) eigene Klassen? Ggfs. zu komplex.
  
 -def decodeSMS( pdu, direction ):
 -    # first convert the string into a bytestream
 -    bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
 -
 -    sms = AbstractSMS( direction )
 -
 -    offset = 0
 -    # SCA - Service Center address
 -    sca_len = bytes[offset]
 -    offset += 1
 -    if sca_len > 0:
 -        sms.sca = PDUAddress( *decodePDUNumber( bytes[offset:offset+sca_len] ) )
 -    else:
 -        sms.sca = False
 -
 -    offset += sca_len
 -    # PDU type
 -    pdu_type = bytes[offset]
 -
 -    sms.pdu_mti = pdu_type & 0x03
 -    sms.pdu_rp = pdu_type & 0x80 != 0
 -    sms.pdu_udhi = pdu_type & 0x40 != 0
 -    sms.pdu_srr = pdu_type & 0x20 != 0
 -    sms.pdu_sri = sms.pdu_srr
 -    sms.pdu_vpf =  (pdu_type & 0x18)>>3
 -    sms.pdu_rd = pdu_type & 0x04 != 0
 -    sms.pdu_mms = sms.pdu_rd
 -
 -    offset += 1
 -    if sms.pdu_mti == 1:
 -        # MR - Message Reference
 -        sms.mr = bytes[offset]
 -        offset += 1
 -
 -    # OA/DA - Originating or Destination Address
 -    # WARNING, the length is coded in digits of the number, not in octets occupied!
 -    oa_len = 1 + (bytes[offset] + 1) / 2
 -    offset += 1
 -    sms.oa = PDUAddress( *decodePDUNumber( bytes[offset:offset+oa_len] ) )
 -    sms.da = sms.oa
 -
 -    offset += oa_len
 -    # PID - Protocol identifier
 -    sms.pid = bytes[offset]
 -
 -    offset += 1
 -    # DCS - Data Coding Scheme
 -    sms.dcs = bytes[offset]
 -
 -    offset += 1
 -    if sms.pdu_mti == 0:
 -        # SCTS - Service Centre Time Stamp
 -        sms.scts = decodePDUTime( bytes[offset:offset+7] )
 -        offset += 7
 -    else:
 -        # VP - Validity Period FIXME
 -        if sms.pdu_vpf == 2:
 -            # Relative
 -            sms.vp = bytes[offset]
 -            offset += 1
 -        elif sms.pdu_vpf == 3:
 -            # Absolute
 -            sms.vp = decodePDUTime( bytes[offset:offset+7] )
 -            offset += 7
 -
 -    # UD - User Data
 -    ud_len = bytes[offset]
 -    offset += 1
 -    parse_userdata( sms, ud_len, bytes[offset:] )
 -    return sms
 -
 -def parse_userdata( sms, ud_len, bytes ):
 -    offset = 0
 -    sms.udh = {}
 -    if sms.pdu_udhi:
 -        # Decode the headers
 -        udh_len =  bytes[offset]
 -        offset += 1
 -        while offset < udh_len:
 -            # Information Element
 -            iei = bytes[offset]
 -            offset += 1
 -            ie_len = bytes[offset]
 -            offset += 1
 -            ie_data = bytes[offset:offset+ie_len]
 -            offset += ie_len
 -            # FIXME
 -            sms.udh[iei] = ie_data
 -
 -    # User Data FIXME
 -    # We need to look at the DCS in order to be able to decide what
 -    # to use here
 -
 -    # We need to lose the padding bits before the start of the
 -    # seven-bit packed data, which means we need to figure out how
 -    # many there are...
 -    # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
 -
 -    userdata = "".join( map( chr, bytes[offset:] ) )
 -    if sms.dcs_alphabet == "gsm_default":
 -        padding_size = ((7 * ud_len) - (8 * (offset))) % 7
 -        userdata = unpack_sevenbit(bytes[offset:], padding_size)
 -        septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
 -        userdata = userdata[:septets]
 -
 -    if not sms.dcs_alphabet is None:
 -        sms.ud = userdata.decode( sms.dcs_alphabet )
 -
  class PDUAddress:
      @classmethod
      def guess( cls, number ):
@@@ -44,25 -149,6 +44,25 @@@
              number = number
              ntype = 5
          return cls( ntype, 1, number )
 +
 +    @classmethod
 +    def decode( cls, bs ):
 +        num_type = ( bs[0] & 0x70)  >> 4
 +        num_plan = ( bs[0] & 0x0F )
 +        number = bs[1:]
 +        if number == []:
 +            number = ""
 +        elif num_type == 5:
 +            number = unpack_sevenbit( number )
 +            number = number.decode( "gsm_default" )
 +        else:
 +            number = bcd_decode( number )
 +            # Every occurence of the padding semi-octet should be removed
 +            number = number.replace( "f", "" )
 +            # Decode special "digits"
 +            number = number.translate( PDUADDR_DEC_TRANS )
 +        return cls( num_type, num_plan, number )
 +
      def __init__( self, type, dialplan, number ):
          self.type = type
          self.dialplan = dialplan
@@@ -73,97 -159,7 +73,97 @@@
              prefix = "+"
          return prefix + self.number
  
 -class AbstractSMS(object):
 +    def pdu( self ):
 +        if self.type == 5:
 +            number = self.number.encode("gsm_default")
 +            enc = pack_sevenbit(number)
 +            length = len(enc)*2
 +            if (len(self.number)*7)%8 <= 4:
 +                length -= 1
 +        else:
 +            # Encode special "digits"
 +            number = self.number.translate(PDUADDR_ENC_TRANS)
 +            enc = bcd_encode(number)
 +            length = len(number)
 +        return flatten( [length, 0x80 | self.type << 4 | self.dialplan, enc] )
 +
 +
 +class SMS(object):
 +    @classmethod
 +    def decode( cls, pdu, direction ):
 +        # first convert the string into a bytestream
 +        try:
 +            bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
 +        except ValueError:
 +            raise SMSError, "PDU malformed"
 +
 +        sms = cls( direction )
 +
 +        offset = 0
 +        # SCA - Service Center address
 +        sca_len = bytes[offset]
 +        offset += 1
 +        if sca_len > 0:
 +            sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
 +        else:
 +            sms.sca = False
 +
 +        offset += sca_len
 +        # PDU type
 +        pdu_type = bytes[offset]
 +
 +        sms.pdu_mti = pdu_type & 0x03
 +        sms.pdu_rp = pdu_type & 0x80 != 0
 +        sms.pdu_udhi = pdu_type & 0x40 != 0
 +        sms.pdu_srr = pdu_type & 0x20 != 0
 +        sms.pdu_sri = sms.pdu_srr
 +        sms.pdu_vpf =  (pdu_type & 0x18)>>3
 +        sms.pdu_rd = pdu_type & 0x04 != 0
 +        sms.pdu_mms = sms.pdu_rd
 +
 +        offset += 1
 +        if sms.pdu_mti == 1:
 +            # MR - Message Reference
 +            sms.mr = bytes[offset]
 +            offset += 1
 +
 +        # OA/DA - Originating or Destination Address
 +        # WARNING, the length is coded in digits of the number, not in octets occupied!
 +        oa_len = 1 + (bytes[offset] + 1) / 2
 +        offset += 1
 +        sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
 +        sms.da = sms.oa
 +
 +        offset += oa_len
 +        # PID - Protocol identifier
 +        sms.pid = bytes[offset]
 +
 +        offset += 1
 +        # DCS - Data Coding Scheme
 +        sms.dcs = bytes[offset]
 +
 +        offset += 1
 +        if sms.pdu_mti == 0:
 +            # SCTS - Service Centre Time Stamp
 +            sms.scts = decodePDUTime( bytes[offset:offset+7] )
 +            offset += 7
 +        else:
 +            # VP - Validity Period FIXME
 +            if sms.pdu_vpf == 2:
 +                # Relative
 +                sms.vp = bytes[offset]
 +                offset += 1
 +            elif sms.pdu_vpf == 3:
 +                # Absolute
 +                sms.vp = decodePDUTime( bytes[offset:offset+7] )
 +                offset += 7
 +
 +        # UD - User Data
 +        ud_len = bytes[offset]
 +        offset += 1
 +        sms._parse_userdata( ud_len, bytes[offset:] )
 +        return sms
 +
      def __init__( self, direction ):
          self.direction = direction
          self.sca = False
@@@ -175,6 -171,7 +175,7 @@@
          self.pdu_rd = False
          self.pdu_mms = False
          self.udh = {}
+         self.ud = ""
          self.mr = 0
          self.pid = 0
          self.dcs_alphabet = "gsm_default"
@@@ -184,45 -181,6 +185,45 @@@
          self.dcs_mwi_type = None
          self.dcs_mclass = None
  
 +    def _parse_userdata( self, ud_len, bytes ):
 +        offset = 0
 +        self.udh = {}
 +        if self.pdu_udhi:
 +            # Decode the headers
 +            udh_len =  bytes[offset]
 +            offset += 1
 +            while offset < udh_len:
 +                # Information Element
 +                iei = bytes[offset]
 +                offset += 1
 +                ie_len = bytes[offset]
 +                offset += 1
 +                ie_data = bytes[offset:offset+ie_len]
 +                offset += ie_len
 +                # FIXME
 +                self.udh[iei] = ie_data
 +
 +        # User Data FIXME
 +        # We need to look at the DCS in order to be able to decide what
 +        # to use here
 +
 +        # We need to lose the padding bits before the start of the
 +        # seven-bit packed data, which means we need to figure out how
 +        # many there are...
 +        # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
 +
 +        userdata = "".join( map( chr, bytes[offset:] ) )
 +        if self.dcs_alphabet == "gsm_default":
 +            padding_size = ((7 * ud_len) - (8 * (offset))) % 7
 +            userdata = unpack_sevenbit(bytes[offset:], padding_size)
 +            septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
 +            userdata = userdata[:septets]
 +
 +        if not self.dcs_alphabet is None:
 +            self.ud = userdata.decode( self.dcs_alphabet )
 +        else:
 +            self.ud = userdata
 +
      def _getDCS( self ):
          # TODO throw exceptions on invalid combinations
          if self.dcs_mwi_type is None:
@@@ -294,27 -252,10 +295,27 @@@
  
      dcs = property( _getDCS, _setDCS )
  
 -    def _getFeatureMap( self ):
 -        map = {}
 -        map["direction"] = self.direction
 +    def _getType( self ):
          if self.direction == "MT":
 +            map = TP_MTI_INCOMING
 +        elif self.direction == "MO":
 +            map = TP_MTI_OUTGOING
 +        return map[self.pdu_mti]
 +
 +    def _setType( self, smstype ):
 +        if TP_MTI_INCOMING.has_key(smstype):
 +            self.direction = "MT"
 +            self.pdu_mti = TP_MTI_INCOMING[smstype]
 +        elif TP_MTI_OUTGOING.has_key(smstype):
 +            self.direction = "MO"
 +            self.pdu_mti = TP_MTI_OUTGOING[smstype]
 +
 +    type = property( _getType, _setType )
 +
 +    def _getProperties( self ):
 +        map = {}
 +        map["type"] = self.type
 +        if self.type == "sms-deliver":
              # FIXME Return correct time with timezoneinfo
              map["timestamp"] = self.scts[0].ctime() + " %+05i" % (self.scts[1]*100)
          if 0 in self.udh:
@@@ -328,15 -269,15 +329,15 @@@
  
          return map
  
 -    def _setFeatureMap( self, featureMap ):
 -        for k,v in featureMap.items():
 +    def _setProperties( self, properties ):
 +        for k,v in properties.items():
              if k == "csm_id":
 -                if "csm_num" in featureMap and "csm_seq" in featureMap:
 -                    self.udh[0] = [ v, featureMap["csm_num"], featureMap["csm_seq"] ]
 +                if "csm_num" in properties and "csm_seq" in properties:
 +                    self.udh[0] = [ v, properties["csm_num"], properties["csm_seq"] ]
              if k == "port":
                  self.udh[4] = [v]
  
 -    featureMap = property( _getFeatureMap, _setFeatureMap )
 +    properties = property( _getProperties, _setProperties )
  
      def _getUdhi( self ):
          return self.udh
@@@ -349,10 -290,11 +350,10 @@@
      def pdu( self ):
          pdubytes = []
          if self.sca:
 -            scabcd = bcd_encode( self.sca.number )
 -            pdubytes.append( len(scabcd) + 1 )
 -            pdubytes.append( 0x80 | (self.sca.type << 4) | self.sca.dialplan )
 +            scabcd = self.sca.pdu()
 +            # SCA has non-standard length
 +            scabcd[0] = len( scabcd ) - 1
              pdubytes.extend( scabcd )
 -            # FIXME This won't work with alphanumeric "numbers"
          else:
              pdubytes.append( 0 )
  
@@@ -374,7 -316,7 +375,7 @@@
          if self.pdu_mti == 1:
              pdubytes.append( self.mr )
  
 -        pdubytes.extend( encodePDUNumber(self.oa) )
 +        pdubytes.extend( self.oa.pdu() )
  
          pdubytes.append( self.pid )
  
@@@ -429,60 -371,51 +430,60 @@@
  
      def serviceCenter( self ):
          pass
 -    def repr( self ):
 +    def __repr__( self ):
          if self.pdu_mti == 0:
 -            return """AbstractSMS:
 +            return """MT SMS:
  ServiceCenter: %s
  TimeStamp: %s
 -PID: %i
 +PID: 0x%x
  DCS: 0x%x
  Number: %s
  Headers: %s
 +Alphabet: %s
  Message: %s
 -""" % (self.sca, self.scts, self.pid, self.dcs, self.oa, self.udh, self.ud)
 +""" % (self.sca, self.scts, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
          else:
 -            return """AbstractSMS:
 +            return """MO SMS:
  ServiceCenter: %s
  Valid: %s
 -PID: %i
 +PID: 0x%x
  DCS: 0x%x
  Number: %s
  Headers: %s
 +Alphabet: %s
  Message: %s
 -""" % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.ud)
 +""" % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
  
 -class CellBroadcast(AbstractSMS):
 -    def __init__(self, pdu):
 -        self.dcs_alphabet = "gsm_default"
 -        self.dcs_language = None
 -        self.dcs_language_indication = False
 -        self.dcs_compressed = False
 -        self.dcs_mclass = None
 +class CellBroadcast(SMS):
 +    @classmethod
 +    def decode( cls, pdu):
          # first convert the string into a bytestream
          bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
  
 -        self.sn = bytes[0] << 8 | bytes[1]
 -        self.mid = bytes[2] << 8 | bytes[3]
 -        self.dcs = bytes[4]
 -        self.page = bytes[5]
 +        cb = cls()
 +        cb.sn = bytes[0] << 8 | bytes[1]
 +        cb.mid = bytes[2] << 8 | bytes[3]
 +        cb.dcs = bytes[4]
 +        cb.page = bytes[5]
  
          userdata = "".join( map( chr, bytes[6:] ) )
 -        if self.dcs_alphabet == "gsm_default":
 +        if cb.dcs_alphabet == "gsm_default":
              userdata = unpack_sevenbit(bytes[6:])
  
 -        if not self.dcs_alphabet is None:
 +        if not cb.dcs_alphabet is None:
              # \n is the padding character in CB messages so strip it
 -            self.ud = userdata.decode( self.dcs_alphabet ).strip("\n")
 +            cb.ud = userdata.decode( cb.dcs_alphabet ).strip("\n")
 +        else:
 +            cb.ud = userdata
 +
 +        return cb
  
 +    def __init__(self):
 +        self.dcs_alphabet = "gsm_default"
 +        self.dcs_language = None
 +        self.dcs_language_indication = False
 +        self.dcs_compressed = False
 +        self.dcs_mclass = None
  
      def _getDCS( self ):
          if self.dcs_language_indication is None:
@@@ -559,32 -492,16 +560,32 @@@
  
      dcs = property( _getDCS, _setDCS )
  
 -    def repr(self):
 +    def pdu( self ):
 +        # We don't need to generate the PDU for Cell Broadcasts
 +        pass
 +
 +    def __repr__(self):
          return """CellBroadcast
  SN: %i
  MID: %i
  Page: %i
  Alphabet: %s
  Language: %s
 -Message: %s""" % (self.sn, self.mid, self.page, self.dcs_alphabet, self.dcs_language, self.ud)
 +Message: %s""" % (self.sn, self.mid, self.page, self.dcs_alphabet, self.dcs_language, repr(self.ud))
  
  if __name__ == "__main__":
 +    import sys
 +    #============================================================================#
 +    def readFromFile( path ):
 +    #============================================================================#
 +        try:
 +            value = open( path, 'r' ).read().strip()
 +        except IOError, e:
 +            print( "(could not read from '%s': %s)" % ( path, e ) )
 +            return "N/A"
 +        else:
 +            return value
 +
      pdus_MT = [
      "0791448720900253040C914497035290960000500151614414400DD4F29C9E769F41E17338ED06",
      "0791448720003023440C91449703529096000050015132532240A00500037A020190E9339A9D3EA3E920FA1B1466B341E472193E079DD3EE73D85DA7EB41E7B41C1407C1CBF43228CC26E3416137390F3AABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FADC3EB7CFED73FBDC3EBF5D4416D9457411596457137D87B7E16438194E86BBCF6D16D9055D429548A28BE822BA882E6370196C2A8950E291E822BA88",
@@@ -607,55 -524,34 +608,55 @@@
      "0791889663000009040C91889671342752000080908171153223282073788E4EBFDD2B1CCE96C3E16AB6592E67D32944ECF7780D9A8FE5E5B25BA468B514",
      "0791889663000019040C918896138188020008809091907405238050B38A0A606F003F767E842C734E91D190017D664F6076846D3B52D590FD958B8DD15169500B67084E86FF0C4F605831540D4E8655CEFF0173FE572853EA898150B34E00500B7A7A767D7C218A0A52300030003900330031002D003100380031003900330030514D8CBB5831540DFF0C6A5F67035C31662F4F6076845594FF01",
      "07918896532430280406918816880000809042215024235FC3309B0D42BEDB6590380F22A7C3ECB4FB0CE2AD7C20DEF85D77D3E579D0F84D2E836839900FC403C1D16F7719E47E837CA01D681866B341ECF738CC06A9EB733A889C0EB341ECF738CC06C1D16F7719E47EBB00",
 +    "07914140279505F74404D011002000800190819234000704010200018000",
 +    "07914140279505F74404D011002000800190913285000704010200028000",
 +    "07914140279505F74404D011002000800190320243000704010200038000",
 +    "0791947106004034040C9194713900303341008011311265854059D6B75B076A86D36CF11BEF024DD365103A2C2EBB413390BB5C2F839CE1315A9E1EA3E96537C805D2D6DBA0A0585E3797DDA0FB1ECD2EBB41D37419244ED3E965906845CBC56EB9190C069BCD6622",
 +    "0791947106004034040C9194713900303341008011312270804059D6B75B076A86D36CF11BEF024DD365103A2C2EBB413490BB5C2F839CE1315A9E1EA3E96537C805D2D6DBA0A0585E3797DDA0FB1ECD2EBB41D37419244ED3E965906845CBC56EB9190C069BCD6622",
 +
      ]
 +
      pdus_MO = [
      "07910447946400F011000A9270042079330000AA0161",
      "079194710716000001310C919491103246570000061B1EBD3CA703",
      ]
  
 +    pdus_ACKPDU = [
 +    "010080110191146140",
 +    "010080112102618040",
 +    ]
 +
      pdus_CB = [
      "001000DD001133DAED46ABD56AB5186CD668341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D100",
      ]
  
 +    if len(sys.argv) == 3:
 +        pdus_MT = readFromFile(sys.argv[1]).split("\n")
 +        pdus_MO = readFromFile(sys.argv[2]).split("\n")
 +
 +
      def testpdu(pdu, dir):
 -        sms = decodeSMS(pdu, dir)
 -        genpdu = sms.pdu()
 -        if pdu != genpdu:
 -            print "ERROR: Reencoded SMS doesn't match"
 -            print "Orig PDU: ", pdu
 -            print "ReencPDU: ", genpdu
 -            print sms.repr()
 -            sms = decodeSMS(genpdu, dir)
 -        print sms.repr()
 +        try:
 +            sms = SMS.decode(pdu, dir)
 +            genpdu = sms.pdu()
 +            if pdu != genpdu:
 +                print "ERROR: Reencoded SMS doesn't match"
 +                print "Orig PDU: ", pdu
 +                print "ReencPDU: ", genpdu
 +                print repr(sms)
 +                sms = SMS.decode(genpdu, dir)
 +            print repr(sms)
 +        except SMSError, e:
 +            print "%s, PDU was: %s\n" % (e, pdu)
  
      for pdu in pdus_MT:
          testpdu(pdu, "MT")
 +
      for pdu in pdus_MO:
          testpdu(pdu, "MO")
  
      for pdu in pdus_CB:
 -        cb = CellBroadcast(pdu)
 -        print cb.repr()
 +        cb = CellBroadcast.decode(pdu)
 +        print repr(cb)
  
  # vim: expandtab shiftwidth=4 tabstop=4

-- 
FSO frameworkd Debian packaging



More information about the pkg-fso-commits mailing list