2025-04-16 18:54:05 +05:30
|
|
|
from http import HTTPStatus
|
2025-07-29 14:44:16 +05:30
|
|
|
from typing import Callable
|
2025-04-16 18:54:05 +05:30
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
import requests
|
|
|
|
|
|
2025-07-29 14:44:16 +05:30
|
|
|
from fixtures import dev, types
|
2025-04-16 18:54:05 +05:30
|
|
|
|
2025-07-29 14:44:16 +05:30
|
|
|
USER_ADMIN_NAME = "admin"
|
|
|
|
|
USER_ADMIN_EMAIL = "admin@integration.test"
|
|
|
|
|
USER_ADMIN_PASSWORD = "password"
|
2025-04-16 18:54:05 +05:30
|
|
|
|
2025-07-29 14:44:16 +05:30
|
|
|
|
|
|
|
|
@pytest.fixture(name="create_user_admin", scope="package")
|
|
|
|
|
def create_user_admin(
|
|
|
|
|
signoz: types.SigNoz, request: pytest.FixtureRequest, pytestconfig: pytest.Config
|
|
|
|
|
) -> types.Operation:
|
|
|
|
|
def create() -> None:
|
2025-04-16 18:54:05 +05:30
|
|
|
response = requests.post(
|
2025-07-29 14:44:16 +05:30
|
|
|
signoz.self.host_configs["8080"].get("/api/v1/register"),
|
2025-04-16 18:54:05 +05:30
|
|
|
json={
|
2025-07-29 14:44:16 +05:30
|
|
|
"name": USER_ADMIN_NAME,
|
2025-04-16 18:54:05 +05:30
|
|
|
"orgId": "",
|
|
|
|
|
"orgName": "",
|
2025-07-29 14:44:16 +05:30
|
|
|
"email": USER_ADMIN_EMAIL,
|
|
|
|
|
"password": USER_ADMIN_PASSWORD,
|
2025-04-16 18:54:05 +05:30
|
|
|
},
|
|
|
|
|
timeout=5,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
|
2025-07-29 14:44:16 +05:30
|
|
|
return types.Operation(name="create_user_admin")
|
|
|
|
|
|
|
|
|
|
def delete(_: types.Operation) -> None:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def restore(cache: dict) -> types.Operation:
|
|
|
|
|
return types.Operation(name=cache["name"])
|
|
|
|
|
|
|
|
|
|
return dev.wrap(
|
|
|
|
|
request,
|
|
|
|
|
pytestconfig,
|
|
|
|
|
"create_user_admin",
|
|
|
|
|
lambda: types.Operation(name=""),
|
|
|
|
|
create,
|
|
|
|
|
delete,
|
|
|
|
|
restore,
|
|
|
|
|
)
|
2025-04-16 18:54:05 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(name="get_jwt_token", scope="module")
|
2025-07-29 14:44:16 +05:30
|
|
|
def get_jwt_token(signoz: types.SigNoz) -> Callable[[str, str], str]:
|
2025-04-16 18:54:05 +05:30
|
|
|
def _get_jwt_token(email: str, password: str) -> str:
|
|
|
|
|
response = requests.post(
|
2025-07-29 14:44:16 +05:30
|
|
|
signoz.self.host_configs["8080"].get("/api/v1/login"),
|
2025-04-16 18:54:05 +05:30
|
|
|
json={
|
|
|
|
|
"email": email,
|
|
|
|
|
"password": password,
|
|
|
|
|
},
|
|
|
|
|
timeout=5,
|
|
|
|
|
)
|
|
|
|
|
assert response.status_code == HTTPStatus.OK
|
|
|
|
|
|
2025-05-14 23:12:55 +05:30
|
|
|
return response.json()["data"]["accessJwt"]
|
2025-04-16 18:54:05 +05:30
|
|
|
|
|
|
|
|
return _get_jwt_token
|