Home Blog CV Projects Patterns Notes Book Colophon Search

Find the IP of a Huawei b525 Router

19 Dec, 2019

I have home broadband using the Huawei b525 4G router.

I'd like to know when its external IP changes so that I can update any DNS records that point to services that run on the network from that router.

I could do that by asking a third party service what my IP was. For example http://whatismyip.akamai.com.

Another way is to log into the router and inspect its settings.

Here is some code that you can run to do just that. Note that it is based on ideas I found from Hamish McNeish's efforts at https://github.com/jinxo13/HuaweiB525Router so thanks go to Hamish.

I call this file get_ip_from_b525.py. In this version the router's IP is hard coded to '192.168.8.1' which is the default for the router. You can change it near the end if you've set yours up differently.

# -*- coding: utf-8 -*-

'''
Python 3 script. Doesn't work with Python 2. Based on https://github.com/jinxo13/HuaweiB525Router

MIT License

Copyright (c) 2019 Hamish McNeish
              2019 James Gardner

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

import xml.etree.ElementTree as ET
import sys
from time import sleep
from xml.sax.saxutils import escape
import logging
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
import http.cookies
from datetime import datetime, timedelta
import threading
import xml.etree.ElementTree as ET
import inspect
import re
import http.client
import uuid
import hashlib
import hmac
from binascii import hexlify

def generate_nonce():
    """ generate random clientside nonce """
    return uuid.uuid4().hex + uuid.uuid4().hex

def get_client_proof(clientnonce, servernonce, password, salt, iterations):
    """ calculates server client proof (part of the SCRAM algorithm) """
    msg = "%s,%s,%s" % (clientnonce, servernonce, servernonce)
    salted_pass = hashlib.pbkdf2_hmac(
        'sha256', password.encode('UTF-8'), bytearray.fromhex(salt), iterations)
    client_key = hmac.new(b'Client Key', msg=salted_pass,
                        digestmod=hashlib.sha256)
    stored_key = hashlib.sha256()
    stored_key.update(client_key.digest())
    signature = hmac.new(msg.encode('utf_8'),
                        msg=stored_key.digest(), digestmod=hashlib.sha256)
    client_key_digest = client_key.digest()
    signature_digest = signature.digest()
    client_proof = bytearray()
    i = 0
    while i < client_key.digest_size:
        client_proof.append(client_key_digest[i] ^ signature_digest[i])
        i = i + 1
    return hexlify(client_proof)

def get_param(vals, key, default=None):
    if not key in vals:
        vals[key] = default
    return vals[key]


class XmlObject(object):
    '''A simple object to handle XML object serialisation'''

    def __init__(self, settings=None):
        self._SKIP_BLANK = self._get_param(settings, 'skip_blanks', False)
        self._SKIP_CLASS_ELEMENT = self._get_param(settings, 'skip_class_element', False)

    @classmethod
    def _get_param(cls, vals, key, default=None):
        return get_param(vals, key, default)

    def buildXmlRequest(self): return self.buildXML(root='request')
    def buildXmlResponse(self): return self.buildXML(root='response')
    def buildXmlError(self): return self.buildXML(root='error')
    def buildXML(self, header=True, root='request'):
        result = []
        if (header):
            result.append('<?xml version="1.0" encoding="UTF-8"?>')
            result.append('<'+root+'>')
        for prop in self.getPropertyNames():
            value = self.getValue(prop)
            result.extend(['<', prop, '>'])
            result.append(str(value))
            result.extend(['</', prop, '>'])
        if (header):
            result.append('</'+root+'>')
        return ''.join(result)


class CustomXml(XmlObject):
    def __init__(self, props, element_name=None):
        super(CustomXml, self).__init__({'skip_class_element': True})
        if element_name is None:
            element_name = self.__class__.__name__
        self.ele_name = element_name
        self.vals = props.copy()
    def getPropertyNames(self):
        return list(self.vals.keys())
    def getValue(self, property):
        return self.vals[property]
    def getElementName(self): return self.ele_name

logger = logging.getLogger(__name__)

#Dictionary to hold all GET APIS, used by testFeatures function
GET_APIS = {}
#Decorator for GET API functions, populates the GET_APIS dictionary
def get_api(api):
    '''Designate function as a GET API call'''
    def api_decorator(f):
        GET_APIS[f.__name__] = api
        def decorated_function(*args):
            inst = args[0]
            if issubclass(type(inst), RouterObject):
                return inst.router.api(api)
        return decorated_function
    return api_decorator


def post_api(f):
    '''Decorator to ensure any errors are returned as an XML response'''
    def decorated_function(*args, **kwargs):
        return f(*args, **kwargs)
    return decorated_function

class RouterObject(object):
    '''Parent for all router modules'''
    def __init__(self, router):
        self.router = router
        self.api = router.api


class Device(RouterObject):
    '''Device module'''
    @property
    @get_api(api='device/information')
    def info(self): pass


class B525Router(object):
    '''B525 implementation'''
    REQUEST_TOKEN = '__RequestVerificationToken'

    def __init__(self, host):
        self.client = None
        self.cookie = None
        self.router = host

        self.username = None
        self.__password = None
        self.__rsae = None
        self.__rsan = None
        self.__is_logged_in = False
        self.__lock = threading.Lock()

        self.device = Device(self)

    def login(self, username, password, keepalive=300):
        with self.__lock:
            self.__last_login=datetime.now()-timedelta(seconds=keepalive)
            self.username = username
            self.__password = password
            self.__timeout = keepalive
            return self.__login()

    def __setup_session(self):
        """ gets the url from the server ignoring the response, just to get session cookie set up """
        url = "http://%s/" % self.router
        if self.client is None:
            self.client = http.client.HTTPConnection(self.router)
        response = self.__get(url)
        response.raise_for_status()
        # will have to debug this one as without delay here it was throwing
        # a buffering exception on one of the machines
        sleep(1)

    def __get_server_token(self):
        """ retrieves server token """
        url = "http://%s/api/webserver/token" % self.router
        token_response = self.__get(url).text
        root = ET.fromstring(token_response)
        return root.findall('./token')[0].text

    def __api_challenge(self):
        self.__setup_session()
        token = self.__get_server_token()
        url = "http://%s/api/user/challenge_login" % self.router
        self.clientnonce = generate_nonce()
        xml = CustomXml({
            'username': self.username,
            'firstnonce': self.clientnonce,
            'mode': 1
            }).buildXML()
        headers = {'Content-type': 'text/html', self.REQUEST_TOKEN: token[32:]}
        response = self.__post(url=url, data=xml, headers=headers)
        return response

    def __login(self):
        """ logs in to the router using SCRAM method of authentication """
        logger.info('LOGIN for user [%s]' % self.username)
        response = self.__api_challenge()
        verification_token = response.headers[self.REQUEST_TOKEN]
        scram_data = ET.fromstring(response.text)
        servernonce = scram_data.findall('./servernonce')[0].text
        salt = scram_data.findall('./salt')[0].text
        iterations = int(scram_data.findall('./iterations')[0].text)
        client_proof = get_client_proof(self.clientnonce, servernonce, self.__password, salt, iterations).decode('UTF-8')
        login_request = CustomXml({
            'clientproof': client_proof,
            'finalnonce': servernonce}).buildXML()
        headers = {'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
                   self.REQUEST_TOKEN: verification_token}
        url = "http://%s/api/user/authentication_login" % self.router
        result = self.__post(url=url, data=login_request, headers=headers)
        verification_token = result.headers[self.REQUEST_TOKEN]
        self.__last_login = datetime.now()
        '''
        The SCRAM protocol would normally validate the server signatures
        We're assuming this is ok
        e.g.
        var serverProof = scram.serverProof(psd, salt, iter, authMsg);
        if (ret.response.serversignature == serverProof) {
        var publicKeySignature = scram.signature(CryptoJS.enc.Hex.parse(ret.response.rsan), CryptoJS.enc.Hex.parse(serverKey)).toString();
        if (ret.response.rsapubkeysignature == publicKeySignature) {
        '''
        xml = ET.fromstring(result.text)
        self.__rsae = xml.find('.//rsae').text
        self.__rsan = xml.find('.//rsan').text
        self.__is_logged_in = True

    def __post(self, url, data, headers):
        logger.debug('------------ REQUEST to %s -------------', url)
        logger.debug('-- HEADERS --')
        logger.debug('%s', headers)
        logger.debug('-------------')
        logger.debug('-- DATA --')
        logger.debug('%s', data)
        logger.debug('-------------')

        path = url[len('http://')+len(self.router):]
        logger.debug(headers)
        headers = headers or {}
        if self.cookie:
            headers['Cookie'] = self.cookie.output(header="").strip()
        logger.debug(headers)
        self.client.request("POST", path, data, headers)
        response = self.client.getresponse()
        class Result:
            def raise_for_status(self):
                if response.status != 200:
                    raise Exception('Bad response')
        result = Result()
        result.status_code = response.status
        result.headers = dict(response.getheaders())
        result.text = response.read()
        if result.headers.get('Set-Cookie'):
            self.cookie = http.cookies.SimpleCookie()
            self.cookie.load(result.headers['Set-Cookie'])
        logger.info('POST %s %i' % (url, result.status_code))
        logger.debug('------------ RESPONSE to %s -------------', url)
        logger.debug('-- HEADERS --')
        logger.debug('%s', result.headers)
        logger.debug('-------------')
        logger.debug('-- DATA --')
        logger.debug('%s', result.text)
        logger.debug('-------------')
        return result

    def __get(self, url, headers=None):
        self.client = http.client.HTTPConnection(self.router)
        logger.debug('------------ REQUEST to %s -------------', url)
        logger.debug('-- HEADERS --')
        logger.debug('%s', headers)
        logger.debug('-------------')

        path = url[len('http://')+len(self.router):]
        logger.debug(headers)
        headers = headers or {}
        if self.cookie:
            headers['Cookie'] = self.cookie.output(header="").strip()
        logger.debug(headers)
        self.client.request("GET", path, None, headers)
        response = self.client.getresponse()
        class Result:
            def raise_for_status(self):
                if response.status != 200:
                    raise Exception('Bad response')
        result = Result()
        result.status_code = response.status
        result.headers = dict(response.getheaders())
        result.text = response.read()
        if result.headers.get('Set-Cookie'):
            self.cookie = http.cookies.SimpleCookie()
            self.cookie.load(result.headers['Set-Cookie'])


        logger.info('GET %s %i' % (url, result.status_code))
        logger.debug('------------ RESPONSE to %s -------------', url)
        logger.debug('-- HEADERS --')
        logger.debug('%s', result.headers)
        logger.debug('-------------')
        logger.debug('-- DATA --')
        logger.debug('%s', result.text)
        logger.debug('-------------')
        return result

    @post_api
    def api(self, url, data=None, encrypted=False):
        """ Handles all api calls to the router """
        verification_token = self.__get_server_token()[32:]
        url = "http://%s/api/%s" % (self.router, url)
        headers = {}
        headers[self.REQUEST_TOKEN] = verification_token
        headers['Content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
        if data is None or data == '':
            response = self.__get(url, headers).text
        return response


if __name__ == '__main__':
    import sys
    password = sys.argv[1]
    router = B525Router('192.168.8.1')
    router.login(username='admin', password=password)
    response = router.device.info
    startpos = response.find(b'<WanIPAddress>')
    if startpos != -1:
        endpos = response.find(b'</WanIPAddress>')
        print(response[startpos+len(b'<WanIPAddress>'):endpos].decode('utf-8'))
        sys.exit(0)
    sys.exit(1)

You can run it like this:

python3 -u get_ip_from_b525.py YOUR_ROUTER_ADMIN_PASSWORD

Comments

Be the first to comment.

Add Comment





Copyright James Gardner 1996-2020 All Rights Reserved. Admin.