Files
Unify/framework/server/socketManagerServer.js
2025-12-25 11:16:59 +01:00

1119 lines
20 KiB
JavaScript

/*
Copyright (c) 2020, 2023, The Unified Company.
This code is part of Unify.
This program is free software; you can redistribute it and/or modify
it under the terms of the ESA Software Community License - Strong Copyleft LICENSE,
as published by the ESA.
See the ESA Software Community License - Strong Copyleft LICENSE, for more details.
https://unifyjs.org
*/
import socketMessage from '../unify/socketMessage.js';
import uws from '../node_modules/uws/uws.js';
import http from 'http';
import https from 'https';
import socketClient from './socketClient.js';
import objectManager from './objectManager.js';
import Console from '../server/console.js';
import webConsole from '../unify/console.js';
import tools from '../unify/tools.js';
import deepclone from '../unify/clonedeep.js';
import applications from '../configs/applications.js';
import unify from '../unify/unify.js';
import defaultObject from '../unify/defaultObject.js';
import path from 'path';
import groupsFlat from './themeGroups.js';
import moduleLoader from '../server/moduleLoader.js';
import imports from './imports.js';
import { RateLimiterMemory, BurstyRateLimiter } from '../node_modules/node-rate-limiter-flexible/index.js';
global.isListening = false;
global.cachePromise = new Array();
global.cacheResolve = new Array();
for (var i = 0; i < 1000000; i++) {
global.cachePromise[i] = new Promise(function( resolve ) {
global.cacheResolve[i] = resolve;
});
}
export default class socketManager{
userID = 0;
port = 4437;
host = "localhost";
cacheTreshold = 100;
cacheSize = 50;
cachePointer = 0;
cacheDelay = 30;
cacheBusy = false;
blockedRemoteAddresses = new Array();
clients = new Array();
sessionClients = new Array();
objectManager = new objectManager();
moduleLoader = new moduleLoader();
clientID = 0;
socket;
core;
hotSwapping;
applicationID;
applicationURLArray;
httpsServer;
application;
constructor( port ){
if( port ) {
this.port = port;
}
global.socketManager = this;
this.initializeLimiter();
}
async checkRateLimiter( remoteAddress ) {
console.log("remoteAddress", remoteAddress);
var allow = await this.rateLimiter.consume( remoteAddress, 2 )
.then(() => {
return true;
})
.catch(() => {
return false;
});
if( !allow ) {
if( global.mode == "development" ) {
console.log("To many connections from same ip. Connection rejected.");
}
}
return allow;
}
initializeLimiter() {
this.burstyLimiter = new BurstyRateLimiter(
new RateLimiterMemory({
points: 50,
duration: 1,
}),
new RateLimiterMemory({
points: 150,
duration: 3,
})
);
this.rateLimiter = new RateLimiterMemory({
points: 100,
duration: 1,
});
}
setApplicationID( applicationID ) {
this.applicationID = applicationID;
}
setApplication( application ) {
this.application = application;
}
create() {
this.cacheDelay = ( 10 - global.cacheBuildSpeed ) * 20;
if( global.mode == "development" ) {
this.cacheTreshold = 1;
console.log( "Max number of clients ", global.maxClients, ",But set to", this.cacheTreshold, "Because the server is running in developer mode." );
//this.cacheTreshold = global.maxClients;
} else {
console.log( "Max number of clients ", global.maxClients );
this.cacheTreshold = global.maxClients;
}
console.log( "Cache build speed ", global.cacheBuildSpeed, "(" + this.cacheDelay + "ms delay per client)" );
if( !global.isListening ) {
this.fileUpdater();
global.isListening = true;
}
var socketManager = this;
//var privateKey = fs.readFileSync('./sslCertificates/privkey.pem', 'utf8');
//var certificate = fs.readFileSync('./sslCertificates/fullchain.pem', 'utf8');
var socket = {
compression: 0,
maxPayloadLength: 16 * 1024 * 1024,
idleTimeout: 60,
open: async function( ws, req ) {
/*
ws [
'__defineGetter__', '__defineSetter__',
'__lookupGetter__', '__lookupSetter__',
'close', 'constructor',
'cork', 'end',
'getBufferedAmount', 'getRemoteAddress',
'getRemoteAddressAsText', 'getTopics',
'getUserData', 'hasOwnProperty',
'isPrototypeOf', 'isSubscribed',
'ping', 'propertyIsEnumerable',
'publish', 'send',
'sendFirstFragment', 'sendFragment',
'sendLastFragment', 'subscribe',
'toLocaleString', 'toString',
'unsubscribe', 'valueOf'
]
*/
var buffer = ws.getRemoteAddressAsText();
var remoteAddress = new TextDecoder().decode( new Int8Array( buffer ) );
var allow = await socketManager.checkRateLimiter( remoteAddress );
if( !allow ) {
console.log("Connection Rejected.");
ws.close();
//socketManager.blockedRemoteAddresses.push( remoteAddress );
return true;
} else {
console.log("Connection Accepted.");
}
ws.socket = socket;
socketManager.connection( ws )
},
message: async function( ws, buf ) {
//var message = String.fromCharCode.apply( null, new Int8Array( buf ) );
/*
var buffer = ws.getRemoteAddressAsText();
var remoteAddress = new TextDecoder().decode( new Int8Array( buffer ) );
if( socketManager.blockedRemoteAddresses.includes( remoteAddress ) ) {
ws.close();
return true;
}
*/
var message = new TextDecoder().decode( new Int8Array( buf ) );
if( !ws.client ) {
//console.log("userID", message, socketManager.userID);
var client = await socketManager.connectionCallback( ws, message );
ws.client = await client;
}
if( !ws.client ) {
return false;
}
ws.client.message( message, client )
},
close: ( ws, code, message ) => {
//ws.client.destroy()
var client = ws.client;
if( client ) {
client.destroy();
}
/*
console.log("socket.closed on cluster:", global.clusterID );
var socketManager = client.socketManager;
var userID = client.userID;
console.log("socket closed", userID, Object.keys( socketManager.clients) );
var exists = socketManager.clients[ userID ];
if( exists ) {
console.log("client still exists, starting timer.");
client.destroyTimerCallback = function() {
client.destroy();
var exists = socketManager.clients[ userID ];
console.log("Timer exceeded, Client is removed.", typeof exists);
}
client.destroyTimer = setTimeout( client.destroyTimerCallback, client.destroyTime );
} else {
//console.log( "client is destroyed" );
}
*/
}
}
if( !global.ssl ) {
var socketServer = uws.App();
this.socket = socketServer.ws( '/*', socket ).listen( this.port,function( token ) {
if( token ) {
console.log(`Listening to port ${socketManager.port}`);
} else {
console.log(`Failed to listen to port ${port}`);
}
});
console.log("socket server started on port: " , this.port );
} else {
var socketServer = uws.SSLApp({
key_file_name: './sslCertificates/privkey.pem',
cert_file_name: './sslCertificates/fullchain.pem'
});
this.socket = socketServer.ws( '/*', socket ).listen( this.port,function( token ) {
if( token ) {
console.log(`Listening to port ${socketManager.port}`);
} else {
console.log(`Failed to listen to port ${port}`);
}
});
console.log("started Secure socket server: ", this.port);
}
global.applicationCache = new Array();
this.createCache( this.cacheSize );
this.createEventListeners();
}
async createCache( how_many ) {
if( !this.cacheBusy ) {
this.cacheBusy = true;
var cachedItems = this.cachePointer;
var client = new Object();
client.classObjects = new Array();
client.objects = new Array();
var defaultInstance = new defaultObject();
var startTime = Date.now();
var allObjects = new Array();
var methods = tools.getAllFuncs( defaultInstance );
for ( var i = this.cachePointer; i < this.cachePointer + how_many; i++ ) {
var cacheObject = new Object();
cacheObject.client = new Object();
cacheObject.client.classObjects = new Array();
cacheObject.client.objects = new Array();
//cacheObject.application = structuredClone(beginObject);
cacheObject.application = new imports[ this.applicationID ]();
global.core.collectObjects( cacheObject.application, cacheObject.client );
var objects = cacheObject.client.objects;
for (var k = 0; k < objects.length; k++) {
var object = objects[k]
for (var j = 0; j < methods.length; j++) {
var method = methods[j];
const ommit = new Array( '', 'setJoinID', 'reset', 'isOrExtendsClass', 'hasClass', 'getTableName', 'delete', 'appendToSelector', 'agregateDefaultObject', '__defineGetter__', '__defineSetter__', '__lookupGetter__', '__lookupSetter__', 'valueOf', 'constructor', 'toLocaleString', 'toString' );
if( ommit.includes( method ) ) {
continue;
}
if(!object[method]) {
// this takes most time
object[method] = defaultInstance[method];
}
}
global.core.prepareObject( object, cacheObject.client );
global.core.tableManager.parse( object );
}
await tools.delay( this.cacheDelay );
//console.log("create cache", i);
global.applicationCache[i] = cacheObject;
global.cacheResolve[i]();
}
console.log("\n\n\n\n");
console.log("cached ", how_many, "client Environments.")
console.log("Total cached ", Object.keys(global.applicationCache).length, "client Environments.")
console.log("On Cluster ", global.clusterID);
console.log("in ", (Date.now() - startTime ) / 1000, "Seconds" );
console.log("from ", this.cachePointer, "to", this.cachePointer + how_many);
console.log("\n\n");
this.cachePointer = this.cachePointer + how_many;
this.cacheBusy = false;
if( this.cachePointer < this.cacheTreshold ) {
this.createCache( this.cacheSize );
}
}
}
/*
createCacheFastest( how_many ) {
var cachedItems = global.applicationCache.length;
var client = new Object();
client.classObjects = new Array();
client.objects = new Array();
var beginObject = new imports[ this.applicationID ]();
var removedMethods = new Array();
var collectionStorage = new Array();
var tableStorage = new Array();
global.core.getTables( beginObject, tableStorage );
global.core.prepareClone( beginObject, removedMethods );
global.core.removeCollectionObject( beginObject, false, collectionStorage );
//console.log(beginObject);
var defaultInstance = new defaultObject();
var startTime = Date.now();
var allObjects = new Array();
for ( var i = cachedItems; i < cachedItems + how_many; i++ ) {
var cacheObject = new Object();
cacheObject.client = new Object();
cacheObject.client.classObjects = new Array();
cacheObject.client.objects = new Array();
cacheObject.application = structuredClone(beginObject);
//cacheObject.application = new imports[ this.applicationID ]();
global.core.fastParse( cacheObject.application, cacheObject.client, true );
allObjects = [...allObjects, ...cacheObject.client.objects]
console.log("create cache", i);
global.applicationCache[i] = cacheObject;
}
//console.log("time taken", (Date.now() - startTime ) / 1000 );
//var startTime = Date.now();
//console.log("all objects ", allObjects.length);
var methods = tools.getAllFuncs( defaultInstance );
for ( var i = 0; i < allObjects.length; i++ ) {
var object = allObjects[i];
//var nodeMethodsString = object.__nodeMethods;
var className = tools.getClassName( object );
if( object.type == "table" ) {
object.table = tableStorage[className]
}
var currentRemoved = removedMethods[className];
//if(currentRemoved)
//console.log("reconstructing", className, Object.keys( currentRemoved ).length);
if( currentRemoved ) {
var keys = Object.keys( currentRemoved );
for (var j = 0; j < keys.length; j++) {
var removedMethodName = keys[j]
var removedMethod = removedMethods[className][removedMethodName]
object[ removedMethodName ] = removedMethod;
//console.log(className, removedMethodName, removedMethod);
}
}
switch( object.type ) {
case "renderCollection":
object.object = collectionStorage[ className ]
if( object.collections[0] ) {
object.collections[0].object = collectionStorage[ className ];
}
break;
case "collection":
var className = tools.getClassName( object );
if( object.parent ) {
var parentClassName = tools.getClassName( object.parent );
if(object.parentName) {
object.object = collectionStorage[ object.parentName ];
}
}
break;
}
for (var j = 0; j < methods.length; j++) {
var method = methods[j];
if(!object[method]) {
object[method] = defaultInstance[method];
}
//console.log(method, defaultInstance[method]);
}
global.core.prepareObject( object, false );
global.core.tableManager.parse( object );
}
//console.log(global.applicationCache[0]);
console.log("time taken", (Date.now() - startTime ) / 1000 );
console.log("cached " + how_many + " client Environments.");
console.log("from ", cachedItems, "to", cachedItems + how_many);
}
*/
createEventListeners() {
var socketManager = this;
//this.socket.on('connection', function( socket ){ socketManager.connection( socket ) });
}
recycleClient( userID ) {
//delete global.applicationCache[ userID ];
var client = this.clients[ userID ];
if( client.sessionKey ) {
delete this.sessionClients[ client.sessionKey ];
}
var cacheObject = global.applicationCache[ userID ];
var objects = cacheObject.client.objects;
//console.log("global.applicationCache", userID, objects.length );
for (var i = 0; i < objects.length; i++) {
var object = objects[i];
object.value = false;
object.id = 0;
if( object.type == "renderCollection" ) {
object.empty( cacheObject.client );
/*
var collection = this.getCollection();
if( collection ) {
for (var j = 0; j < rows.length; j++) {
var row = rows[j];
delete this[row.propertyName];
}
}
*/
}
object.user = false
}
delete global.applicationCache[ userID ];
global.applicationCache.push( cacheObject );
this.cachePointer++;
console.log("recycled cacheObject", global.applicationCache.length);
delete this.clients[ userID ];
}
removeClient( userID ) {
console.log( "RemoveClient with id", userID);
console.log( "Clients:", Object.keys( this.clients ) );
var clients = this.clients;
this.recycleClient( userID );
this.objectManager.removeObjectsByClientID( userID );
console.log( "Clients:", Object.keys( this.clients ) );
}
getClientBySessionKey( sessionKey ) {
var clients = this.clients;
console.log( "getClientBySessionKey", Object.keys(this.sessionClients), sessionKey);
return this.sessionClients[ sessionKey ];
}
async addClient( id, socket ) {
var newClient = new socketClient( this.port );
newClient.userID = id;
newClient.socket = socket;
newClient.socketManager = this;
newClient.core = this.core;
newClient.application = this.application;
await newClient.createSocket( socket );
this.clients[ id ] = newClient;
this.sessionClients[ newClient.sessionKey ] = newClient;
// numClients = this.clientID++;
if( id >= this.cachePointer - this.cacheTreshold ) {
this.createCache( this.cacheSize );
}
return newClient;
}
createInitiationMessage() {
var message = new socketMessage();
message.id = 0;
message.data = "connected";
message.type = "promise";
return message;
}
async registerClient( socket, message ) {
var userID = this.userID;
var client2 = await this.addClient( userID, socket );
console.log( "Client connected: ", userID );
var object = new Object();
//object.sessionKey = client.sessionKey;
//var returnMessage = client.createValidMessage( message, object );
//client.send( returnMessage );
//this.userID++;
return client2;
}
reEstablishConnection( client, socket, message ) {
client.socket = socket;
if( client.destroyTimerCallback ) {
clearTimeout( client.destroyTimer );
console.log( "destroyTimerCallback cleared." );
}
client.createEventListeners();
console.log( "Client found, Socket re-established.", client.id );
var object = new Object();
object.data = "Connection re-established";
object.sessionKey = client.sessionKey;
var returnMessage = client.createValidMessage( message, object );
client.send( returnMessage );
}
async reconnectClient( socket, message ) {
console.log( "reconnectClient", "Cluster number:", global.clusterID, message.object );
var client = this.getClientBySessionKey( message.object );
if( client ) {
this.reEstablishConnection( client, socket, message );
}
}
async connectionCallback( socket, json ){
var string = tools.serializeString( json );
var message = tools.serializeJSON( string );
switch( message.type ) {
case "registerClient":
var client = await this.registerClient( socket, message );
this.userID++;
return client;
break;
case "reconnectClient":
this.reconnectClient( socket, message );
break;
default:
}
}
connection( socket ) {
if( global.consoleLog ) {
console.log = global.consoleLog;
}
var that = this;
//socket.on( 'message', async function( json ) { that.connectionCallback( socket, json ); } );
var message = this.createInitiationMessage();
socket.send( JSON.stringify( message ) );
if( global.mode == "development" ) {
console.log( "Number connected clients: ", Object.keys( this.clients ).length );
}
}
log( parameter1, parameter2, parameter3 ) {
var parameters = webConsole.parseParameters( parameter1, parameter2, parameter3 );
this.logClients( parameters );
}
logClients( parameters ) {
for(var c = 0; c<this.clients.length;c++) {
var client = this.clients[c];
client.logClient( parameters );
}
}
instantiateHotSwapping( hotSwapping ) {
if( hotSwapping ) {
var that = this;
try{
var clients = that.clients;
this.hotSwapping = new hotSwapping.default( clients );
} catch( error ) {
console.log("An error was made,", error);
that.fileUpdater();
}
}
}
async fileUpdater() {
if( process.platform != "android" && global.mode == "development" ) {
var hotSwapping = await import( /* webpackIgnore: true */ "./hotSwapping.js" );
this.instantiateHotSwapping( hotSwapping );
}
}
}