Compare commits

...

2 Commits

Author SHA1 Message Date
Alessandro dff0d92474 Ignore null values 2021-11-24 23:41:55 +01:00
Alessandro f276b79293 Added prometheus metrics 2021-11-24 21:58:42 +01:00
10 changed files with 257 additions and 48 deletions

View File

@ -1,10 +1,11 @@
{ {
"motd": "Welcome to Soqet v2.0", "motd": "Welcome to Soqet v2.1",
"httpPort": 3004, "httpPort": 3004,
"tcpPort": 25555, "tcpPort": 25555,
"enableWebsocket": true, "enableWebsocket": true,
"enableTCPSocket": true, "enableTCPSocket": true,
"enablePolling": true, "enablePolling": true,
"wildcardChannel": "*", "wildcardChannel": "*",
"pollingInterval": 3600 "pollingInterval": 3600,
"enablePrometheus": true
} }

157
package-lock.json generated
View File

@ -1,26 +1,31 @@
{ {
"name": "soqet", "name": "soqet",
"version": "2.0.0", "version": "2.1.0",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "soqet", "name": "soqet",
"version": "2.0.0", "version": "2.1.0",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"ws": "^7.5.5" "prom-client": "^14.0.1",
"ws": "^7.5.6"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^14.17.19", "@types/node": "^14.17.34",
"@types/ws": "^7.4.7", "@types/ws": "^7.4.7",
"typescript": "^4.4.3" "typescript": "^4.5.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
"utf-8-validate": "^5.0.7"
} }
}, },
"node_modules/@types/node": { "node_modules/@types/node": {
"version": "14.17.19", "version": "14.17.34",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.19.tgz", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.34.tgz",
"integrity": "sha512-jjYI6NkyfXykucU6ELEoT64QyKOdvaA6enOqKtP4xUsGY0X0ZUZz29fUmrTRo+7v7c6TgDu82q3GHHaCEkqZwA==", "integrity": "sha512-USUftMYpmuMzeWobskoPfzDi+vkpe0dvcOBRNOscFrGxVp4jomnRxWuVohgqBow2xyIPC0S3gjxV/5079jhmDg==",
"dev": true "dev": true
}, },
"node_modules/@types/ws": { "node_modules/@types/ws": {
@ -32,10 +37,58 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"node_modules/bintrees": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz",
"integrity": "sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ="
},
"node_modules/bufferutil": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.5.tgz",
"integrity": "sha512-HTm14iMQKK2FjFLRTM5lAVcyaUzOnqbPtesFIvREgXpJHdQm8bWS+GkQgIkfaBYRHuCnea7w8UVNfwiAQhlr9A==",
"hasInstallScript": true,
"optional": true,
"dependencies": {
"node-gyp-build": "^4.3.0"
},
"engines": {
"node": ">=6.14.2"
}
},
"node_modules/node-gyp-build": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz",
"integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==",
"optional": true,
"bin": {
"node-gyp-build": "bin.js",
"node-gyp-build-optional": "optional.js",
"node-gyp-build-test": "build-test.js"
}
},
"node_modules/prom-client": {
"version": "14.0.1",
"resolved": "https://registry.npmjs.org/prom-client/-/prom-client-14.0.1.tgz",
"integrity": "sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==",
"dependencies": {
"tdigest": "^0.1.1"
},
"engines": {
"node": ">=10"
}
},
"node_modules/tdigest": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.1.tgz",
"integrity": "sha1-Ljyyw56kSeVdHmzZEReszKRYgCE=",
"dependencies": {
"bintrees": "1.0.1"
}
},
"node_modules/typescript": { "node_modules/typescript": {
"version": "4.4.3", "version": "4.5.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.5.2.tgz",
"integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==", "integrity": "sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==",
"dev": true, "dev": true,
"bin": { "bin": {
"tsc": "bin/tsc", "tsc": "bin/tsc",
@ -45,10 +98,23 @@
"node": ">=4.2.0" "node": ">=4.2.0"
} }
}, },
"node_modules/utf-8-validate": {
"version": "5.0.7",
"resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.7.tgz",
"integrity": "sha512-vLt1O5Pp+flcArHGIyKEQq883nBt8nN8tVBcoL0qUXj2XT1n7p70yGIq2VK98I5FdZ1YHc0wk/koOnHjnXWk1Q==",
"hasInstallScript": true,
"optional": true,
"dependencies": {
"node-gyp-build": "^4.3.0"
},
"engines": {
"node": ">=6.14.2"
}
},
"node_modules/ws": { "node_modules/ws": {
"version": "7.5.5", "version": "7.5.6",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.6.tgz",
"integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", "integrity": "sha512-6GLgCqo2cy2A2rjCNFlxQS6ZljG/coZfZXclldI8FB/1G3CCI36Zd8xy2HrFVACi8tfk5XrgLQEk+P0Tnz9UcA==",
"engines": { "engines": {
"node": ">=8.3.0" "node": ">=8.3.0"
}, },
@ -68,9 +134,9 @@
}, },
"dependencies": { "dependencies": {
"@types/node": { "@types/node": {
"version": "14.17.19", "version": "14.17.34",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.19.tgz", "resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.34.tgz",
"integrity": "sha512-jjYI6NkyfXykucU6ELEoT64QyKOdvaA6enOqKtP4xUsGY0X0ZUZz29fUmrTRo+7v7c6TgDu82q3GHHaCEkqZwA==", "integrity": "sha512-USUftMYpmuMzeWobskoPfzDi+vkpe0dvcOBRNOscFrGxVp4jomnRxWuVohgqBow2xyIPC0S3gjxV/5079jhmDg==",
"dev": true "dev": true
}, },
"@types/ws": { "@types/ws": {
@ -82,16 +148,61 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"bintrees": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz",
"integrity": "sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ="
},
"bufferutil": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.5.tgz",
"integrity": "sha512-HTm14iMQKK2FjFLRTM5lAVcyaUzOnqbPtesFIvREgXpJHdQm8bWS+GkQgIkfaBYRHuCnea7w8UVNfwiAQhlr9A==",
"optional": true,
"requires": {
"node-gyp-build": "^4.3.0"
}
},
"node-gyp-build": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz",
"integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==",
"optional": true
},
"prom-client": {
"version": "14.0.1",
"resolved": "https://registry.npmjs.org/prom-client/-/prom-client-14.0.1.tgz",
"integrity": "sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==",
"requires": {
"tdigest": "^0.1.1"
}
},
"tdigest": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.1.tgz",
"integrity": "sha1-Ljyyw56kSeVdHmzZEReszKRYgCE=",
"requires": {
"bintrees": "1.0.1"
}
},
"typescript": { "typescript": {
"version": "4.4.3", "version": "4.5.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.5.2.tgz",
"integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==", "integrity": "sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==",
"dev": true "dev": true
}, },
"utf-8-validate": {
"version": "5.0.7",
"resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.7.tgz",
"integrity": "sha512-vLt1O5Pp+flcArHGIyKEQq883nBt8nN8tVBcoL0qUXj2XT1n7p70yGIq2VK98I5FdZ1YHc0wk/koOnHjnXWk1Q==",
"optional": true,
"requires": {
"node-gyp-build": "^4.3.0"
}
},
"ws": { "ws": {
"version": "7.5.5", "version": "7.5.6",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.6.tgz",
"integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", "integrity": "sha512-6GLgCqo2cy2A2rjCNFlxQS6ZljG/coZfZXclldI8FB/1G3CCI36Zd8xy2HrFVACi8tfk5XrgLQEk+P0Tnz9UcA==",
"requires": {} "requires": {}
} }
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "soqet", "name": "soqet",
"version": "2.0.0", "version": "2.1.0",
"description": "", "description": "",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
@ -11,11 +11,16 @@
"author": "AlexDevs", "author": "AlexDevs",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"ws": "^7.5.5" "prom-client": "^14.0.1",
"ws": "^7.5.6"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^14.17.19", "@types/node": "^14.17.34",
"@types/ws": "^7.4.7", "@types/ws": "^7.4.7",
"typescript": "^4.4.3" "typescript": "^4.5.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
"utf-8-validate": "^5.0.7"
} }
} }

View File

@ -1,6 +1,7 @@
import http from "http"; import http from "http";
import crypto from "crypto"; import crypto from "crypto";
import Timeout = NodeJS.Timeout; import Timeout = NodeJS.Timeout;
import Prometheus from "./prometheus";
export interface Client { export interface Client {
uuid: string, uuid: string,
@ -9,7 +10,7 @@ export interface Client {
channelsAmount: number, channelsAmount: number,
send: (data: any) => void, send: (data: any) => void,
guest: boolean, guest: boolean,
ip?: string,
socket?: any, socket?: any,
} }
@ -31,6 +32,8 @@ export interface Server {
httpServer: http.Server, httpServer: http.Server,
prometheus: Prometheus,
config: { config: {
motd: string, motd: string,
httpPort: number, httpPort: number,
@ -40,6 +43,7 @@ export interface Server {
enablePolling: boolean, enablePolling: boolean,
wildcardChannel: string, wildcardChannel: string,
pollingInterval: number, pollingInterval: number,
enablePrometheus: boolean,
}, },
log: (...data: any) => void, log: (...data: any) => void,

View File

@ -9,6 +9,8 @@ import websocket from "./protocols/websocket";
import tcpsocket from "./protocols/tcpsocket"; import tcpsocket from "./protocols/tcpsocket";
import pollingsocket from "./protocols/pollingsocket"; import pollingsocket from "./protocols/pollingsocket";
import Prometheus from "./prometheus";
const config = require("../config.json"); const config = require("../config.json");
const pack = require("../package.json"); const pack = require("../package.json");
@ -30,7 +32,10 @@ function openChannel(sessionId: string, channel: string | number) {
} }
// Create channel if it does not exist // Create channel if it does not exist
if (!server.channels[channel]) server.channels[channel] = []; if (!server.channels[channel]) {
server.channels[channel] = [];
server.prometheus.openChannelsGauge.inc();
}
// Add client to channel // Add client to channel
let channelArray = server.channels[channel]; let channelArray = server.channels[channel];
@ -88,6 +93,7 @@ function closeChannel(sessionId: string, channel: string | number) {
if (channelArray.length === 0) { if (channelArray.length === 0) {
delete server.channels[channel]; delete server.channels[channel];
server.prometheus.openChannelsGauge.dec();
} }
let client: Client = server.clients[sessionId]; let client: Client = server.clients[sessionId];
@ -123,6 +129,10 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
meta.channel = channel; meta.channel = channel;
meta.guest = client.guest; meta.guest = client.guest;
server.prometheus.messagesTrafficCounter.labels('incoming').inc();
server.prometheus.clientIdMessagesCounter.labels(client.uuid).inc();
server.prometheus.clientIPMessagesCounter.labels(client.ip || "localhost").inc();
// Send message to the channel // Send message to the channel
if (server.channels[channel]) { if (server.channels[channel]) {
@ -130,12 +140,15 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
channelArray.forEach(recipientId => { channelArray.forEach(recipientId => {
if (recipientId === sessionId) return; if (recipientId === sessionId) return;
server.clients[recipientId].send({ server.clients[recipientId]?.send({
type: "message", type: "message",
channel: channel, channel: channel,
message: message, message: message,
meta: meta, meta: meta,
}) })
server.prometheus.messagesTrafficCounter.labels('outgoing').inc();
server.prometheus.channelMessagesCounter.labels(channel.toString()).inc();
}) })
} }
@ -144,12 +157,13 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
let wildcardChannelArray = server.channels[config.wildcardChannel]; let wildcardChannelArray = server.channels[config.wildcardChannel];
if (wildcardChannelArray) { if (wildcardChannelArray) {
wildcardChannelArray.forEach(recipientId => { wildcardChannelArray.forEach(recipientId => {
server.clients[recipientId].send({ server.clients[recipientId]?.send({
type: "message", type: "message",
channel: config.wildcardChannel, channel: config.wildcardChannel,
message: message, message: message,
meta: meta, meta: meta,
}) })
server.prometheus.messagesTrafficCounter.labels('outgoing').inc();
}) })
} }
@ -182,10 +196,14 @@ function authenticateClient(sessionId: string, token: string) {
} }
function destroyClient(sessionId: string) { function destroyClient(sessionId: string) {
let channels = server.clients[sessionId].channels; let client = server.clients[sessionId]
let channels = client.channels;
for (let channelName in channels) { for (let channelName in channels) {
closeChannel(sessionId, channelName); closeChannel(sessionId, channelName);
} }
server.prometheus.clientCountGauge.dec()
server.prometheus.clientIdMessagesCounter.remove(client.uuid)
server.prometheus.clientIPMessagesCounter.remove(client.ip || "localhost")
delete server.clients[sessionId]; delete server.clients[sessionId];
} }
@ -201,6 +219,8 @@ function buildClient(send: (data: any) => void, token?: string): Client {
server.clients[client.sessionId] = client; server.clients[client.sessionId] = client;
server.prometheus.clientCountGauge.inc()
if (token) { if (token) {
authenticateClient(client.sessionId, token); authenticateClient(client.sessionId, token);
} }
@ -288,6 +308,7 @@ const server: Server = {
channels: {}, channels: {},
config: config, config: config,
httpServer: http.createServer(), httpServer: http.createServer(),
prometheus: new Prometheus(),
log: function (...data) { log: function (...data) {
console.log(...data); console.log(...data);
}, },

44
src/prometheus.ts Normal file
View File

@ -0,0 +1,44 @@
import client from "prom-client";
export default class Prometheus {
client = client;
prefix = "soqet_"
openChannelsGauge = new client.Gauge({
name: this.prefix + "open_channels",
help: "Amount of open channels"
});
clientCountGauge = new client.Gauge({
name: this.prefix + "client_count",
help: "Amount of connected clients"
})
messagesTrafficCounter = new client.Counter({
name: this.prefix + "messages_traffic",
help: "Counter of incoming and outcoming messages",
labelNames: ["side"]
})
channelMessagesCounter = new client.Counter({
name: this.prefix + "channel_messages",
help: "Amount of messages by channel",
labelNames: ["channel_name"]
});
clientIdMessagesCounter = new client.Counter({
name: this.prefix + "client_id_messages",
help: "Amount of messages by client id",
labelNames: ["client_id"]
})
clientIPMessagesCounter = new client.Counter({
name: this.prefix + "client_ip_messages",
help: "Amount of messages by client IP",
labelNames: ["client_ip"]
})
constructor() {
client.collectDefaultMetrics({ prefix: this.prefix })
}
}

View File

@ -76,10 +76,7 @@ let app = {
export default function run(srv: common.Server) { export default function run(srv: common.Server) {
server = srv; server = srv;
if (!server.config.enablePolling) {
console.log("HTTP Long Polling is disabled!");
return;
}
routers(srv, app); routers(srv, app);

View File

@ -12,6 +12,23 @@ export default function routes(server: common.Server, app: any) {
res.end(); res.end();
}); });
if (server.config.enablePrometheus) {
app.get("/metrics", async function (req: Request, res: Response) {
try {
res.setHeader('Content-Type', server.prometheus.client.register.contentType);
res.end(await server.prometheus.client.register.metrics());
} catch (e: any) {
console.error(e);
res.status(500).end(e.message);
}
})
}
if (!server.config.enablePolling) {
console.log("HTTP Long Polling is disabled!");
return;
}
app.get("/api/connect", function (req: Request, res: Response) { app.get("/api/connect", function (req: Request, res: Response) {
let query = req.query; let query = req.query;
let queue: any[] = []; let queue: any[] = [];
@ -21,6 +38,9 @@ export default function routes(server: common.Server, app: any) {
} }
let client: common.PollingClient = server.buildClient(send, query.token) as common.PollingClient; let client: common.PollingClient = server.buildClient(send, query.token) as common.PollingClient;
try {
client.ip = req.headers['x-forwarded-for']?.toString() || req.socket.remoteAddress;
} catch (_) { }
client.pollingQueue = queue; client.pollingQueue = queue;
client.pollingToken = "$" + common.randomString(63); client.pollingToken = "$" + common.randomString(63);

View File

@ -17,6 +17,7 @@ export default function run(server: common.Server) {
} }
let client = server.buildClient(send); let client = server.buildClient(send);
client.ip = socket.remoteAddress;
client.socket = socket; client.socket = socket;
let pingInterval = setInterval(function () { let pingInterval = setInterval(function () {
@ -48,6 +49,7 @@ export default function run(server: common.Server) {
socket.on("close", () => { socket.on("close", () => {
server.log("[TCP Close]", client.sessionId); server.log("[TCP Close]", client.sessionId);
server.destroyClient(client.sessionId)
clearInterval(pingInterval); clearInterval(pingInterval);
}) })

View File

@ -1,5 +1,7 @@
import * as common from "../common"; import * as common from "../common";
import WebSocket from "ws"; import WebSocket from "ws";
import { IncomingMessage } from "http";
import { Socket } from "net";
export default function run(server: common.Server): void { export default function run(server: common.Server): void {
if (!server.config.enableWebsocket) { if (!server.config.enableWebsocket) {
@ -16,13 +18,13 @@ export default function run(server: common.Server): void {
return; return;
} }
server.httpServer.on("upgrade", (req: any, socket: any, head: any) => { server.httpServer.on("upgrade", (req: IncomingMessage, socket: Socket, head: Buffer) => {
wss.handleUpgrade(req, socket, head, ws => { wss.handleUpgrade(req, socket, head, ws => {
wss.emit("connection", ws); wss.emit("connection", ws, req);
}) })
}); });
wss.on("connection", (ws) => { wss.on("connection", (ws, req) => {
function send (data: any) { function send (data: any) {
if (ws.readyState === WebSocket.OPEN) { if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data)); ws.send(JSON.stringify(data));
@ -31,6 +33,7 @@ export default function run(server: common.Server): void {
let client = server.buildClient(send); let client = server.buildClient(send);
client.socket = ws; client.socket = ws;
client.ip = req.headers['x-forwarded-for']?.toString() || req.socket.remoteAddress;
let pingInterval = setInterval(function () { let pingInterval = setInterval(function () {
send({ send({
@ -60,6 +63,7 @@ export default function run(server: common.Server): void {
ws.on("close", (code, reason) => { ws.on("close", (code, reason) => {
server.log("[WS Close]", client.sessionId, code, reason); server.log("[WS Close]", client.sessionId, code, reason);
server.destroyClient(client.sessionId)
clearInterval(pingInterval); clearInterval(pingInterval);
}) })