Hello everyone! I’ve been trying to find security related bugs in the UBNT EdgeRouter web interface. I setup a proxy with ZAP and looked at the web requests and found a POST request being made to the URL https://192.168.1.1/api/edge/feature.json
with the string {"data":{"scenario":".Port_Forwarding","action":"load"}}
. I’m trying to run commix against the URL but I keep getting error 403 back. I’m testing this as the “operator” user which only has read level access and can’t change the load
parameter.
python commix.py --force-ssl --url="https://192.168.1.1/api/edge/feature.json" --data='{"data":{"scenario":".Port_Forwarding","action":"load"}}' --cookie="PHPSESSID=c1405d73a76242dcac20ec2a4c117d76; X-CSRF-TOKEN=9fbebb63d405dd77d8d9f648a440d3d4508d3ff5c6045d7a364beffbfeb0741f; ip_address_top_user_option=total_bytes; beaker.session.id=c1405d73a76242dcac20ec2a4c117d76" --host="192.168.1.1" --user-agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:61.0) Gecko/20100101 Firefox/61.0" --referer="https://192.168.1.1/"
Here is part of the source code for the UBNT EdgeRouter Interface.
import cgi
import json
import logging
import tempfile
from bottle import default_app, request
from . import api_command, dispatch_request, get_post_content, APIError
from ..utils import get_parent_structure, require_acl
logger = logging.getLogger("edge_api")
def register_urls(app, prefix):
# Authentication
app.post(prefix + "/auth.json", callback=auth,
csrf_exempt=True)
# Config API
app.get(prefix + "/get.json", callback=get)
app.get(prefix + "/partial.json", callback=partial)
app.get(prefix + "/data.json", callback=data)
app.post(prefix + "/set.json", callback=set_)
app.post(prefix + "/delete.json", callback=delete)
app.post(prefix + "/batch.json", callback=batch)
app.get(prefix + "/heartbeat.json", callback=heartbeat)
# Misc device operations
app.post(prefix + "/operation/get-support-file.json", callback=get_support_file)
app.post(prefix + "/operation/<op>.json", callback=operation)
# Config backup/restore
app.get(prefix + "/config/save.json", callback=config_save)
app.post(prefix + "/config/restore.json", callback=config_restore,
csrf_token_in_query=True)
# Device upgrade
app.post(prefix + "/upgrade.json", callback=upgrade,
csrf_token_in_query=True)
# Wizard
app.post(prefix + "/setup.json", callback=wizard_setup)
app.post(prefix + "/feature.json", callback=wizard_feature)
# Config tree
app.get(prefix + "/getcfg.json", callback=getcfg)
# ONUs
app.post(prefix + "/onu/upgrade.json", callback=onu_upgrade)
app.post(prefix + "/onu/reboot.json", callback=onu_reboot)
@require_acl(admin_only=False)
@api_command
def get():
operations = {
'GET': {
'firewall': "",
'interfaces': "",
'layer2': "",
'service': "",
'system': "",
'vpn': "",
'protocols': "",
'traffic-control': "",
'onu-list': "",
'onu-profiles': "",
'onu-policies': "",
}
}
return operations
@require_acl
@api_command
def set_():
operations = {
'SET': get_post_content()
}
operations['GET'] = get_parent_structure(operations['SET'])
return operations
@require_acl
@api_command
def delete():
operations = {
'DELETE': get_post_content()
}
operations['GET'] = get_parent_structure(operations['DELETE'])
return operations
@require_acl
@api_command
def batch():
operations = get_post_content()
if not operations.get("GET"):
operations['GET'] = get_parent_structure(operations.get("SET")) \
if operations.get("SET") else {}
if operations.get('DELETE'):
operations['GET'].update(get_parent_structure(operations['DELETE']))
return operations
@require_acl
@api_command
def partial(struct=None):
return dict(GET=json.loads(struct or request.GET.get("struct", "{}")))
@require_acl
@api_command
def getcfg():
return dict(GETCFG=request.GET.getall("node[]"))
@require_acl(authenticated_only=False, admin_only=False)
@api_command
def auth(username=None, password=None):
username = username or request.POST.get("username", "")
password = password or request.POST.get("password", "")
return dict(AUTH=dict(username=username, password=password))
@require_acl(admin_only=False)
@api_command
def data():
return dict(GETDATA=request.GET.get("data"))
@require_acl(admin_only=False)
def heartbeat():
response = dispatch_request(PING=None)
return dict(PING=bool(response['success']), SESSION=True)
@require_acl
@api_command
def operation(op):
return dict(OPERATION=dict(op=op))
@require_acl
def get_support_file():
response = dispatch_request(OPERATION=dict(op="get-support-file"))
session = request.environ['beaker.session']
session['support_file'] = response['OPERATION']['path']
session.save()
return response
@require_acl
def config_save():
response = dispatch_request(CONFIG=dict(action="save"))
session = request.environ['beaker.session']
session['config_backup_file'] = response['CONFIG']['path']
session.save()
return response
@require_acl
def config_restore():
try:
qqfile = request.files.get("qqfile")
if not qqfile:
raise APIError("Missing file upload.")
path = tempfile.mktemp()
qqfile.save(path)
finally:
del request['_cgi.FieldStorage']
request.body.close()
response = dispatch_request(CONFIG=dict(action="restore", path=path))
error = response.get('CONFIG', {}).get('error')
if error:
raise APIError(error)
return response
def upgrade_handle_upload():
app = default_app()
try:
app.config['apply_upload_patches'] = True
app.config['delete_uploaded_tempfile'] = False
qqfile = request.files.get("qqfile")
if not qqfile:
raise APIError("Missing file upload.")
path = qqfile.file.name
return dict(path=path, showChangelog=True)
finally:
del request['_cgi.FieldStorage']
request.body.close()
app.config['apply_upload_patches'] = False
app.config['delete_uploaded_tempfile'] = True
@require_acl
def upgrade():
action = request.GET.get("action")
session = request.environ['beaker.session']
if action:
payload = dict(action=action, path=session['upgrade_file'])
else:
payload = upgrade_handle_upload()
response = dispatch_request(UPGRADE=payload)
error = response.get('UPGRADE', {}).get('error')
if error:
raise APIError(error)
elif not action:
# save path to session only if file was posted and it did not fail
session['upgrade_file'] = payload['path']
session.save()
if response.get("UPGRADE", {}).get("changelog"):
escaped = cgi.escape(response['UPGRADE']['changelog'], quote=True)
response['UPGRADE']['changelog'] = escaped
return response
@require_acl
@api_command
def wizard_setup():
post_content = get_post_content()
return dict(SETUP=post_content.get("data"))
def check_wizard_feature_admin_only(session):
"""
Check if the called action should be performed as admin_only.
Currently operator has only the "load" action allowed.
"""
if session['level'] == "admin":
return False
else:
post_content = get_post_content()
if post_content.get('data', {}).get('action', "") == "load":
return False
return True
@require_acl(admin_only=check_wizard_feature_admin_only)
@api_command
def wizard_feature():
post_content = get_post_content()
return dict(FEATURE=post_content.get("data"))
@require_acl
@api_command
def onu_upgrade():
post_content = get_post_content()
onu_list = json.loads(post_content.get("onu_list", "[]"))
try:
qqfile = request.files.get("qqfile")
if not qqfile:
raise APIError("Missing file upload.")
path = tempfile.mktemp()
logger.info("Saving ONU upgrade image to %s", path)
qqfile.save(path)
finally:
del request['_cgi.FieldStorage']
request.body.close()
return dict(UPGRADE_ONU=dict(path=path,
onu_list=onu_list))
@require_acl
@api_command
def onu_reboot():
return dict(REBOOT_ONU=dict(get_post_content()))