403 lines
13 KiB
Python
Raw Normal View History

import datetime
import json
from abc import ABC
from typing import Any, Callable, Generator, List
import numpy as np
import pytest
from ksuid import KsuidMs
from fixtures import types
from fixtures.fingerprint import LogsOrTracesFingerprint
class LogsResource(ABC):
labels: str
fingerprint: str
seen_at_ts_bucket_start: np.int64
def __init__(
self,
labels: dict[str, str],
fingerprint: str,
seen_at_ts_bucket_start: np.int64,
) -> None:
self.labels = json.dumps(labels, separators=(',', ':')) # clickhouse treats {"a": "b"} differently from {"a":"b"}. In the first case it is not able to run json functions
self.fingerprint = fingerprint
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
def np_arr(self) -> np.array:
return np.array([self.labels, self.fingerprint, self.seen_at_ts_bucket_start])
class LogsResourceOrAttributeKeys(ABC):
name: str
datatype: str
def __init__(self, name: str, datatype: str) -> None:
self.name = name
self.datatype = datatype
def np_arr(self) -> np.array:
return np.array([self.name, self.datatype])
class LogsTagAttributes(ABC):
unix_milli: np.int64
tag_key: str
tag_type: str
tag_data_type: str
string_value: str
number_value: np.float64
def __init__(
self,
timestamp: datetime.datetime,
tag_key: str,
tag_type: str,
tag_data_type: str,
string_value: str,
number_value: np.float64,
) -> None:
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
self.tag_key = tag_key
self.tag_type = tag_type
self.tag_data_type = tag_data_type
self.string_value = string_value or ""
self.number_value = number_value
def np_arr(self) -> np.array:
return np.array(
[
self.unix_milli,
self.tag_key,
self.tag_type,
self.tag_data_type,
self.string_value,
self.number_value,
]
)
class Logs(ABC):
ts_bucket_start: np.uint64
resource_fingerprint: str
timestamp: np.uint64
observed_timestamp: np.uint64
id: str
trace_id: str
span_id: str
trace_flags: np.uint32
severity_text: str
severity_number: np.uint8
body: str
attributes_string: dict[str, str]
attributes_number: dict[str, np.float64]
attributes_bool: dict[str, bool]
resources_string: dict[str, str]
scope_name: str
scope_version: str
scope_string: dict[str, str]
resource: List[LogsResource]
tag_attributes: List[LogsTagAttributes]
resource_keys: List[LogsResourceOrAttributeKeys]
attribute_keys: List[LogsResourceOrAttributeKeys]
def __init__(
self,
timestamp: datetime.datetime = datetime.datetime.now(),
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
trace_flags: np.uint32 = 0,
scope_name: str = "",
scope_version: str = "",
scope_attributes: dict[str, str] = {},
) -> None:
self.tag_attributes = []
self.attribute_keys = []
self.resource_keys = []
# Convert timestamp to uint64 nanoseconds
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
self.observed_timestamp = self.timestamp
# Calculate ts_bucket_start (30mins bucket)
# Round down to nearest 30-minute interval
minute = timestamp.minute
if minute < 30:
bucket_minute = 0
else:
bucket_minute = 30
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
# Generate ksuid by using the timestamp
self.id = str(KsuidMs(datetime=timestamp))
# Initialize trace fields
self.trace_id = trace_id
self.span_id = span_id
self.trace_flags = trace_flags
# Set severity fields
self.severity_text = severity_text
self.severity_number = self._get_severity_number(severity_text)
# Set body
self.body = body
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
for k, v in self.resources_string.items():
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="resource",
tag_data_type="string",
string_value=v,
number_value=None,
)
)
self.resource_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="string")
)
# Calculate resource fingerprint
self.resource_fingerprint = LogsOrTracesFingerprint(
self.resources_string
).calculate()
# Process attributes by type
self.attributes_string = {}
self.attributes_number = {}
self.attributes_bool = {}
for k, v in attributes.items():
if isinstance(v, bool):
self.attributes_bool[k] = v
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="bool",
string_value=None,
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="bool")
)
elif isinstance(v, (int, float)):
self.attributes_number[k] = np.float64(v)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="float64",
string_value=None,
number_value=np.float64(v),
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="float64")
)
else:
self.attributes_string[k] = str(v)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="tag",
tag_data_type="string",
string_value=str(v),
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name=k, datatype="string")
)
# Initialize scope fields
self.scope_name = scope_name
self.scope_version = scope_version
self.scope_string = {k: str(v) for k, v in scope_attributes.items()}
for k, v in self.scope_string.items():
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="scope",
tag_data_type="string",
string_value=v,
number_value=None,
)
)
self.resource = []
self.resource.append(
LogsResource(
labels=self.resources_string,
fingerprint=self.resource_fingerprint,
seen_at_ts_bucket_start=self.ts_bucket_start,
)
)
# Log fields (severity)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key="severity_text",
tag_type="logfield",
tag_data_type="string",
string_value=self.severity_text,
number_value=None,
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name="severity_text", datatype="string")
)
self.tag_attributes.append(
LogsTagAttributes(
timestamp=timestamp,
tag_key="severity_number",
tag_type="logfield",
tag_data_type="float64",
string_value=None,
number_value=float(self.severity_number),
)
)
self.attribute_keys.append(
LogsResourceOrAttributeKeys(name="severity_number", datatype="float64")
)
def _get_severity_number(self, severity_text: str) -> np.uint8:
"""Convert severity text to numeric value"""
severity_map = {
"TRACE": 1,
"DEBUG": 5,
"INFO": 9,
"WARN": 13,
"ERROR": 17,
"FATAL": 21,
}
return np.uint8(severity_map.get(severity_text.upper(), 9)) # Default to INFO
def np_arr(self) -> np.array:
"""Return log data as numpy array for database insertion"""
return np.array(
[
self.ts_bucket_start,
self.resource_fingerprint,
self.timestamp,
self.observed_timestamp,
self.id,
self.trace_id,
self.span_id,
self.trace_flags,
self.severity_text,
self.severity_number,
self.body,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
self.resources_string,
self.scope_name,
self.scope_version,
self.scope_string,
]
)
@pytest.fixture(name="insert_logs", scope="function")
def insert_logs(
clickhouse: types.TestContainerClickhouse,
) -> Generator[Callable[[List[Logs]], None], Any, None]:
def _insert_logs(logs: List[Logs]) -> None:
"""
Insert logs into ClickHouse tables following the same logic as the Go exporter.
This function handles insertion into multiple tables:
- distributed_logs_v2 (main logs table)
- distributed_logs_v2_resource (resource fingerprints)
- distributed_tag_attributes_v2 (tag attributes)
- distributed_logs_attribute_keys (attribute keys)
- distributed_logs_resource_keys (resource keys)
"""
resources: List[LogsResource] = []
for log in logs:
resources.extend(log.resource)
if len(resources) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2_resource",
data=[resource.np_arr() for resource in resources],
)
tag_attributes: List[LogsTagAttributes] = []
for log in logs:
tag_attributes.extend(log.tag_attributes)
if len(tag_attributes) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_tag_attributes_v2",
data=[tag_attribute.np_arr() for tag_attribute in tag_attributes],
)
attribute_keys: List[LogsResourceOrAttributeKeys] = []
for log in logs:
attribute_keys.extend(log.attribute_keys)
if len(attribute_keys) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_attribute_keys",
data=[attribute_key.np_arr() for attribute_key in attribute_keys],
)
resource_keys: List[LogsResourceOrAttributeKeys] = []
for log in logs:
resource_keys.extend(log.resource_keys)
if len(resource_keys) > 0:
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_resource_keys",
data=[resource_key.np_arr() for resource_key in resource_keys],
)
clickhouse.conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=[log.np_arr() for log in logs],
)
yield _insert_logs
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.tag_attributes_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_attribute_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_logs.logs_resource_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)