Files
attack-api/server.js

290 lines
11 KiB
JavaScript
Raw Permalink Normal View History

'use strict';
const express = require('express');
const Database = require('better-sqlite3');
const path = require('path');
const http = require('http');
const { timingSafeEqual } = require('crypto');
const app = express();
const PORT = Number(process.env.PORT) || 3083;
const DB = new Database(process.env.DB_PATH || '/data/attacks.db');
// ── Database ──────────────────────────────────────────────────────────────────
DB.pragma('journal_mode = WAL');
DB.pragma('synchronous = NORMAL');
DB.pragma('cache_size = -8000');
DB.exec(`
CREATE TABLE IF NOT EXISTS attacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at INTEGER NOT NULL DEFAULT (unixepoch()),
site_id TEXT NOT NULL DEFAULT '',
ip TEXT NOT NULL DEFAULT '',
attack_type TEXT NOT NULL DEFAULT '',
rule_desc TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT '',
param TEXT NOT NULL DEFAULT '',
payload TEXT NOT NULL DEFAULT '',
uri TEXT NOT NULL DEFAULT '',
method TEXT NOT NULL DEFAULT '',
user_agent TEXT NOT NULL DEFAULT '',
country TEXT NOT NULL DEFAULT '',
asn TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS sites (
site_id TEXT PRIMARY KEY,
first_seen INTEGER NOT NULL DEFAULT (unixepoch()),
last_seen INTEGER NOT NULL DEFAULT (unixepoch()),
event_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_recv ON attacks(received_at DESC);
CREATE INDEX IF NOT EXISTS idx_ip ON attacks(ip);
CREATE INDEX IF NOT EXISTS idx_site ON attacks(site_id);
CREATE INDEX IF NOT EXISTS idx_attack_type ON attacks(attack_type);
CREATE INDEX IF NOT EXISTS idx_source ON attacks(source);
`);
// Migrations silently ignored if columns already exist
['country', 'asn', 'param', 'payload', 'uri', 'method', 'user_agent', 'rule_desc', 'source'].forEach(col => {
try { DB.exec(`ALTER TABLE attacks ADD COLUMN ${col} TEXT NOT NULL DEFAULT ''`); } catch {}
});
// ── Auth ──────────────────────────────────────────────────────────────────────
const API_TOKEN = (process.env.API_TOKEN || '').trim();
function requireToken(req, res, next) {
if (!API_TOKEN) return next();
const token = (req.headers['authorization'] || '').replace(/^Bearer\s+/, '');
const a = Buffer.alloc(128); Buffer.from(token, 'utf8').copy(a, 0, 0, 128);
const b = Buffer.alloc(128); Buffer.from(API_TOKEN, 'utf8').copy(b, 0, 0, 128);
if (!timingSafeEqual(a, b) || token !== API_TOKEN) {
return res.status(403).json({ error: 'Forbidden' });
}
next();
}
// ── IP geo-enrichment ─────────────────────────────────────────────────────────
const stmtEnrich = DB.prepare('UPDATE attacks SET country=?, asn=? WHERE id=?');
const enrichCache = new Map();
function isPrivateIP(ip) {
return /^(10\.|192\.168\.|172\.(1[6-9]|2\d|3[01])\.|127\.|::1$|fc|fd)/.test(ip);
}
function enrichIP(rowId, ip) {
if (!ip || ip === '?' || isPrivateIP(ip)) return;
const now = Date.now();
if ((enrichCache.get(ip) || 0) > now) return;
enrichCache.set(ip, now + 3_600_000);
http.get(
`http://ip-api.com/json/${encodeURIComponent(ip)}?fields=status,countryCode,as`,
{ timeout: 5000 },
res => {
let data = '';
res.on('data', d => data += d);
res.on('end', () => {
try {
const j = JSON.parse(data);
if (j.status === 'success') {
stmtEnrich.run(
(j.countryCode || '').slice(0, 2),
(j.as || '').slice(0, 50),
rowId
);
}
} catch {}
});
}
).on('error', () => enrichCache.delete(ip));
}
// Background enrichment of unenriched rows
const stmtUnenriched = DB.prepare(
"SELECT id, ip FROM attacks WHERE country='' AND ip != '' AND ip != '?' LIMIT 5"
);
setInterval(() => {
for (const row of stmtUnenriched.all()) enrichIP(row.id, row.ip);
}, 20_000);
// ── Rate limiter ──────────────────────────────────────────────────────────────
const rl = new Map();
setInterval(() => { const n = Date.now(); for (const [k, v] of rl) if (n > v.r) rl.delete(k); }, 30_000);
function allowed(ip, max = 30, win = 60_000) {
const n = Date.now();
let e = rl.get(ip);
if (!e || n > e.r) { e = { c: 0, r: n + win }; rl.set(ip, e); }
return ++e.c <= max;
}
// ── Stats cache (30s TTL) ─────────────────────────────────────────────────────
let _cache = null, _cacheTs = 0;
function getStats() {
if (_cache && Date.now() - _cacheTs < 30_000) return _cache;
const now = Math.floor(Date.now() / 1000);
_cache = {
total: DB.prepare('SELECT COUNT(*) n FROM attacks').get().n,
today: DB.prepare('SELECT COUNT(*) n FROM attacks WHERE received_at > ?').get(now - 86400).n,
last_7d: DB.prepare('SELECT COUNT(*) n FROM attacks WHERE received_at > ?').get(now - 604800).n,
last_30d: DB.prepare('SELECT COUNT(*) n FROM attacks WHERE received_at > ?').get(now - 2592000).n,
total_sites: DB.prepare('SELECT COUNT(*) n FROM sites').get().n,
top_attack_types: DB.prepare(`
SELECT attack_type, COUNT(*) hits
FROM attacks WHERE received_at > ?
GROUP BY attack_type ORDER BY hits DESC LIMIT 10
`).all(now - 2592000),
top_ips: DB.prepare(`
SELECT ip, country, asn, COUNT(*) hits
FROM attacks WHERE received_at > ?
GROUP BY ip ORDER BY hits DESC LIMIT 10
`).all(now - 2592000),
top_params: DB.prepare(`
SELECT param, COUNT(*) hits
FROM attacks WHERE received_at > ? AND param != ''
GROUP BY param ORDER BY hits DESC LIMIT 10
`).all(now - 2592000),
top_uris: DB.prepare(`
SELECT uri, COUNT(*) hits
FROM attacks WHERE received_at > ? AND uri != ''
GROUP BY uri ORDER BY hits DESC LIMIT 10
`).all(now - 2592000),
top_sources: DB.prepare(`
SELECT source, COUNT(*) hits
FROM attacks WHERE received_at > ? AND source != ''
GROUP BY source ORDER BY hits DESC LIMIT 8
`).all(now - 2592000),
recent: DB.prepare(`
SELECT received_at, ip, country, attack_type, rule_desc, source, param, payload, method, site_id
FROM attacks ORDER BY id DESC LIMIT 40
`).all(),
hourly: DB.prepare(`
SELECT (received_at / 3600) * 3600 h, COUNT(*) n
FROM attacks WHERE received_at > ?
GROUP BY h ORDER BY h ASC
`).all(now - 86400),
};
_cacheTs = Date.now();
return _cache;
}
// ── SSE live stream ───────────────────────────────────────────────────────────
const sseClients = new Set();
let lastId = DB.prepare('SELECT MAX(id) id FROM attacks').get().id || 0;
setInterval(() => {
if (!sseClients.size) return;
const rows = DB.prepare(
'SELECT id, received_at, ip, country, attack_type, rule_desc, source, param, payload, method, site_id FROM attacks WHERE id > ? ORDER BY id ASC LIMIT 20'
).all(lastId);
if (!rows.length) return;
lastId = rows.at(-1).id;
const msg = `data: ${JSON.stringify(rows)}\n\n`;
for (const r of sseClients) { try { r.write(msg); } catch { sseClients.delete(r); } }
}, 2000);
// ── Prepared statements ───────────────────────────────────────────────────────
const stmtIns = DB.prepare(`
INSERT INTO attacks (received_at, site_id, ip, attack_type, rule_desc, source, param, payload, uri, method, user_agent, country, asn)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
`);
const stmtSite = DB.prepare(`
INSERT INTO sites (site_id, first_seen, last_seen, event_count) VALUES (?,?,?,?)
ON CONFLICT(site_id) DO UPDATE SET
last_seen = excluded.last_seen,
event_count = event_count + excluded.event_count
`);
const VALID_ATTACK_TYPES = new Set([
'sqli', 'xss', 'lfi', 'rfi', 'cmdi', 'xxe', 'php_inject', 'ssrf', 'wp_specific', 'other'
]);
const VALID_SOURCES = new Set(['GET', 'POST', 'COOKIE', 'URI', 'UA', 'HEADER', '']);
const insertBatch = DB.transaction((siteId, attacks) => {
const now = Math.floor(Date.now() / 1000);
const ids = [];
for (const a of attacks) {
const ts = a.logged_at ? Math.floor(new Date(a.logged_at) / 1000) : now;
const ip = String(a.ip || '').trim().slice(0, 45) || '?';
const r = stmtIns.run(
ts, siteId, ip,
String(a.attack_type || 'other').slice(0, 50),
String(a.rule_desc || '').slice(0, 255),
String(a.source || '').slice(0, 20),
String(a.param || '').slice(0, 200),
String(a.payload || '').slice(0, 500),
String(a.uri || '').slice(0, 500),
String(a.method || '').slice(0, 10),
String(a.user_agent || '').slice(0, 300),
'', '' // country/asn filled async
);
ids.push({ id: Number(r.lastInsertRowid), ip });
}
stmtSite.run(siteId, now, now, attacks.length);
return ids;
});
// ── Routes ────────────────────────────────────────────────────────────────────
app.use(express.json({ limit: '256kb' }));
app.use(express.static(path.join(__dirname, 'public')));
app.post('/api/v1/submit', requireToken, (req, res) => {
const clientIP = (req.headers['x-forwarded-for'] || '').split(',')[0].trim()
|| req.socket.remoteAddress || '';
if (!allowed(clientIP)) return res.status(429).json({ error: 'Rate limit exceeded' });
const { site_hash, attacks } = req.body || {};
if (!site_hash || typeof site_hash !== 'string' || site_hash.length < 8) {
return res.status(400).json({ error: 'Invalid site_hash' });
}
if (!Array.isArray(attacks) || !attacks.length || attacks.length > 100) {
return res.status(400).json({ error: 'attacks must be array of 1100 items' });
}
try {
const ids = insertBatch(site_hash.slice(0, 64), attacks);
_cache = null;
setImmediate(() => ids.forEach(({ id, ip }) => enrichIP(id, ip)));
res.json({ ok: true, received: attacks.length });
} catch (e) {
console.error('[submit]', e.message);
res.status(500).json({ error: 'Internal error' });
}
});
app.get('/api/v1/stats', (_, res) => res.json(getStats()));
app.get('/api/v1/stream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.write(':\n\n');
sseClients.add(res);
req.on('close', () => sseClients.delete(res));
});
app.get('/api/v1/health', (_, res) =>
res.json({ ok: true, uptime: process.uptime(), sse_clients: sseClients.size })
);
app.listen(PORT, '0.0.0.0', () => {
console.log(`[attack-api] listening on :${PORT}`);
console.log(`[attack-api] db: ${process.env.DB_PATH || '/data/attacks.db'}`);
});