Compare commits

...

2 Commits

Author SHA1 Message Date
Alessandro dff0d92474 Ignore null values 2021-11-24 23:41:55 +01:00
Alessandro f276b79293 Added prometheus metrics 2021-11-24 21:58:42 +01:00
10 changed files with 257 additions and 48 deletions

View File

@ -1,10 +1,11 @@
{
"motd": "Welcome to Soqet v2.0",
"motd": "Welcome to Soqet v2.1",
"httpPort": 3004,
"tcpPort": 25555,
"enableWebsocket": true,
"enableTCPSocket": true,
"enablePolling": true,
"wildcardChannel": "*",
"pollingInterval": 3600
"pollingInterval": 3600,
"enablePrometheus": true
}

157
package-lock.json generated
View File

@ -1,26 +1,31 @@
{
"name": "soqet",
"version": "2.0.0",
"version": "2.1.0",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "soqet",
"version": "2.0.0",
"version": "2.1.0",
"license": "MIT",
"dependencies": {
"ws": "^7.5.5"
"prom-client": "^14.0.1",
"ws": "^7.5.6"
},
"devDependencies": {
"@types/node": "^14.17.19",
"@types/node": "^14.17.34",
"@types/ws": "^7.4.7",
"typescript": "^4.4.3"
"typescript": "^4.5.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
"utf-8-validate": "^5.0.7"
}
},
"node_modules/@types/node": {
"version": "14.17.19",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.19.tgz",
"integrity": "sha512-jjYI6NkyfXykucU6ELEoT64QyKOdvaA6enOqKtP4xUsGY0X0ZUZz29fUmrTRo+7v7c6TgDu82q3GHHaCEkqZwA==",
"version": "14.17.34",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.34.tgz",
"integrity": "sha512-USUftMYpmuMzeWobskoPfzDi+vkpe0dvcOBRNOscFrGxVp4jomnRxWuVohgqBow2xyIPC0S3gjxV/5079jhmDg==",
"dev": true
},
"node_modules/@types/ws": {
@ -32,10 +37,58 @@
"@types/node": "*"
}
},
"node_modules/bintrees": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz",
"integrity": "sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ="
},
"node_modules/bufferutil": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.5.tgz",
"integrity": "sha512-HTm14iMQKK2FjFLRTM5lAVcyaUzOnqbPtesFIvREgXpJHdQm8bWS+GkQgIkfaBYRHuCnea7w8UVNfwiAQhlr9A==",
"hasInstallScript": true,
"optional": true,
"dependencies": {
"node-gyp-build": "^4.3.0"
},
"engines": {
"node": ">=6.14.2"
}
},
"node_modules/node-gyp-build": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz",
"integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==",
"optional": true,
"bin": {
"node-gyp-build": "bin.js",
"node-gyp-build-optional": "optional.js",
"node-gyp-build-test": "build-test.js"
}
},
"node_modules/prom-client": {
"version": "14.0.1",
"resolved": "https://registry.npmjs.org/prom-client/-/prom-client-14.0.1.tgz",
"integrity": "sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==",
"dependencies": {
"tdigest": "^0.1.1"
},
"engines": {
"node": ">=10"
}
},
"node_modules/tdigest": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.1.tgz",
"integrity": "sha1-Ljyyw56kSeVdHmzZEReszKRYgCE=",
"dependencies": {
"bintrees": "1.0.1"
}
},
"node_modules/typescript": {
"version": "4.4.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz",
"integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==",
"version": "4.5.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.5.2.tgz",
"integrity": "sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==",
"dev": true,
"bin": {
"tsc": "bin/tsc",
@ -45,10 +98,23 @@
"node": ">=4.2.0"
}
},
"node_modules/utf-8-validate": {
"version": "5.0.7",
"resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.7.tgz",
"integrity": "sha512-vLt1O5Pp+flcArHGIyKEQq883nBt8nN8tVBcoL0qUXj2XT1n7p70yGIq2VK98I5FdZ1YHc0wk/koOnHjnXWk1Q==",
"hasInstallScript": true,
"optional": true,
"dependencies": {
"node-gyp-build": "^4.3.0"
},
"engines": {
"node": ">=6.14.2"
}
},
"node_modules/ws": {
"version": "7.5.5",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz",
"integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==",
"version": "7.5.6",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.6.tgz",
"integrity": "sha512-6GLgCqo2cy2A2rjCNFlxQS6ZljG/coZfZXclldI8FB/1G3CCI36Zd8xy2HrFVACi8tfk5XrgLQEk+P0Tnz9UcA==",
"engines": {
"node": ">=8.3.0"
},
@ -68,9 +134,9 @@
},
"dependencies": {
"@types/node": {
"version": "14.17.19",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.19.tgz",
"integrity": "sha512-jjYI6NkyfXykucU6ELEoT64QyKOdvaA6enOqKtP4xUsGY0X0ZUZz29fUmrTRo+7v7c6TgDu82q3GHHaCEkqZwA==",
"version": "14.17.34",
"resolved": "https://registry.npmjs.org/@types/node/-/node-14.17.34.tgz",
"integrity": "sha512-USUftMYpmuMzeWobskoPfzDi+vkpe0dvcOBRNOscFrGxVp4jomnRxWuVohgqBow2xyIPC0S3gjxV/5079jhmDg==",
"dev": true
},
"@types/ws": {
@ -82,16 +148,61 @@
"@types/node": "*"
}
},
"bintrees": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.1.tgz",
"integrity": "sha1-DmVcm5wkNeqraL9AJyJtK1WjRSQ="
},
"bufferutil": {
"version": "4.0.5",
"resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.5.tgz",
"integrity": "sha512-HTm14iMQKK2FjFLRTM5lAVcyaUzOnqbPtesFIvREgXpJHdQm8bWS+GkQgIkfaBYRHuCnea7w8UVNfwiAQhlr9A==",
"optional": true,
"requires": {
"node-gyp-build": "^4.3.0"
}
},
"node-gyp-build": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz",
"integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==",
"optional": true
},
"prom-client": {
"version": "14.0.1",
"resolved": "https://registry.npmjs.org/prom-client/-/prom-client-14.0.1.tgz",
"integrity": "sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==",
"requires": {
"tdigest": "^0.1.1"
}
},
"tdigest": {
"version": "0.1.1",
"resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.1.tgz",
"integrity": "sha1-Ljyyw56kSeVdHmzZEReszKRYgCE=",
"requires": {
"bintrees": "1.0.1"
}
},
"typescript": {
"version": "4.4.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz",
"integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==",
"version": "4.5.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.5.2.tgz",
"integrity": "sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw==",
"dev": true
},
"utf-8-validate": {
"version": "5.0.7",
"resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.7.tgz",
"integrity": "sha512-vLt1O5Pp+flcArHGIyKEQq883nBt8nN8tVBcoL0qUXj2XT1n7p70yGIq2VK98I5FdZ1YHc0wk/koOnHjnXWk1Q==",
"optional": true,
"requires": {
"node-gyp-build": "^4.3.0"
}
},
"ws": {
"version": "7.5.5",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz",
"integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==",
"version": "7.5.6",
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.6.tgz",
"integrity": "sha512-6GLgCqo2cy2A2rjCNFlxQS6ZljG/coZfZXclldI8FB/1G3CCI36Zd8xy2HrFVACi8tfk5XrgLQEk+P0Tnz9UcA==",
"requires": {}
}
}

View File

@ -1,6 +1,6 @@
{
"name": "soqet",
"version": "2.0.0",
"version": "2.1.0",
"description": "",
"main": "index.js",
"scripts": {
@ -11,11 +11,16 @@
"author": "AlexDevs",
"license": "MIT",
"dependencies": {
"ws": "^7.5.5"
"prom-client": "^14.0.1",
"ws": "^7.5.6"
},
"devDependencies": {
"@types/node": "^14.17.19",
"@types/node": "^14.17.34",
"@types/ws": "^7.4.7",
"typescript": "^4.4.3"
"typescript": "^4.5.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.5",
"utf-8-validate": "^5.0.7"
}
}

View File

@ -1,6 +1,7 @@
import http from "http";
import crypto from "crypto";
import Timeout = NodeJS.Timeout;
import Prometheus from "./prometheus";
export interface Client {
uuid: string,
@ -9,7 +10,7 @@ export interface Client {
channelsAmount: number,
send: (data: any) => void,
guest: boolean,
ip?: string,
socket?: any,
}
@ -31,6 +32,8 @@ export interface Server {
httpServer: http.Server,
prometheus: Prometheus,
config: {
motd: string,
httpPort: number,
@ -40,6 +43,7 @@ export interface Server {
enablePolling: boolean,
wildcardChannel: string,
pollingInterval: number,
enablePrometheus: boolean,
},
log: (...data: any) => void,

View File

@ -9,6 +9,8 @@ import websocket from "./protocols/websocket";
import tcpsocket from "./protocols/tcpsocket";
import pollingsocket from "./protocols/pollingsocket";
import Prometheus from "./prometheus";
const config = require("../config.json");
const pack = require("../package.json");
@ -30,7 +32,10 @@ function openChannel(sessionId: string, channel: string | number) {
}
// Create channel if it does not exist
if (!server.channels[channel]) server.channels[channel] = [];
if (!server.channels[channel]) {
server.channels[channel] = [];
server.prometheus.openChannelsGauge.inc();
}
// Add client to channel
let channelArray = server.channels[channel];
@ -88,6 +93,7 @@ function closeChannel(sessionId: string, channel: string | number) {
if (channelArray.length === 0) {
delete server.channels[channel];
server.prometheus.openChannelsGauge.dec();
}
let client: Client = server.clients[sessionId];
@ -123,6 +129,10 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
meta.channel = channel;
meta.guest = client.guest;
server.prometheus.messagesTrafficCounter.labels('incoming').inc();
server.prometheus.clientIdMessagesCounter.labels(client.uuid).inc();
server.prometheus.clientIPMessagesCounter.labels(client.ip || "localhost").inc();
// Send message to the channel
if (server.channels[channel]) {
@ -130,12 +140,15 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
channelArray.forEach(recipientId => {
if (recipientId === sessionId) return;
server.clients[recipientId].send({
server.clients[recipientId]?.send({
type: "message",
channel: channel,
message: message,
meta: meta,
})
server.prometheus.messagesTrafficCounter.labels('outgoing').inc();
server.prometheus.channelMessagesCounter.labels(channel.toString()).inc();
})
}
@ -144,12 +157,13 @@ function transmitMessage(sessionId: string, channel: string | number, message: a
let wildcardChannelArray = server.channels[config.wildcardChannel];
if (wildcardChannelArray) {
wildcardChannelArray.forEach(recipientId => {
server.clients[recipientId].send({
server.clients[recipientId]?.send({
type: "message",
channel: config.wildcardChannel,
message: message,
meta: meta,
})
server.prometheus.messagesTrafficCounter.labels('outgoing').inc();
})
}
@ -182,10 +196,14 @@ function authenticateClient(sessionId: string, token: string) {
}
function destroyClient(sessionId: string) {
let channels = server.clients[sessionId].channels;
let client = server.clients[sessionId]
let channels = client.channels;
for (let channelName in channels) {
closeChannel(sessionId, channelName);
}
server.prometheus.clientCountGauge.dec()
server.prometheus.clientIdMessagesCounter.remove(client.uuid)
server.prometheus.clientIPMessagesCounter.remove(client.ip || "localhost")
delete server.clients[sessionId];
}
@ -201,6 +219,8 @@ function buildClient(send: (data: any) => void, token?: string): Client {
server.clients[client.sessionId] = client;
server.prometheus.clientCountGauge.inc()
if (token) {
authenticateClient(client.sessionId, token);
}
@ -288,6 +308,7 @@ const server: Server = {
channels: {},
config: config,
httpServer: http.createServer(),
prometheus: new Prometheus(),
log: function (...data) {
console.log(...data);
},

44
src/prometheus.ts Normal file
View File

@ -0,0 +1,44 @@
import client from "prom-client";
export default class Prometheus {
client = client;
prefix = "soqet_"
openChannelsGauge = new client.Gauge({
name: this.prefix + "open_channels",
help: "Amount of open channels"
});
clientCountGauge = new client.Gauge({
name: this.prefix + "client_count",
help: "Amount of connected clients"
})
messagesTrafficCounter = new client.Counter({
name: this.prefix + "messages_traffic",
help: "Counter of incoming and outcoming messages",
labelNames: ["side"]
})
channelMessagesCounter = new client.Counter({
name: this.prefix + "channel_messages",
help: "Amount of messages by channel",
labelNames: ["channel_name"]
});
clientIdMessagesCounter = new client.Counter({
name: this.prefix + "client_id_messages",
help: "Amount of messages by client id",
labelNames: ["client_id"]
})
clientIPMessagesCounter = new client.Counter({
name: this.prefix + "client_ip_messages",
help: "Amount of messages by client IP",
labelNames: ["client_ip"]
})
constructor() {
client.collectDefaultMetrics({ prefix: this.prefix })
}
}

View File

@ -76,10 +76,7 @@ let app = {
export default function run(srv: common.Server) {
server = srv;
if (!server.config.enablePolling) {
console.log("HTTP Long Polling is disabled!");
return;
}
routers(srv, app);

View File

@ -12,6 +12,23 @@ export default function routes(server: common.Server, app: any) {
res.end();
});
if (server.config.enablePrometheus) {
app.get("/metrics", async function (req: Request, res: Response) {
try {
res.setHeader('Content-Type', server.prometheus.client.register.contentType);
res.end(await server.prometheus.client.register.metrics());
} catch (e: any) {
console.error(e);
res.status(500).end(e.message);
}
})
}
if (!server.config.enablePolling) {
console.log("HTTP Long Polling is disabled!");
return;
}
app.get("/api/connect", function (req: Request, res: Response) {
let query = req.query;
let queue: any[] = [];
@ -21,6 +38,9 @@ export default function routes(server: common.Server, app: any) {
}
let client: common.PollingClient = server.buildClient(send, query.token) as common.PollingClient;
try {
client.ip = req.headers['x-forwarded-for']?.toString() || req.socket.remoteAddress;
} catch (_) { }
client.pollingQueue = queue;
client.pollingToken = "$" + common.randomString(63);

View File

@ -17,6 +17,7 @@ export default function run(server: common.Server) {
}
let client = server.buildClient(send);
client.ip = socket.remoteAddress;
client.socket = socket;
let pingInterval = setInterval(function () {
@ -48,6 +49,7 @@ export default function run(server: common.Server) {
socket.on("close", () => {
server.log("[TCP Close]", client.sessionId);
server.destroyClient(client.sessionId)
clearInterval(pingInterval);
})

View File

@ -1,5 +1,7 @@
import * as common from "../common";
import WebSocket from "ws";
import { IncomingMessage } from "http";
import { Socket } from "net";
export default function run(server: common.Server): void {
if (!server.config.enableWebsocket) {
@ -16,13 +18,13 @@ export default function run(server: common.Server): void {
return;
}
server.httpServer.on("upgrade", (req: any, socket: any, head: any) => {
server.httpServer.on("upgrade", (req: IncomingMessage, socket: Socket, head: Buffer) => {
wss.handleUpgrade(req, socket, head, ws => {
wss.emit("connection", ws);
wss.emit("connection", ws, req);
})
});
wss.on("connection", (ws) => {
wss.on("connection", (ws, req) => {
function send (data: any) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
@ -31,6 +33,7 @@ export default function run(server: common.Server): void {
let client = server.buildClient(send);
client.socket = ws;
client.ip = req.headers['x-forwarded-for']?.toString() || req.socket.remoteAddress;
let pingInterval = setInterval(function () {
send({
@ -60,6 +63,7 @@ export default function run(server: common.Server): void {
ws.on("close", (code, reason) => {
server.log("[WS Close]", client.sessionId, code, reason);
server.destroyClient(client.sessionId)
clearInterval(pingInterval);
})