diff --git a/Robust.Client/GameController/GameController.cs b/Robust.Client/GameController/GameController.cs index 2411eb7f9..1e21ebc13 100644 --- a/Robust.Client/GameController/GameController.cs +++ b/Robust.Client/GameController/GameController.cs @@ -28,6 +28,7 @@ using Robust.Shared.Network; using Robust.Shared.Prototypes; using Robust.Shared.Serialization; using Robust.Shared.Serialization.Manager; +using Robust.Shared.Threading; using Robust.Shared.Timing; using Robust.Shared.Utility; using YamlDotNet.RepresentationModel; @@ -65,6 +66,7 @@ namespace Robust.Client [Dependency] private readonly IAuthManager _authManager = default!; [Dependency] private readonly IMidiManager _midiManager = default!; [Dependency] private readonly IEyeManager _eyeManager = default!; + [Dependency] private readonly IParallelManagerInternal _parallelMgr = default!; private IWebViewManagerHook? _webViewHook; @@ -326,6 +328,8 @@ namespace Robust.Client ProfileOptSetup.Setup(_configurationManager); + _parallelMgr.Initialize(); + _resourceCache.Initialize(Options.LoadConfigAndUserData ? userDataDir : null); var mountOptions = _commandLineArgs != null diff --git a/Robust.Server/BaseServer.cs b/Robust.Server/BaseServer.cs index f1a18ed8d..13e53c8f8 100644 --- a/Robust.Server/BaseServer.cs +++ b/Robust.Server/BaseServer.cs @@ -31,6 +31,7 @@ using Robust.Shared.Network; using Robust.Shared.Prototypes; using Robust.Shared.Serialization; using Robust.Shared.Serialization.Manager; +using Robust.Shared.Threading; using Robust.Shared.Timing; using Robust.Shared.Utility; using Serilog.Debugging; @@ -87,6 +88,7 @@ namespace Robust.Server [Dependency] private readonly ILocalizationManagerInternal _loc = default!; [Dependency] private readonly INetConfigurationManager _netCfgMan = default!; [Dependency] private readonly IServerConsoleHost _consoleHost = default!; + [Dependency] private readonly IParallelManagerInternal _parallelMgr = default!; private readonly Stopwatch _uptimeStopwatch = new(); @@ -184,6 +186,8 @@ namespace Robust.Server ProfileOptSetup.Setup(_config); + _parallelMgr.Initialize(); + //Sets up Logging _logHandlerFactory = logHandlerFactory; diff --git a/Robust.Server/GameStates/ServerGameStateManager.cs b/Robust.Server/GameStates/ServerGameStateManager.cs index 895dce5ca..a87e4eb1c 100644 --- a/Robust.Server/GameStates/ServerGameStateManager.cs +++ b/Robust.Server/GameStates/ServerGameStateManager.cs @@ -1,15 +1,14 @@ using System; using System.Buffers; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; -using Microsoft.CodeAnalysis; -using Microsoft.Extensions.ObjectPool; using Robust.Server.GameObjects; using Robust.Server.Player; +using Robust.Shared; +using Robust.Shared.Configuration; using Robust.Shared.Enums; using Robust.Shared.GameObjects; using Robust.Shared.GameStates; @@ -18,8 +17,10 @@ using Robust.Shared.Log; using Robust.Shared.Map; using Robust.Shared.Network; using Robust.Shared.Network.Messages; +using Robust.Shared.Threading; using Robust.Shared.Timing; using Robust.Shared.Utility; +using SharpZstd.Interop; namespace Robust.Server.GameStates { @@ -40,9 +41,13 @@ namespace Robust.Server.GameStates [Dependency] private readonly INetworkedMapManager _mapManager = default!; [Dependency] private readonly IEntitySystemManager _systemManager = default!; [Dependency] private readonly IServerEntityNetworkManager _entityNetworkManager = default!; + [Dependency] private readonly IConfigurationManager _cfg = default!; + [Dependency] private readonly IParallelManager _parallelMgr = default!; private ISawmill _logger = default!; + private PvsThreadResources[] _threadResourcesPool = Array.Empty(); + public ushort TransformNetId { get; set; } public void PostInject() @@ -60,6 +65,43 @@ namespace Robust.Server.GameStates _networkManager.Disconnect += HandleClientDisconnect; _pvs = EntitySystem.Get(); + + _parallelMgr.AddAndInvokeParallelCountChanged(ParallelChanged); + + _cfg.OnValueChanged(CVars.NetPVSCompressLevel, _ => UpdateZStdParams(), true); + } + + private void ParallelChanged() + { + foreach (var resource in _threadResourcesPool) + { + resource.CompressionContext.Dispose(); + } + + _threadResourcesPool = new PvsThreadResources[_parallelMgr.ParallelProcessCount]; + for (var i = 0; i < _threadResourcesPool.Length; i++) + { + ref var res = ref _threadResourcesPool[i]; + res.CompressionContext = new ZStdCompressionContext(); + } + + UpdateZStdParams(); + } + + private void UpdateZStdParams() + { + var compressionLevel = _cfg.GetCVar(CVars.NetPVSCompressLevel); + + for (var i = 0; i < _threadResourcesPool.Length; i++) + { + ref var res = ref _threadResourcesPool[i]; + res.CompressionContext.SetParameter(ZSTD_cParameter.ZSTD_c_compressionLevel, compressionLevel); + } + } + + private struct PvsThreadResources + { + public ZStdCompressionContext CompressionContext; } private void HandleClientConnected(object? sender, NetChannelArgs e) @@ -142,7 +184,7 @@ namespace Robust.Server.GameStates (chunks, playerChunks, viewerEntities) = _pvs.GetChunks(players); const int ChunkBatchSize = 2; var chunksCount = chunks.Count; - var chunkBatches = (int) MathF.Ceiling((float) chunksCount / ChunkBatchSize); + var chunkBatches = (int)MathF.Ceiling((float)chunksCount / ChunkBatchSize); chunkCache = new (Dictionary metadata, RobustTree tree)?[chunksCount]; @@ -159,7 +201,8 @@ namespace Robust.Server.GameStates for (var j = start; j < end; ++j) { var (visMask, chunkIndexLocation) = chunks[j]; - reuse[j] = _pvs.TryCalculateChunk(chunkIndexLocation, visMask, transformQuery, metadataQuery, out var chunk); + reuse[j] = _pvs.TryCalculateChunk(chunkIndexLocation, visMask, transformQuery, metadataQuery, + out var chunk); chunkCache[j] = chunk; } }); @@ -168,33 +211,27 @@ namespace Robust.Server.GameStates ArrayPool.Shared.Return(reuse); } - const int BatchSize = 2; - var batches = (int) MathF.Ceiling((float) players.Length / BatchSize); - - Parallel.For(0, batches, i => - { - var start = i * BatchSize; - var end = Math.Min(start + BatchSize, players.Length); - - for (var j = start; j < end; ++j) + _parallelMgr.ParallelForWithResources( + 0, players.Length, + _threadResourcesPool, + (int i, ref PvsThreadResources resource) => { try { - SendStateUpdate(j); + SendStateUpdate(i, ref resource); } catch (Exception e) // Catch EVERY exception { _logger.Log(LogLevel.Error, e, "Caught exception while generating mail."); } - } - }); + }); - void SendStateUpdate(int sessionIndex) + void SendStateUpdate(int sessionIndex, ref PvsThreadResources resources) { var session = players[sessionIndex]; // KILL IT WITH FIRE - if(mainThread != Thread.CurrentThread) + if (mainThread != Thread.CurrentThread) IoCManager.InitThread(new DependencyCollection(parentDeps), true); var channel = session.ConnectedClient; @@ -214,13 +251,15 @@ namespace Robust.Server.GameStates // lastAck varies with each client based on lag and such, we can't just make 1 global state and send it to everyone var lastInputCommand = inputSystem.GetLastInputCommand(session); var lastSystemMessage = _entityNetworkManager.GetLastMessageSequence(session); - var state = new GameState(lastAck, _gameTiming.CurTick, Math.Max(lastInputCommand, lastSystemMessage), entStates, playerStates, deletions, mapData); + var state = new GameState(lastAck, _gameTiming.CurTick, Math.Max(lastInputCommand, lastSystemMessage), + entStates, playerStates, deletions, mapData); InterlockedHelper.Min(ref oldestAckValue, lastAck.Value); // actually send the state var stateUpdateMessage = new MsgState(); stateUpdateMessage.State = state; + stateUpdateMessage.CompressionContext = resources.CompressionContext; // If the state is too big we let Lidgren send it reliably. // This is to avoid a situation where a state is so large that it consistently gets dropped @@ -238,7 +277,7 @@ namespace Robust.Server.GameStates _networkManager.ServerSendMessage(stateUpdateMessage, channel); } - if(_pvs.CullingEnabled) + if (_pvs.CullingEnabled) _pvs.ReturnToPool(playerChunks); _pvs.Cleanup(_playerManager.ServerSessions); var oldestAck = new GameTick(oldestAckValue); diff --git a/Robust.Shared/CVars.cs b/Robust.Shared/CVars.cs index c53925528..d6f7d0cd6 100644 --- a/Robust.Shared/CVars.cs +++ b/Robust.Shared/CVars.cs @@ -136,6 +136,12 @@ namespace Robust.Shared public static readonly CVarDef NetPVSEntityBudget = CVarDef.Create("net.pvs_budget", 50, CVar.ARCHIVE | CVar.REPLICATED); + /// + /// ZSTD compression level to use when compressing game states. + /// + public static readonly CVarDef NetPVSCompressLevel = + CVarDef.Create("net.pvs_compress_level", 3, CVar.SERVERONLY); + /// /// Log late input messages from clients. /// @@ -188,6 +194,33 @@ namespace Robust.Shared public static readonly CVarDef NetLidgrenAppIdentifier = CVarDef.Create("net.lidgren_app_identifier", "RobustToolbox"); +#if DEBUG + /// + /// Add random fake network loss to all outgoing UDP network packets, as a ratio of how many packets to drop. + /// 0 = no packet loss, 1 = all packets dropped + /// + public static readonly CVarDef NetFakeLoss = CVarDef.Create("net.fakeloss", 0f, CVar.CHEAT); + + /// + /// Add fake extra delay to all outgoing UDP network packets, in seconds. + /// + /// + public static readonly CVarDef NetFakeLagMin = CVarDef.Create("net.fakelagmin", 0f, CVar.CHEAT); + + /// + /// Add fake extra random delay to all outgoing UDP network packets, in seconds. + /// The actual delay added for each packet is random between 0 and the specified value. + /// + /// + public static readonly CVarDef NetFakeLagRand = CVarDef.Create("net.fakelagrand", 0f, CVar.CHEAT); + + /// + /// Add random fake duplicates to all outgoing UDP network packets, as a ratio of how many packets to duplicate. + /// 0 = no packets duplicated, 1 = all packets duplicated. + /// + public static readonly CVarDef NetFakeDuplicates = CVarDef.Create("net.fakeduplicates", 0f, CVar.CHEAT); +#endif + /** * SUS */ @@ -218,33 +251,6 @@ namespace Robust.Shared public static readonly CVarDef SysGameThreadPriority = CVarDef.Create("sys.game_thread_priority", (int) ThreadPriority.AboveNormal); -#if DEBUG - /// - /// Add random fake network loss to all outgoing UDP network packets, as a ratio of how many packets to drop. - /// 0 = no packet loss, 1 = all packets dropped - /// - public static readonly CVarDef NetFakeLoss = CVarDef.Create("net.fakeloss", 0f, CVar.CHEAT); - - /// - /// Add fake extra delay to all outgoing UDP network packets, in seconds. - /// - /// - public static readonly CVarDef NetFakeLagMin = CVarDef.Create("net.fakelagmin", 0f, CVar.CHEAT); - - /// - /// Add fake extra random delay to all outgoing UDP network packets, in seconds. - /// The actual delay added for each packet is random between 0 and the specified value. - /// - /// - public static readonly CVarDef NetFakeLagRand = CVarDef.Create("net.fakelagrand", 0f, CVar.CHEAT); - - /// - /// Add random fake duplicates to all outgoing UDP network packets, as a ratio of how many packets to duplicate. - /// 0 = no packets duplicated, 1 = all packets duplicated. - /// - public static readonly CVarDef NetFakeDuplicates = CVarDef.Create("net.fakeduplicates", 0f, CVar.CHEAT); -#endif - /* * METRICS */ @@ -1142,5 +1148,15 @@ namespace Robust.Shared public static readonly CVarDef AczManifestCompressLevel = CVarDef.Create("acz.manifest_compress_level", 14, CVar.SERVERONLY); + /* + * THREAD + */ + + /// + /// The nominal parallel processing count to use for parallelized operations. + /// The default of 0 automatically selects the system's processor count. + /// + public static readonly CVarDef ThreadParallelCount = + CVarDef.Create("thread.parallel_count", 0); } } diff --git a/Robust.Shared/Network/Messages/MsgEntity.cs b/Robust.Shared/Network/Messages/MsgEntity.cs index 315385269..56af86027 100644 --- a/Robust.Shared/Network/Messages/MsgEntity.cs +++ b/Robust.Shared/Network/Messages/MsgEntity.cs @@ -5,6 +5,7 @@ using Robust.Shared.GameObjects; using Robust.Shared.IoC; using Robust.Shared.Serialization; using Robust.Shared.Timing; +using Robust.Shared.Utility; #nullable disable @@ -52,13 +53,12 @@ namespace Robust.Shared.Network.Messages case EntityMessageType.SystemMessage: { var serializer = IoCManager.Resolve(); - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, SystemMessage); - buffer.WriteVariableInt32((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } + var stream = new MemoryStream(); + + serializer.Serialize(stream, SystemMessage); + + buffer.WriteVariableInt32((int)stream.Length); + buffer.Write(stream.AsSpan()); } break; } diff --git a/Robust.Shared/Network/Messages/MsgScriptResponse.cs b/Robust.Shared/Network/Messages/MsgScriptResponse.cs index d5e9a4ed1..0b744df6b 100644 --- a/Robust.Shared/Network/Messages/MsgScriptResponse.cs +++ b/Robust.Shared/Network/Messages/MsgScriptResponse.cs @@ -51,8 +51,7 @@ namespace Robust.Shared.Network.Messages serializer.SerializeDirect(memoryStream, Response); buffer.WriteVariableInt32((int)memoryStream.Length); - memoryStream.TryGetBuffer(out var segment); - buffer.Write(segment); + buffer.Write(memoryStream.AsSpan()); } } } diff --git a/Robust.Shared/Network/Messages/MsgState.cs b/Robust.Shared/Network/Messages/MsgState.cs index 5501fd42c..36bfe9230 100644 --- a/Robust.Shared/Network/Messages/MsgState.cs +++ b/Robust.Shared/Network/Messages/MsgState.cs @@ -1,10 +1,10 @@ using System; +using System.Buffers; +using System.Diagnostics; using System.IO; -using System.IO.Compression; using Lidgren.Network; using Robust.Shared.GameStates; using Robust.Shared.IoC; -using Robust.Shared.Log; using Robust.Shared.Serialization; using Robust.Shared.Utility; @@ -25,6 +25,7 @@ namespace Robust.Shared.Network.Messages public override MsgGroups MsgGroup => MsgGroups.Entity; public GameState State; + public ZStdCompressionContext CompressionContext; private bool _hasWritten; @@ -39,7 +40,7 @@ namespace Robust.Shared.Network.Messages if (compressedLength > 0) { var stream = buffer.ReadAlignedMemory(compressedLength); - using var decompressStream = new DeflateStream(stream, CompressionMode.Decompress); + using var decompressStream = new ZStdDecompressStream(stream); var decompressedStream = new MemoryStream(uncompressedLength); decompressStream.CopyTo(decompressedStream, uncompressedLength); decompressedStream.Position = 0; @@ -62,35 +63,37 @@ namespace Robust.Shared.Network.Messages public override void WriteToBuffer(NetOutgoingMessage buffer) { var serializer = IoCManager.Resolve(); - MemoryStream finalStream; var stateStream = new MemoryStream(); serializer.SerializeDirect(stateStream, State); - buffer.WriteVariableInt32((int) stateStream.Length); + buffer.WriteVariableInt32((int)stateStream.Length); // We compress the state. if (stateStream.Length > CompressionThreshold) { + var sw = Stopwatch.StartNew(); stateStream.Position = 0; - var compressedStream = new MemoryStream(); - using (var deflateStream = new DeflateStream(compressedStream, CompressionMode.Compress, true)) - { - stateStream.CopyTo(deflateStream); - } + var buf = ArrayPool.Shared.Rent(ZStd.CompressBound((int)stateStream.Length)); + var length = CompressionContext.Compress2(buf, stateStream.AsSpan()); - buffer.WriteVariableInt32((int) compressedStream.Length); - finalStream = compressedStream; + buffer.WriteVariableInt32(length); + + buffer.Write(buf.AsSpan(0, length)); + + var elapsed = sw.Elapsed; + System.Console.WriteLine( + $"From: {State.FromSequence} To: {State.ToSequence} Size: {length} B Before: {stateStream.Length} B time: {elapsed}"); + + ArrayPool.Shared.Return(buf); } // The state is sent as is. else { // 0 means that the state isn't compressed. buffer.WriteVariableInt32(0); - finalStream = stateStream; + + buffer.Write(stateStream.AsSpan()); } - finalStream.TryGetBuffer(out var segment); - buffer.Write(segment); - finalStream.Dispose(); _hasWritten = false; MsgSize = buffer.LengthBytes; @@ -109,6 +112,7 @@ namespace Robust.Shared.Network.Messages { return true; } + return MsgSize > ReliableThreshold; } diff --git a/Robust.Shared/Network/Messages/MsgViewVariablesModifyRemote.cs b/Robust.Shared/Network/Messages/MsgViewVariablesModifyRemote.cs index 2538c98ea..f4119496e 100644 --- a/Robust.Shared/Network/Messages/MsgViewVariablesModifyRemote.cs +++ b/Robust.Shared/Network/Messages/MsgViewVariablesModifyRemote.cs @@ -2,6 +2,7 @@ using System.IO; using Lidgren.Network; using Robust.Shared.IoC; using Robust.Shared.Serialization; +using Robust.Shared.Utility; using Robust.Shared.ViewVariables; #nullable disable @@ -57,20 +58,17 @@ namespace Robust.Shared.Network.Messages { var serializer = IoCManager.Resolve(); buffer.Write(SessionId); - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, PropertyIndex); - buffer.Write((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, Value); - buffer.Write((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } + + var stream = new MemoryStream(); + serializer.Serialize(stream, PropertyIndex); + buffer.Write((int)stream.Length); + buffer.Write(stream.AsSpan()); + + stream.Position = 0; + serializer.Serialize(stream, Value); + buffer.Write((int)stream.Length); + buffer.Write(stream.AsSpan()); + buffer.Write(ReinterpretValue); } } diff --git a/Robust.Shared/Network/Messages/MsgViewVariablesRemoteData.cs b/Robust.Shared/Network/Messages/MsgViewVariablesRemoteData.cs index b6b0fca7b..bdae3520f 100644 --- a/Robust.Shared/Network/Messages/MsgViewVariablesRemoteData.cs +++ b/Robust.Shared/Network/Messages/MsgViewVariablesRemoteData.cs @@ -2,6 +2,7 @@ using System.IO; using Lidgren.Network; using Robust.Shared.IoC; using Robust.Shared.Serialization; +using Robust.Shared.Utility; using Robust.Shared.ViewVariables; #nullable disable @@ -39,13 +40,11 @@ namespace Robust.Shared.Network.Messages { buffer.Write(RequestId); var serializer = IoCManager.Resolve(); - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, Blob); - buffer.Write((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } + + var stream = new MemoryStream(); + serializer.Serialize(stream, Blob); + buffer.Write((int)stream.Length); + buffer.Write(stream.AsSpan()); } } } diff --git a/Robust.Shared/Network/Messages/MsgViewVariablesReqData.cs b/Robust.Shared/Network/Messages/MsgViewVariablesReqData.cs index 9a51d48b2..e448b984e 100644 --- a/Robust.Shared/Network/Messages/MsgViewVariablesReqData.cs +++ b/Robust.Shared/Network/Messages/MsgViewVariablesReqData.cs @@ -2,6 +2,7 @@ using System.IO; using Lidgren.Network; using Robust.Shared.IoC; using Robust.Shared.Serialization; +using Robust.Shared.Utility; using Robust.Shared.ViewVariables; #nullable disable @@ -46,13 +47,12 @@ namespace Robust.Shared.Network.Messages buffer.Write(RequestId); buffer.Write(SessionId); var serializer = IoCManager.Resolve(); - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, RequestMeta); - buffer.Write((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } + + var stream = new MemoryStream(); + serializer.Serialize(stream, RequestMeta); + + buffer.Write((int)stream.Length); + buffer.Write(stream.AsSpan()); } } } diff --git a/Robust.Shared/Network/Messages/MsgViewVariablesReqSession.cs b/Robust.Shared/Network/Messages/MsgViewVariablesReqSession.cs index 97e154328..780b9b53b 100644 --- a/Robust.Shared/Network/Messages/MsgViewVariablesReqSession.cs +++ b/Robust.Shared/Network/Messages/MsgViewVariablesReqSession.cs @@ -2,6 +2,7 @@ using System.IO; using Lidgren.Network; using Robust.Shared.IoC; using Robust.Shared.Serialization; +using Robust.Shared.Utility; using Robust.Shared.ViewVariables; #nullable disable @@ -40,13 +41,11 @@ namespace Robust.Shared.Network.Messages { buffer.Write(RequestId); var serializer = IoCManager.Resolve(); - using (var stream = new MemoryStream()) - { - serializer.Serialize(stream, Selector); - buffer.Write((int)stream.Length); - stream.TryGetBuffer(out var segment); - buffer.Write(segment); - } + + var stream = new MemoryStream(); + serializer.Serialize(stream, Selector); + buffer.Write((int)stream.Length); + buffer.Write(stream.AsSpan()); } } } diff --git a/Robust.Shared/SharedIoC.cs b/Robust.Shared/SharedIoC.cs index 996d9e9d2..7d7313490 100644 --- a/Robust.Shared/SharedIoC.cs +++ b/Robust.Shared/SharedIoC.cs @@ -14,6 +14,7 @@ using Robust.Shared.Random; using Robust.Shared.Sandboxing; using Robust.Shared.Serialization; using Robust.Shared.Serialization.Manager; +using Robust.Shared.Threading; using Robust.Shared.Timing; namespace Robust.Shared @@ -47,6 +48,8 @@ namespace Robust.Shared IoCManager.Register(); IoCManager.Register(); IoCManager.Register(); + IoCManager.Register(); + IoCManager.Register(); } } } diff --git a/Robust.Shared/Threading/ParallelManager.cs b/Robust.Shared/Threading/ParallelManager.cs new file mode 100644 index 000000000..f2bcfb145 --- /dev/null +++ b/Robust.Shared/Threading/ParallelManager.cs @@ -0,0 +1,89 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Robust.Shared.Configuration; +using Robust.Shared.IoC; +using Robust.Shared.Log; +using Robust.Shared.Utility; + +namespace Robust.Shared.Threading; + +public interface IParallelManager +{ + event Action ParallelCountChanged; + + int ParallelProcessCount { get; } + + /// + /// Add the delegate to and immediately invoke it. + /// + void AddAndInvokeParallelCountChanged(Action changed); +} + +internal interface IParallelManagerInternal : IParallelManager +{ + void Initialize(); +} + +internal sealed class ParallelManager : IParallelManagerInternal +{ + [Dependency] private readonly IConfigurationManager _cfg = default!; + + public event Action? ParallelCountChanged; + public int ParallelProcessCount { get; private set; } + + public void Initialize() + { + _cfg.OnValueChanged(CVars.ThreadParallelCount, UpdateCVar, true); + } + + public void AddAndInvokeParallelCountChanged(Action changed) + { + ParallelCountChanged += changed; + changed(); + } + + private void UpdateCVar(int value) + { + var oldCount = ParallelProcessCount; + ParallelProcessCount = value == 0 ? Environment.ProcessorCount : value; + + if (oldCount != ParallelProcessCount) + ParallelCountChanged?.Invoke(); + } +} + +public static class ParallelManagerExt +{ + public delegate void ParallelResourceAction(int i, ref T resource); + + public static void ParallelForWithResources( + this IParallelManager manager, + int fromInclusive, + int toExclusive, + T[] resources, + ParallelResourceAction action) + { + var parallelCount = manager.ParallelProcessCount; + + DebugTools.Assert( + resources.Length >= parallelCount, + "Resources buffer is too small to fit maximum thread count."); + + var threadIndex = 0; + + Parallel.For( + fromInclusive, toExclusive, + new ParallelOptions { MaxDegreeOfParallelism = parallelCount }, + () => Interlocked.Increment(ref threadIndex), + (i, _, localThreadIdx) => + { + ref var resource = ref resources[localThreadIdx]; + + action(i, ref resource); + + return localThreadIdx; + }, + _ => { }); + } +} diff --git a/Robust.Shared/Utility/StreamExt.cs b/Robust.Shared/Utility/StreamExt.cs index afc748d02..22d0ad2ac 100644 --- a/Robust.Shared/Utility/StreamExt.cs +++ b/Robust.Shared/Utility/StreamExt.cs @@ -98,5 +98,28 @@ namespace Robust.Shared.Utility return totalRead; } } + + /// + /// Gets the span over the currently filled region of the memory stream, based on its length. + /// + public static Span AsSpan(this MemoryStream ms) + { + // Let it be forever immortalized that, while I was writing this function, + // Julian suggested that I should name it "AssSpan" to test if that would slip through review. + + var buf = ms.GetBuffer(); + + return buf.AsSpan(0, (int) ms.Length); + } + + /// + /// Gets the memory over the currently filled region of the memory stream, based on its length. + /// + public static Memory AsMemory(this MemoryStream ms) + { + var buf = ms.GetBuffer(); + + return buf.AsMemory(0, (int) ms.Length); + } } } diff --git a/Robust.Shared/Utility/ZStd.cs b/Robust.Shared/Utility/ZStd.cs index 974fc227f..75f437911 100644 --- a/Robust.Shared/Utility/ZStd.cs +++ b/Robust.Shared/Utility/ZStd.cs @@ -77,7 +77,7 @@ public sealed unsafe class ZStdCompressionContext : IDisposable ZSTD_CCtx_setParameter(Context, parameter, value); } - public int Compress(Span destination, Span source, int compressionLevel = ZSTD_CLEVEL_DEFAULT) + public int Compress(Span destination, ReadOnlySpan source, int compressionLevel = ZSTD_CLEVEL_DEFAULT) { CheckDisposed(); @@ -95,6 +95,23 @@ public sealed unsafe class ZStdCompressionContext : IDisposable } } + public int Compress2(Span destination, ReadOnlySpan source) + { + CheckDisposed(); + + fixed (byte* dst = destination) + fixed (byte* src = source) + { + var ret = ZSTD_compress2( + Context, + dst, (nuint)destination.Length, + src, (nuint)source.Length); + + ZStdException.ThrowIfError(ret); + return (int)ret; + } + } + ~ZStdCompressionContext() { Dispose();