19 Dec, 2019
I have home broadband using the Huawei b525 4G router.
I'd like to know when its external IP changes so that I can update any DNS records that point to services that run on the network from that router.
I could do that by asking a third party service what my IP was. For example http://whatismyip.akamai.com.
Another way is to log into the router and inspect its settings.
Here is some code that you can run to do just that. Note that it is based on ideas I found from Hamish McNeish's efforts at https://github.com/jinxo13/HuaweiB525Router so thanks go to Hamish.
I call this file get_ip_from_b525.py
. In this version the router's IP is hard coded to '192.168.8.1' which is the default for the router. You can change it near the end if you've set yours up differently.
# -*- coding: utf-8 -*-
'''
Python 3 script. Doesn't work with Python 2. Based on https://github.com/jinxo13/HuaweiB525Router
MIT License
Copyright (c) 2019 Hamish McNeish
2019 James Gardner
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
import xml.etree.ElementTree as ET
import sys
from time import sleep
from xml.sax.saxutils import escape
import logging
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
import http.cookies
from datetime import datetime, timedelta
import threading
import xml.etree.ElementTree as ET
import inspect
import re
import http.client
import uuid
import hashlib
import hmac
from binascii import hexlify
def generate_nonce():
""" generate random clientside nonce """
return uuid.uuid4().hex + uuid.uuid4().hex
def get_client_proof(clientnonce, servernonce, password, salt, iterations):
""" calculates server client proof (part of the SCRAM algorithm) """
msg = "%s,%s,%s" % (clientnonce, servernonce, servernonce)
salted_pass = hashlib.pbkdf2_hmac(
'sha256', password.encode('UTF-8'), bytearray.fromhex(salt), iterations)
client_key = hmac.new(b'Client Key', msg=salted_pass,
digestmod=hashlib.sha256)
stored_key = hashlib.sha256()
stored_key.update(client_key.digest())
signature = hmac.new(msg.encode('utf_8'),
msg=stored_key.digest(), digestmod=hashlib.sha256)
client_key_digest = client_key.digest()
signature_digest = signature.digest()
client_proof = bytearray()
i = 0
while i < client_key.digest_size:
client_proof.append(client_key_digest[i] ^ signature_digest[i])
i = i + 1
return hexlify(client_proof)
def get_param(vals, key, default=None):
if not key in vals:
vals[key] = default
return vals[key]
class XmlObject(object):
'''A simple object to handle XML object serialisation'''
def __init__(self, settings=None):
self._SKIP_BLANK = self._get_param(settings, 'skip_blanks', False)
self._SKIP_CLASS_ELEMENT = self._get_param(settings, 'skip_class_element', False)
@classmethod
def _get_param(cls, vals, key, default=None):
return get_param(vals, key, default)
def buildXmlRequest(self): return self.buildXML(root='request')
def buildXmlResponse(self): return self.buildXML(root='response')
def buildXmlError(self): return self.buildXML(root='error')
def buildXML(self, header=True, root='request'):
result = []
if (header):
result.append('<?xml version="1.0" encoding="UTF-8"?>')
result.append('<'+root+'>')
for prop in self.getPropertyNames():
value = self.getValue(prop)
result.extend(['<', prop, '>'])
result.append(str(value))
result.extend(['</', prop, '>'])
if (header):
result.append('</'+root+'>')
return ''.join(result)
class CustomXml(XmlObject):
def __init__(self, props, element_name=None):
super(CustomXml, self).__init__({'skip_class_element': True})
if element_name is None:
element_name = self.__class__.__name__
self.ele_name = element_name
self.vals = props.copy()
def getPropertyNames(self):
return list(self.vals.keys())
def getValue(self, property):
return self.vals[property]
def getElementName(self): return self.ele_name
logger = logging.getLogger(__name__)
#Dictionary to hold all GET APIS, used by testFeatures function
GET_APIS = {}
#Decorator for GET API functions, populates the GET_APIS dictionary
def get_api(api):
'''Designate function as a GET API call'''
def api_decorator(f):
GET_APIS[f.__name__] = api
def decorated_function(*args):
inst = args[0]
if issubclass(type(inst), RouterObject):
return inst.router.api(api)
return decorated_function
return api_decorator
def post_api(f):
'''Decorator to ensure any errors are returned as an XML response'''
def decorated_function(*args, **kwargs):
return f(*args, **kwargs)
return decorated_function
class RouterObject(object):
'''Parent for all router modules'''
def __init__(self, router):
self.router = router
self.api = router.api
class Device(RouterObject):
'''Device module'''
@property
@get_api(api='device/information')
def info(self): pass
class B525Router(object):
'''B525 implementation'''
REQUEST_TOKEN = '__RequestVerificationToken'
def __init__(self, host):
self.client = None
self.cookie = None
self.router = host
self.username = None
self.__password = None
self.__rsae = None
self.__rsan = None
self.__is_logged_in = False
self.__lock = threading.Lock()
self.device = Device(self)
def login(self, username, password, keepalive=300):
with self.__lock:
self.__last_login=datetime.now()-timedelta(seconds=keepalive)
self.username = username
self.__password = password
self.__timeout = keepalive
return self.__login()
def __setup_session(self):
""" gets the url from the server ignoring the response, just to get session cookie set up """
url = "http://%s/" % self.router
if self.client is None:
self.client = http.client.HTTPConnection(self.router)
response = self.__get(url)
response.raise_for_status()
# will have to debug this one as without delay here it was throwing
# a buffering exception on one of the machines
sleep(1)
def __get_server_token(self):
""" retrieves server token """
url = "http://%s/api/webserver/token" % self.router
token_response = self.__get(url).text
root = ET.fromstring(token_response)
return root.findall('./token')[0].text
def __api_challenge(self):
self.__setup_session()
token = self.__get_server_token()
url = "http://%s/api/user/challenge_login" % self.router
self.clientnonce = generate_nonce()
xml = CustomXml({
'username': self.username,
'firstnonce': self.clientnonce,
'mode': 1
}).buildXML()
headers = {'Content-type': 'text/html', self.REQUEST_TOKEN: token[32:]}
response = self.__post(url=url, data=xml, headers=headers)
return response
def __login(self):
""" logs in to the router using SCRAM method of authentication """
logger.info('LOGIN for user [%s]' % self.username)
response = self.__api_challenge()
verification_token = response.headers[self.REQUEST_TOKEN]
scram_data = ET.fromstring(response.text)
servernonce = scram_data.findall('./servernonce')[0].text
salt = scram_data.findall('./salt')[0].text
iterations = int(scram_data.findall('./iterations')[0].text)
client_proof = get_client_proof(self.clientnonce, servernonce, self.__password, salt, iterations).decode('UTF-8')
login_request = CustomXml({
'clientproof': client_proof,
'finalnonce': servernonce}).buildXML()
headers = {'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
self.REQUEST_TOKEN: verification_token}
url = "http://%s/api/user/authentication_login" % self.router
result = self.__post(url=url, data=login_request, headers=headers)
verification_token = result.headers[self.REQUEST_TOKEN]
self.__last_login = datetime.now()
'''
The SCRAM protocol would normally validate the server signatures
We're assuming this is ok
e.g.
var serverProof = scram.serverProof(psd, salt, iter, authMsg);
if (ret.response.serversignature == serverProof) {
var publicKeySignature = scram.signature(CryptoJS.enc.Hex.parse(ret.response.rsan), CryptoJS.enc.Hex.parse(serverKey)).toString();
if (ret.response.rsapubkeysignature == publicKeySignature) {
'''
xml = ET.fromstring(result.text)
self.__rsae = xml.find('.//rsae').text
self.__rsan = xml.find('.//rsan').text
self.__is_logged_in = True
def __post(self, url, data, headers):
logger.debug('------------ REQUEST to %s -------------', url)
logger.debug('-- HEADERS --')
logger.debug('%s', headers)
logger.debug('-------------')
logger.debug('-- DATA --')
logger.debug('%s', data)
logger.debug('-------------')
path = url[len('http://')+len(self.router):]
logger.debug(headers)
headers = headers or {}
if self.cookie:
headers['Cookie'] = self.cookie.output(header="").strip()
logger.debug(headers)
self.client.request("POST", path, data, headers)
response = self.client.getresponse()
class Result:
def raise_for_status(self):
if response.status != 200:
raise Exception('Bad response')
result = Result()
result.status_code = response.status
result.headers = dict(response.getheaders())
result.text = response.read()
if result.headers.get('Set-Cookie'):
self.cookie = http.cookies.SimpleCookie()
self.cookie.load(result.headers['Set-Cookie'])
logger.info('POST %s %i' % (url, result.status_code))
logger.debug('------------ RESPONSE to %s -------------', url)
logger.debug('-- HEADERS --')
logger.debug('%s', result.headers)
logger.debug('-------------')
logger.debug('-- DATA --')
logger.debug('%s', result.text)
logger.debug('-------------')
return result
def __get(self, url, headers=None):
self.client = http.client.HTTPConnection(self.router)
logger.debug('------------ REQUEST to %s -------------', url)
logger.debug('-- HEADERS --')
logger.debug('%s', headers)
logger.debug('-------------')
path = url[len('http://')+len(self.router):]
logger.debug(headers)
headers = headers or {}
if self.cookie:
headers['Cookie'] = self.cookie.output(header="").strip()
logger.debug(headers)
self.client.request("GET", path, None, headers)
response = self.client.getresponse()
class Result:
def raise_for_status(self):
if response.status != 200:
raise Exception('Bad response')
result = Result()
result.status_code = response.status
result.headers = dict(response.getheaders())
result.text = response.read()
if result.headers.get('Set-Cookie'):
self.cookie = http.cookies.SimpleCookie()
self.cookie.load(result.headers['Set-Cookie'])
logger.info('GET %s %i' % (url, result.status_code))
logger.debug('------------ RESPONSE to %s -------------', url)
logger.debug('-- HEADERS --')
logger.debug('%s', result.headers)
logger.debug('-------------')
logger.debug('-- DATA --')
logger.debug('%s', result.text)
logger.debug('-------------')
return result
@post_api
def api(self, url, data=None, encrypted=False):
""" Handles all api calls to the router """
verification_token = self.__get_server_token()[32:]
url = "http://%s/api/%s" % (self.router, url)
headers = {}
headers[self.REQUEST_TOKEN] = verification_token
headers['Content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
if data is None or data == '':
response = self.__get(url, headers).text
return response
if __name__ == '__main__':
import sys
password = sys.argv[1]
router = B525Router('192.168.8.1')
router.login(username='admin', password=password)
response = router.device.info
startpos = response.find(b'<WanIPAddress>')
if startpos != -1:
endpos = response.find(b'</WanIPAddress>')
print(response[startpos+len(b'<WanIPAddress>'):endpos].decode('utf-8'))
sys.exit(0)
sys.exit(1)
You can run it like this:
python3 -u get_ip_from_b525.py YOUR_ROUTER_ADMIN_PASSWORD
Be the first to comment.
Copyright James Gardner 1996-2020 All Rights Reserved. Admin.