2025-07-29 14:44:16 +05:30
|
|
|
import datetime
|
|
|
|
|
import json
|
|
|
|
|
from abc import ABC
|
|
|
|
|
from typing import Any, Callable, Generator, List
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pytest
|
|
|
|
|
from ksuid import KsuidMs
|
|
|
|
|
|
|
|
|
|
from fixtures import types
|
|
|
|
|
from fixtures.fingerprint import LogsOrTracesFingerprint
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LogsResource(ABC):
|
|
|
|
|
labels: str
|
|
|
|
|
fingerprint: str
|
|
|
|
|
seen_at_ts_bucket_start: np.int64
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
labels: dict[str, str],
|
|
|
|
|
fingerprint: str,
|
|
|
|
|
seen_at_ts_bucket_start: np.int64,
|
|
|
|
|
) -> None:
|
2025-07-31 15:42:34 +05:30
|
|
|
self.labels = json.dumps(
|
|
|
|
|
labels, separators=(",", ":")
|
|
|
|
|
) # clickhouse treats {"a": "b"} differently from {"a":"b"}. In the first case it is not able to run json functions
|
2025-07-29 14:44:16 +05:30
|
|
|
self.fingerprint = fingerprint
|
|
|
|
|
self.seen_at_ts_bucket_start = seen_at_ts_bucket_start
|
|
|
|
|
|
|
|
|
|
def np_arr(self) -> np.array:
|
2025-09-24 14:54:43 +05:30
|
|
|
return np.array([self.labels, self.fingerprint, self.seen_at_ts_bucket_start, np.uint64(10),np.uint64(15)])
|
2025-07-29 14:44:16 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
class LogsResourceOrAttributeKeys(ABC):
|
|
|
|
|
name: str
|
|
|
|
|
datatype: str
|
|
|
|
|
|
|
|
|
|
def __init__(self, name: str, datatype: str) -> None:
|
|
|
|
|
self.name = name
|
|
|
|
|
self.datatype = datatype
|
|
|
|
|
|
|
|
|
|
def np_arr(self) -> np.array:
|
|
|
|
|
return np.array([self.name, self.datatype])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LogsTagAttributes(ABC):
|
|
|
|
|
unix_milli: np.int64
|
|
|
|
|
tag_key: str
|
|
|
|
|
tag_type: str
|
|
|
|
|
tag_data_type: str
|
|
|
|
|
string_value: str
|
|
|
|
|
number_value: np.float64
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
timestamp: datetime.datetime,
|
|
|
|
|
tag_key: str,
|
|
|
|
|
tag_type: str,
|
|
|
|
|
tag_data_type: str,
|
|
|
|
|
string_value: str,
|
|
|
|
|
number_value: np.float64,
|
|
|
|
|
) -> None:
|
|
|
|
|
self.unix_milli = np.int64(int(timestamp.timestamp() * 1e3))
|
|
|
|
|
self.tag_key = tag_key
|
|
|
|
|
self.tag_type = tag_type
|
|
|
|
|
self.tag_data_type = tag_data_type
|
|
|
|
|
self.string_value = string_value or ""
|
|
|
|
|
self.number_value = number_value
|
|
|
|
|
|
|
|
|
|
def np_arr(self) -> np.array:
|
|
|
|
|
return np.array(
|
|
|
|
|
[
|
|
|
|
|
self.unix_milli,
|
|
|
|
|
self.tag_key,
|
|
|
|
|
self.tag_type,
|
|
|
|
|
self.tag_data_type,
|
|
|
|
|
self.string_value,
|
|
|
|
|
self.number_value,
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Logs(ABC):
|
|
|
|
|
ts_bucket_start: np.uint64
|
|
|
|
|
resource_fingerprint: str
|
|
|
|
|
timestamp: np.uint64
|
|
|
|
|
observed_timestamp: np.uint64
|
|
|
|
|
id: str
|
|
|
|
|
trace_id: str
|
|
|
|
|
span_id: str
|
|
|
|
|
trace_flags: np.uint32
|
|
|
|
|
severity_text: str
|
|
|
|
|
severity_number: np.uint8
|
|
|
|
|
body: str
|
|
|
|
|
attributes_string: dict[str, str]
|
|
|
|
|
attributes_number: dict[str, np.float64]
|
|
|
|
|
attributes_bool: dict[str, bool]
|
|
|
|
|
resources_string: dict[str, str]
|
|
|
|
|
scope_name: str
|
|
|
|
|
scope_version: str
|
|
|
|
|
scope_string: dict[str, str]
|
|
|
|
|
|
|
|
|
|
resource: List[LogsResource]
|
|
|
|
|
tag_attributes: List[LogsTagAttributes]
|
|
|
|
|
resource_keys: List[LogsResourceOrAttributeKeys]
|
|
|
|
|
attribute_keys: List[LogsResourceOrAttributeKeys]
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
timestamp: datetime.datetime = datetime.datetime.now(),
|
|
|
|
|
resources: dict[str, Any] = {},
|
|
|
|
|
attributes: dict[str, Any] = {},
|
|
|
|
|
body: str = "default body",
|
|
|
|
|
severity_text: str = "INFO",
|
|
|
|
|
trace_id: str = "",
|
|
|
|
|
span_id: str = "",
|
|
|
|
|
trace_flags: np.uint32 = 0,
|
|
|
|
|
scope_name: str = "",
|
|
|
|
|
scope_version: str = "",
|
|
|
|
|
scope_attributes: dict[str, str] = {},
|
|
|
|
|
) -> None:
|
|
|
|
|
self.tag_attributes = []
|
|
|
|
|
self.attribute_keys = []
|
|
|
|
|
self.resource_keys = []
|
|
|
|
|
|
|
|
|
|
# Convert timestamp to uint64 nanoseconds
|
|
|
|
|
self.timestamp = np.uint64(int(timestamp.timestamp() * 1e9))
|
|
|
|
|
self.observed_timestamp = self.timestamp
|
|
|
|
|
|
|
|
|
|
# Calculate ts_bucket_start (30mins bucket)
|
|
|
|
|
# Round down to nearest 30-minute interval
|
|
|
|
|
minute = timestamp.minute
|
|
|
|
|
if minute < 30:
|
|
|
|
|
bucket_minute = 0
|
|
|
|
|
else:
|
|
|
|
|
bucket_minute = 30
|
|
|
|
|
|
|
|
|
|
bucket_start = timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
|
|
|
|
|
self.ts_bucket_start = np.uint64(int(bucket_start.timestamp()))
|
|
|
|
|
|
|
|
|
|
# Generate ksuid by using the timestamp
|
|
|
|
|
self.id = str(KsuidMs(datetime=timestamp))
|
|
|
|
|
|
|
|
|
|
# Initialize trace fields
|
|
|
|
|
self.trace_id = trace_id
|
|
|
|
|
self.span_id = span_id
|
|
|
|
|
self.trace_flags = trace_flags
|
|
|
|
|
|
|
|
|
|
# Set severity fields
|
|
|
|
|
self.severity_text = severity_text
|
|
|
|
|
self.severity_number = self._get_severity_number(severity_text)
|
|
|
|
|
|
|
|
|
|
# Set body
|
|
|
|
|
self.body = body
|
|
|
|
|
|
|
|
|
|
# Process resources and attributes
|
|
|
|
|
self.resources_string = {k: str(v) for k, v in resources.items()}
|
|
|
|
|
for k, v in self.resources_string.items():
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key=k,
|
|
|
|
|
tag_type="resource",
|
|
|
|
|
tag_data_type="string",
|
|
|
|
|
string_value=v,
|
|
|
|
|
number_value=None,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.resource_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name=k, datatype="string")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Calculate resource fingerprint
|
|
|
|
|
self.resource_fingerprint = LogsOrTracesFingerprint(
|
|
|
|
|
self.resources_string
|
|
|
|
|
).calculate()
|
|
|
|
|
|
|
|
|
|
# Process attributes by type
|
|
|
|
|
self.attributes_string = {}
|
|
|
|
|
self.attributes_number = {}
|
|
|
|
|
self.attributes_bool = {}
|
|
|
|
|
|
|
|
|
|
for k, v in attributes.items():
|
|
|
|
|
if isinstance(v, bool):
|
|
|
|
|
self.attributes_bool[k] = v
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key=k,
|
|
|
|
|
tag_type="tag",
|
|
|
|
|
tag_data_type="bool",
|
|
|
|
|
string_value=None,
|
|
|
|
|
number_value=None,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.attribute_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name=k, datatype="bool")
|
|
|
|
|
)
|
|
|
|
|
elif isinstance(v, (int, float)):
|
|
|
|
|
self.attributes_number[k] = np.float64(v)
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key=k,
|
|
|
|
|
tag_type="tag",
|
|
|
|
|
tag_data_type="float64",
|
|
|
|
|
string_value=None,
|
|
|
|
|
number_value=np.float64(v),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.attribute_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name=k, datatype="float64")
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
self.attributes_string[k] = str(v)
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key=k,
|
|
|
|
|
tag_type="tag",
|
|
|
|
|
tag_data_type="string",
|
|
|
|
|
string_value=str(v),
|
|
|
|
|
number_value=None,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.attribute_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name=k, datatype="string")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Initialize scope fields
|
|
|
|
|
self.scope_name = scope_name
|
|
|
|
|
self.scope_version = scope_version
|
|
|
|
|
self.scope_string = {k: str(v) for k, v in scope_attributes.items()}
|
|
|
|
|
for k, v in self.scope_string.items():
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key=k,
|
|
|
|
|
tag_type="scope",
|
|
|
|
|
tag_data_type="string",
|
|
|
|
|
string_value=v,
|
|
|
|
|
number_value=None,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.resource = []
|
|
|
|
|
self.resource.append(
|
|
|
|
|
LogsResource(
|
|
|
|
|
labels=self.resources_string,
|
|
|
|
|
fingerprint=self.resource_fingerprint,
|
|
|
|
|
seen_at_ts_bucket_start=self.ts_bucket_start,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Log fields (severity)
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key="severity_text",
|
|
|
|
|
tag_type="logfield",
|
|
|
|
|
tag_data_type="string",
|
|
|
|
|
string_value=self.severity_text,
|
|
|
|
|
number_value=None,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.attribute_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name="severity_text", datatype="string")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.tag_attributes.append(
|
|
|
|
|
LogsTagAttributes(
|
|
|
|
|
timestamp=timestamp,
|
|
|
|
|
tag_key="severity_number",
|
|
|
|
|
tag_type="logfield",
|
|
|
|
|
tag_data_type="float64",
|
|
|
|
|
string_value=None,
|
|
|
|
|
number_value=float(self.severity_number),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.attribute_keys.append(
|
|
|
|
|
LogsResourceOrAttributeKeys(name="severity_number", datatype="float64")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def _get_severity_number(self, severity_text: str) -> np.uint8:
|
|
|
|
|
"""Convert severity text to numeric value"""
|
|
|
|
|
severity_map = {
|
|
|
|
|
"TRACE": 1,
|
|
|
|
|
"DEBUG": 5,
|
|
|
|
|
"INFO": 9,
|
|
|
|
|
"WARN": 13,
|
|
|
|
|
"ERROR": 17,
|
|
|
|
|
"FATAL": 21,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return np.uint8(severity_map.get(severity_text.upper(), 9)) # Default to INFO
|
|
|
|
|
|
|
|
|
|
def np_arr(self) -> np.array:
|
|
|
|
|
"""Return log data as numpy array for database insertion"""
|
|
|
|
|
return np.array(
|
|
|
|
|
[
|
|
|
|
|
self.ts_bucket_start,
|
|
|
|
|
self.resource_fingerprint,
|
|
|
|
|
self.timestamp,
|
|
|
|
|
self.observed_timestamp,
|
|
|
|
|
self.id,
|
|
|
|
|
self.trace_id,
|
|
|
|
|
self.span_id,
|
|
|
|
|
self.trace_flags,
|
|
|
|
|
self.severity_text,
|
|
|
|
|
self.severity_number,
|
|
|
|
|
self.body,
|
|
|
|
|
self.attributes_string,
|
|
|
|
|
self.attributes_number,
|
|
|
|
|
self.attributes_bool,
|
|
|
|
|
self.resources_string,
|
|
|
|
|
self.scope_name,
|
|
|
|
|
self.scope_version,
|
|
|
|
|
self.scope_string,
|
2025-09-24 14:54:43 +05:30
|
|
|
np.uint64(10),
|
|
|
|
|
np.uint64(15),
|
|
|
|
|
self.resources_string,
|
2025-07-29 14:44:16 +05:30
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(name="insert_logs", scope="function")
|
|
|
|
|
def insert_logs(
|
|
|
|
|
clickhouse: types.TestContainerClickhouse,
|
|
|
|
|
) -> Generator[Callable[[List[Logs]], None], Any, None]:
|
|
|
|
|
def _insert_logs(logs: List[Logs]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Insert logs into ClickHouse tables following the same logic as the Go exporter.
|
|
|
|
|
This function handles insertion into multiple tables:
|
|
|
|
|
- distributed_logs_v2 (main logs table)
|
|
|
|
|
- distributed_logs_v2_resource (resource fingerprints)
|
|
|
|
|
- distributed_tag_attributes_v2 (tag attributes)
|
|
|
|
|
- distributed_logs_attribute_keys (attribute keys)
|
|
|
|
|
- distributed_logs_resource_keys (resource keys)
|
|
|
|
|
"""
|
|
|
|
|
resources: List[LogsResource] = []
|
|
|
|
|
for log in logs:
|
|
|
|
|
resources.extend(log.resource)
|
|
|
|
|
|
|
|
|
|
if len(resources) > 0:
|
|
|
|
|
clickhouse.conn.insert(
|
|
|
|
|
database="signoz_logs",
|
|
|
|
|
table="distributed_logs_v2_resource",
|
|
|
|
|
data=[resource.np_arr() for resource in resources],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
tag_attributes: List[LogsTagAttributes] = []
|
|
|
|
|
for log in logs:
|
|
|
|
|
tag_attributes.extend(log.tag_attributes)
|
|
|
|
|
|
|
|
|
|
if len(tag_attributes) > 0:
|
|
|
|
|
clickhouse.conn.insert(
|
|
|
|
|
database="signoz_logs",
|
|
|
|
|
table="distributed_tag_attributes_v2",
|
|
|
|
|
data=[tag_attribute.np_arr() for tag_attribute in tag_attributes],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
attribute_keys: List[LogsResourceOrAttributeKeys] = []
|
|
|
|
|
for log in logs:
|
|
|
|
|
attribute_keys.extend(log.attribute_keys)
|
|
|
|
|
|
|
|
|
|
if len(attribute_keys) > 0:
|
|
|
|
|
clickhouse.conn.insert(
|
|
|
|
|
database="signoz_logs",
|
|
|
|
|
table="distributed_logs_attribute_keys",
|
|
|
|
|
data=[attribute_key.np_arr() for attribute_key in attribute_keys],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
resource_keys: List[LogsResourceOrAttributeKeys] = []
|
|
|
|
|
for log in logs:
|
|
|
|
|
resource_keys.extend(log.resource_keys)
|
|
|
|
|
|
|
|
|
|
if len(resource_keys) > 0:
|
|
|
|
|
clickhouse.conn.insert(
|
|
|
|
|
database="signoz_logs",
|
|
|
|
|
table="distributed_logs_resource_keys",
|
|
|
|
|
data=[resource_key.np_arr() for resource_key in resource_keys],
|
|
|
|
|
)
|
2025-09-24 14:54:43 +05:30
|
|
|
|
2025-07-29 14:44:16 +05:30
|
|
|
clickhouse.conn.insert(
|
|
|
|
|
database="signoz_logs",
|
|
|
|
|
table="distributed_logs_v2",
|
|
|
|
|
data=[log.np_arr() for log in logs],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
yield _insert_logs
|
|
|
|
|
|
|
|
|
|
clickhouse.conn.query(
|
|
|
|
|
f"TRUNCATE TABLE signoz_logs.logs_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
|
|
|
)
|
|
|
|
|
clickhouse.conn.query(
|
|
|
|
|
f"TRUNCATE TABLE signoz_logs.logs_v2_resource ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
|
|
|
)
|
|
|
|
|
clickhouse.conn.query(
|
|
|
|
|
f"TRUNCATE TABLE signoz_logs.tag_attributes_v2 ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
|
|
|
)
|
|
|
|
|
clickhouse.conn.query(
|
|
|
|
|
f"TRUNCATE TABLE signoz_logs.logs_attribute_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
|
|
|
)
|
|
|
|
|
clickhouse.conn.query(
|
|
|
|
|
f"TRUNCATE TABLE signoz_logs.logs_resource_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
|
|
|
|
|
)
|