179 lines
4.9 KiB
Python
179 lines
4.9 KiB
Python
import os
|
|
import re
|
|
from random import choice, choices, randrange
|
|
from urllib.parse import urlparse
|
|
|
|
from locust import HttpLocust, TaskSequence, seq_task
|
|
|
|
from pyquery import PyQuery as pq
|
|
|
|
# Provide username/password for basic auth
|
|
USERNAME = os.getenv("ATAT_BA_USERNAME", "")
|
|
PASSWORD = os.getenv("ATAT_BA_PASSWORD", "")
|
|
|
|
# Ability to disable SSL verification for bad cert situations
|
|
DISABLE_VERIFY = os.getenv("DISABLE_VERIFY", "true").lower() == "true"
|
|
|
|
# Alpha numerics for random entity names
|
|
LETTERS = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890" # pragma: allowlist secret
|
|
|
|
NEW_PORTFOLIO_CHANCE = 10
|
|
NEW_APPLICATION_CHANCE = 10
|
|
|
|
|
|
def login(l):
|
|
l.client.get("/login-dev", auth=(USERNAME, PASSWORD))
|
|
|
|
|
|
def logout(l):
|
|
l.client.get("/logout")
|
|
|
|
|
|
def get_csrf_token(response):
|
|
d = pq(response.text)
|
|
return d("#csrf_token").val()
|
|
|
|
|
|
def extract_id(path):
|
|
entity_id_matcher = re.compile(
|
|
".*\/?(?:portfolios|applications)\/([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}).*",
|
|
re.I,
|
|
)
|
|
|
|
entity_id_match = entity_id_matcher.match(path)
|
|
|
|
assert entity_id_match is not None, f"Could not find id in {path}"
|
|
if entity_id_match:
|
|
return entity_id_match.group(1)
|
|
|
|
|
|
def get_portfolios(l):
|
|
response = l.client.get("/home")
|
|
d = pq(response.text)
|
|
portfolio_links = [p.attr("href") for p in d(".sidenav__link").items()]
|
|
force_new_portfolio = randrange(0, 100) < NEW_PORTFOLIO_CHANCE
|
|
if len(portfolio_links) == 0 or force_new_portfolio:
|
|
portfolio_links += [create_portfolio(l)]
|
|
|
|
l.locust.portfolio_links = portfolio_links
|
|
|
|
|
|
def get_portfolio(l):
|
|
portfolio_link = choice(l.locust.portfolio_links)
|
|
response = l.client.get(portfolio_link)
|
|
d = pq(response.text)
|
|
application_links = [
|
|
p.attr("href")
|
|
for p in d(".portfolio-applications .accordion__header-text a").items()
|
|
]
|
|
if len(application_links) > 0:
|
|
portfolio_id = extract_id(portfolio_link)
|
|
update_app_registry(l, portfolio_id, application_links)
|
|
|
|
|
|
def update_app_registry(l, portfolio_id, app_links):
|
|
if not hasattr(l.locust, "app_links"):
|
|
l.locust.app_links = {}
|
|
l.locust.app_links[portfolio_id] = app_links
|
|
|
|
|
|
def get_app(l):
|
|
app_link = pick_app(l)
|
|
force_new_app = randrange(0, 100) < NEW_APPLICATION_CHANCE
|
|
if app_link is not None and not force_new_app:
|
|
l.client.get(app_link)
|
|
else:
|
|
portfolio_id = extract_id(choice(l.locust.portfolio_links))
|
|
update_app_registry(l, portfolio_id, [create_new_app(l, portfolio_id)])
|
|
|
|
|
|
def pick_app(l):
|
|
if hasattr(l.locust, "app_links") and len(l.locust.app_links.items()) > 0:
|
|
return choice(choice(list(l.locust.app_links.values())))
|
|
|
|
|
|
def create_new_app(l, portfolio_id):
|
|
create_app_url = f"/portfolios/{portfolio_id}/applications/new"
|
|
new_app_form = l.client.get(create_app_url)
|
|
|
|
create_app_body = {
|
|
"name": f"Load Test Created - {''.join(choices(LETTERS, k=5))}",
|
|
"description": "Description",
|
|
"csrf_token": get_csrf_token(new_app_form),
|
|
}
|
|
|
|
create_app_response = l.client.post(
|
|
create_app_url,
|
|
create_app_body,
|
|
headers={"Referer": l.parent.host + create_app_url},
|
|
)
|
|
application_id = extract_id(create_app_response.url)
|
|
|
|
create_environments_body = {
|
|
"environment_names-0": "Development",
|
|
"environment_names-1": "Testing",
|
|
"environment_names-2": "Staging",
|
|
"environment_names-3": "Production",
|
|
"csrf_token": get_csrf_token(create_app_response),
|
|
}
|
|
|
|
create_environments_url = f"/applications/{application_id}/new/step_2"
|
|
|
|
l.client.post(
|
|
create_environments_url + f"?portfolio_id={portfolio_id}",
|
|
create_environments_body,
|
|
headers={"Referer": l.parent.host + create_environments_url},
|
|
)
|
|
|
|
return f"/applications/{application_id}/settings"
|
|
|
|
|
|
def create_portfolio(l):
|
|
new_portfolio_form = l.client.get("/portfolios/new")
|
|
new_portfolio_body = {
|
|
"name": f"Load Test Created - {''.join(choices(LETTERS, k=5))}",
|
|
"defense_component": "army",
|
|
"description": "Test",
|
|
"csrf_token": get_csrf_token(new_portfolio_form),
|
|
}
|
|
|
|
response = l.client.post(
|
|
"/portfolios",
|
|
new_portfolio_body,
|
|
headers={"Referer": l.parent.host + "/portfolios"},
|
|
)
|
|
|
|
return urlparse(response.url).path
|
|
|
|
|
|
class UserBehavior(TaskSequence):
|
|
def on_start(self):
|
|
self.client.verify = not DISABLE_VERIFY
|
|
login(self)
|
|
|
|
@seq_task(1)
|
|
def portfolios(l):
|
|
get_portfolios(l)
|
|
|
|
@seq_task(2)
|
|
def pick_a_portfolio(l):
|
|
get_portfolio(l)
|
|
|
|
@seq_task(3)
|
|
def pick_an_app(l):
|
|
get_app(l)
|
|
|
|
def on_stop(self):
|
|
logout(self)
|
|
|
|
|
|
class WebsiteUser(HttpLocust):
|
|
task_set = UserBehavior
|
|
min_wait = 3000
|
|
max_wait = 9000
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# if run as the main file, will spin up a single locust
|
|
WebsiteUser().run()
|