Testing the UBNT EdgeRouter Web Interface

Hello everyone! I’ve been trying to find security related bugs in the UBNT EdgeRouter web interface. I setup a proxy with ZAP and looked at the web requests and found a POST request being made to the URL with the string {"data":{"scenario":".Port_Forwarding","action":"load"}}. I’m trying to run commix against the URL but I keep getting error 403 back. I’m testing this as the “operator” user which only has read level access and can’t change the load parameter.

python commix.py --force-ssl --url="" --data='{"data":{"scenario":".Port_Forwarding","action":"load"}}' --cookie="PHPSESSID=c1405d73a76242dcac20ec2a4c117d76; X-CSRF-TOKEN=9fbebb63d405dd77d8d9f648a440d3d4508d3ff5c6045d7a364beffbfeb0741f; ip_address_top_user_option=total_bytes; beaker.session.id=c1405d73a76242dcac20ec2a4c117d76" --host="" --user-agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:61.0) Gecko/20100101 Firefox/61.0" --referer=""

Here is part of the source code for the UBNT EdgeRouter Interface.

import cgi
import json
import logging
import tempfile

from bottle import default_app, request

from . import api_command, dispatch_request, get_post_content, APIError
from ..utils import get_parent_structure, require_acl

logger = logging.getLogger("edge_api")

def register_urls(app, prefix):
    # Authentication
    app.post(prefix + "/auth.json", callback=auth,
    # Config API
    app.get(prefix + "/get.json", callback=get)
    app.get(prefix + "/partial.json", callback=partial)
    app.get(prefix + "/data.json", callback=data)
    app.post(prefix + "/set.json", callback=set_)
    app.post(prefix + "/delete.json", callback=delete)
    app.post(prefix + "/batch.json", callback=batch)
    app.get(prefix + "/heartbeat.json", callback=heartbeat)
    # Misc device operations
    app.post(prefix + "/operation/get-support-file.json", callback=get_support_file)
    app.post(prefix + "/operation/<op>.json", callback=operation)
    # Config backup/restore
    app.get(prefix + "/config/save.json", callback=config_save)
    app.post(prefix + "/config/restore.json", callback=config_restore,
    # Device upgrade
    app.post(prefix + "/upgrade.json", callback=upgrade,
    # Wizard
    app.post(prefix + "/setup.json", callback=wizard_setup)
    app.post(prefix + "/feature.json", callback=wizard_feature)
    # Config tree
    app.get(prefix + "/getcfg.json", callback=getcfg)
    # ONUs
    app.post(prefix + "/onu/upgrade.json", callback=onu_upgrade)
    app.post(prefix + "/onu/reboot.json", callback=onu_reboot)

def get():
    operations = {
        'GET': {
            'firewall': "",
            'interfaces': "",
            'layer2': "",
            'service': "",
            'system': "",
            'vpn': "",
            'protocols': "",
            'traffic-control': "",
            'onu-list': "",
            'onu-profiles': "",
            'onu-policies': "",
    return operations

def set_():
    operations = {
        'SET': get_post_content()
    operations['GET'] = get_parent_structure(operations['SET'])
    return operations

def delete():
    operations = {
        'DELETE': get_post_content()
    operations['GET'] = get_parent_structure(operations['DELETE'])
    return operations

def batch():
    operations = get_post_content()
    if not operations.get("GET"):
        operations['GET'] = get_parent_structure(operations.get("SET")) \
            if operations.get("SET") else {}
        if operations.get('DELETE'):
    return operations

def partial(struct=None):
    return dict(GET=json.loads(struct or request.GET.get("struct", "{}")))

def getcfg():
    return dict(GETCFG=request.GET.getall("node[]"))

@require_acl(authenticated_only=False, admin_only=False)
def auth(username=None, password=None):
    username = username or request.POST.get("username", "")
    password = password or request.POST.get("password", "")
    return dict(AUTH=dict(username=username, password=password))

def data():
    return dict(GETDATA=request.GET.get("data"))

def heartbeat():
    response = dispatch_request(PING=None)
    return dict(PING=bool(response['success']), SESSION=True)

def operation(op):
    return dict(OPERATION=dict(op=op))

def get_support_file():
    response = dispatch_request(OPERATION=dict(op="get-support-file"))
    session = request.environ['beaker.session']
    session['support_file'] = response['OPERATION']['path']
    return response

def config_save():
    response = dispatch_request(CONFIG=dict(action="save"))
    session = request.environ['beaker.session']
    session['config_backup_file'] = response['CONFIG']['path']
    return response

def config_restore():
        qqfile = request.files.get("qqfile")
        if not qqfile:
            raise APIError("Missing file upload.")
        path = tempfile.mktemp()
        del request['_cgi.FieldStorage']

    response = dispatch_request(CONFIG=dict(action="restore", path=path))
    error = response.get('CONFIG', {}).get('error')
    if error:
        raise APIError(error)

    return response

def upgrade_handle_upload():
    app = default_app()
        app.config['apply_upload_patches'] = True
        app.config['delete_uploaded_tempfile'] = False
        qqfile = request.files.get("qqfile")
        if not qqfile:
            raise APIError("Missing file upload.")
        path = qqfile.file.name
        return dict(path=path, showChangelog=True)
        del request['_cgi.FieldStorage']
        app.config['apply_upload_patches'] = False
        app.config['delete_uploaded_tempfile'] = True

def upgrade():
    action = request.GET.get("action")
    session = request.environ['beaker.session']
    if action:
        payload = dict(action=action, path=session['upgrade_file'])
        payload = upgrade_handle_upload()
    response = dispatch_request(UPGRADE=payload)
    error = response.get('UPGRADE', {}).get('error')
    if error:
        raise APIError(error)
    elif not action:
        # save path to session only if file was posted and it did not fail
        session['upgrade_file'] = payload['path']
    if response.get("UPGRADE", {}).get("changelog"):
        escaped = cgi.escape(response['UPGRADE']['changelog'], quote=True)
        response['UPGRADE']['changelog'] = escaped
    return response

def wizard_setup():
    post_content = get_post_content()
    return dict(SETUP=post_content.get("data"))

def check_wizard_feature_admin_only(session):
    Check if the called action should be performed as admin_only.
    Currently operator has only the "load" action allowed.
    if session['level'] == "admin":
        return False
        post_content = get_post_content()
        if post_content.get('data', {}).get('action', "") == "load":
            return False
    return True

def wizard_feature():
    post_content = get_post_content()
    return dict(FEATURE=post_content.get("data"))

def onu_upgrade():
    post_content = get_post_content()
    onu_list = json.loads(post_content.get("onu_list", "[]"))

        qqfile = request.files.get("qqfile")
        if not qqfile:
            raise APIError("Missing file upload.")
        path = tempfile.mktemp()
        logger.info("Saving ONU upgrade image to %s", path)
        del request['_cgi.FieldStorage']

    return dict(UPGRADE_ONU=dict(path=path,

def onu_reboot():
    return dict(REBOOT_ONU=dict(get_post_content()))

Hi @Greg64,
I haven’t used that tool you are using, but I guess the 403 error is caused because you are sending an unauthenticated request or something like that. It looks like you added the cookies to the request though. Maybe you have to tell your tool the Content-Type of the request? I guess it has to be application/json, which may not be the Content-Type by default.