mirror of
https://github.com/space-wizards/RobustToolbox.git
synced 2026-02-15 03:30:53 +01:00
ZSTD game states + other improvements (#2737)
This commit is contained in:
committed by
GitHub
parent
f2fa930edd
commit
588a9e9f63
@@ -28,6 +28,7 @@ using Robust.Shared.Network;
|
||||
using Robust.Shared.Prototypes;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Serialization.Manager;
|
||||
using Robust.Shared.Threading;
|
||||
using Robust.Shared.Timing;
|
||||
using Robust.Shared.Utility;
|
||||
using YamlDotNet.RepresentationModel;
|
||||
@@ -65,6 +66,7 @@ namespace Robust.Client
|
||||
[Dependency] private readonly IAuthManager _authManager = default!;
|
||||
[Dependency] private readonly IMidiManager _midiManager = default!;
|
||||
[Dependency] private readonly IEyeManager _eyeManager = default!;
|
||||
[Dependency] private readonly IParallelManagerInternal _parallelMgr = default!;
|
||||
|
||||
private IWebViewManagerHook? _webViewHook;
|
||||
|
||||
@@ -326,6 +328,8 @@ namespace Robust.Client
|
||||
|
||||
ProfileOptSetup.Setup(_configurationManager);
|
||||
|
||||
_parallelMgr.Initialize();
|
||||
|
||||
_resourceCache.Initialize(Options.LoadConfigAndUserData ? userDataDir : null);
|
||||
|
||||
var mountOptions = _commandLineArgs != null
|
||||
|
||||
@@ -31,6 +31,7 @@ using Robust.Shared.Network;
|
||||
using Robust.Shared.Prototypes;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Serialization.Manager;
|
||||
using Robust.Shared.Threading;
|
||||
using Robust.Shared.Timing;
|
||||
using Robust.Shared.Utility;
|
||||
using Serilog.Debugging;
|
||||
@@ -87,6 +88,7 @@ namespace Robust.Server
|
||||
[Dependency] private readonly ILocalizationManagerInternal _loc = default!;
|
||||
[Dependency] private readonly INetConfigurationManager _netCfgMan = default!;
|
||||
[Dependency] private readonly IServerConsoleHost _consoleHost = default!;
|
||||
[Dependency] private readonly IParallelManagerInternal _parallelMgr = default!;
|
||||
|
||||
private readonly Stopwatch _uptimeStopwatch = new();
|
||||
|
||||
@@ -184,6 +186,8 @@ namespace Robust.Server
|
||||
|
||||
ProfileOptSetup.Setup(_config);
|
||||
|
||||
_parallelMgr.Initialize();
|
||||
|
||||
//Sets up Logging
|
||||
_logHandlerFactory = logHandlerFactory;
|
||||
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using Microsoft.CodeAnalysis;
|
||||
using Microsoft.Extensions.ObjectPool;
|
||||
using Robust.Server.GameObjects;
|
||||
using Robust.Server.Player;
|
||||
using Robust.Shared;
|
||||
using Robust.Shared.Configuration;
|
||||
using Robust.Shared.Enums;
|
||||
using Robust.Shared.GameObjects;
|
||||
using Robust.Shared.GameStates;
|
||||
@@ -18,8 +17,10 @@ using Robust.Shared.Log;
|
||||
using Robust.Shared.Map;
|
||||
using Robust.Shared.Network;
|
||||
using Robust.Shared.Network.Messages;
|
||||
using Robust.Shared.Threading;
|
||||
using Robust.Shared.Timing;
|
||||
using Robust.Shared.Utility;
|
||||
using SharpZstd.Interop;
|
||||
|
||||
namespace Robust.Server.GameStates
|
||||
{
|
||||
@@ -40,9 +41,13 @@ namespace Robust.Server.GameStates
|
||||
[Dependency] private readonly INetworkedMapManager _mapManager = default!;
|
||||
[Dependency] private readonly IEntitySystemManager _systemManager = default!;
|
||||
[Dependency] private readonly IServerEntityNetworkManager _entityNetworkManager = default!;
|
||||
[Dependency] private readonly IConfigurationManager _cfg = default!;
|
||||
[Dependency] private readonly IParallelManager _parallelMgr = default!;
|
||||
|
||||
private ISawmill _logger = default!;
|
||||
|
||||
private PvsThreadResources[] _threadResourcesPool = Array.Empty<PvsThreadResources>();
|
||||
|
||||
public ushort TransformNetId { get; set; }
|
||||
|
||||
public void PostInject()
|
||||
@@ -60,6 +65,43 @@ namespace Robust.Server.GameStates
|
||||
_networkManager.Disconnect += HandleClientDisconnect;
|
||||
|
||||
_pvs = EntitySystem.Get<PVSSystem>();
|
||||
|
||||
_parallelMgr.AddAndInvokeParallelCountChanged(ParallelChanged);
|
||||
|
||||
_cfg.OnValueChanged(CVars.NetPVSCompressLevel, _ => UpdateZStdParams(), true);
|
||||
}
|
||||
|
||||
private void ParallelChanged()
|
||||
{
|
||||
foreach (var resource in _threadResourcesPool)
|
||||
{
|
||||
resource.CompressionContext.Dispose();
|
||||
}
|
||||
|
||||
_threadResourcesPool = new PvsThreadResources[_parallelMgr.ParallelProcessCount];
|
||||
for (var i = 0; i < _threadResourcesPool.Length; i++)
|
||||
{
|
||||
ref var res = ref _threadResourcesPool[i];
|
||||
res.CompressionContext = new ZStdCompressionContext();
|
||||
}
|
||||
|
||||
UpdateZStdParams();
|
||||
}
|
||||
|
||||
private void UpdateZStdParams()
|
||||
{
|
||||
var compressionLevel = _cfg.GetCVar(CVars.NetPVSCompressLevel);
|
||||
|
||||
for (var i = 0; i < _threadResourcesPool.Length; i++)
|
||||
{
|
||||
ref var res = ref _threadResourcesPool[i];
|
||||
res.CompressionContext.SetParameter(ZSTD_cParameter.ZSTD_c_compressionLevel, compressionLevel);
|
||||
}
|
||||
}
|
||||
|
||||
private struct PvsThreadResources
|
||||
{
|
||||
public ZStdCompressionContext CompressionContext;
|
||||
}
|
||||
|
||||
private void HandleClientConnected(object? sender, NetChannelArgs e)
|
||||
@@ -142,7 +184,7 @@ namespace Robust.Server.GameStates
|
||||
(chunks, playerChunks, viewerEntities) = _pvs.GetChunks(players);
|
||||
const int ChunkBatchSize = 2;
|
||||
var chunksCount = chunks.Count;
|
||||
var chunkBatches = (int) MathF.Ceiling((float) chunksCount / ChunkBatchSize);
|
||||
var chunkBatches = (int)MathF.Ceiling((float)chunksCount / ChunkBatchSize);
|
||||
chunkCache =
|
||||
new (Dictionary<EntityUid, MetaDataComponent> metadata, RobustTree<EntityUid> tree)?[chunksCount];
|
||||
|
||||
@@ -159,7 +201,8 @@ namespace Robust.Server.GameStates
|
||||
for (var j = start; j < end; ++j)
|
||||
{
|
||||
var (visMask, chunkIndexLocation) = chunks[j];
|
||||
reuse[j] = _pvs.TryCalculateChunk(chunkIndexLocation, visMask, transformQuery, metadataQuery, out var chunk);
|
||||
reuse[j] = _pvs.TryCalculateChunk(chunkIndexLocation, visMask, transformQuery, metadataQuery,
|
||||
out var chunk);
|
||||
chunkCache[j] = chunk;
|
||||
}
|
||||
});
|
||||
@@ -168,33 +211,27 @@ namespace Robust.Server.GameStates
|
||||
ArrayPool<bool>.Shared.Return(reuse);
|
||||
}
|
||||
|
||||
const int BatchSize = 2;
|
||||
var batches = (int) MathF.Ceiling((float) players.Length / BatchSize);
|
||||
|
||||
Parallel.For(0, batches, i =>
|
||||
{
|
||||
var start = i * BatchSize;
|
||||
var end = Math.Min(start + BatchSize, players.Length);
|
||||
|
||||
for (var j = start; j < end; ++j)
|
||||
_parallelMgr.ParallelForWithResources(
|
||||
0, players.Length,
|
||||
_threadResourcesPool,
|
||||
(int i, ref PvsThreadResources resource) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
SendStateUpdate(j);
|
||||
SendStateUpdate(i, ref resource);
|
||||
}
|
||||
catch (Exception e) // Catch EVERY exception
|
||||
{
|
||||
_logger.Log(LogLevel.Error, e, "Caught exception while generating mail.");
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
void SendStateUpdate(int sessionIndex)
|
||||
void SendStateUpdate(int sessionIndex, ref PvsThreadResources resources)
|
||||
{
|
||||
var session = players[sessionIndex];
|
||||
|
||||
// KILL IT WITH FIRE
|
||||
if(mainThread != Thread.CurrentThread)
|
||||
if (mainThread != Thread.CurrentThread)
|
||||
IoCManager.InitThread(new DependencyCollection(parentDeps), true);
|
||||
|
||||
var channel = session.ConnectedClient;
|
||||
@@ -214,13 +251,15 @@ namespace Robust.Server.GameStates
|
||||
// lastAck varies with each client based on lag and such, we can't just make 1 global state and send it to everyone
|
||||
var lastInputCommand = inputSystem.GetLastInputCommand(session);
|
||||
var lastSystemMessage = _entityNetworkManager.GetLastMessageSequence(session);
|
||||
var state = new GameState(lastAck, _gameTiming.CurTick, Math.Max(lastInputCommand, lastSystemMessage), entStates, playerStates, deletions, mapData);
|
||||
var state = new GameState(lastAck, _gameTiming.CurTick, Math.Max(lastInputCommand, lastSystemMessage),
|
||||
entStates, playerStates, deletions, mapData);
|
||||
|
||||
InterlockedHelper.Min(ref oldestAckValue, lastAck.Value);
|
||||
|
||||
// actually send the state
|
||||
var stateUpdateMessage = new MsgState();
|
||||
stateUpdateMessage.State = state;
|
||||
stateUpdateMessage.CompressionContext = resources.CompressionContext;
|
||||
|
||||
// If the state is too big we let Lidgren send it reliably.
|
||||
// This is to avoid a situation where a state is so large that it consistently gets dropped
|
||||
@@ -238,7 +277,7 @@ namespace Robust.Server.GameStates
|
||||
_networkManager.ServerSendMessage(stateUpdateMessage, channel);
|
||||
}
|
||||
|
||||
if(_pvs.CullingEnabled)
|
||||
if (_pvs.CullingEnabled)
|
||||
_pvs.ReturnToPool(playerChunks);
|
||||
_pvs.Cleanup(_playerManager.ServerSessions);
|
||||
var oldestAck = new GameTick(oldestAckValue);
|
||||
|
||||
@@ -136,6 +136,12 @@ namespace Robust.Shared
|
||||
public static readonly CVarDef<int> NetPVSEntityBudget =
|
||||
CVarDef.Create("net.pvs_budget", 50, CVar.ARCHIVE | CVar.REPLICATED);
|
||||
|
||||
/// <summary>
|
||||
/// ZSTD compression level to use when compressing game states.
|
||||
/// </summary>
|
||||
public static readonly CVarDef<int> NetPVSCompressLevel =
|
||||
CVarDef.Create("net.pvs_compress_level", 3, CVar.SERVERONLY);
|
||||
|
||||
/// <summary>
|
||||
/// Log late input messages from clients.
|
||||
/// </summary>
|
||||
@@ -188,6 +194,33 @@ namespace Robust.Shared
|
||||
public static readonly CVarDef<string> NetLidgrenAppIdentifier =
|
||||
CVarDef.Create("net.lidgren_app_identifier", "RobustToolbox");
|
||||
|
||||
#if DEBUG
|
||||
/// <summary>
|
||||
/// Add random fake network loss to all outgoing UDP network packets, as a ratio of how many packets to drop.
|
||||
/// 0 = no packet loss, 1 = all packets dropped
|
||||
/// </summary>
|
||||
public static readonly CVarDef<float> NetFakeLoss = CVarDef.Create("net.fakeloss", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add fake extra delay to all outgoing UDP network packets, in seconds.
|
||||
/// </summary>
|
||||
/// <seealso cref="NetFakeLagRand"/>
|
||||
public static readonly CVarDef<float> NetFakeLagMin = CVarDef.Create("net.fakelagmin", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add fake extra random delay to all outgoing UDP network packets, in seconds.
|
||||
/// The actual delay added for each packet is random between 0 and the specified value.
|
||||
/// </summary>
|
||||
/// <seealso cref="NetFakeLagMin"/>
|
||||
public static readonly CVarDef<float> NetFakeLagRand = CVarDef.Create("net.fakelagrand", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add random fake duplicates to all outgoing UDP network packets, as a ratio of how many packets to duplicate.
|
||||
/// 0 = no packets duplicated, 1 = all packets duplicated.
|
||||
/// </summary>
|
||||
public static readonly CVarDef<float> NetFakeDuplicates = CVarDef.Create("net.fakeduplicates", 0f, CVar.CHEAT);
|
||||
#endif
|
||||
|
||||
/**
|
||||
* SUS
|
||||
*/
|
||||
@@ -218,33 +251,6 @@ namespace Robust.Shared
|
||||
public static readonly CVarDef<int> SysGameThreadPriority =
|
||||
CVarDef.Create("sys.game_thread_priority", (int) ThreadPriority.AboveNormal);
|
||||
|
||||
#if DEBUG
|
||||
/// <summary>
|
||||
/// Add random fake network loss to all outgoing UDP network packets, as a ratio of how many packets to drop.
|
||||
/// 0 = no packet loss, 1 = all packets dropped
|
||||
/// </summary>
|
||||
public static readonly CVarDef<float> NetFakeLoss = CVarDef.Create("net.fakeloss", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add fake extra delay to all outgoing UDP network packets, in seconds.
|
||||
/// </summary>
|
||||
/// <seealso cref="NetFakeLagRand"/>
|
||||
public static readonly CVarDef<float> NetFakeLagMin = CVarDef.Create("net.fakelagmin", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add fake extra random delay to all outgoing UDP network packets, in seconds.
|
||||
/// The actual delay added for each packet is random between 0 and the specified value.
|
||||
/// </summary>
|
||||
/// <seealso cref="NetFakeLagMin"/>
|
||||
public static readonly CVarDef<float> NetFakeLagRand = CVarDef.Create("net.fakelagrand", 0f, CVar.CHEAT);
|
||||
|
||||
/// <summary>
|
||||
/// Add random fake duplicates to all outgoing UDP network packets, as a ratio of how many packets to duplicate.
|
||||
/// 0 = no packets duplicated, 1 = all packets duplicated.
|
||||
/// </summary>
|
||||
public static readonly CVarDef<float> NetFakeDuplicates = CVarDef.Create("net.fakeduplicates", 0f, CVar.CHEAT);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* METRICS
|
||||
*/
|
||||
@@ -1142,5 +1148,15 @@ namespace Robust.Shared
|
||||
public static readonly CVarDef<int> AczManifestCompressLevel =
|
||||
CVarDef.Create("acz.manifest_compress_level", 14, CVar.SERVERONLY);
|
||||
|
||||
/*
|
||||
* THREAD
|
||||
*/
|
||||
|
||||
/// <summary>
|
||||
/// The nominal parallel processing count to use for parallelized operations.
|
||||
/// The default of 0 automatically selects the system's processor count.
|
||||
/// </summary>
|
||||
public static readonly CVarDef<int> ThreadParallelCount =
|
||||
CVarDef.Create("thread.parallel_count", 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ using Robust.Shared.GameObjects;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Timing;
|
||||
using Robust.Shared.Utility;
|
||||
|
||||
#nullable disable
|
||||
|
||||
@@ -52,13 +53,12 @@ namespace Robust.Shared.Network.Messages
|
||||
case EntityMessageType.SystemMessage:
|
||||
{
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, SystemMessage);
|
||||
buffer.WriteVariableInt32((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
var stream = new MemoryStream();
|
||||
|
||||
serializer.Serialize(stream, SystemMessage);
|
||||
|
||||
buffer.WriteVariableInt32((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -51,8 +51,7 @@ namespace Robust.Shared.Network.Messages
|
||||
serializer.SerializeDirect(memoryStream, Response);
|
||||
|
||||
buffer.WriteVariableInt32((int)memoryStream.Length);
|
||||
memoryStream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
buffer.Write(memoryStream.AsSpan());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
using System;
|
||||
using System.Buffers;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.IO.Compression;
|
||||
using Lidgren.Network;
|
||||
using Robust.Shared.GameStates;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Log;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Utility;
|
||||
|
||||
@@ -25,6 +25,7 @@ namespace Robust.Shared.Network.Messages
|
||||
public override MsgGroups MsgGroup => MsgGroups.Entity;
|
||||
|
||||
public GameState State;
|
||||
public ZStdCompressionContext CompressionContext;
|
||||
|
||||
private bool _hasWritten;
|
||||
|
||||
@@ -39,7 +40,7 @@ namespace Robust.Shared.Network.Messages
|
||||
if (compressedLength > 0)
|
||||
{
|
||||
var stream = buffer.ReadAlignedMemory(compressedLength);
|
||||
using var decompressStream = new DeflateStream(stream, CompressionMode.Decompress);
|
||||
using var decompressStream = new ZStdDecompressStream(stream);
|
||||
var decompressedStream = new MemoryStream(uncompressedLength);
|
||||
decompressStream.CopyTo(decompressedStream, uncompressedLength);
|
||||
decompressedStream.Position = 0;
|
||||
@@ -62,35 +63,37 @@ namespace Robust.Shared.Network.Messages
|
||||
public override void WriteToBuffer(NetOutgoingMessage buffer)
|
||||
{
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
MemoryStream finalStream;
|
||||
var stateStream = new MemoryStream();
|
||||
serializer.SerializeDirect(stateStream, State);
|
||||
buffer.WriteVariableInt32((int) stateStream.Length);
|
||||
buffer.WriteVariableInt32((int)stateStream.Length);
|
||||
|
||||
// We compress the state.
|
||||
if (stateStream.Length > CompressionThreshold)
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
stateStream.Position = 0;
|
||||
var compressedStream = new MemoryStream();
|
||||
using (var deflateStream = new DeflateStream(compressedStream, CompressionMode.Compress, true))
|
||||
{
|
||||
stateStream.CopyTo(deflateStream);
|
||||
}
|
||||
var buf = ArrayPool<byte>.Shared.Rent(ZStd.CompressBound((int)stateStream.Length));
|
||||
var length = CompressionContext.Compress2(buf, stateStream.AsSpan());
|
||||
|
||||
buffer.WriteVariableInt32((int) compressedStream.Length);
|
||||
finalStream = compressedStream;
|
||||
buffer.WriteVariableInt32(length);
|
||||
|
||||
buffer.Write(buf.AsSpan(0, length));
|
||||
|
||||
var elapsed = sw.Elapsed;
|
||||
System.Console.WriteLine(
|
||||
$"From: {State.FromSequence} To: {State.ToSequence} Size: {length} B Before: {stateStream.Length} B time: {elapsed}");
|
||||
|
||||
ArrayPool<byte>.Shared.Return(buf);
|
||||
}
|
||||
// The state is sent as is.
|
||||
else
|
||||
{
|
||||
// 0 means that the state isn't compressed.
|
||||
buffer.WriteVariableInt32(0);
|
||||
finalStream = stateStream;
|
||||
|
||||
buffer.Write(stateStream.AsSpan());
|
||||
}
|
||||
|
||||
finalStream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
finalStream.Dispose();
|
||||
|
||||
_hasWritten = false;
|
||||
MsgSize = buffer.LengthBytes;
|
||||
@@ -109,6 +112,7 @@ namespace Robust.Shared.Network.Messages
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return MsgSize > ReliableThreshold;
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ using System.IO;
|
||||
using Lidgren.Network;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Utility;
|
||||
using Robust.Shared.ViewVariables;
|
||||
|
||||
#nullable disable
|
||||
@@ -57,20 +58,17 @@ namespace Robust.Shared.Network.Messages
|
||||
{
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
buffer.Write(SessionId);
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, PropertyIndex);
|
||||
buffer.Write((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, Value);
|
||||
buffer.Write((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
|
||||
var stream = new MemoryStream();
|
||||
serializer.Serialize(stream, PropertyIndex);
|
||||
buffer.Write((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
|
||||
stream.Position = 0;
|
||||
serializer.Serialize(stream, Value);
|
||||
buffer.Write((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
|
||||
buffer.Write(ReinterpretValue);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ using System.IO;
|
||||
using Lidgren.Network;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Utility;
|
||||
using Robust.Shared.ViewVariables;
|
||||
|
||||
#nullable disable
|
||||
@@ -39,13 +40,11 @@ namespace Robust.Shared.Network.Messages
|
||||
{
|
||||
buffer.Write(RequestId);
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, Blob);
|
||||
buffer.Write((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
|
||||
var stream = new MemoryStream();
|
||||
serializer.Serialize(stream, Blob);
|
||||
buffer.Write((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ using System.IO;
|
||||
using Lidgren.Network;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Utility;
|
||||
using Robust.Shared.ViewVariables;
|
||||
|
||||
#nullable disable
|
||||
@@ -46,13 +47,12 @@ namespace Robust.Shared.Network.Messages
|
||||
buffer.Write(RequestId);
|
||||
buffer.Write(SessionId);
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, RequestMeta);
|
||||
buffer.Write((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
|
||||
var stream = new MemoryStream();
|
||||
serializer.Serialize(stream, RequestMeta);
|
||||
|
||||
buffer.Write((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ using System.IO;
|
||||
using Lidgren.Network;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Utility;
|
||||
using Robust.Shared.ViewVariables;
|
||||
|
||||
#nullable disable
|
||||
@@ -40,13 +41,11 @@ namespace Robust.Shared.Network.Messages
|
||||
{
|
||||
buffer.Write(RequestId);
|
||||
var serializer = IoCManager.Resolve<IRobustSerializer>();
|
||||
using (var stream = new MemoryStream())
|
||||
{
|
||||
serializer.Serialize(stream, Selector);
|
||||
buffer.Write((int)stream.Length);
|
||||
stream.TryGetBuffer(out var segment);
|
||||
buffer.Write(segment);
|
||||
}
|
||||
|
||||
var stream = new MemoryStream();
|
||||
serializer.Serialize(stream, Selector);
|
||||
buffer.Write((int)stream.Length);
|
||||
buffer.Write(stream.AsSpan());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ using Robust.Shared.Random;
|
||||
using Robust.Shared.Sandboxing;
|
||||
using Robust.Shared.Serialization;
|
||||
using Robust.Shared.Serialization.Manager;
|
||||
using Robust.Shared.Threading;
|
||||
using Robust.Shared.Timing;
|
||||
|
||||
namespace Robust.Shared
|
||||
@@ -47,6 +48,8 @@ namespace Robust.Shared
|
||||
IoCManager.Register<IManifoldManager, CollisionManager>();
|
||||
IoCManager.Register<IIslandManager, IslandManager>();
|
||||
IoCManager.Register<IVerticesSimplifier, RamerDouglasPeuckerSimplifier>();
|
||||
IoCManager.Register<IParallelManager, ParallelManager>();
|
||||
IoCManager.Register<IParallelManagerInternal, ParallelManager>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
89
Robust.Shared/Threading/ParallelManager.cs
Normal file
89
Robust.Shared/Threading/ParallelManager.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Robust.Shared.Configuration;
|
||||
using Robust.Shared.IoC;
|
||||
using Robust.Shared.Log;
|
||||
using Robust.Shared.Utility;
|
||||
|
||||
namespace Robust.Shared.Threading;
|
||||
|
||||
public interface IParallelManager
|
||||
{
|
||||
event Action ParallelCountChanged;
|
||||
|
||||
int ParallelProcessCount { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Add the delegate to <see cref="ParallelCountChanged"/> and immediately invoke it.
|
||||
/// </summary>
|
||||
void AddAndInvokeParallelCountChanged(Action changed);
|
||||
}
|
||||
|
||||
internal interface IParallelManagerInternal : IParallelManager
|
||||
{
|
||||
void Initialize();
|
||||
}
|
||||
|
||||
internal sealed class ParallelManager : IParallelManagerInternal
|
||||
{
|
||||
[Dependency] private readonly IConfigurationManager _cfg = default!;
|
||||
|
||||
public event Action? ParallelCountChanged;
|
||||
public int ParallelProcessCount { get; private set; }
|
||||
|
||||
public void Initialize()
|
||||
{
|
||||
_cfg.OnValueChanged(CVars.ThreadParallelCount, UpdateCVar, true);
|
||||
}
|
||||
|
||||
public void AddAndInvokeParallelCountChanged(Action changed)
|
||||
{
|
||||
ParallelCountChanged += changed;
|
||||
changed();
|
||||
}
|
||||
|
||||
private void UpdateCVar(int value)
|
||||
{
|
||||
var oldCount = ParallelProcessCount;
|
||||
ParallelProcessCount = value == 0 ? Environment.ProcessorCount : value;
|
||||
|
||||
if (oldCount != ParallelProcessCount)
|
||||
ParallelCountChanged?.Invoke();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ParallelManagerExt
|
||||
{
|
||||
public delegate void ParallelResourceAction<T>(int i, ref T resource);
|
||||
|
||||
public static void ParallelForWithResources<T>(
|
||||
this IParallelManager manager,
|
||||
int fromInclusive,
|
||||
int toExclusive,
|
||||
T[] resources,
|
||||
ParallelResourceAction<T> action)
|
||||
{
|
||||
var parallelCount = manager.ParallelProcessCount;
|
||||
|
||||
DebugTools.Assert(
|
||||
resources.Length >= parallelCount,
|
||||
"Resources buffer is too small to fit maximum thread count.");
|
||||
|
||||
var threadIndex = 0;
|
||||
|
||||
Parallel.For(
|
||||
fromInclusive, toExclusive,
|
||||
new ParallelOptions { MaxDegreeOfParallelism = parallelCount },
|
||||
() => Interlocked.Increment(ref threadIndex),
|
||||
(i, _, localThreadIdx) =>
|
||||
{
|
||||
ref var resource = ref resources[localThreadIdx];
|
||||
|
||||
action(i, ref resource);
|
||||
|
||||
return localThreadIdx;
|
||||
},
|
||||
_ => { });
|
||||
}
|
||||
}
|
||||
@@ -98,5 +98,28 @@ namespace Robust.Shared.Utility
|
||||
return totalRead;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the span over the currently filled region of the memory stream, based on its length.
|
||||
/// </summary>
|
||||
public static Span<byte> AsSpan(this MemoryStream ms)
|
||||
{
|
||||
// Let it be forever immortalized that, while I was writing this function,
|
||||
// Julian suggested that I should name it "AssSpan" to test if that would slip through review.
|
||||
|
||||
var buf = ms.GetBuffer();
|
||||
|
||||
return buf.AsSpan(0, (int) ms.Length);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the memory over the currently filled region of the memory stream, based on its length.
|
||||
/// </summary>
|
||||
public static Memory<byte> AsMemory(this MemoryStream ms)
|
||||
{
|
||||
var buf = ms.GetBuffer();
|
||||
|
||||
return buf.AsMemory(0, (int) ms.Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ public sealed unsafe class ZStdCompressionContext : IDisposable
|
||||
ZSTD_CCtx_setParameter(Context, parameter, value);
|
||||
}
|
||||
|
||||
public int Compress(Span<byte> destination, Span<byte> source, int compressionLevel = ZSTD_CLEVEL_DEFAULT)
|
||||
public int Compress(Span<byte> destination, ReadOnlySpan<byte> source, int compressionLevel = ZSTD_CLEVEL_DEFAULT)
|
||||
{
|
||||
CheckDisposed();
|
||||
|
||||
@@ -95,6 +95,23 @@ public sealed unsafe class ZStdCompressionContext : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
public int Compress2(Span<byte> destination, ReadOnlySpan<byte> source)
|
||||
{
|
||||
CheckDisposed();
|
||||
|
||||
fixed (byte* dst = destination)
|
||||
fixed (byte* src = source)
|
||||
{
|
||||
var ret = ZSTD_compress2(
|
||||
Context,
|
||||
dst, (nuint)destination.Length,
|
||||
src, (nuint)source.Length);
|
||||
|
||||
ZStdException.ThrowIfError(ret);
|
||||
return (int)ret;
|
||||
}
|
||||
}
|
||||
|
||||
~ZStdCompressionContext()
|
||||
{
|
||||
Dispose();
|
||||
|
||||
Reference in New Issue
Block a user