[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, master, updated. milestone4-368-g700ab82
Daniel Willmann
daniel at totalueberwachung.de
Mon Feb 2 18:51:18 UTC 2009
The following commit has been merged in the master branch:
commit bb64b62e42d743ce09765ddd35fed8878503d4aa
Merge: 08a0cd7fb96950a8be203f1dc0aa1bdea5db2543 d8a55311c5d29bb2946c76e8445748e04049eb1a
Author: Daniel Willmann <daniel at totalueberwachung.de>
Date: Tue Nov 18 00:02:57 2008 +0100
Merge branch 'stabilization/milestone4'
diff --combined framework/subsystems/ogsmd/gsm/sms.py
index e33f388,716e12b..fc1fd1b
--- a/framework/subsystems/ogsmd/gsm/sms.py
+++ b/framework/subsystems/ogsmd/gsm/sms.py
@@@ -15,11 -15,8 +15,11 @@@ from ogsmd.gsm.convert import
from ogsmd.gsm.const import CB_PDU_DCS_LANGUAGE
import math
+class SMSError(Exception):
+ pass
+
# ** Dekodieren
-# smsobject = decodeSMS( pdu )
+# smsobject = SMS.decode( pdu )
# print "von", smsobject.sender(), "um", smsobject.arrivalTime(), "via" smsobject.serviceCenter(), ...
# print "uses charset", smsobject.charset(), ...
# ** Enkodieren
@@@ -30,6 -27,114 +30,6 @@@
# * SmsMessage (repraesentiert eine -- moeglicherweise Multipart -- Nachricht)
# * Weitere, fuer spezifische SMS-Typen (Status Report) eigene Klassen? Ggfs. zu komplex.
-def decodeSMS( pdu, direction ):
- # first convert the string into a bytestream
- bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
-
- sms = AbstractSMS( direction )
-
- offset = 0
- # SCA - Service Center address
- sca_len = bytes[offset]
- offset += 1
- if sca_len > 0:
- sms.sca = PDUAddress( *decodePDUNumber( bytes[offset:offset+sca_len] ) )
- else:
- sms.sca = False
-
- offset += sca_len
- # PDU type
- pdu_type = bytes[offset]
-
- sms.pdu_mti = pdu_type & 0x03
- sms.pdu_rp = pdu_type & 0x80 != 0
- sms.pdu_udhi = pdu_type & 0x40 != 0
- sms.pdu_srr = pdu_type & 0x20 != 0
- sms.pdu_sri = sms.pdu_srr
- sms.pdu_vpf = (pdu_type & 0x18)>>3
- sms.pdu_rd = pdu_type & 0x04 != 0
- sms.pdu_mms = sms.pdu_rd
-
- offset += 1
- if sms.pdu_mti == 1:
- # MR - Message Reference
- sms.mr = bytes[offset]
- offset += 1
-
- # OA/DA - Originating or Destination Address
- # WARNING, the length is coded in digits of the number, not in octets occupied!
- oa_len = 1 + (bytes[offset] + 1) / 2
- offset += 1
- sms.oa = PDUAddress( *decodePDUNumber( bytes[offset:offset+oa_len] ) )
- sms.da = sms.oa
-
- offset += oa_len
- # PID - Protocol identifier
- sms.pid = bytes[offset]
-
- offset += 1
- # DCS - Data Coding Scheme
- sms.dcs = bytes[offset]
-
- offset += 1
- if sms.pdu_mti == 0:
- # SCTS - Service Centre Time Stamp
- sms.scts = decodePDUTime( bytes[offset:offset+7] )
- offset += 7
- else:
- # VP - Validity Period FIXME
- if sms.pdu_vpf == 2:
- # Relative
- sms.vp = bytes[offset]
- offset += 1
- elif sms.pdu_vpf == 3:
- # Absolute
- sms.vp = decodePDUTime( bytes[offset:offset+7] )
- offset += 7
-
- # UD - User Data
- ud_len = bytes[offset]
- offset += 1
- parse_userdata( sms, ud_len, bytes[offset:] )
- return sms
-
-def parse_userdata( sms, ud_len, bytes ):
- offset = 0
- sms.udh = {}
- if sms.pdu_udhi:
- # Decode the headers
- udh_len = bytes[offset]
- offset += 1
- while offset < udh_len:
- # Information Element
- iei = bytes[offset]
- offset += 1
- ie_len = bytes[offset]
- offset += 1
- ie_data = bytes[offset:offset+ie_len]
- offset += ie_len
- # FIXME
- sms.udh[iei] = ie_data
-
- # User Data FIXME
- # We need to look at the DCS in order to be able to decide what
- # to use here
-
- # We need to lose the padding bits before the start of the
- # seven-bit packed data, which means we need to figure out how
- # many there are...
- # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
-
- userdata = "".join( map( chr, bytes[offset:] ) )
- if sms.dcs_alphabet == "gsm_default":
- padding_size = ((7 * ud_len) - (8 * (offset))) % 7
- userdata = unpack_sevenbit(bytes[offset:], padding_size)
- septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
- userdata = userdata[:septets]
-
- if not sms.dcs_alphabet is None:
- sms.ud = userdata.decode( sms.dcs_alphabet )
-
class PDUAddress:
@classmethod
def guess( cls, number ):
@@@ -44,25 -149,6 +44,25 @@@
number = number
ntype = 5
return cls( ntype, 1, number )
+
+ @classmethod
+ def decode( cls, bs ):
+ num_type = ( bs[0] & 0x70) >> 4
+ num_plan = ( bs[0] & 0x0F )
+ number = bs[1:]
+ if number == []:
+ number = ""
+ elif num_type == 5:
+ number = unpack_sevenbit( number )
+ number = number.decode( "gsm_default" )
+ else:
+ number = bcd_decode( number )
+ # Every occurence of the padding semi-octet should be removed
+ number = number.replace( "f", "" )
+ # Decode special "digits"
+ number = number.translate( PDUADDR_DEC_TRANS )
+ return cls( num_type, num_plan, number )
+
def __init__( self, type, dialplan, number ):
self.type = type
self.dialplan = dialplan
@@@ -73,97 -159,7 +73,97 @@@
prefix = "+"
return prefix + self.number
-class AbstractSMS(object):
+ def pdu( self ):
+ if self.type == 5:
+ number = self.number.encode("gsm_default")
+ enc = pack_sevenbit(number)
+ length = len(enc)*2
+ if (len(self.number)*7)%8 <= 4:
+ length -= 1
+ else:
+ # Encode special "digits"
+ number = self.number.translate(PDUADDR_ENC_TRANS)
+ enc = bcd_encode(number)
+ length = len(number)
+ return flatten( [length, 0x80 | self.type << 4 | self.dialplan, enc] )
+
+
+class SMS(object):
+ @classmethod
+ def decode( cls, pdu, direction ):
+ # first convert the string into a bytestream
+ try:
+ bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
+ except ValueError:
+ raise SMSError, "PDU malformed"
+
+ sms = cls( direction )
+
+ offset = 0
+ # SCA - Service Center address
+ sca_len = bytes[offset]
+ offset += 1
+ if sca_len > 0:
+ sms.sca = PDUAddress.decode( bytes[offset:offset+sca_len] )
+ else:
+ sms.sca = False
+
+ offset += sca_len
+ # PDU type
+ pdu_type = bytes[offset]
+
+ sms.pdu_mti = pdu_type & 0x03
+ sms.pdu_rp = pdu_type & 0x80 != 0
+ sms.pdu_udhi = pdu_type & 0x40 != 0
+ sms.pdu_srr = pdu_type & 0x20 != 0
+ sms.pdu_sri = sms.pdu_srr
+ sms.pdu_vpf = (pdu_type & 0x18)>>3
+ sms.pdu_rd = pdu_type & 0x04 != 0
+ sms.pdu_mms = sms.pdu_rd
+
+ offset += 1
+ if sms.pdu_mti == 1:
+ # MR - Message Reference
+ sms.mr = bytes[offset]
+ offset += 1
+
+ # OA/DA - Originating or Destination Address
+ # WARNING, the length is coded in digits of the number, not in octets occupied!
+ oa_len = 1 + (bytes[offset] + 1) / 2
+ offset += 1
+ sms.oa = PDUAddress.decode( bytes[offset:offset+oa_len] )
+ sms.da = sms.oa
+
+ offset += oa_len
+ # PID - Protocol identifier
+ sms.pid = bytes[offset]
+
+ offset += 1
+ # DCS - Data Coding Scheme
+ sms.dcs = bytes[offset]
+
+ offset += 1
+ if sms.pdu_mti == 0:
+ # SCTS - Service Centre Time Stamp
+ sms.scts = decodePDUTime( bytes[offset:offset+7] )
+ offset += 7
+ else:
+ # VP - Validity Period FIXME
+ if sms.pdu_vpf == 2:
+ # Relative
+ sms.vp = bytes[offset]
+ offset += 1
+ elif sms.pdu_vpf == 3:
+ # Absolute
+ sms.vp = decodePDUTime( bytes[offset:offset+7] )
+ offset += 7
+
+ # UD - User Data
+ ud_len = bytes[offset]
+ offset += 1
+ sms._parse_userdata( ud_len, bytes[offset:] )
+ return sms
+
def __init__( self, direction ):
self.direction = direction
self.sca = False
@@@ -175,6 -171,7 +175,7 @@@
self.pdu_rd = False
self.pdu_mms = False
self.udh = {}
+ self.ud = ""
self.mr = 0
self.pid = 0
self.dcs_alphabet = "gsm_default"
@@@ -184,45 -181,6 +185,45 @@@
self.dcs_mwi_type = None
self.dcs_mclass = None
+ def _parse_userdata( self, ud_len, bytes ):
+ offset = 0
+ self.udh = {}
+ if self.pdu_udhi:
+ # Decode the headers
+ udh_len = bytes[offset]
+ offset += 1
+ while offset < udh_len:
+ # Information Element
+ iei = bytes[offset]
+ offset += 1
+ ie_len = bytes[offset]
+ offset += 1
+ ie_data = bytes[offset:offset+ie_len]
+ offset += ie_len
+ # FIXME
+ self.udh[iei] = ie_data
+
+ # User Data FIXME
+ # We need to look at the DCS in order to be able to decide what
+ # to use here
+
+ # We need to lose the padding bits before the start of the
+ # seven-bit packed data, which means we need to figure out how
+ # many there are...
+ # See the diagram on page 58 of GSM_03.40_6.0.0.pdf.
+
+ userdata = "".join( map( chr, bytes[offset:] ) )
+ if self.dcs_alphabet == "gsm_default":
+ padding_size = ((7 * ud_len) - (8 * (offset))) % 7
+ userdata = unpack_sevenbit(bytes[offset:], padding_size)
+ septets = ud_len - int( math.ceil( (offset*8)/7.0 ) )
+ userdata = userdata[:septets]
+
+ if not self.dcs_alphabet is None:
+ self.ud = userdata.decode( self.dcs_alphabet )
+ else:
+ self.ud = userdata
+
def _getDCS( self ):
# TODO throw exceptions on invalid combinations
if self.dcs_mwi_type is None:
@@@ -294,27 -252,10 +295,27 @@@
dcs = property( _getDCS, _setDCS )
- def _getFeatureMap( self ):
- map = {}
- map["direction"] = self.direction
+ def _getType( self ):
if self.direction == "MT":
+ map = TP_MTI_INCOMING
+ elif self.direction == "MO":
+ map = TP_MTI_OUTGOING
+ return map[self.pdu_mti]
+
+ def _setType( self, smstype ):
+ if TP_MTI_INCOMING.has_key(smstype):
+ self.direction = "MT"
+ self.pdu_mti = TP_MTI_INCOMING[smstype]
+ elif TP_MTI_OUTGOING.has_key(smstype):
+ self.direction = "MO"
+ self.pdu_mti = TP_MTI_OUTGOING[smstype]
+
+ type = property( _getType, _setType )
+
+ def _getProperties( self ):
+ map = {}
+ map["type"] = self.type
+ if self.type == "sms-deliver":
# FIXME Return correct time with timezoneinfo
map["timestamp"] = self.scts[0].ctime() + " %+05i" % (self.scts[1]*100)
if 0 in self.udh:
@@@ -328,15 -269,15 +329,15 @@@
return map
- def _setFeatureMap( self, featureMap ):
- for k,v in featureMap.items():
+ def _setProperties( self, properties ):
+ for k,v in properties.items():
if k == "csm_id":
- if "csm_num" in featureMap and "csm_seq" in featureMap:
- self.udh[0] = [ v, featureMap["csm_num"], featureMap["csm_seq"] ]
+ if "csm_num" in properties and "csm_seq" in properties:
+ self.udh[0] = [ v, properties["csm_num"], properties["csm_seq"] ]
if k == "port":
self.udh[4] = [v]
- featureMap = property( _getFeatureMap, _setFeatureMap )
+ properties = property( _getProperties, _setProperties )
def _getUdhi( self ):
return self.udh
@@@ -349,10 -290,11 +350,10 @@@
def pdu( self ):
pdubytes = []
if self.sca:
- scabcd = bcd_encode( self.sca.number )
- pdubytes.append( len(scabcd) + 1 )
- pdubytes.append( 0x80 | (self.sca.type << 4) | self.sca.dialplan )
+ scabcd = self.sca.pdu()
+ # SCA has non-standard length
+ scabcd[0] = len( scabcd ) - 1
pdubytes.extend( scabcd )
- # FIXME This won't work with alphanumeric "numbers"
else:
pdubytes.append( 0 )
@@@ -374,7 -316,7 +375,7 @@@
if self.pdu_mti == 1:
pdubytes.append( self.mr )
- pdubytes.extend( encodePDUNumber(self.oa) )
+ pdubytes.extend( self.oa.pdu() )
pdubytes.append( self.pid )
@@@ -429,60 -371,51 +430,60 @@@
def serviceCenter( self ):
pass
- def repr( self ):
+ def __repr__( self ):
if self.pdu_mti == 0:
- return """AbstractSMS:
+ return """MT SMS:
ServiceCenter: %s
TimeStamp: %s
-PID: %i
+PID: 0x%x
DCS: 0x%x
Number: %s
Headers: %s
+Alphabet: %s
Message: %s
-""" % (self.sca, self.scts, self.pid, self.dcs, self.oa, self.udh, self.ud)
+""" % (self.sca, self.scts, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
else:
- return """AbstractSMS:
+ return """MO SMS:
ServiceCenter: %s
Valid: %s
-PID: %i
+PID: 0x%x
DCS: 0x%x
Number: %s
Headers: %s
+Alphabet: %s
Message: %s
-""" % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.ud)
+""" % (self.sca, self.pdu_vpf, self.pid, self.dcs, self.oa, self.udh, self.dcs_alphabet, repr(self.ud))
-class CellBroadcast(AbstractSMS):
- def __init__(self, pdu):
- self.dcs_alphabet = "gsm_default"
- self.dcs_language = None
- self.dcs_language_indication = False
- self.dcs_compressed = False
- self.dcs_mclass = None
+class CellBroadcast(SMS):
+ @classmethod
+ def decode( cls, pdu):
# first convert the string into a bytestream
bytes = [ int( pdu[i:i+2], 16 ) for i in range(0, len(pdu), 2) ]
- self.sn = bytes[0] << 8 | bytes[1]
- self.mid = bytes[2] << 8 | bytes[3]
- self.dcs = bytes[4]
- self.page = bytes[5]
+ cb = cls()
+ cb.sn = bytes[0] << 8 | bytes[1]
+ cb.mid = bytes[2] << 8 | bytes[3]
+ cb.dcs = bytes[4]
+ cb.page = bytes[5]
userdata = "".join( map( chr, bytes[6:] ) )
- if self.dcs_alphabet == "gsm_default":
+ if cb.dcs_alphabet == "gsm_default":
userdata = unpack_sevenbit(bytes[6:])
- if not self.dcs_alphabet is None:
+ if not cb.dcs_alphabet is None:
# \n is the padding character in CB messages so strip it
- self.ud = userdata.decode( self.dcs_alphabet ).strip("\n")
+ cb.ud = userdata.decode( cb.dcs_alphabet ).strip("\n")
+ else:
+ cb.ud = userdata
+
+ return cb
+ def __init__(self):
+ self.dcs_alphabet = "gsm_default"
+ self.dcs_language = None
+ self.dcs_language_indication = False
+ self.dcs_compressed = False
+ self.dcs_mclass = None
def _getDCS( self ):
if self.dcs_language_indication is None:
@@@ -559,32 -492,16 +560,32 @@@
dcs = property( _getDCS, _setDCS )
- def repr(self):
+ def pdu( self ):
+ # We don't need to generate the PDU for Cell Broadcasts
+ pass
+
+ def __repr__(self):
return """CellBroadcast
SN: %i
MID: %i
Page: %i
Alphabet: %s
Language: %s
-Message: %s""" % (self.sn, self.mid, self.page, self.dcs_alphabet, self.dcs_language, self.ud)
+Message: %s""" % (self.sn, self.mid, self.page, self.dcs_alphabet, self.dcs_language, repr(self.ud))
if __name__ == "__main__":
+ import sys
+ #============================================================================#
+ def readFromFile( path ):
+ #============================================================================#
+ try:
+ value = open( path, 'r' ).read().strip()
+ except IOError, e:
+ print( "(could not read from '%s': %s)" % ( path, e ) )
+ return "N/A"
+ else:
+ return value
+
pdus_MT = [
"0791448720900253040C914497035290960000500151614414400DD4F29C9E769F41E17338ED06",
"0791448720003023440C91449703529096000050015132532240A00500037A020190E9339A9D3EA3E920FA1B1466B341E472193E079DD3EE73D85DA7EB41E7B41C1407C1CBF43228CC26E3416137390F3AABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FAAC3EABCFEAB3FADC3EB7CFED73FBDC3EBF5D4416D9457411596457137D87B7E16438194E86BBCF6D16D9055D429548A28BE822BA882E6370196C2A8950E291E822BA88",
@@@ -607,55 -524,34 +608,55 @@@
"0791889663000009040C91889671342752000080908171153223282073788E4EBFDD2B1CCE96C3E16AB6592E67D32944ECF7780D9A8FE5E5B25BA468B514",
"0791889663000019040C918896138188020008809091907405238050B38A0A606F003F767E842C734E91D190017D664F6076846D3B52D590FD958B8DD15169500B67084E86FF0C4F605831540D4E8655CEFF0173FE572853EA898150B34E00500B7A7A767D7C218A0A52300030003900330031002D003100380031003900330030514D8CBB5831540DFF0C6A5F67035C31662F4F6076845594FF01",
"07918896532430280406918816880000809042215024235FC3309B0D42BEDB6590380F22A7C3ECB4FB0CE2AD7C20DEF85D77D3E579D0F84D2E836839900FC403C1D16F7719E47E837CA01D681866B341ECF738CC06A9EB733A889C0EB341ECF738CC06C1D16F7719E47EBB00",
+ "07914140279505F74404D011002000800190819234000704010200018000",
+ "07914140279505F74404D011002000800190913285000704010200028000",
+ "07914140279505F74404D011002000800190320243000704010200038000",
+ "0791947106004034040C9194713900303341008011311265854059D6B75B076A86D36CF11BEF024DD365103A2C2EBB413390BB5C2F839CE1315A9E1EA3E96537C805D2D6DBA0A0585E3797DDA0FB1ECD2EBB41D37419244ED3E965906845CBC56EB9190C069BCD6622",
+ "0791947106004034040C9194713900303341008011312270804059D6B75B076A86D36CF11BEF024DD365103A2C2EBB413490BB5C2F839CE1315A9E1EA3E96537C805D2D6DBA0A0585E3797DDA0FB1ECD2EBB41D37419244ED3E965906845CBC56EB9190C069BCD6622",
+
]
+
pdus_MO = [
"07910447946400F011000A9270042079330000AA0161",
"079194710716000001310C919491103246570000061B1EBD3CA703",
]
+ pdus_ACKPDU = [
+ "010080110191146140",
+ "010080112102618040",
+ ]
+
pdus_CB = [
"001000DD001133DAED46ABD56AB5186CD668341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D168341A8D46A3D100",
]
+ if len(sys.argv) == 3:
+ pdus_MT = readFromFile(sys.argv[1]).split("\n")
+ pdus_MO = readFromFile(sys.argv[2]).split("\n")
+
+
def testpdu(pdu, dir):
- sms = decodeSMS(pdu, dir)
- genpdu = sms.pdu()
- if pdu != genpdu:
- print "ERROR: Reencoded SMS doesn't match"
- print "Orig PDU: ", pdu
- print "ReencPDU: ", genpdu
- print sms.repr()
- sms = decodeSMS(genpdu, dir)
- print sms.repr()
+ try:
+ sms = SMS.decode(pdu, dir)
+ genpdu = sms.pdu()
+ if pdu != genpdu:
+ print "ERROR: Reencoded SMS doesn't match"
+ print "Orig PDU: ", pdu
+ print "ReencPDU: ", genpdu
+ print repr(sms)
+ sms = SMS.decode(genpdu, dir)
+ print repr(sms)
+ except SMSError, e:
+ print "%s, PDU was: %s\n" % (e, pdu)
for pdu in pdus_MT:
testpdu(pdu, "MT")
+
for pdu in pdus_MO:
testpdu(pdu, "MO")
for pdu in pdus_CB:
- cb = CellBroadcast(pdu)
- print cb.repr()
+ cb = CellBroadcast.decode(pdu)
+ print repr(cb)
# vim: expandtab shiftwidth=4 tabstop=4
--
FSO frameworkd Debian packaging
More information about the pkg-fso-commits
mailing list