/* Copyright (c) 2020, 2023, The Unified Company. This code is part of Unify. This program is free software; you can redistribute it and/or modify it under the terms of the ESA Software Community License - Strong Copyleft, https://unifyjs.org */ import WebSocket, {WebSocketServer} from 'ws'; import database from '../server/database.js'; import querySQL from '../unify/querySQL.js'; import readline from 'readline'; import socketMessage from '../unify/socketMessage.js'; import fs from 'fs'; var clusterClientID = 0; //database.createTable( "sync_client" ); database.addColumn( "original_id", "INTEGER", "sync_client" ); database.addColumn( "server_id", "INTEGER", "sync_client" ); database.addColumn( "queryString", "TEXT", "sync_client" ); database.addColumn( "columnValues", "TEXT", "sync_client" ); database.addColumn( "operation", "TEXT", "sync_client" ); database.addColumn( "clusterClientID", "INTEGER", "sync_client" ); database.addColumn( "synced", "BOOLEAN", "sync_client" ); export default class syncClient{ clusterClientID = 5; syncServerAddress = "ws://localhost:4422"; mode = "sync"; constructor( mode ) { if( mode ) { this.mode = mode; } this.connect(); } emptyTable() { database.database.prepare( "DELETE from sync_client" ).run() } hearthBeat() { //if( this.socket.readyState != 0 ) //console.log("perform hearthBeat"); var message = new socketMessage(); message.type = "add_sqlite_operation"; message.unsynced = this.getUnsynced(); message.clusterClientID = this.clusterClientID; message.lastID = this.getLastID(); //console.log("getUnsynced", this.getUnsynced()); var unsynced = this.getUnsynced() if(unsynced.length != 0) { console.log( unsynced ); console.log( "unsynced", unsynced.length ); } if( this.socket ) { this.socket.send( JSON.stringify( message ) ); } var that = this; setTimeout(function() { that.hearthBeat(); }, 1000); } async connect(){ this.socket = new WebSocket( this.syncServerAddress, { perMessageDeflate: false }); var syncClient = this; this.socket.on('error', function open( error ) { syncClient.onError(); }); this.socket.on('close', function open( error ) { syncClient.onClose(); }); this.socket.on('open', function open( socket ) { syncClient.onOpen(); }); this.socket.on('message', function message( data ) { syncClient.onMessage( data ); }); } getLastID() { var query = new querySQL( ); query.type = "select"; query.table = "sync_client"; query.order = ["server_id Desc LIMIT 1"]; query.find( "clusterClientID", this.clusterClientID , "!=" ); query.sync = false; var result = database.query( query ); if( result.length == 0 ) { return 0; } return result[0].server_id; } getUnsynced() { var query = new querySQL( ); query.type = "select"; query.table = "sync_client"; query.order = ["id"];// Desc LIMIT 1 query.find( "clusterClientID", this.clusterClientID , "=" ); query.find( "synced", false ); query.sync = false; var result = database.query( query ); return result; } sendMessage( queryString, inputQuery, original_id ) { // !!! console.log("syncClient: send message is performed"); if( !original_id ) { return true; } var query = new querySQL( ); query.type = "insert"; query.table = "sync_client"; query.setValue( "original_id", original_id ); query.setValue( "queryString", queryString ); query.setValue( "columnValues", JSON.stringify( inputQuery.getValues() ) ); query.setValue( "clusterClientID", this.clusterClientID ); query.setValue( "operation", inputQuery.type ); query.setValue( "synced", false ); query.sync = false; var result = database.query( query ); } onError() { } onClose() { //console.log('Socket is closed. Reconnect will be attempted in 1 second.', this.mode); switch( this.mode ) { case "download_database": return true; break; default: var that = this; setTimeout(function() { that.connect(); }, 3000); } } onOpen() { switch( this.mode ) { case "sync": this.hearthBeat(); break; case "download_database": var message = new socketMessage(); message.type = "download_database"; console.log("Downloading most recent database from Sync server, Please wait..."); this.socket.send( JSON.stringify( message ) ); break; } } onMessage( data ) { var parseData = JSON.parse( data ); switch( parseData.name ) { case "connected": //clusterClientID = parseData.clientID; break; case "upload_database": //console.log("binary", parseData.binary); console.log("Recieved most recent binary of database, Now writing to disk..."); fs.writeFileSync( "./framework/db/uploaded_db.sqlite", Buffer.from( parseData.binary.data ) ); this.socket.close() break; case "updateSyncClient": // update synced rows var synced = parseData.synced; for (var i = 0; i < synced.length; i++) { var row = synced[i]; var query = new querySQL( ); query.type = "update"; query.table = "sync_client"; query.setValue( "synced", true ); //query.setValue( "server_id", row.server_id ); query.find( "id", row.id, "=" ); query.sync = false; database.query( query ); } // console.log("set synced of previouse added rows to true: ", synced.length); // add new rows var rows = parseData.rows; //console.log("adding missing rows: ", rows.length); for (var i = 0; i < rows.length; i++) { var row = rows[i]; //console.log(row); // -- // perform real query here var columnArray = JSON.parse( row.columnValues ); var query = new querySQL( ); query.type = row.operation; for (var a = 0; a < columnArray.length; a++) { var column = new Array(); column.name = false; column.value = columnArray[a]; query.values.push( column ); } query.sync = false; database.executeQuery( row.queryString, query ); // --- var query = new querySQL( ); query.type = "insert"; query.table = "sync_client"; query.setValue( "original_id", row.original_id ); query.setValue( "queryString", row.queryString ); query.setValue( "columnValues", row.columnValues ); query.setValue( "operation", row.operation ); query.setValue( "clusterClientID", row.clusterClientID ); query.setValue( "server_id", row.id ); query.setValue( "synced", true ); query.sync = false; var result = database.query( query ); } break; } } } /* var syncClientManager = new syncClient(); readline.emitKeypressEvents( process.stdin ); process.stdin.setRawMode(true); process.stdin.on('keypress', (str, key) => { //console.log(str) //console.log(key) console.log(key); if(key.name == "c") { process.exit(); } var exampleQuery = new querySQL( ); exampleQuery.type = "insert"; exampleQuery.table = "comment"; exampleQuery.setValue( "author_id", 1123 ); var insert = database.query( exampleQuery ); var queryString = database.query( exampleQuery, false ); syncClientManager.sendMessage( queryString, exampleQuery, insert.lastInsertRowid ); }) */