Move Distrubuted stuff into my libs

This commit is contained in:
Marc Hernandez 2021-06-19 18:25:45 -07:00
parent 2acdb68828
commit 3b71cc49e6
7 changed files with 812 additions and 73 deletions

View File

@ -17,6 +17,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Optional" Version="4.0.0" />
<PackageReference Include="Optional.Async" Version="1.3.0" />
<PackageReference Include="System.Collections.Immutable" Version="1.6.0" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
</ItemGroup>

96
db/Act.cs Normal file
View File

@ -0,0 +1,96 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
namespace db
{
public class Act
{
public Func<CommitResults> Fn => m_act;
public string DebugInfo { get; private set; } = "";
public string Path { get; private set; } = "";
public int Line { get; private set; } = -1;
public string Member { get; private set; } = "";
private Act( Func<CommitResults> act, string debugInfo = "{unknown_base}", string path = "", int line = -1, string member = "" )
{
m_act = act;
DebugInfo = debugInfo;
Path = path;
Line = line;
Member = member;
//ExtractValue( act );
}
static public Act create( Func<CommitResults> act, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" )
{
//ExtractValue( act );
return new Act( act, debugInfo, path, line, member );
}
public static Act create<T>( Func<T, CommitResults> act, T p0, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" )
{
//ExtractValue( act );
//return new Act( act );
return new Act( () => { return act( p0 ); }, debugInfo, path, line, member );
}
// If we're not doing any commit ops we can just use these.
static public Act create( Action act, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" )
{
//ExtractValue( act );
return new Act( () => { act(); return CommitResults.Perfect; }, debugInfo, path, line, member );
}
public static Act create<T>( Action<T> act, T p0, string debugInfo = "{unknown}", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" )
{
//ExtractValue( act );
//return new Act( act );
return new Act( () => { act( p0 ); return CommitResults.Perfect; }, debugInfo, path, line, member );
}
public static void ExtractValue( Delegate lambda )
{
var lambdaType = lambda.GetType();
var methodType = lambda.Method.GetType();
//Nothing here.
//var locals = lambda.Method.GetMethodBody().LocalVariables;
var targetType = lambda.Target?.GetType();
var fields = lambda.Method.DeclaringType?.GetFields
(
BindingFlags.NonPublic |
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.Static
);
//.SingleOrDefault(x => x.Name == variableName);
//return (TValue)field.GetValue( lambda.Target );
}
Func<CommitResults> m_act;
}
}

223
db/DB.cs Normal file
View File

@ -0,0 +1,223 @@
using System;
using System.Collections.Immutable;
using Optional;
using static Optional.OptionExtensions;
using static System.Collections.Immutable.ImmutableInterlocked;
/*
???? Should we have an explicit transaction class/ID?
???? Should we split things into threaded vs action
*/
namespace db
{
public enum CommitResults
{
Invalid,
Perfect,
Collisions,
}
public interface IID<TS>
{
TS id { get; }
}
public class DB<TID, T> where T : IID<TID>
{
//Current snapshot of the DB.
ImmutableDictionary<TID, T> m_objs = ImmutableDictionary<TID, T>.Empty;
//List of committed Ids based on when they were committed.
ImmutableList<TID> m_committed = ImmutableList<TID>.Empty;
ImmutableDictionary<TID, T> Objects => m_objs;
// @@@@ TODO This returns an entity that can be changing. It should be a lazy instantiated copy
public Option<T> lookup( TID id )
{
if( m_objs.TryGetValue( id, out T obj ) )
{
return obj.Some();
}
else
{
// LOG
}
return obj.None();
}
public (Tx<TID, T>, Option<T>) checkout( TID id )
{
var tx = new Tx<TID, T>( m_committed.Count, m_activeTransaction, this );
var v = lookup( id );
v.Match( t => {
tx.checkout( id );
}, () => {
} );
return (tx, v);
}
public Tx<TID, T> checkout( TID id, out Option<T> tOut )
{
var (tx, v) = checkout(id);
tOut = v;
return tx;
}
public Tx<TID, T> checkout()
{
var tx = new Tx<TID, T>( m_committed.Count, m_activeTransaction, this );
return tx;
}
public CommitResults commit( ref Tx<TID, T> co )
{
co = null;
return commit_internal_single( co );
}
public ImmutableDictionary<TID, T> getSnapshot()
{
ImmutableDictionary<TID, T> res = m_objs;
return res;
}
internal CommitResults commit_internal_single( Tx<TID, T> tx )
{
//var collision = false;
//Check for previously committed things
var start = tx.Start;
var curCommitted = m_committed;
foreach( var t in tx.Checkouts )
{
for( int i = start; i < curCommitted.Count; ++i )
{
if( !t.id.Equals( curCommitted[i] ) ) { }
else
{
//collision = true;
return CommitResults.Collisions;
}
}
}
// @@@@ LOCK
lock( m_committed )
{
TID[] committed = new TID[tx.Checkouts.Count];
for( var i = 0; i < tx.Checkouts.Count; ++i )
{
committed[i] = tx.Checkouts[i].id;
m_objs = m_objs.Add( tx.Checkouts[i].id, tx.Checkouts[i] );
}
m_committed = m_committed.AddRange(committed);
foreach( var v in tx.Adds )
{
m_objs = m_objs.Add( v.id, v );
}
return CommitResults.Perfect;
}
}
Option<Tx<TID, T>> m_activeTransaction = Option.None<Tx<TID, T>>();
}
public enum TxStates
{
Invalid,
Running,
Committed,
}
//This only works for a single thread
public class Tx<TID, T>: IDisposable where T : IID<TID>
{
internal ImmutableList<T> Checkouts => m_checkouts;
internal TxStates State => m_state;
internal int Start => m_start;
internal ImmutableList<T> Adds => m_adds;
internal Tx( int start, DB<TID, T> db )
:
this(start, Option.None<Tx<TID, T>>(), db)
{
}
internal Tx( int start, Option<Tx<TID, T>> parentTx, DB<TID, T> db )
{
m_start = start;
m_parentTx = parentTx;
m_childTx = m_childTx.Add(this);
m_db = db;
m_state = TxStates.Running;
}
public void Dispose()
{
// Dispose of unmanaged resources.
Dispose( true );
// Suppress finalization.
GC.SuppressFinalize( this );
}
public void Dispose(bool isFromDispose )
{
if( isFromDispose )
{
m_db.commit_internal_single( this );
}
}
public Option<T> checkout( TID id )
{
var v = m_db.lookup( id );
v.MatchSome( t => { m_checkouts = m_checkouts.Add( t ); } );
return v;
}
public void add( T obj )
{
m_adds = m_adds.Add(obj);
}
int m_start = -1;
DB<TID, T> m_db;
//Do we need these? Do we need both?
Option<Tx<TID, T>> m_parentTx;
ImmutableList<Tx<TID, T>> m_childTx = ImmutableList<Tx<TID, T>>.Empty;
TxStates m_state = TxStates.Invalid;
ImmutableList<T> m_checkouts = ImmutableList<T>.Empty;
// New objects created this pass
ImmutableList<T> m_adds = ImmutableList<T>.Empty;
}
}

122
db/Processor.cs Normal file
View File

@ -0,0 +1,122 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Optional.Unsafe;
namespace db
{
public enum State
{
Invalid,
Prestartup,
Active,
Waiting,
Stopped,
}
public class Processor<TID, T> where T : IID<TID>
{
public DB<TID, T> DB { get; private set; }
public System<TID, T> Sys { get; private set; }
public State State => m_state;
//public SemaphoreSlim Semaphore { get; private set; } = new SemaphoreSlim( 1 );
public int Processed => m_processed;
public Act DebugCurrentAct => m_debugCurrentAct;
public Processor( DB<TID, T> db, System<TID, T> sys )
{
DB = db;
Sys= sys;
m_state = State.Prestartup;
}
public void run()
{
m_state = State.Active;
while( Sys.Running )
{
tick();
}
m_state = State.Stopped;
}
public void tick()
{
var actOpt = Sys.getNextAct();
if( !actOpt.HasValue )
{
//log.trace( $"{Thread.CurrentThread.Name} Processed {m_processed} acts" );
/*
m_state = State.Waiting;
Semaphore.Wait();
m_state = State.Active;
m_processed = 0;
*/
return;
}
var act = actOpt.ValueOrDefault();
m_debugCurrentAct = act;
// @@@ TODO Put a timer around this and make sure any particular act is shorter than that. Probably 1ms and 5ms.
act.Fn();
++m_processed;
}
/*
public void kick()
{
Semaphore.Release();
}
*/
volatile State m_state;
int m_processed = 0;
//volatile string ProcessingDebug = "";
Act m_debugCurrentAct = null;
}
}

308
db/System.cs Normal file
View File

@ -0,0 +1,308 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Text;
using System.Threading;
using System.Diagnostics;
using Optional;
using System.Diagnostics.CodeAnalysis;
namespace db
{
struct TimedAction : IComparable<TimedAction>
{
public long when;
public Act act;
public TimedAction( long when, Act act )
{
this.when = when;
this.act = act;
}
public int CompareTo( TimedAction other )
{
return when.CompareTo( other.when );
}
public override bool Equals( object obj )
{
return obj is TimedAction action &&
when == action.when &&
EqualityComparer<Act>.Default.Equals( act, action.act );
}
public override int GetHashCode()
{
var hc = when.GetHashCode() ^ act.GetHashCode();
return hc;
}
}
public class SystemCfg : lib.Config
{
public readonly float Cores = 1;
}
public class System<TID, T> where T : IID<TID>
{
//public static System Current => s_system;
public SemaphoreSlim ActsExist => m_actsExist;
public DB<TID, T> DB { get; private set; }
public bool Running { get; private set; }
public System( res.Ref<SystemCfg> cfg, DB<TID, T> db )
{
m_cfg = cfg;
DB = db;
var procCount = Environment.ProcessorCount;
//Exact comparison
if( m_cfg.res.Cores != 0.0f )
{
//If its less than 1, then use it as a multiplier
if( m_cfg.res.Cores < 0.0f )
{
procCount = Environment.ProcessorCount - (int)m_cfg.res.Cores;
}
else if( m_cfg.res.Cores < 1.0f )
{
procCount = (int) ((float)Environment.ProcessorCount * m_cfg.res.Cores);
}
else
{
procCount = (int)m_cfg.res.Cores;
}
}
log.info( $"Running {procCount} cores out of a total cores {Environment.ProcessorCount} via a config Cores value of {m_cfg.res.Cores}" );
Processor<TID, T>[] procs = new Processor<TID, T>[procCount];
for( var i = 0; i < procCount; ++i )
{
var proc = new Processor<TID, T>( db, this );
procs[i] = proc;
}
m_processors = m_processors.AddRange( procs );
Running = true;
}
public void forcedThisTick( Act act )
{
m_current.Add( act );
m_actsExist.Release();
}
public void next( Act act )
{
m_next.Add( act );
}
//Most things dont need accurate next frame processing, so split them between the next frame N frames
const double s_variance = 1.0 / 15.0;
public void future( Act act, double future, double maxVariance = s_variance )
{
//m_actions.Add( act );
var variance = m_rand.NextDouble() * maxVariance;
var nextTime = future + variance;
if( nextTime < 1.0 / 60.0 )
{
next( act );
return;
}
var ts = TimeSpan.FromSeconds( nextTime );
var tsTicks = ts.Ticks;
// @@@ TIMING Should we use a fixed time at the front of the frame for this?
var ticks = tsTicks + DateTime.Now.Ticks;
var ta = new TimedAction( ticks, act );
var newFuture = m_futureActions.Add( ta );
Interlocked.Exchange( ref m_futureActions, newFuture );
}
public void start()
{
int count = 0;
foreach( var p in m_processors )
{
var start = new ThreadStart( p.run );
var th = new Thread( start );
th.Name = $"Processor_{count}";
th.Start();
++count;
}
}
public void tick()
{
//Debug.Assert( m_current.IsEmpty );
addTimedActions();
var current = m_current;
m_current = m_next;
m_next = current;
while( !m_current.IsEmpty )
{
m_actsExist.Release();
}
/*
foreach( var proc in m_processors )
{
//Debug.Assert( proc.State == State.Waiting );
proc.kick();
}
*/
}
/*
public void wait_blah( int targetMs, int maxMs )
{
var done = 0;
var start = DateTime.Now;
var delta = start - start;
while( done < m_processors.Count && delta.TotalMilliseconds < maxMs )
{
done = 0;
foreach( var proc in m_processors )
{
if( proc.State != State.Active )
{
++done;
}
}
delta = DateTime.Now - start;
}
if( done != m_processors.Count )
{
log.warn( $"Processing took significantly too long {delta.TotalSeconds}sec." );
foreach( var proc in m_processors )
{
Act debugAct = proc.DebugCurrentAct;
if( proc.State == State.Active )
{
log.warn( $"Proc is still running\n{debugAct.Path}({debugAct.Line}): In method {debugAct.Member}" );
// @@@ TODO Should we kill the procedure? Let it continue to run?
}
}
}
if( delta.TotalMilliseconds > targetMs )
{
log.warn( $"Missed our target {delta.TotalMilliseconds} framerate." );
}
}
//*/
public void addTimedActions()
{
var sortedFutureActions = m_futureActions.Sort( );
var future = TimeSpan.FromMilliseconds( 33.33333 );
var time = DateTime.Now + future;
foreach( var action in sortedFutureActions )
{
if( action.when < time.Ticks )
{
next( action.act );
var newActions = m_futureActions.Remove( action );
Interlocked.Exchange( ref m_futureActions, newActions );
}
else
{
break;
}
}
}
public void stopRunning()
{
Running = false;
}
internal Option<Act> getNextAct()
{
if( m_current.TryTake( out Act res ) )
{
return res.Some();
}
m_actsExist.Wait();
return Option.None<Act>();
}
res.Ref<SystemCfg> m_cfg;
SemaphoreSlim m_actsExist = new SemaphoreSlim(0);
Random m_rand = new Random();
ConcurrentBag<Act> m_current = new ConcurrentBag<Act>();
ConcurrentBag<Act> m_next = new ConcurrentBag<Act>();
// @@ TODO Keep an eye on the timing of this.
ImmutableList<TimedAction> m_futureActions = ImmutableList<TimedAction>.Empty;
/*
TimedAction[] m_sortedFutureActions = new TimedAction[16 * 1024];
int m_sfaStart = 0;
int m_sfaEnd = 0;
*/
ImmutableList<Processor<TID, T>> m_processors = ImmutableList<Processor<TID, T>>.Empty;
//private static System s_system;
}
}

View File

@ -85,7 +85,7 @@ static public class log
static public void create( string filename )
{
createLog( filename );
startup( filename );
}
@ -105,9 +105,12 @@ static public class log
{
var logEvent = new LogEvent( logType, msg, path, line, member, cat, obj );
return logEvent;
}
static internal ConcurrentQueue<LogEvent> s_events = new ConcurrentQueue<LogEvent>();
private Thread m_thread;
static private Thread s_thread;
/*
static public Log log
@ -160,7 +163,7 @@ static public class log
static object s_lock = new object();
static public void logBase( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null )
static public void logBase_old( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null )
{
// @@@@@ TODO Get rid of this lock.
var evt = new LogEvent( type, msg, path, line, member, cat, obj );
@ -171,19 +174,14 @@ static public class log
}
}
static public void log( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null )
static public void logBase( string msg, LogType type = LogType.Debug, string path = "", int line = -1, string member = "", string cat = "unk", object obj = null )
{
var evt = new LogEvent( type, msg, path, line, member, cat, obj );
s_events.Enqueue( evt );
}
/*
lock( s_log )
{
s_log.writeToAll( evt );
}
*/
}
static public void logProps( object obj, string header, LogType type = LogType.Debug, string cat = "", string prefix = "", [CallerFilePath] string path = "", [CallerLineNumber] int line = -1, [CallerMemberName] string member = "" )
{
var list = refl.GetAllProperties( obj.GetType() );
@ -195,7 +193,7 @@ static public class log
//lock( s_log )
{
var evt = CreateLogEvent( type, header, cat, obj );
//var evt = CreateLogEvent( type, header, cat, obj );
s_events.Enqueue( evt );
@ -217,6 +215,7 @@ static public class log
}
}
}
//This might seem a little odd, but the intent is that usually you wont need to set notExpectedValue.
static public void expected<T>( T value, string falseString, string trueString = "", T notExpectedValue = default( T ) )
@ -233,12 +232,12 @@ static public class log
}
private Log( string filename )
static void startup( string filename )
{
var start = new ThreadStart( run );
m_thread = new Thread( start );
m_thread.Start();
s_thread = new Thread( start );
s_thread.Start();
//TODO: Fix this so itll work without a directory.
Directory.CreateDirectory( Path.GetDirectoryName( filename ) );
@ -258,21 +257,19 @@ static public class log
//Debug.Listeners.Add( this );
s_events.Enqueue( evt );
//var evt = CreateLogEvent( LogType.Info, $"startup", "System", null );
//s_events.Enqueue( evt );
info( $"startup" );
//writeToAll( evt );
}
var evt = CreateLogEvent( LogType.Info, msg, "System", null );
static bool s_running = true;
writeToAll( evt );
}
bool m_running = true;
void run()
static void run()
{
while( m_running )
while( s_running )
{
while( s_events.TryDequeue( out var evt ) )
{
@ -283,25 +280,16 @@ static public class log
}
}
void stop()
{
m_running = false;
m_writer.Close();
m_stream.Close();
m_errorWriter.Close();
m_errorStream.Close();
}
static void stop()
{
s_running = false;
s_writer.Close();
s_stream.Close();
s_errorWriter.Close();
s_errorStream.Close();
}
static public void addDelegate( Log_delegate cb )

View File

@ -249,10 +249,10 @@ namespace res
if( wr.TryGetTarget( out var v ) )
return v;
lib.Log.info( $"{filename} was in cache, but its been dropped, reloading." );
log.info( $"{filename} was in cache, but its been dropped, reloading." );
}
lib.Log.warn( $"Block Loading {filename}." );
log.warn( $"Block Loading {filename}." );
var newV = actualLoad<T>( filename );
@ -270,7 +270,7 @@ namespace res
if( wr.TryGetTarget( out var v ) )
return v;
lib.Log.error( $"{filename} was in cache, but its been dropped, reloading." );
log.error( $"{filename} was in cache, but its been dropped, reloading." );
}
}
@ -293,19 +293,19 @@ namespace res
//Done loading
if( !ImmutableInterlocked.TryRemove( ref s_loading, filename, out var oldEvt ) )
{
lib.Log.error( $"Error removing loading event for {filename}" );
log.error( $"Error removing loading event for {filename}" );
}
if( alreadyAdded )
{
lib.Log.error( $"Key {filename} already existed, though it shouldnt." );
log.error( $"Key {filename} already existed, though it shouldnt." );
}
return v;
}
else
{
lib.Log.error( $"Loader could not be found for type {typeof( T )}" );
log.error( $"Loader could not be found for type {typeof( T )}" );
return ResCache<T>.s_default;
}