Pool MsgState streams (#4582)

This commit is contained in:
metalgearsloth
2023-11-28 19:10:30 +11:00
committed by GitHub
parent 4fd9b2bc3b
commit 3ffef625ec
14 changed files with 91 additions and 27 deletions

View File

@@ -0,0 +1,46 @@
using System;
using System.IO;
using Microsoft.IO;
using Robust.Shared.Utility;
namespace Robust.Shared.GameObjects;
/// <summary>
/// Generic memory manager for engine use.
/// </summary>
internal sealed class RobustMemoryManager
{
// Let's be real this is a bandaid for pooling bullshit at an engine level and I don't know what
// good memory management looks like for PVS or the RobustSerializer.
private static readonly RecyclableMemoryStreamManager MemStreamManager = new()
{
ThrowExceptionOnToArray = true,
};
public RobustMemoryManager()
{
MemStreamManager.StreamDoubleDisposed += (sender, args) =>
throw new InvalidOperationException("Found double disposed stream.");
MemStreamManager.StreamFinalized += (sender, args) =>
throw new InvalidOperationException("Stream finalized but not disposed indicating a leak");
MemStreamManager.StreamOverCapacity += (sender, args) =>
throw new InvalidOperationException("Stream over memory capacity");
}
public static MemoryStream GetMemoryStream()
{
var stream = MemStreamManager.GetStream("RobustMemoryManager");
DebugTools.Assert(stream.Position == 0);
return stream;
}
public static MemoryStream GetMemoryStream(int length)
{
var stream = MemStreamManager.GetStream("RobustMemoryManager", length);
DebugTools.Assert(stream.Position == 0);
return stream;
}
}

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -17,7 +18,8 @@ namespace Robust.Shared.Network.Messages
public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer)
{
int length = buffer.ReadVariableInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
Text = serializer.Deserialize<FormattedMessage>(stream);
}

View File

@@ -33,8 +33,9 @@ namespace Robust.Shared.Network.Messages
{
case EntityMessageType.SystemMessage:
{
int length = buffer.ReadVariableInt32();
using var stream = buffer.ReadAlignedMemory(length);
var length = buffer.ReadVariableInt32();
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
SystemMessage = serializer.Deserialize<EntityEventArgs>(stream);
}
break;

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -28,7 +29,8 @@ namespace Robust.Shared.Network.Messages
{
buffer.ReadPadBits();
var length = buffer.ReadVariableInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
serializer.DeserializeDirect(stream, out Echo);
serializer.DeserializeDirect(stream, out Response);
}

View File

@@ -1,10 +1,9 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.GameStates;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -38,29 +37,30 @@ namespace Robust.Shared.Network.Messages
// State is compressed.
if (compressedLength > 0)
{
var stream = buffer.ReadAlignedMemory(compressedLength);
var stream = RobustMemoryManager.GetMemoryStream(compressedLength);
buffer.ReadAlignedMemory(stream, compressedLength);
using var decompressStream = new ZStdDecompressStream(stream);
var decompressedStream = new MemoryStream(uncompressedLength);
decompressStream.CopyTo(decompressedStream, uncompressedLength);
decompressedStream.Position = 0;
finalStream = decompressedStream;
finalStream = RobustMemoryManager.GetMemoryStream(uncompressedLength);
finalStream.SetLength(uncompressedLength);
decompressStream.CopyTo(finalStream, uncompressedLength);
finalStream.Position = 0;
}
// State is uncompressed.
else
{
var stream = buffer.ReadAlignedMemory(uncompressedLength);
finalStream = stream;
finalStream = RobustMemoryManager.GetMemoryStream(uncompressedLength);
buffer.ReadAlignedMemory(finalStream, uncompressedLength);
}
serializer.DeserializeDirect(finalStream, out State);
finalStream.Dispose();
State.PayloadSize = uncompressedLength;
finalStream.Dispose();
}
public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer)
{
var stateStream = new MemoryStream();
using var stateStream = RobustMemoryManager.GetMemoryStream();
serializer.SerializeDirect(stateStream, State);
buffer.WriteVariableInt32((int)stateStream.Length);
@@ -87,7 +87,6 @@ namespace Robust.Shared.Network.Messages
{
// 0 means that the state isn't compressed.
buffer.WriteVariableInt32(0);
buffer.Write(stateStream.AsSpan());
}

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -42,12 +43,14 @@ namespace Robust.Shared.Network.Messages
SessionId = buffer.ReadUInt32();
{
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
PropertyIndex = serializer.Deserialize<object[]>(stream);
}
{
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
Value = serializer.Deserialize(stream);
}
ReinterpretValue = buffer.ReadBoolean();

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -31,7 +32,8 @@ namespace Robust.Shared.Network.Messages
{
RequestId = buffer.ReadUInt32();
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
Blob = serializer.Deserialize<ViewVariablesBlob>(stream);
}

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -37,7 +38,8 @@ namespace Robust.Shared.Network.Messages
RequestId = buffer.ReadUInt32();
SessionId = buffer.ReadUInt32();
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
RequestMeta = serializer.Deserialize<ViewVariablesRequest>(stream);
}

View File

@@ -1,5 +1,6 @@
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -32,7 +33,8 @@ namespace Robust.Shared.Network.Messages
{
RequestId = buffer.ReadUInt32();
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
Selector = serializer.Deserialize<ViewVariablesObjectSelector>(stream);
}

View File

@@ -6,6 +6,7 @@ using Robust.Shared.GameObjects;
using Robust.Shared.Map;
using Robust.Shared.Maths;
using Robust.Shared.Timing;
using Robust.Shared.Utility;
namespace Robust.Shared.Network
{
@@ -96,16 +97,17 @@ namespace Robust.Shared.Network
/// <exception cref="ArgumentException">
/// Thrown if the current read position of the message is not byte-aligned.
/// </exception>
public static MemoryStream ReadAlignedMemory(this NetIncomingMessage message, int length)
public static void ReadAlignedMemory(this NetIncomingMessage message, MemoryStream memStream, int length)
{
if ((message.Position & 7) != 0)
{
throw new ArgumentException("Read position in message must be byte-aligned", nameof(message));
}
var stream = new MemoryStream(message.Data, message.PositionInBytes, length, false);
DebugTools.Assert(memStream.Position == 0);
memStream.Write(message.Data, message.PositionInBytes, length);
memStream.Position = 0;
message.Position += length * 8;
return stream;
}
public static TimeSpan ReadTimeSpan(this NetIncomingMessage message)

View File

@@ -9,6 +9,7 @@
<PackageReference Include="JetBrains.Annotations" Version="2021.3.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.2" />
<PackageReference Include="Microsoft.ILVerification" Version="6.0.0" PrivateAssets="compile" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.3.2" />
<PackageReference Include="Nett" Version="0.15.0" PrivateAssets="compile" />
<PackageReference Include="NVorbis" Version="0.10.1" PrivateAssets="compile" />
<PackageReference Include="Pidgin" Version="2.5.0" />

View File

@@ -49,6 +49,7 @@ namespace Robust.Shared
deps.Register<IParallelManagerInternal, ParallelManager>();
deps.Register<ToolshedManager>();
deps.Register<HttpClientHolder>();
deps.Register<RobustMemoryManager>();
}
}
}

View File

@@ -1,6 +1,7 @@
using System;
using System.IO;
using Lidgren.Network;
using Robust.Shared.GameObjects;
using Robust.Shared.IoC;
using Robust.Shared.Network;
using Robust.Shared.Serialization;
@@ -156,7 +157,8 @@ internal sealed class MsgViewVariablesListPathReq : MsgViewVariablesPathReq
{
base.ReadFromBuffer(buffer, serializer);
var length = buffer.ReadInt32();
using var stream = buffer.ReadAlignedMemory(length);
using var stream = RobustMemoryManager.GetMemoryStream(length);
buffer.ReadAlignedMemory(stream, length);
Options = serializer.Deserialize<VVListPathOptions>(stream);
}

View File

@@ -71,7 +71,6 @@ namespace Robust.UnitTesting.Shared.Timing
public void TestCancellation()
{
var timerManager = IoCManager.Resolve<ITimerManager>();
var taskManager = IoCManager.Resolve<ITaskManager>();
var cts = new CancellationTokenSource();
var ran = false;