From 3b71cc49e61b0fc5d805ae4b99e11f517e02f498 Mon Sep 17 00:00:00 2001 From: Marc Hernandez Date: Sat, 19 Jun 2021 18:25:45 -0700 Subject: [PATCH] Move Distrubuted stuff into my libs --- SharpLib.csproj | 2 + db/Act.cs | 96 +++++++++++++++ db/DB.cs | 223 +++++++++++++++++++++++++++++++++++ db/Processor.cs | 122 +++++++++++++++++++ db/System.cs | 308 ++++++++++++++++++++++++++++++++++++++++++++++++ logging/Log.cs | 122 +++++++++---------- res/Resource.cs | 12 +- 7 files changed, 812 insertions(+), 73 deletions(-) create mode 100644 db/Act.cs create mode 100644 db/DB.cs create mode 100644 db/Processor.cs create mode 100644 db/System.cs diff --git a/SharpLib.csproj b/SharpLib.csproj index 4a2daad..905a906 100644 --- a/SharpLib.csproj +++ b/SharpLib.csproj @@ -17,6 +17,8 @@ + + diff --git a/db/Act.cs b/db/Act.cs new file mode 100644 index 0000000..1403aad --- /dev/null +++ b/db/Act.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Text; + +namespace db +{ + public class Act + { + public Func Fn => m_act; + + + public string DebugInfo { get; private set; } = ""; + public string Path { get; private set; } = ""; + public int Line { get; private set; } = -1; + public string Member { get; private set; } = ""; + + private Act( Func act, string debugInfo = "{unknown_base}", string path = "", int line = -1, string member = "" ) + { + m_act = act; + + DebugInfo = debugInfo; + Path = path; + Line = line; + Member = member; + + //ExtractValue( act ); + } + + static public Act create( Func act, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" ) + { + //ExtractValue( act ); + + return new Act( act, debugInfo, path, line, member ); + } + + public static Act create( Func act, T p0, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" ) + { + //ExtractValue( act ); + + //return new Act( act ); + + return new Act( () => { return act( p0 ); }, debugInfo, path, line, member ); + } + + // If we're not doing any commit ops we can just use these. + static public Act create( Action act, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" ) + { + //ExtractValue( act ); + + return new Act( () => { act(); return CommitResults.Perfect; }, debugInfo, path, line, member ); + } + + public static Act create( Action act, T p0, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" ) + { + //ExtractValue( act ); + + //return new Act( act ); + + return new Act( () => { act( p0 ); return CommitResults.Perfect; }, debugInfo, path, line, member ); + } + + + + public static void ExtractValue( Delegate lambda ) + { + var lambdaType = lambda.GetType(); + + var methodType = lambda.Method.GetType(); + + //Nothing here. + //var locals = lambda.Method.GetMethodBody().LocalVariables; + + var targetType = lambda.Target?.GetType(); + + var fields = lambda.Method.DeclaringType?.GetFields + ( + BindingFlags.NonPublic | + BindingFlags.Instance | + BindingFlags.Public | + BindingFlags.Static + ); + //.SingleOrDefault(x => x.Name == variableName); + + //return (TValue)field.GetValue( lambda.Target ); + } + + + + + Func m_act; + + } +} diff --git a/db/DB.cs b/db/DB.cs new file mode 100644 index 0000000..08a38b2 --- /dev/null +++ b/db/DB.cs @@ -0,0 +1,223 @@ +using System; +using System.Collections.Immutable; +using Optional; +using static Optional.OptionExtensions; +using static System.Collections.Immutable.ImmutableInterlocked; + +/* + ???? Should we have an explicit transaction class/ID? + ???? Should we split things into threaded vs action +*/ + +namespace db +{ + + public enum CommitResults + { + Invalid, + Perfect, + Collisions, + } + + public interface IID + { + TS id { get; } + } + + public class DB where T : IID + { + //Current snapshot of the DB. + ImmutableDictionary m_objs = ImmutableDictionary.Empty; + + //List of committed Ids based on when they were committed. + ImmutableList m_committed = ImmutableList.Empty; + + ImmutableDictionary Objects => m_objs; + + // @@@@ TODO This returns an entity that can be changing. It should be a lazy instantiated copy + public Option lookup( TID id ) + { + if( m_objs.TryGetValue( id, out T obj ) ) + { + return obj.Some(); + } + else + { + // LOG + } + + return obj.None(); + } + + public (Tx, Option) checkout( TID id ) + { + var tx = new Tx( m_committed.Count, m_activeTransaction, this ); + + var v = lookup( id ); + + v.Match( t => { + tx.checkout( id ); + }, () => { + } ); + + return (tx, v); + } + + public Tx checkout( TID id, out Option tOut ) + { + var (tx, v) = checkout(id); + + tOut = v; + + return tx; + } + + public Tx checkout() + { + var tx = new Tx( m_committed.Count, m_activeTransaction, this ); + + return tx; + } + + public CommitResults commit( ref Tx co ) + { + co = null; + return commit_internal_single( co ); + } + + public ImmutableDictionary getSnapshot() + { + ImmutableDictionary res = m_objs; + return res; + } + + + internal CommitResults commit_internal_single( Tx tx ) + { + //var collision = false; + + //Check for previously committed things + var start = tx.Start; + + var curCommitted = m_committed; + + foreach( var t in tx.Checkouts ) + { + for( int i = start; i < curCommitted.Count; ++i ) + { + if( !t.id.Equals( curCommitted[i] ) ) { } + else + { + //collision = true; + return CommitResults.Collisions; + } + } + } + + // @@@@ LOCK + lock( m_committed ) + { + TID[] committed = new TID[tx.Checkouts.Count]; + + for( var i = 0; i < tx.Checkouts.Count; ++i ) + { + committed[i] = tx.Checkouts[i].id; + m_objs = m_objs.Add( tx.Checkouts[i].id, tx.Checkouts[i] ); + } + + m_committed = m_committed.AddRange(committed); + + foreach( var v in tx.Adds ) + { + m_objs = m_objs.Add( v.id, v ); + } + + return CommitResults.Perfect; + + } + + } + + + + Option> m_activeTransaction = Option.None>(); + + } + + public enum TxStates + { + Invalid, + Running, + Committed, + } + + + //This only works for a single thread + public class Tx: IDisposable where T : IID + { + internal ImmutableList Checkouts => m_checkouts; + internal TxStates State => m_state; + internal int Start => m_start; + internal ImmutableList Adds => m_adds; + + internal Tx( int start, DB db ) + : + this(start, Option.None>(), db) + { + } + + internal Tx( int start, Option> parentTx, DB db ) + { + m_start = start; + m_parentTx = parentTx; + m_childTx = m_childTx.Add(this); + m_db = db; + m_state = TxStates.Running; + } + + public void Dispose() + { + // Dispose of unmanaged resources. + Dispose( true ); + // Suppress finalization. + GC.SuppressFinalize( this ); + } + + public void Dispose(bool isFromDispose ) + { + if( isFromDispose ) + { + m_db.commit_internal_single( this ); + } + } + + public Option checkout( TID id ) + { + var v = m_db.lookup( id ); + + v.MatchSome( t => { m_checkouts = m_checkouts.Add( t ); } ); + + return v; + } + + public void add( T obj ) + { + m_adds = m_adds.Add(obj); + } + + + int m_start = -1; + DB m_db; + + //Do we need these? Do we need both? + Option> m_parentTx; + ImmutableList> m_childTx = ImmutableList>.Empty; + + TxStates m_state = TxStates.Invalid; + ImmutableList m_checkouts = ImmutableList.Empty; + + // New objects created this pass + ImmutableList m_adds = ImmutableList.Empty; + } + +} diff --git a/db/Processor.cs b/db/Processor.cs new file mode 100644 index 0000000..51cb9be --- /dev/null +++ b/db/Processor.cs @@ -0,0 +1,122 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +using Optional.Unsafe; + +namespace db +{ + public enum State + { + Invalid, + Prestartup, + Active, + Waiting, + Stopped, + } + + + public class Processor where T : IID + { + + + public DB DB { get; private set; } + + public System Sys { get; private set; } + + public State State => m_state; + + //public SemaphoreSlim Semaphore { get; private set; } = new SemaphoreSlim( 1 ); + public int Processed => m_processed; + + public Act DebugCurrentAct => m_debugCurrentAct; + + public Processor( DB db, System sys ) + { + DB = db; + Sys= sys; + m_state = State.Prestartup; + } + + public void run() + { + m_state = State.Active; + + + while( Sys.Running ) + { + tick(); + } + + m_state = State.Stopped; + } + + public void tick() + { + var actOpt = Sys.getNextAct(); + + if( !actOpt.HasValue ) + { + //log.trace( $"{Thread.CurrentThread.Name} Processed {m_processed} acts" ); + + /* + m_state = State.Waiting; + Semaphore.Wait(); + + m_state = State.Active; + + m_processed = 0; + */ + + return; + } + + var act = actOpt.ValueOrDefault(); + + m_debugCurrentAct = act; + + // @@@ TODO Put a timer around this and make sure any particular act is shorter than that. Probably 1ms and 5ms. + + act.Fn(); + + ++m_processed; + + } + + /* + public void kick() + { + Semaphore.Release(); + } + */ + + volatile State m_state; + int m_processed = 0; + //volatile string ProcessingDebug = ""; + + Act m_debugCurrentAct = null; + + + + + + + + + } + + + + + + + + + + + + + + +} diff --git a/db/System.cs b/db/System.cs new file mode 100644 index 0000000..1d3194c --- /dev/null +++ b/db/System.cs @@ -0,0 +1,308 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Text; +using System.Threading; +using System.Diagnostics; + +using Optional; +using System.Diagnostics.CodeAnalysis; + +namespace db +{ + + struct TimedAction : IComparable + { + public long when; + public Act act; + + public TimedAction( long when, Act act ) + { + this.when = when; + this.act = act; + } + + public int CompareTo( TimedAction other ) + { + return when.CompareTo( other.when ); + } + + public override bool Equals( object obj ) + { + return obj is TimedAction action && + when == action.when && + EqualityComparer.Default.Equals( act, action.act ); + } + + public override int GetHashCode() + { + var hc = when.GetHashCode() ^ act.GetHashCode(); + return hc; + } + } + + public class SystemCfg : lib.Config + { + public readonly float Cores = 1; + } + + public class System where T : IID + { + //public static System Current => s_system; + + public SemaphoreSlim ActsExist => m_actsExist; + public DB DB { get; private set; } + + public bool Running { get; private set; } + + public System( res.Ref cfg, DB db ) + { + m_cfg = cfg; + DB = db; + + var procCount = Environment.ProcessorCount; + + //Exact comparison + if( m_cfg.res.Cores != 0.0f ) + { + //If its less than 1, then use it as a multiplier + if( m_cfg.res.Cores < 0.0f ) + { + procCount = Environment.ProcessorCount - (int)m_cfg.res.Cores; + } + else if( m_cfg.res.Cores < 1.0f ) + { + procCount = (int) ((float)Environment.ProcessorCount * m_cfg.res.Cores); + } + else + { + procCount = (int)m_cfg.res.Cores; + } + } + + log.info( $"Running {procCount} cores out of a total cores {Environment.ProcessorCount} via a config Cores value of {m_cfg.res.Cores}" ); + + Processor[] procs = new Processor[procCount]; + + for( var i = 0; i < procCount; ++i ) + { + var proc = new Processor( db, this ); + + procs[i] = proc; + } + + m_processors = m_processors.AddRange( procs ); + + Running = true; + + } + + + public void forcedThisTick( Act act ) + { + m_current.Add( act ); + + m_actsExist.Release(); + } + + public void next( Act act ) + { + m_next.Add( act ); + } + + //Most things dont need accurate next frame processing, so split them between the next frame N frames + const double s_variance = 1.0 / 15.0; + + public void future( Act act, double future, double maxVariance = s_variance ) + { + //m_actions.Add( act ); + + var variance = m_rand.NextDouble() * maxVariance; + + var nextTime = future + variance; + + if( nextTime < 1.0 / 60.0 ) + { + next( act ); + return; + } + + var ts = TimeSpan.FromSeconds( nextTime ); + + var tsTicks = ts.Ticks; + + // @@@ TIMING Should we use a fixed time at the front of the frame for this? + var ticks = tsTicks + DateTime.Now.Ticks; + + var ta = new TimedAction( ticks, act ); + + var newFuture = m_futureActions.Add( ta ); + + Interlocked.Exchange( ref m_futureActions, newFuture ); + + } + + public void start() + { + int count = 0; + foreach( var p in m_processors ) + { + var start = new ThreadStart( p.run ); + + var th = new Thread( start ); + th.Name = $"Processor_{count}"; + + th.Start(); + + ++count; + } + } + + public void tick() + { + //Debug.Assert( m_current.IsEmpty ); + + addTimedActions(); + + var current = m_current; + m_current = m_next; + m_next = current; + + while( !m_current.IsEmpty ) + { + m_actsExist.Release(); + } + + + /* + foreach( var proc in m_processors ) + { + //Debug.Assert( proc.State == State.Waiting ); + + proc.kick(); + } + */ + } + + /* + public void wait_blah( int targetMs, int maxMs ) + { + var done = 0; + + var start = DateTime.Now; + var delta = start - start; + + while( done < m_processors.Count && delta.TotalMilliseconds < maxMs ) + { + done = 0; + + foreach( var proc in m_processors ) + { + if( proc.State != State.Active ) + { + ++done; + } + } + + delta = DateTime.Now - start; + } + + if( done != m_processors.Count ) + { + log.warn( $"Processing took significantly too long {delta.TotalSeconds}sec." ); + + foreach( var proc in m_processors ) + { + Act debugAct = proc.DebugCurrentAct; + + if( proc.State == State.Active ) + { + log.warn( $"Proc is still running\n{debugAct.Path}({debugAct.Line}): In method {debugAct.Member}" ); + + // @@@ TODO Should we kill the procedure? Let it continue to run? + } + } + } + + if( delta.TotalMilliseconds > targetMs ) + { + log.warn( $"Missed our target {delta.TotalMilliseconds} framerate." ); + } + + } + //*/ + + public void addTimedActions() + { + var sortedFutureActions = m_futureActions.Sort( ); + + var future = TimeSpan.FromMilliseconds( 33.33333 ); + + var time = DateTime.Now + future; + + foreach( var action in sortedFutureActions ) + { + if( action.when < time.Ticks ) + { + next( action.act ); + + var newActions = m_futureActions.Remove( action ); + + Interlocked.Exchange( ref m_futureActions, newActions ); + + } + else + { + break; + } + } + } + + public void stopRunning() + { + Running = false; + } + + + + + internal Option getNextAct() + { + if( m_current.TryTake( out Act res ) ) + { + return res.Some(); + } + + m_actsExist.Wait(); + + return Option.None(); + } + + res.Ref m_cfg; + + SemaphoreSlim m_actsExist = new SemaphoreSlim(0); + + Random m_rand = new Random(); + + ConcurrentBag m_current = new ConcurrentBag(); + ConcurrentBag m_next = new ConcurrentBag(); + + // @@ TODO Keep an eye on the timing of this. + ImmutableList m_futureActions = ImmutableList.Empty; + + /* + TimedAction[] m_sortedFutureActions = new TimedAction[16 * 1024]; + int m_sfaStart = 0; + int m_sfaEnd = 0; + */ + + + + ImmutableList> m_processors = ImmutableList>.Empty; + + //private static System s_system; + } + + +} diff --git a/logging/Log.cs b/logging/Log.cs index 73d1bed..d46176f 100644 --- a/logging/Log.cs +++ b/logging/Log.cs @@ -85,7 +85,7 @@ static public class log static public void create( string filename ) { - createLog( filename ); + startup( filename ); } @@ -95,7 +95,7 @@ static public class log var evt = CreateLogEvent( LogType.Info, msg, "System", null ); - s_events.Enqueue( evt ); + s_events.Enqueue( evt ); stop(); } @@ -105,19 +105,22 @@ static public class log { var logEvent = new LogEvent( logType, msg, path, line, member, cat, obj ); - static internal ConcurrentQueue s_events = new ConcurrentQueue(); + return logEvent; + } - private Thread m_thread; + static internal ConcurrentQueue s_events = new ConcurrentQueue(); - /* - static public Log log + static private Thread s_thread; + + /* + static public Log log + { + get { - get - { - return s_log; - } + return s_log; } - */ + } + */ @@ -160,7 +163,7 @@ static public class log static object s_lock = new object(); - static public void logBase( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null ) + static public void logBase_old( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null ) { // @@@@@ TODO Get rid of this lock. var evt = new LogEvent( type, msg, path, line, member, cat, obj ); @@ -171,19 +174,14 @@ static public class log } } - static public void log( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null ) - { - var evt = new LogEvent( type, msg, path, line, member, cat, obj ); + static public void logBase( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null ) + { + var evt = new LogEvent( type, msg, path, line, member, cat, obj ); + + s_events.Enqueue( evt ); + } - s_events.Enqueue( evt ); - /* - lock( s_log ) - { - s_log.writeToAll( evt ); - } - */ - } static public void logProps( object obj, string header, LogType type = LogType.Debug, string cat = "", string prefix = "", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" ) { var list = refl.GetAllProperties( obj.GetType() ); @@ -195,26 +193,27 @@ static public class log //lock( s_log ) { - var evt = CreateLogEvent( type, header, cat, obj ); + //var evt = CreateLogEvent( type, header, cat, obj ); s_events.Enqueue( evt ); //s_log.writeToAll( evt ); - foreach( var pi in list ) - { - try + foreach( var pi in list ) { - var v = pi.GetValue( obj ); + try + { + var v = pi.GetValue( obj ); - logBase( $"{prefix}{pi.Name} = {v}", type, path, line, member, cat ); - } - catch( Exception ex ) - { - logBase( $"Exception processing {pi.Name} {ex.Message}", LogType.Error, "log" ); + logBase( $"{prefix}{pi.Name} = {v}", type, path, line, member, cat ); + } + catch( Exception ex ) + { + logBase( $"Exception processing {pi.Name} {ex.Message}", LogType.Error, "log" ); + } } + } - } } @@ -233,15 +232,15 @@ static public class log } - private Log( string filename ) - { - var start = new ThreadStart( run ); + static void startup( string filename ) + { + var start = new ThreadStart( run ); - m_thread = new Thread( start ); - m_thread.Start(); + s_thread = new Thread( start ); + s_thread.Start(); - //TODO: Fix this so itll work without a directory. - Directory.CreateDirectory( Path.GetDirectoryName( filename ) ); + //TODO: Fix this so itll work without a directory. + Directory.CreateDirectory( Path.GetDirectoryName( filename ) ); string dir = Path.GetDirectoryName( filename ); @@ -258,50 +257,39 @@ static public class log //Debug.Listeners.Add( this ); - s_events.Enqueue( evt ); + //var evt = CreateLogEvent( LogType.Info, $"startup", "System", null ); - //writeToAll( evt ); - } + //s_events.Enqueue( evt ); - var evt = CreateLogEvent( LogType.Info, msg, "System", null ); + info( $"startup" ); - writeToAll( evt ); } - bool m_running = true; + static bool s_running = true; - void run() + static void run() + { + while( s_running ) { - while( m_running ) + while( s_events.TryDequeue( out var evt ) ) { - while( s_events.TryDequeue( out var evt ) ) - { - writeToAll( evt ); - } - - Thread.Sleep( 0 ); + writeToAll( evt ); } + + Thread.Sleep( 0 ); } - - void stop() - { - m_running = false; - - m_writer.Close(); - m_stream.Close(); - - m_errorWriter.Close(); - m_errorStream.Close(); - - } + } static void stop() { + s_running = false; + s_writer.Close(); s_stream.Close(); s_errorWriter.Close(); s_errorStream.Close(); + } static public void addDelegate( Log_delegate cb ) diff --git a/res/Resource.cs b/res/Resource.cs index 57dad63..68c6fda 100644 --- a/res/Resource.cs +++ b/res/Resource.cs @@ -249,10 +249,10 @@ namespace res if( wr.TryGetTarget( out var v ) ) return v; - lib.Log.info( $"{filename} was in cache, but its been dropped, reloading." ); + log.info( $"{filename} was in cache, but its been dropped, reloading." ); } - lib.Log.warn( $"Block Loading {filename}." ); + log.warn( $"Block Loading {filename}." ); var newV = actualLoad( filename ); @@ -270,7 +270,7 @@ namespace res if( wr.TryGetTarget( out var v ) ) return v; - lib.Log.error( $"{filename} was in cache, but its been dropped, reloading." ); + log.error( $"{filename} was in cache, but its been dropped, reloading." ); } } @@ -293,19 +293,19 @@ namespace res //Done loading if( !ImmutableInterlocked.TryRemove( ref s_loading, filename, out var oldEvt ) ) { - lib.Log.error( $"Error removing loading event for {filename}" ); + log.error( $"Error removing loading event for {filename}" ); } if( alreadyAdded ) { - lib.Log.error( $"Key {filename} already existed, though it shouldnt." ); + log.error( $"Key {filename} already existed, though it shouldnt." ); } return v; } else { - lib.Log.error( $"Loader could not be found for type {typeof( T )}" ); + log.error( $"Loader could not be found for type {typeof( T )}" ); return ResCache.s_default; }