Source code for HiLinkAPI

import logging
from threading import Thread
import requests
import xmltodict
import uuid
import base64
import hashlib
import binascii
import hmac
import time
from binascii import hexlify
from collections import OrderedDict
from bs4 import BeautifulSoup
from datetime import datetime


[docs]class hilinkException(Exception): """ HiLink API exception :param modemname: Unique name of the modem will be used in raising an exceptions to identify the respective modem :param message: Error message body for the raising exception :type modemname: string :type message: string """ def __init__(self, modemname, message): self.message = message self.modemname = modemname def __str__(self): return "Huawei HLink API Error ({}) : {}".format(self.modemname, self.message)
[docs]class webui(Thread): """ This class facilitate to open, authenticate and operate functions of supported Huawei HiLink modems. :param modemname: A uniquely identifiable name for the modem will useful when debugging or tracing with logs :param host: IP address of the modem :param username: Username if authentication required :param password: Password if authentication required :param logger: Logger object if using already configured :class:`logging.Logger` :type url: string :type host: string :type username: string, defaults to None :type password: string, defaults to None :type logger: :class:`logging.Logger`, defaults to None """ errorCodes = { # System errors 100002: "ERROR_SYSTEM_NO_SUPPORT", 100003: "ERROR_SYSTEM_NO_RIGHTS", 100004: "ERROR_BUSY", # Authentication errors 108001: "ERROR_LOGIN_USERNAME_WRONG", 108002: "ERROR_LOGIN_PASSWORD_WRONG", 108003: "ERROR_LOGIN_ALREADY_LOGIN", 108005: "ERROR_LOGIN_TOO_MANY_USERS_LOGINED", 108006: "ERROR_LOGIN_USERNAME_OR_PASSWORD_ERROR", 108007: "ERROR_LOGIN_TOO_MANY_TIMES", 108008: "MODIFYPASSWORD_ERROR", 108009: "ERROR_LOGIN_IN_DEFFERENT_DEVICES", 108010: "ERROR_LOGIN_FREQUENTLY_LOGIN", # SIM errors 101001: "ERROR_NO_SIM_CARD_OR_INVALID_SIM_CARD", 101002: "ERROR_CHECK_SIM_CARD_PIN_LOCK", 101003: "ERROR_CHECK_SIM_CARD_PUN_LOCK", 101004: "ERROR_CHECK_SIM_CARD_CAN_UNUSEABLE", 101005: "ERROR_ENABLE_PIN_FAILED", 101006: "ERROR_DISABLE_PIN_FAILED", 101007: "ERROR_UNLOCK_PIN_FAILED", 101008: "ERROR_DISABLE_AUTO_PIN_FAILED", 101009: "ERROR_ENABLE_AUTO_PIN_FAILED", 103002: "ERROR_DEVICE_PIN_VALIDATE_FAILED", 103003: "ERROR_DEVICE_PIN_MODIFFY_FAILED", 103004: "ERROR_DEVICE_PUK_MODIFFY_FAILED", 103008: "ERROR_DEVICE_SIM_CARD_BUSY", 103009: "ERROR_DEVICE_SIM_LOCK_INPUT_ERROR", 103011: "ERROR_DEVICE_PUK_DEAD_LOCK", # Network errors 112001: "ERROR_SET_NET_MODE_AND_BAND_WHEN_DAILUP_FAILED", 112002: "ERROR_SET_NET_SEARCH_MODE_WHEN_DAILUP_FAILED", 112003: "ERROR_SET_NET_MODE_AND_BAND_FAILED", 112005: "ERROR_NET_REGISTER_NET_FAILED", 112008: "ERROR_NET_SIM_CARD_NOT_READY_STATUS", # Session errors 125001: "ERROR_WRONG_TOKEN", 125002: "ERROR_WRONG_SESSION", 125003: "ERROR_WRONG_SESSION_TOKEN", } def __init__(self, modemname, host, username=None, password=None, logger=None): """ Initialize webui """ self._modemname = modemname self._host = host # assign empty strings if none self._username = username if username is not None else "" self._password = password if password is not None else "" # initialize logger if not provided if logger is None: self.logger = logging.getLogger() else: self.logger = logger # build http host URL self._httpHost = f"http://{self._host}" # timeout for a HTTP call (seconds) self._HTTPcallTimeOut = 10 # variables required for webui session self._sessionId = None self._RequestVerificationToken = None # Authenticaion required or not self._loginRequired = False #### WebUI variables#### self._sessionId = None self._RequestVerificationToken = None self._deviceClassify = None self._deviceName = None self._loginState = False # Logged in to the session or not self._webuiversion = None # Has to be 10 or 17/21 # webui initialization succeeded or not self._isWebUIInitialized = False # in an operation or not self._inOperation = False # session refresh interval in seconds self._sessionRefreshInterval = 10 # session refreshed after an operation self._sessionRefreshed = False # Last operation ended time self._lastOperationEndedTime = None # valid session or not (not logged in) self._validSession = False # login wait time self._loginWaitTime = 0 # active error code self._activeErrorCode = 0 # initialize thread stop self._stopped = True # thread stopped self._isStopped = True # network modes # LTE=3, WCDMA=2, GSM=1 network modes self._netModePrimary = 3 self._netModeSecondary = 2 ############################### # device info self._deviceName = None self._imei = None self._serial = None self._imsi = None self._iccid = None self._supportedModes = None self._hwversion = None self._swversion = None self._webui = None # connection info self._workmode = None self._wanIP = None self._networkName = None # data connection info self._roamingEnabled = False self._maxIdleTimeOut = 0 ######### Initialize ########### # Initialize the thread Thread.__init__(self)
[docs] def start(self): """ This method will start the thread. """ # initialize variables and webui self._stopped = False self._isStopped = False self.initialize() # Thread start Thread.start(self)
[docs] def stop(self): """ This method will initialize thread stop. """ self._stopped = True
[docs] def isStopped(self): """ This method will return successfully stopped or not. :return: Return deinited or not :rtype: boolean """ return self._isStopped
[docs] def setCredentials(self, username, password): """ This method will set/update username and password for authentication after initializing. :param username: Username if authentication required :param password: Password if authentication required :type username: string, defaults to None :type password: string, defaults to None """ self._username = username self._password = password
[docs] def processHTTPHeaders(self, response): """ This method will retrieve *SessionID* from cookies and *__RequestVerificationToken* from HTTP headers. This method has to be called after each :class:`requests.get` or :class:`requests.post` as mismatch with *SessionID* or *__RequestVerificationToken* between API(webui) and HTTP request call leading to return errors in API calls. :param response: Response object from :class:`requests.get` or :class:`requests.post` :type response: :class:`requests.Response` """ if 'SessionID' in response.cookies: self._sessionId = response.cookies['SessionID'] self.logger.debug(f"Updating SessionID = {self._sessionId}") headers = OrderedDict(response.headers) if '__RequestVerificationToken' in headers: self.logger.debug("Updating RequestVerificationToken = {}".format(self._RequestVerificationToken)) self._RequestVerificationToken = None self._RequestVerificationToken = headers['__RequestVerificationToken'].split("#")[0] self.logger.debug("Updated RequestVerificationToken to = {}".format(self._RequestVerificationToken))
[docs] def buildCookies(self): """ This method will build a dictionary object containing *SessionID* which is provided as cookies to HTTP requests. Each call of :meth:`~httpGet` and :meth:`~httpPost` will generate default cookies set if cookies not provided in parameters. :return: Return a dictionary containing cookies :rtype: dictionary """ cookies = None if self._sessionId: cookies = { 'SessionID': self._sessionId } # return return cookies
[docs] def httpGet(self, endpoint, cookies=None, headers=None): """ Call an API end point using a HTTP GET request. If :attr:`cookies` are not provided (when defaulted to None) will build cookies by calling :meth:`~buildCookies`. At the end of each call :meth:`~processHTTPHeaders` will call to retrieve *SessionID* and *__RequestVerificationToken*. :param endpoint: API end point (eg:- /api/device/information) :param cookies: cookies, defaults to None :param headers: HTTP headers, defaults to None :type endpoint: string :type postBody: string :type cookies: dictionary :type headers: dictionary :return: Return the HTTP response as a requests.Response :rtype: :class:`requests.Response` """ if(cookies == None): cookies = self.buildCookies() # request try: _response = requests.get(f"{self._httpHost}{endpoint}", cookies=cookies, timeout=self._HTTPcallTimeOut) self.processHTTPHeaders(_response) return _response except Exception as e: self.logger.error(e) raise hilinkException(self._modemname, f"Calling {self._httpHost}{endpoint} failed")
[docs] def httpPost(self, endpoint, postBody, cookies=None, headers=None): """ Call an API end point using a HTTP POST request. If :attr:`cookies` are not provided (when defaulted to None) will build cookies by calling :meth:`~buildCookies`. At the end of each call :meth:`~processHTTPHeaders` will call to retrieve *SessionID* and *__RequestVerificationToken*. :param endpoint: API end point (eg:- /api/user/authentication_login) :param postBody: HTTP body :param cookies: cookies :param headers: HTTP headers :type endpoint: string :type postBody: string :type cookies: dictionary :type headers: dictionary :return: Return the HTTP response as a requests.Response :rtype: :class:`requests.Response` """ if(cookies == None): cookies = self.buildCookies() # request try: _response = requests.post(f"{self._httpHost}{endpoint}", data=postBody, cookies=cookies, headers=headers, timeout=self._HTTPcallTimeOut) self.processHTTPHeaders(_response) return _response except Exception as e: self.logger.error(e) raise hilinkException(self._modemname, f"Calling {self._httpHost}{endpoint} failed")
[docs] def initialize(self): """ Calling this method will initialize API calls by * Initialize session and fetch *SessionID* * Request initial *__RequestVerificationToken* * Query authentication required or not * Identify webUI version """ self._sessionId = None self._RequestVerificationToken = None self._webuiversion = None self._deviceClassify = None self._deviceName = None # Initialize session try: self.httpGet(endpoint="/") self._isWebUIInitialized = True except Exception as e: # WebUI initialization failed self._isWebUIInitialized = False self.logger.error(e) # get request verification token # first webUI 10 or 21 try: self.logger.debug(f"Trying for webUI version 10") response = self.httpGet("/api/webserver/token") tokenJson = xmltodict.parse(response.text) if "response" in tokenJson: loginToken = tokenJson['response']['token'] self.logger.debug(f"Got new loginToken = {tokenJson}") size = len(loginToken) self._RequestVerificationToken = loginToken[(size - 32):(size)] self.logger.debug(f"Got new RequestVerificationToken = {self._RequestVerificationToken}") # set supporting webUI version is 10 as default self._webuiversion = 10 # check if it's 21 response = self.httpGet("/api/device/basic_information") tokenJson21Check = xmltodict.parse(response.text) if "response" in tokenJson21Check: # valid response if "WebUIVersion" in tokenJson21Check["response"]: if "21." in tokenJson21Check["response"]["WebUIVersion"]: self._webuiversion = 21 except: self._RequestVerificationToken = None self._webuiversion = None # If haven't fetched try for webUI version 17 if self._RequestVerificationToken is None: self.logger.debug(f"Trying for webUI version 17") response = self.httpGet("/html/home.html") soup = BeautifulSoup(response.text, "html.parser") meta = soup.head.meta if meta is not None: self._RequestVerificationToken = meta.get("content", None) self.logger.debug(f"Got new RequestVerificationToken = {self._RequestVerificationToken}") if self._RequestVerificationToken is None: raise hilinkException(self._modemname, "Failed to get a request verification token") else: self._webuiversion = 17 # End of request verification token fetching and webUI version self.logger.info(f"Huawei webUI version = {self._webuiversion}") ############################################################### # Get basic device info try: headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/api/device/basic_information", headers=headers) deviceInfo = xmltodict.parse(response.text) if "response" in deviceInfo: self._deviceClassify = deviceInfo['response']['classify'] self._deviceName = deviceInfo['response']['devicename'] else: self.sessionErrorCheck(deviceInfo) raise hilinkException(self._modemname, "Failed to get device info") except Exception as e: self.logger.error(e) raise hilinkException(self._modemname, "Failed to get device info") ############################################################### ################## Authentication required check ############## # common API endpoint for webui version 10,17 & 21 try: response = self.httpGet("/api/user/hilink_login") hilinkLogin = xmltodict.parse(response.text) if "response" in hilinkLogin: if int(hilinkLogin['response']['hilink_login']) == 0: # wingles always comes with authentication even hilink_login==0 if str(self._deviceClassify).upper() == "WINGLE" or str(self._deviceClassify).upper() == "MOBILE-WIFI": self._loginRequired = True else: self._loginRequired = False elif int(hilinkLogin['response']['hilink_login']) == 1: self._loginRequired = True else: self.sessionErrorCheck(hilinkLogin) raise hilinkException(self._modemname, "Invalid response while getting user hilink state") except Exception as e: self.logger.error(e) raise hilinkException(self._modemname, "Failed to get user login state")
#############Authentication required check end#################
[docs] def sessionErrorCheck(self, responseDict): """ This method will use to validate error responses """ self.logger.error(responseDict) # check for errors & if exist set as active error code if "error" in responseDict: try: self._activeErrorCode = int(responseDict["error"]["code"]) if self._activeErrorCode in self.errorCodes: self.logger.error(f"{self._activeErrorCode} -- {self.errorCodes[self._activeErrorCode]}") else: self.logger.error(f"Unidentified error code - {self._activeErrorCode}") ######################################################## ####### Try to recover identified / known errors ####### # Wrong session token if self._activeErrorCode == 125003: self.logger.info(f"Re-initializing webui due to wrong session token error") # re-initialize, validate and start self.initialize() self.validateSession() self.logger.info(f"Re-initialization of webui due to wrong session token error completed") ###### End of recovering identified / known errors ##### ######################################################## except Exception as e: self.logger.error("Error code extraction failed") self.logger.error(e)
[docs] def resetActiveErrorCode(self): """ This method will reset active error code """ self._activeErrorCode = 0
[docs] def login_b64_sha256(self, data): """ This method will used to SHA256 hashing and base64 encoding for WebUI version 10.x.x authentication in :meth:`~login_WebUI10`. :param data: Data to hash and encode :type data: string :return: Return hashed and encoded data string :rtype: string """ s256 = hashlib.sha256() s256.update(data.encode('utf-8')) dg = s256.digest() hs256 = binascii.hexlify(dg) return base64.urlsafe_b64encode(hs256).decode('utf-8', 'ignore')
[docs] def validateSession(self): """ This method will validate session * Check if a valid authenticated session * If authentication required will login :return: Return valid session or not :rtype: boolean """ # wait if in an operation while self._inOperation: time.sleep(0.5) #******fine tune the wait****** # update in an operation self._inOperation = True ##################################### ######### Login state check ######### response = self.httpGet("/api/user/state-login") stateLogin = xmltodict.parse(response.text) if "response" in stateLogin: self._loginState = True if int(stateLogin['response']['State']) == 0 else False self._passwordType = stateLogin['response']['password_type'] self._loginWaitTime = int(stateLogin['response']['remainwaittime']) if 'remainwaittime' in stateLogin['response'] else 0 # in response lockstatus=1 if locked # update active error code if has a login wait time if self._loginWaitTime > 0: self._activeErrorCode = 108007 else: self.logger.error("Invalid response while getting user login state") ###### Login state check end ######## ##################################### ###### Login if required ############ # In webui 10 self._loginState is -1 even login not enabled so hilink_state check is also required # print(f"Login required = {self._loginRequired} \t LoginState = {self._loginState} \t time = {datetime.now()}") if not self._loginState and self._loginRequired: # check for login wait time if self._loginWaitTime <= 0: # invalidate session self._validSession = False # check username and password have provided if self._username is not None and self._password is not None: # login for webui 17 & 21 if self._webuiversion in (17, 21): self.logger.debug(f"Login initiated fot WebUI version 17 & 21") self.logger.debug(f"Having session ID = {self._sessionId}") self.logger.debug(f"Having request verification token = {self._RequestVerificationToken}") # generate password value # base64encode(SHA256(name + base64encode(SHA256($('#password').val())) + g_requestVerificationToken[0])); passwd_string = f"{self._password}" s256 = hashlib.sha256() s256.update(passwd_string.encode('utf-8')) dg = s256.digest() hs256 = binascii.hexlify(dg) hassed_password = base64.urlsafe_b64encode(hs256).decode('utf-8', 'ignore') s2562 = hashlib.sha256() s2562.update(f"{self._username}{hassed_password}{self._RequestVerificationToken}".encode('utf-8')) dg2 = s2562.digest() hs2562 = binascii.hexlify(dg2) hashed_username_password = base64.urlsafe_b64encode(hs2562).decode('utf-8', 'ignore') xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <Username>{self._username}</Username> <Password>{hashed_username_password}</Password> <password_type>{self._passwordType}</password_type> </request> """.replace("b\'", "").replace("\'", "") # challenge headers headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # challenge_login response = self.httpPost("/api/user/login", xml_body, cookies=None, headers=headers) loginResponse = xmltodict.parse(response.text) # validate login & session if "response" in loginResponse: if loginResponse['response'] == "OK": self._validSession = True # reset if theres any active error self.resetActiveErrorCode() else: self.sessionErrorCheck(loginResponse) self.logger.error(f"Login failed -- {response.text}") else: self.logger.error(f"Login failed -- {response.text}") # validate login & session end # else webui 10 login as default else: # login to webui 10 self.logger.debug(f"Login initiated fot WebUI version 10") # grab the verification token self.logger.debug("Querying for token") response = self.httpGet("/api/webserver/token") tokenJson = xmltodict.parse(response.text) self.logger.debug(response.text) if "response" in tokenJson: _tmpToken = tokenJson['response']['token'] self._RequestVerificationToken = _tmpToken[len(_tmpToken) - 32:len(_tmpToken)] # log self.logger.debug(f"Having session ID = {self._sessionId}") self.logger.debug(f"Having request verification token = {self._RequestVerificationToken}") # generate password value password_value = self.login_b64_sha256(self._username + self.login_b64_sha256(self._password) + self._RequestVerificationToken) # challenge login client_nonce = uuid.uuid4().hex + uuid.uuid4().hex # generate request XML body xml_body = """ <?xml version="1.0" encoding="UTF-8"?> <request> <username>{}</username> <firstnonce>{}</firstnonce> <mode>1</mode> </request> """.format(self._username, client_nonce) # challenge headers headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # challenge_login response = self.httpPost("/api/user/challenge_login", xml_body, cookies=None, headers=headers) challangeDict = xmltodict.parse(response.text) self.logger.debug("Login challangeDict") # check for response if 'response' in challangeDict: salt = challangeDict['response']['salt'] server_nonce = challangeDict['response']['servernonce'] iterations = int(challangeDict['response']['iterations']) # authenticate login msg = "%s,%s,%s" % (client_nonce, server_nonce, server_nonce) salted_pass = hashlib.pbkdf2_hmac('sha256', bytearray(self._password.encode('utf-8')), bytearray.fromhex(salt), iterations) client_key = hmac.new(b'Client Key', msg=salted_pass, digestmod=hashlib.sha256) stored_key = hashlib.sha256() stored_key.update(client_key.digest()) signature = hmac.new(msg.encode('utf_8'), msg=stored_key.digest(), digestmod=hashlib.sha256) client_key_digest = client_key.digest() signature_digest = signature.digest() client_proof = bytearray() i = 0 while i < client_key.digest_size: val = ord(client_key_digest[i:i + 1]) ^ ord(signature_digest[i:i + 1]) client_proof.append(val) i = i + 1 HexClientProof = hexlify(client_proof) xml_body = """ <?xml version="1.0" encoding="UTF-8"?> <request> <clientproof>{}</clientproof> <finalnonce>{}</finalnonce> </request> """.format(HexClientProof, server_nonce).replace("b\'", "").replace("\'", "") # login headers headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } response = self.httpPost("/api/user/authentication_login", xml_body, cookies=None, headers=headers) loginResponse = xmltodict.parse(response.text) # validate login & session if "response" in loginResponse: self._validSession = True # reset if theres any active error self.resetActiveErrorCode() else: self.sessionErrorCheck(loginResponse) self.logger.error(f"Login failed -- {response.text}") else: self.sessionErrorCheck(challangeDict) self.logger.error("Invalid response for Login challageDict") else: self.logger.error("Username & password are mandatory") # login waittime is available have to wait else: self.logger.error(f"Login wait time is available {self._loginWaitTime} minutes") # login not required else: # validate session self._validSession = True ############################## # update in an operation self._inOperation = False # update session refreshed self._sessionRefreshed = True # return session validation return self._validSession
[docs] def run(self): """ This is the overriding method for :class:threading.Thread.run() * Check login state of the session * Perform login when required """ # init session refreshed self._lastSessionRefreshed = 0 # set default stopped into false self._isStopped = False #if webUI successfully initialized start the thread if self._isWebUIInitialized: # if not stop initialized while not self._stopped: if time.time() >= (self._lastSessionRefreshed + self.getSessionRefreshInteval()): # validate session self.validateSession() # reset last session refreshed self._lastSessionRefreshed = time.time() # 0.5 second delay in loop time.sleep(0.5) ####### Loop delay ########### # at the end of termination mark as stopped self._isStopped = True
#################################################### ###################### Query methods ############### ####################################################
[docs] def queryDeviceInfo(self): """ This method will query device information and update existing. If session need a refresh :meth:`~validateSession` before calling device information API end point. :return: Return querying device info succeed or not :rtype: boolean """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid query device info if self._validSession: try: ######### query device info ########## headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/api/device/information", headers=headers) deviceInfo = xmltodict.parse(response.text) if "response" in deviceInfo: self._deviceClassify = deviceInfo['response']['Classify'] self._deviceName = deviceInfo['response']['DeviceName'] self._workmode = deviceInfo['response']['workmode'] if "Imei" in deviceInfo['response']: self._imei = deviceInfo['response']['Imei'] if "SerialNumber" in deviceInfo['response']: self._serial = deviceInfo['response']['SerialNumber'] if "Imsi" in deviceInfo['response']: self._imsi = deviceInfo['response']['Imsi'] if "Iccid" in deviceInfo['response']: self._iccid = deviceInfo['response']['Iccid'] if "supportmode" in deviceInfo['response']: try: self._supportedModes = deviceInfo['response']['supportmode'].split("|") except: self._supportedModes = [] if "HardwareVersion" in deviceInfo['response']: self._hwversion = deviceInfo['response']['HardwareVersion'] if "SoftwareVersion" in deviceInfo['response']: self._swversion = deviceInfo['response']['SoftwareVersion'] if "WebUIVersion" in deviceInfo['response']: self._webui = deviceInfo['response']['WebUIVersion'] # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True else: self.sessionErrorCheck(deviceInfo) self._sessionRefreshed = False return False ####### query device info end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to get device info") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to get device info") return False
[docs] def querySupportedNetworkMethods(self): """ This method will query supported network modes If session need a refresh :meth:`~validateSession` before calling device information API end point. :return: Return querying supported network modes succeeded or not :rtype: boolean """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid query wan ip info if self._validSession: #### Query supported network modes #### headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/config/network/networkmode.xml", headers=headers) supportedNetModes = xmltodict.parse(response.text) print(response.text) # print(supportedNetModes) else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to get supported network modes") return False
[docs] def queryWANIP(self): """ This method will query WAN IP from the carrier network and update existing. If session need a refresh :meth:`~validateSession` before calling device information API end point. Separate API end points will be called as per the WebUI version. :return: Return querying WAN IP succeed or not :rtype: boolean """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid query wan ip info if self._validSession: # Make WAN IP None self._wanIP = None try: ######### query WAN IP info ########## headers = {'X-Requested-With':'XMLHttpRequest'} # API endpoint is defer relavant to webui version if self._webuiversion in (10, 21): wanIPAPIEndPoint = "/api/device/information" else: # webui version 17 wanIPAPIEndPoint = "/api/monitoring/status" response = self.httpGet(wanIPAPIEndPoint, headers=headers) wanIPInfo = xmltodict.parse(response.text) if "response" in wanIPInfo: if "WanIPAddress" in wanIPInfo['response']: self._wanIP = wanIPInfo['response']['WanIPAddress'] # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True else: self.sessionErrorCheck(wanIPInfo) self._sessionRefreshed = False return False ####### query WAN IP info end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to get WAN IP info") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to get WAN IP info") return False
[docs] def queryDataConnection(self): """ This method will query following data connection properties and update existing. * Data roaming enabled or disabled * Max connection idle timeout If session need a refresh :meth:`~validateSession` before calling device information API end point. :return: Return querying data connection properties succeed or not :rtype: boolean """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid query data connection info if self._validSession: try: headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/api/dialup/connection", headers=headers) dataConnectionInfo = xmltodict.parse(response.text) if "response" in dataConnectionInfo: self._roamingEnabled = True if int(dataConnectionInfo["response"]["RoamAutoConnectEnable"]) == 1 else False self._maxIdleTimeOut = int(dataConnectionInfo["response"]["MaxIdelTime"]) else: self.sessionErrorCheck(dataConnectionInfo) self._sessionRefreshed = False # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True ####### query data connection info end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to get data connection configuration info") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to get data connection configuration info") return False
[docs] def queryNetwork(self): """ This method will query network name of the carrier network and update existing. If session need a refresh :meth:`~validateSession` before calling device information API end point. :return: Return querying network succeed or not :rtype: boolean """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid query network info if self._validSession: try: headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/api/net/current-plmn", headers=headers) connectionInfo = xmltodict.parse(response.text) if "response" in connectionInfo: self._networkName = connectionInfo["response"]["FullName"] else: self.sessionErrorCheck(connectionInfo) self._sessionRefreshed = False # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True ####### query network info end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to get Network info") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to get Network info") return False
################################################### ################# Set methods #####################
[docs] def setNetwokModes(self, primary="LTE", secondary="WCDMA"): """ Set primary and secondary network modes respectively with :attr:`primary` and :attr:`secondary`. :param primary: Either "LTE","WCDMA" or "GSM" as primary network mode :param secondary: Either "LTE","WCDMA" or "GSM" as secondary network mode :type primary: String :type secondary: String :return: Return network mode configuration success or not :rtype: bool """ modes = {"LTE":3, "WCDMA":2, "GSM":1, "AUTO":0} # primary if primary in modes: self._netModePrimary = modes[primary] else: return False # secondary if secondary in modes: self._netModeSecondary = modes[secondary] else: return False # if both went fine return True as success return True
[docs] def setSessionRefreshInteval(self, interval): """ This method will set the session refresh interval while in idle without any operation. :param interval: Session refresh interval in seconds :type interval: int """ self._sessionRefreshInterval = interval
################################################### ################# Get methods ##################### ###################################################
[docs] def getLoginRequired(self): """ This method will return either login/authentication required or not which will be updated after calling :meth:`~initialize`. :return: Return login required or not :rtype: bool """ return self._loginRequired
[docs] def getWebUIVersion(self): """ This method will return WebUI version either *10*, *17* or *21*. :return: Return WebUI version :rtype: int """ return self._webuiversion
[docs] def getValidSession(self): """ This method will return if the session is valid for querying and operations or not :return: Return a valid session or not :rtype: boolean """ return self._validSession
[docs] def getSessionRefreshInteval(self): """ This method will return the session refresh interval while in idle without any operation. Use :meth:`~setSessionRefreshInteval` :return: Session refresh interval in seconds :rtype: int """ return self._sessionRefreshInterval
[docs] def getActiveError(self): """ This method will return if theres any active error code else none :return: Return {"errorcode":<code>,"error":"<error message>"} :rtype: dictionary """ error = None if self._activeErrorCode > 0: if self._activeErrorCode in self.errorCodes: error = { "errorcode":self._activeErrorCode, "error":str(self.errorCodes[self._activeErrorCode]) } else: error = { "errorcode":self._activeErrorCode, "error":"ERROR_NOT_IDENTIFIED" } ########### return error
[docs] def getKnownErrors(self): """ This method will return all known errors :return: Return {"<error code>":"<error message>"...} :rtype: dictionary """ return self.errorCodes
[docs] def getLoginWaitTime(self): """ This method will return waittime for next login attemp If session need a refresh :meth:`~validateSession` before calling device information API end point. :return: Login wait time :rtype: int """ return self._loginWaitTime
[docs] def getDeviceName(self): """ This method will return the device name *(model)* from API end point */api/device/information*. :return: Return device name :rtype: string """ return self._deviceName
[docs] def getDeviceInfo(self): """ This method will return following device info as a dictionary. These information have get update by calling :meth:`~queryDeviceInfo` (Required only one time as these are constants) #. *devicename* - Device name #. *serial* - Modem serial number #. *imei* - IMEI number of the modem #. *imsi* - IMSI number #. *iccid* - ICCID of the SIM #. *modes* - Supported network modes (LTE,WCDMA,GSM) #. *hwversion* - Hardware version of the modem #. *swversion* - Software version of the modem #. *webui* - WebUI version of the modem :return: Device information as a dictionary :rtype: dictionary """ return { "devicename":self._deviceName, "serial":self._serial, "imei":self._imei, "imsi":self._imsi, "iccid":self._iccid, "modes":self._supportedModes, "workmode":self._workmode, "hwversion":self._hwversion, "swversion":self._swversion, "webui":self._webui }
[docs] def getWANIP(self): """ This method will return the WAN IP. WAN IP can update by calling :meth:`~queryWANIP` and call this when after a possible WAN IP change like, #. Connection switch on/off event after calling :meth:`~switchConnection`. #. Switching between LTE and WCDMA even after calling :meth:`~switchLTE`. :return: Return WAN IP :rtype: string """ return self._wanIP
[docs] def getNetwork(self): """ This method will return the name of Carrier Network. Carrier Network name can update by calling :meth:`~queryNetwork` and required only to call one time after the first time after a possible WAN IP change like, #. Connection switch on/off event after calling :meth:`~switchConnection`. #. Switching between LTE and WCDMA even after calling :meth:`~switchLTE`. :return: Return network name :rtype: string """ return self._networkName
[docs] def getWorkmode(self): """ This method will return the work mode. Mandatory to update by device work mode by calling :meth:`~queryDeviceInfo` prior to call this method :return: Return network name :rtype: string """ return self._networkName
[docs] def getNetwokModes(self): """ Get primary and secondary network modes :return: {"primary":<<Primary networn mode>>,"secondary":<<Secondary networn mode>>} :rtype: dictionary """ modes = {3:"LTE", 2:"WCDMA", 1:"GSM", 0:"AUTO"} return {"primary":modes[self._netModePrimary], "secondary":modes[self._netModeSecondary]}
[docs] def getDataRoaming(self): """ This method will return either data roaming enabled or disabled. Mandatory to update by data configuration info by calling :meth:`~queryDataConnection` prior to call this method :return: Return data roaming enabled or not :rtype: bool """ return self._roamingEnabled
[docs] def getMaxIdleTime(self): """ This method will return max idle time out of the data connection. Mandatory to update by data configuration info by calling :meth:`~queryDataConnection` prior to call this method :return: Return max data connection idle time :rtype: int """ return self._maxIdleTimeOut
######################################### ######## Connection manage ############## #########################################
[docs] def switchConnection(self, status=True): """ Switch on or off data connection based on :attr:`status`. :param status: Either set status of the connection On or Off :type status: bool :return: Return either requested connection switching succeeded or failed :rtype: bool """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid start switching data connection if self._validSession: try: # data switch dataSwitch = "1" if status else "0" xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <dataswitch>{dataSwitch}</dataswitch> </request> """ headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # call switch self.logger.info(f"Switching data connection status = {dataSwitch}") response = self.httpPost("/api/dialup/mobile-dataswitch", xml_body, cookies=None, headers=headers) dataswitchInfo = xmltodict.parse(response.text) if "response" in dataswitchInfo: self.logger.info(f"Switched data connection status = {dataSwitch}") # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True else: self._sessionRefreshed = False self.sessionErrorCheck(dataswitchInfo) self.logger.error(f"Switching data connection to status = {dataSwitch} failed") # Return failed return False ####### switching data connection end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to switch data connection") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to switch connection") return False
[docs] def switchNetworMode(self, primary=True): """ Switch network between primary and secondary network modes based on :attr:`primary`. If :attr:`primary` is **True** network mode switch to the primary or else to the secondary. Primary network mode and secondary network mode can be set using :meth:`~setNetwokModes` :param primary: Primary network mode or secondary network mode :type primary: bool :return: Return either requested network mode switching succeeded or failed :rtype: bool """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid start switching network mode if self._validSession: try: # decide network mode # http://192.168.10.1/config/network/networkmode.xml # 0=auto 1=2G 2=3G 3=4G NetworkMode = self._netModePrimary if primary else self._netModeSecondary NetworkBand = "" LTEBand = "" # fetch LTE bands headers = {'X-Requested-With':'XMLHttpRequest'} response = self.httpGet("/api/net/net-mode", headers=headers) netModeInfo = xmltodict.parse(response.text) if "response" in netModeInfo: NetworkBand = netModeInfo['response']['NetworkBand'] LTEBand = netModeInfo['response']['LTEBand'] # proceed switching xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <NetworkMode>0{NetworkMode}</NetworkMode> <NetworkBand>{NetworkBand}</NetworkBand> <LTEBand>{LTEBand}</LTEBand> </request> """ headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } self.logger.info(f"Switching network mode = 0{NetworkMode} - NetworkBand={NetworkBand} LTEBand-{LTEBand}") response = self.httpPost("/api/net/net-mode", xml_body, cookies=None, headers=headers) netmodeSwitchInfo = xmltodict.parse(response.text) if "response" in netmodeSwitchInfo: self.logger.info(f"Switched network mode = 0{NetworkMode} - NetworkBand={NetworkBand} LTEBand-{LTEBand}") # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True else: self._sessionRefreshed = False self.sessionErrorCheck(netmodeSwitchInfo) self.logger.error(f"Switching network mode = 0{NetworkMode} failed") # Return failed return False # band configurations fetching failed else: self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to fetch network bands from /api/net/net-mode") return False ####### switching network mode end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to switch network mode") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to switch network mode") return False
[docs] def configureDataConnection(self, roaming=True, maxIdleTime=0): """ This method will configure data connection properties * Switch on or off data roaming based on :attr:`roaming`. * Set max idle time out for the data connection * Rest of configurations keep as defaults :param roaming: Either set data roaming enabled or disabled :param maxIdleTime: Maximum idle timeout for the data connection :type roaming: bool :type maxIdleTime: int, Default to 0 = Disabled :return: Return either data connection configuration succeeded or failed :rtype: bool """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid start configuring data connection if self._validSession: try: # roaming dataRoaming = "1" if roaming else "0" xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <RoamAutoConnectEnable>{dataRoaming}</RoamAutoConnectEnable> <MaxIdelTime>{maxIdleTime}</MaxIdelTime> <ConnectMode>0</ConnectMode> <MTU>1500</MTU> <auto_dial_switch>1</auto_dial_switch> <pdp_always_on>0</pdp_always_on> </request> """ headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # call api self.logger.info(f"Configuring data connection Roaming = {dataRoaming}, Max idle timeout = {maxIdleTime}(sec)") response = self.httpPost("/api/dialup/connection", xml_body, cookies=None, headers=headers) dataswitchInfo = xmltodict.parse(response.text) if "response" in dataswitchInfo: self.logger.info(f"Configured data connection Roaming = {dataRoaming}, Max idle timeout = {maxIdleTime}(sec)") # invalidate refresh self._sessionRefreshed = False # reset if theres any active error self.resetActiveErrorCode() # return success return True else: self._sessionRefreshed = False self.sessionErrorCheck(dataswitchInfo) self.logger.error(f"Configuring data connection Roaming = {dataRoaming}, Max idle timeout = {maxIdleTime}(sec)failed") # Return failed return False ####### configuring data connection end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to configure data connection") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to configure connection") return False
################################################ ########## Modem management #################### ################################################
[docs] def reboot(self): """ Reboot the modem. Return only reboot initiation success or not only as reboot doesnot returns any. :return: Return reboot initiation success or not :rtype: bool """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid start rebooting if self._validSession: try: xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <Control>1</Control> </request> """ headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # call reboot # Use requests.post as no return and trigger of request timeout error cookies = self.buildCookies() try: requests.post(f"{self._httpHost}/api/device/control", data=xml_body, cookies=cookies, headers=headers, timeout=3) except Exception as e: self.logger.debug(f"Reboot call exception found - {e}") #return return True ####### rebooting end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to initiating reboot") return False else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to initiating reboot") return False
[docs] def switchDHCPIPBlock(self, gateway): """ This methos will change DHCP IP block and gateway(modem) IP based on provided :attr:`gateway`. All existing connections will drop and probably a soft reboot will performed after calling this method. Use only gateway in 192.168.X.1 format (eg:- 192.168.2.1, 192.168.20.1). So the DHCP offering IP block will be 192.168.x.100-192.168.x.199. :param gateway: Gateway or IP of the modem :type gateway: string """ # if session is not refreshed validate and refresh session again if not self._sessionRefreshed: self.validateSession() # wait if in an operation while self._inOperation: time.sleep(0.5) # if session is valid start switching DHCP IP block if self._validSession: try: xml_body = f""" <?xml version="1.0" encoding="UTF-8"?> <request> <DhcpIPAddress>{gateway}</DhcpIPAddress> <DhcpLanNetmask>255.255.255.0</DhcpLanNetmask> <DhcpStatus>1</DhcpStatus> <DhcpStartIPAddress>{gateway}00</DhcpStartIPAddress> <DhcpEndIPAddress>{gateway}99</DhcpEndIPAddress> <DhcpLeaseTime>86400</DhcpLeaseTime> <DnsStatus>1</DnsStatus> <PrimaryDns>{gateway}</PrimaryDns> <SecondaryDns>{gateway}</SecondaryDns> </request> """ headers = { 'X-Requested-With':'XMLHttpRequest', '__RequestVerificationToken': self._RequestVerificationToken } # call switch self.logger.info(f"Gateway IP changing into {gateway}") self.httpPost("/api/dhcp/settings", xml_body, cookies=None, headers=headers) # No response & immediatly modem reboot with new IP block # Therefore need to stop self.stop() self.logger.info(f"Stop the thread due to gateway change") ####### switching DHCP IP block end ######## except Exception as e: # invalidate refresh self._sessionRefreshed = False self.logger.error(e) self.logger.error(f"{self._modemname} Failed to switch DHCP IP block") else: # invalidate refresh self._sessionRefreshed = False self.logger.error(f"{self._modemname} Failed to switch DHCP IP block")