diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 9794e8c5a..5cafb6076 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -35,11 +35,13 @@ END TEMPLATE--> ### Breaking changes -*None yet* +* Made types & methods related to `SharedNetworkResourceManager` internals `internal`. ### New features -*None yet* +* Added a new "high-bandwidth transfer" subsystem accessible via `ITransferManager`. Requires server-side setup with new CVars to get full benefit. +* Added `NetMessage.SequenceChannel`. +* Added `INetChannel.CanSendImmediately`. ### Bugfixes @@ -47,6 +49,7 @@ END TEMPLATE--> ### Other +* Resource uploads/downloads now use the new high-bandwidth transfer system. * `DebugTools.AssertNotNull()` has been marked with `[NotNull]`, making C# nullable analysis recognize it. ### Internal diff --git a/Robust.Client/ClientIoC.cs b/Robust.Client/ClientIoC.cs index 47061d5b7..3df848450 100644 --- a/Robust.Client/ClientIoC.cs +++ b/Robust.Client/ClientIoC.cs @@ -13,6 +13,7 @@ using Robust.Client.HWId; using Robust.Client.Input; using Robust.Client.Localization; using Robust.Client.Map; +using Robust.Client.Network.Transfer; using Robust.Client.Placement; using Robust.Client.Player; using Robust.Client.Profiling; @@ -41,6 +42,7 @@ using Robust.Shared.IoC; using Robust.Shared.Localization; using Robust.Shared.Map; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Physics; using Robust.Shared.Player; using Robust.Shared.Prototypes; @@ -174,6 +176,8 @@ namespace Robust.Client deps.Register(); deps.Register(); deps.Register(); + deps.Register(); + deps.Register(); } } } diff --git a/Robust.Client/GameController/GameController.cs b/Robust.Client/GameController/GameController.cs index 02cbb5cde..14f5770d1 100644 --- a/Robust.Client/GameController/GameController.cs +++ b/Robust.Client/GameController/GameController.cs @@ -11,6 +11,7 @@ using Robust.Client.GameObjects; using Robust.Client.GameStates; using Robust.Client.Graphics; using Robust.Client.Input; +using Robust.Client.Network.Transfer; using Robust.Client.Placement; using Robust.Client.Replays.Loading; using Robust.Client.Replays.Playback; @@ -35,6 +36,7 @@ using Robust.Shared.Localization; using Robust.Shared.Log; using Robust.Shared.Map; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Profiling; using Robust.Shared.Prototypes; using Robust.Shared.Reflection; @@ -98,6 +100,8 @@ namespace Robust.Client [Dependency] private readonly ILocalizationManager _loc = default!; [Dependency] private readonly ISystemFontManagerInternal _systemFontManager = default!; [Dependency] private readonly LoadingScreenManager _loadscr = default!; + [Dependency] private readonly ITransferManager _transfer = default!; + [Dependency] private readonly ClientTransferTestManager _transferTest = default!; private IWebViewManagerHook? _webViewHook; @@ -201,7 +205,14 @@ namespace Robust.Client _logManager.GetSawmill("res")); _loadscr.LoadingStep(_resourceCache.PreloadTextures, "Texture preload"); - _loadscr.LoadingStep(() => { _networkManager.Initialize(false); }, _networkManager); + _loadscr.LoadingStep(() => + { + _networkManager.Initialize(false); + _transfer.Initialize(); + _transferTest.Initialize(); + }, + _networkManager); + _loadscr.LoadingStep(_configurationManager.SetupNetworking, _configurationManager); _loadscr.LoadingStep(_serializer.Initialize, _serializer); _loadscr.LoadingStep(_inputManager.Initialize, _inputManager); @@ -685,6 +696,11 @@ namespace Robust.Client _modLoader.BroadcastUpdate(ModUpdateLevel.FramePostEngine, frameEventArgs); } + using (_prof.Group("Transfer")) + { + _transfer.FrameUpdate(); + } + _audio.FlushALDisposeQueues(); } diff --git a/Robust.Client/Network/Transfer/ClientTransferImplWebSocket.cs b/Robust.Client/Network/Transfer/ClientTransferImplWebSocket.cs new file mode 100644 index 000000000..8b03c0af5 --- /dev/null +++ b/Robust.Client/Network/Transfer/ClientTransferImplWebSocket.cs @@ -0,0 +1,42 @@ +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; + +namespace Robust.Client.Network.Transfer; + +internal sealed class ClientTransferImplWebSocket : TransferImplWebSocket +{ + private readonly (string EndpointUrl, byte[] Key) _info; + + public ClientTransferImplWebSocket( + (string EndpointUrl, byte[] Key) info, + ISawmill sawmill, + BaseTransferManager parent, + INetChannel channel) + : base(sawmill, parent, channel) + { + _info = info; + } + + public override async Task ClientInit(CancellationToken cancel) + { + var clientWs = new ClientWebSocket(); + clientWs.Options.SetRequestHeader(KeyHeaderName, Convert.ToBase64String(_info.Key)); + clientWs.Options.SetRequestHeader(UserIdHeaderName, Channel.UserId.ToString()); + + await clientWs.ConnectAsync(new Uri(_info.EndpointUrl), cancel); + + WebSocket = clientWs; + + ReadThread(); + } + + public override Task ServerInit() + { + throw new NotSupportedException(); + } +} diff --git a/Robust.Client/Network/Transfer/ClientTransferManager.cs b/Robust.Client/Network/Transfer/ClientTransferManager.cs new file mode 100644 index 000000000..97e690531 --- /dev/null +++ b/Robust.Client/Network/Transfer/ClientTransferManager.cs @@ -0,0 +1,79 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Robust.Shared.Asynchronous; +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Messages.Transfer; +using Robust.Shared.Network.Transfer; + +namespace Robust.Client.Network.Transfer; + +internal sealed class ClientTransferManager : BaseTransferManager, ITransferManager +{ + private readonly IClientNetManager _netManager; + private BaseTransferImpl? _transferImpl; + + public event Action? ClientHandshakeComplete; + + internal ClientTransferManager( + IClientNetManager netManager, + ILogManager logManager, + ITaskManager taskManager) + : base(logManager, NetMessageAccept.Client, taskManager) + { + _netManager = netManager; + } + + public Stream StartTransfer(INetChannel channel, TransferStartInfo startInfo) + { + if (_transferImpl == null) + throw new InvalidOperationException("Not connected yet!"); + + if (channel != _netManager.ServerChannel) + throw new InvalidOperationException("Invalid channel!"); + + return _transferImpl.StartTransfer(startInfo); + } + + public void Initialize() + { + _netManager.RegisterNetMessage(RxTransferInit, NetMessageAccept.Client | NetMessageAccept.Handshake); + _netManager.RegisterNetMessage(); + _netManager.RegisterNetMessage(RxTransferData, NetMessageAccept.Client | NetMessageAccept.Handshake); + } + + private async void RxTransferInit(MsgTransferInit message) + { + BaseTransferImpl impl; + if (message.HttpInfo is { } httpInfo) + { + impl = new ClientTransferImplWebSocket(httpInfo, Sawmill, this, message.MsgChannel); + } + else + { + impl = new TransferImplLidgren(Sawmill, message.MsgChannel, this, _netManager); + } + + _transferImpl = impl; + await _transferImpl.ClientInit(default); + + ClientHandshakeComplete?.Invoke(); + } + + private void RxTransferData(MsgTransferData message) + { + if (_transferImpl is not TransferImplLidgren lidgren) + { + message.MsgChannel.Disconnect("Not lidgren"); + return; + } + + lidgren.ReceiveData(message); + } + + public Task ServerHandshake(INetChannel channel) + { + throw new NotSupportedException(); + } +} diff --git a/Robust.Client/Network/Transfer/ClientTransferTestManager.cs b/Robust.Client/Network/Transfer/ClientTransferTestManager.cs new file mode 100644 index 000000000..72b1240c6 --- /dev/null +++ b/Robust.Client/Network/Transfer/ClientTransferTestManager.cs @@ -0,0 +1,14 @@ +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; + +namespace Robust.Client.Network.Transfer; + +internal sealed class ClientTransferTestManager(ITransferManager manager, ILogManager logManager) + : TransferTestManager(manager, logManager) +{ + protected override bool PermissionCheck(INetChannel channel) + { + return true; + } +} diff --git a/Robust.Client/Replays/Loading/ReplayLoadManager.Checkpoints.cs b/Robust.Client/Replays/Loading/ReplayLoadManager.Checkpoints.cs index 8c3d3bf29..85991865d 100644 --- a/Robust.Client/Replays/Loading/ReplayLoadManager.Checkpoints.cs +++ b/Robust.Client/Replays/Loading/ReplayLoadManager.Checkpoints.cs @@ -279,10 +279,7 @@ public sealed partial class ReplayLoadManager var path = resUpload.RelativePath.Clean().ToRelativePath(); if (uploadedFiles.Add(path) && !_netResMan.FileExists(path)) { - _netMan.DispatchLocalNetMessage(new NetworkResourceUploadMessage - { - RelativePath = path, Data = resUpload.Data - }); + _netResMan.StoreFile(path, resUpload.Data); message.Messages.RemoveSwap(i); break; } diff --git a/Robust.Client/Upload/Commands/UploadFileCommand.cs b/Robust.Client/Upload/Commands/UploadFileCommand.cs index 4c9d7ed88..bfa0a51a3 100644 --- a/Robust.Client/Upload/Commands/UploadFileCommand.cs +++ b/Robust.Client/Upload/Commands/UploadFileCommand.cs @@ -4,7 +4,6 @@ using Robust.Shared; using Robust.Shared.Configuration; using Robust.Shared.Console; using Robust.Shared.IoC; -using Robust.Shared.Network; using Robust.Shared.Upload; using Robust.Shared.Utility; @@ -14,7 +13,7 @@ public sealed class UploadFileCommand : IConsoleCommand { [Dependency] private readonly IConfigurationManager _cfgManager = default!; [Dependency] private readonly IFileDialogManager _dialog = default!; - [Dependency] private readonly INetManager _netManager = default!; + [Dependency] private readonly NetworkResourceManager _netRes = default!; public string Command => "uploadfile"; public string Description => "Uploads a resource to the server."; @@ -55,12 +54,6 @@ public sealed class UploadFileCommand : IConsoleCommand var data = file.CopyToArray(); - var msg = new NetworkResourceUploadMessage - { - RelativePath = path, - Data = data - }; - - _netManager.ClientSendMessage(msg); + _netRes.UploadResources([(path, data)]); } } diff --git a/Robust.Client/Upload/Commands/UploadFolderCommand.cs b/Robust.Client/Upload/Commands/UploadFolderCommand.cs index 6e4779af8..bd7b67e6e 100644 --- a/Robust.Client/Upload/Commands/UploadFolderCommand.cs +++ b/Robust.Client/Upload/Commands/UploadFolderCommand.cs @@ -1,4 +1,4 @@ -using System; +using System.Collections.Generic; using System.IO; using Robust.Shared; using Robust.Shared.Configuration; @@ -6,7 +6,6 @@ using Robust.Shared.Console; using Robust.Shared.ContentPack; using Robust.Shared.IoC; using Robust.Shared.Localization; -using Robust.Shared.Network; using Robust.Shared.Upload; using Robust.Shared.Utility; @@ -14,9 +13,9 @@ namespace Robust.Client.Upload.Commands; public sealed class UploadFolderCommand : IConsoleCommand { - [Dependency] private IResourceManager _resourceManager = default!; - [Dependency] private IConfigurationManager _configManager = default!; - [Dependency] private INetManager _netMan = default!; + [Dependency] private readonly IResourceManager _resourceManager = default!; + [Dependency] private readonly IConfigurationManager _configManager = default!; + [Dependency] private readonly NetworkResourceManager _netRes = default!; public string Command => "uploadfolder"; public string Description => Loc.GetString("uploadfolder-command-description"); @@ -50,6 +49,7 @@ public sealed class UploadFolderCommand : IConsoleCommand } //Grab all files in specified folder and upload them + var files = new List<(ResPath Relative, byte[] Data)>(); foreach (var filepath in _resourceManager.UserData.Find($"{folderPath.ToRelativePath()}/").files ) { await using var filestream = _resourceManager.UserData.Open(filepath, FileMode.Open); @@ -63,17 +63,14 @@ public sealed class UploadFolderCommand : IConsoleCommand var data = filestream.CopyToArray(); - var msg = new NetworkResourceUploadMessage - { - RelativePath = filepath.RelativeTo(BaseUploadFolderPath), - Data = data - }; + files.Add((filepath.RelativeTo(BaseUploadFolderPath), data)); - _netMan.ClientSendMessage(msg); fileCount++; } } + _netRes.UploadResources(files); + shell.WriteLine( Loc.GetString("uploadfolder-command-success",("fileCount",fileCount))); } } diff --git a/Robust.Client/Upload/NetworkResourceManager.cs b/Robust.Client/Upload/NetworkResourceManager.cs index 72159ebdf..0d053fcdc 100644 --- a/Robust.Client/Upload/NetworkResourceManager.cs +++ b/Robust.Client/Upload/NetworkResourceManager.cs @@ -1,5 +1,11 @@ +using System; +using System.Buffers.Binary; +using System.Collections.Generic; using Robust.Shared.IoC; +using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Upload; +using Robust.Shared.Utility; namespace Robust.Client.Upload; @@ -7,10 +13,44 @@ public sealed class NetworkResourceManager : SharedNetworkResourceManager { [Dependency] private readonly IBaseClient _client = default!; - public override void Initialize() + internal override void Initialize() { base.Initialize(); + _client.RunLevelChanged += OnLevelChanged; + + TransferManager.RegisterTransferMessage(TransferKeyNetworkUpload); + TransferManager.RegisterTransferMessage(TransferKeyNetworkDownload, ReceiveDownload); + + NetManager.RegisterNetMessage(); + } + + private async void ReceiveDownload(TransferReceivedEvent transfer) + { + Sawmill.Debug("Receiving file download transfer!"); + + await using var stream = transfer.DataStream; + + try + { + var ackKeyBytes = new byte[4]; + await stream.ReadExactlyAsync(ackKeyBytes); + var ackKey = BinaryPrimitives.ReadInt32LittleEndian(ackKeyBytes); + + await IngestFileStream(stream); + + if (ackKey != 0) + { + NetManager.ClientSendMessage(new NetworkResourceAckMessage + { + Key = ackKey + }); + } + } + catch (Exception e) + { + Sawmill.Error($"Error while downloading transfer resources: {e}"); + } } private void OnLevelChanged(object? sender, RunLevelChangedEventArgs e) @@ -27,4 +67,20 @@ public sealed class NetworkResourceManager : SharedNetworkResourceManager { ContentRoot.Clear(); } + + internal async void UploadResources(List<(ResPath Relative, byte[] Data)> files) + { + var clientNet = (IClientNetManager)NetManager; + if (clientNet.ServerChannel is not { } channel) + throw new InvalidOperationException("Not connected to server!"); + + await using var transfer = TransferManager.StartTransfer( + channel, + new TransferStartInfo + { + MessageKey = TransferKeyNetworkUpload, + }); + + await WriteFileStream(transfer, files); + } } diff --git a/Robust.Server.Testing/RobustServerSimulation.cs b/Robust.Server.Testing/RobustServerSimulation.cs index 2495fc679..66eb92d86 100644 --- a/Robust.Server.Testing/RobustServerSimulation.cs +++ b/Robust.Server.Testing/RobustServerSimulation.cs @@ -9,11 +9,13 @@ using Robust.Server.Debugging; using Robust.Server.GameObjects; using Robust.Server.GameStates; using Robust.Server.Localization; +using Robust.Server.Network.Transfer; using Robust.Server.Physics; using Robust.Server.Player; using Robust.Server.Prototypes; using Robust.Server.Reflection; using Robust.Server.Replays; +using Robust.Server.ServerStatus; using Robust.Shared; using Robust.Shared.Asynchronous; using Robust.Shared.Configuration; @@ -28,6 +30,7 @@ using Robust.Shared.Log; using Robust.Shared.Map; using Robust.Shared.Map.Components; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Physics; using Robust.Shared.Physics.Collision; using Robust.Shared.Physics.Components; @@ -198,6 +201,9 @@ namespace Robust.UnitTesting.Server container.Register(); container.Register(); container.Register(); + container.Register(); + container.Register(); + container.Register(); var realReflection = new ServerReflectionManager(); realReflection.LoadAssemblies(new List(2) @@ -262,7 +268,6 @@ namespace Robust.UnitTesting.Server // I just wanted to load pvs system container.Register(); - container.Register(); // god help you if you actually need to test pvs functions container.RegisterInstance(new Mock().Object); container.RegisterInstance(new Mock().Object); diff --git a/Robust.Server/BaseServer.cs b/Robust.Server/BaseServer.cs index f6e70299d..6fdaadc19 100644 --- a/Robust.Server/BaseServer.cs +++ b/Robust.Server/BaseServer.cs @@ -9,6 +9,7 @@ using Robust.Server.DataMetrics; using Robust.Server.GameObjects; using Robust.Server.GameStates; using Robust.Server.Log; +using Robust.Server.Network.Transfer; using Robust.Server.Placement; using Robust.Server.Player; using Robust.Server.Scripting; @@ -29,6 +30,7 @@ using Robust.Shared.Localization; using Robust.Shared.Log; using Robust.Shared.Map; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Player; using Robust.Shared.Profiling; using Robust.Shared.Prototypes; @@ -107,6 +109,8 @@ namespace Robust.Server [Dependency] private readonly UploadedContentManager _uploadedContMan = default!; [Dependency] private readonly NetworkResourceManager _netResMan = default!; [Dependency] private readonly IReflectionManager _refMan = default!; + [Dependency] private readonly ITransferManager _transfer = default!; + [Dependency] private readonly ServerTransferTestManager _transferTest = default!; private readonly Stopwatch _uptimeStopwatch = new(); @@ -293,6 +297,9 @@ namespace Robust.Server return true; } + _transfer.Initialize(); + _transferTest.Initialize(); + var dataDir = Options.LoadConfigAndUserData ? _commandLineArgs?.DataDir ?? PathHelpers.ExecutableRelativeFile("data") : null; @@ -773,6 +780,8 @@ namespace Robust.Server _modLoader.BroadcastUpdate(ModUpdateLevel.FramePostEngine, frameEventArgs); + _transfer.FrameUpdate(); + _metricsManager.FrameUpdate(); } diff --git a/Robust.Server/Network/Transfer/ServerTransferImplWebSocket.cs b/Robust.Server/Network/Transfer/ServerTransferImplWebSocket.cs new file mode 100644 index 000000000..641b960f3 --- /dev/null +++ b/Robust.Server/Network/Transfer/ServerTransferImplWebSocket.cs @@ -0,0 +1,122 @@ +using System; +using System.Net; +using System.Security.Cryptography; +using System.Threading; +using System.Threading.Tasks; +using Robust.Server.ServerStatus; +using Robust.Shared; +using Robust.Shared.Configuration; +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Messages.Transfer; +using Robust.Shared.Network.Transfer; +using Robust.Shared.Utility; + +namespace Robust.Server.Network.Transfer; + +internal sealed class ServerTransferImplWebSocket : TransferImplWebSocket +{ + private readonly IConfigurationManager _cfg; + private readonly INetManager _netManager; + private readonly SemaphoreSlim _apiLock = new(1, 1); + private readonly TaskCompletionSource _connectTcs = new(); + + // To authenticate the client doing the HTTP request, + // we ask that they provide a key we gave them via Lidgren traffic. + public byte[]? Key; + + public ServerTransferImplWebSocket( + ISawmill sawmill, + BaseTransferManager parent, + IConfigurationManager cfg, + INetManager netManager, + INetChannel channel) + : base(sawmill, parent, channel) + { + _cfg = cfg; + _netManager = netManager; + } + + public override Task ServerInit() + { + Key = RandomNumberGenerator.GetBytes(RandomKeyBytes); + + var uriBuilder = new UriBuilder(string.Concat( + _cfg.GetCVar(CVars.TransferHttpEndpoint).TrimEnd("/"), + ServerTransferManager.TransferApiUrl)); + + uriBuilder.Scheme = uriBuilder.Scheme switch + { + "http" => "ws", + "https" => "wss", + _ => throw new InvalidOperationException($"Invalid API endpoint scheme: {uriBuilder.Scheme}") + }; + + var url = uriBuilder.ToString(); + + Sawmill.Verbose($"Transfer API URL is '{url}'"); + + var initMsg = new MsgTransferInit(); + initMsg.HttpInfo = (url, Key); + + _netManager.ServerSendMessage(initMsg, Channel); + + return _connectTcs.Task; + } + + public override Task ClientInit(CancellationToken cancel) + { + throw new NotSupportedException(); + } + + public async Task HandleApiRequest(NetUserId userId, IStatusHandlerContext context) + { + using var _ = await _apiLock.WaitGuardAsync(); + + if (Key == null) + { + Sawmill.Warning($"HTTP request failed: UserID '{userId}' tried to connect twice"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return; + } + + if (!context.RequestHeaders.TryGetValue(KeyHeaderName, out var keyValue) || keyValue is not [{ } keyValueStr]) + { + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return; + } + + var buf = new byte[RandomKeyBytes]; + + if (!Convert.TryFromBase64String(keyValueStr, buf, out var written) || written != RandomKeyBytes) + { + Sawmill.Verbose("HTTP request failed: key is not valid base64 or wrong length"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return; + } + + if (!CryptographicOperations.FixedTimeEquals(buf, Key)) + { + Sawmill.Warning("HTTP request failed: key is wrong"); + await context.RespondErrorAsync(HttpStatusCode.Unauthorized); + return; + } + + Sawmill.Debug("Client connect to transfer WS channel: {UserId}", userId); + + WebSocket = await context.AcceptWebSocketAsync(); + + // We've connected. + // Clear key so this can't be reconnected to. + Key = null; + + _connectTcs.TrySetResult(); + + ReadThread(); + } + + public override void Dispose() + { + _connectTcs.TrySetCanceled(); + } +} diff --git a/Robust.Server/Network/Transfer/ServerTransferManager.cs b/Robust.Server/Network/Transfer/ServerTransferManager.cs new file mode 100644 index 000000000..d06867ca8 --- /dev/null +++ b/Robust.Server/Network/Transfer/ServerTransferManager.cs @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Threading.Tasks; +using Robust.Server.ServerStatus; +using Robust.Shared; +using Robust.Shared.Asynchronous; +using Robust.Shared.Configuration; +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Messages.Transfer; +using Robust.Shared.Network.Transfer; + +namespace Robust.Server.Network.Transfer; + +internal sealed class ServerTransferManager : BaseTransferManager, ITransferManager +{ + internal const string TransferApiUrl = "/rt_transfer_init"; + + private readonly IConfigurationManager _cfg; + private readonly IStatusHost _statusHost; + private readonly IServerNetManager _netManager; + + private readonly Dictionary _onlinePlayers = new(); + + internal ServerTransferManager(IConfigurationManager cfg, IStatusHost statusHost, IServerNetManager netManager, ILogManager logManager, ITaskManager taskManager) + : base(logManager, NetMessageAccept.Server, taskManager) + { + _cfg = cfg; + _statusHost = statusHost; + _netManager = netManager; + } + + public void Initialize() + { + _netManager.RegisterNetMessage(); + _netManager.RegisterNetMessage(RxTransferData, NetMessageAccept.Server | NetMessageAccept.Handshake); + _netManager.RegisterNetMessage(RxTransferAckInit, NetMessageAccept.Server | NetMessageAccept.Handshake); + + _statusHost.AddHandler(HandleRequest); + + _netManager.Disconnect += NetManagerOnDisconnect; + } + + private void RxTransferData(MsgTransferData message) + { + if (!_onlinePlayers.TryGetValue(message.MsgChannel.UserId, out var player) + || player.Impl is not TransferImplLidgren lidgren) + { + message.MsgChannel.Disconnect("Not lidgren"); + return; + } + + lidgren.ReceiveData(message); + } + + private void RxTransferAckInit(MsgTransferAckInit message) + { + if (!_onlinePlayers.TryGetValue(message.MsgChannel.UserId, out var player) + || player.Impl is not TransferImplLidgren lidgren) + { + message.MsgChannel.Disconnect("Not lidgren"); + return; + } + + lidgren.ReceiveInitAck(); + } + + public Stream StartTransfer(INetChannel channel, TransferStartInfo startInfo) + { + if (!_onlinePlayers.TryGetValue(channel.UserId, out var player)) + throw new InvalidOperationException("Player is not connected yet!"); + + return player.Impl.StartTransfer(startInfo); + } + + private async Task HandleRequest(IStatusHandlerContext context) + { + if (context.Url.AbsolutePath != TransferApiUrl) + return false; + + if (!context.IsWebSocketRequest) + { + Sawmill.Verbose("HTTP request failed: not a websocket request"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return true; + } + + if (!context.RequestHeaders.TryGetValue(TransferImplWebSocket.UserIdHeaderName, out var userIdValue) + || userIdValue.Count != 1) + { + Sawmill.Verbose("HTTP request failed: missing RT-UserId"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return true; + } + + if (!Guid.TryParse(userIdValue[0], out var userId)) + { + Sawmill.Verbose($"HTTP request failed: UserID '{userId}' invalid"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return true; + } + + if (!_onlinePlayers.TryGetValue(new NetUserId(userId), out var player)) + { + Sawmill.Warning($"HTTP request failed: UserID '{userId}' not online"); + await context.RespondErrorAsync(HttpStatusCode.BadRequest); + return true; + } + + if (player.Impl is not ServerTransferImplWebSocket serverWs) + { + Sawmill.Warning($"HTTP request failed: UserID '{userId}' is not using websocket transfer"); + await context.RespondErrorAsync(HttpStatusCode.Unauthorized); + return true; + } + + await serverWs.HandleApiRequest(new NetUserId(userId), context); + return true; + } + + public async Task ServerHandshake(INetChannel channel) + { + if (_onlinePlayers.ContainsKey(channel.UserId)) + throw new InvalidOperationException("We already have data for this user??"); + + var transferHttpEnabled = _cfg.GetCVar(CVars.TransferHttp); + + BaseTransferImpl impl; + if (transferHttpEnabled) + { + impl = new ServerTransferImplWebSocket(Sawmill, this, _cfg, _netManager, channel); + } + else + { + impl = new TransferImplLidgren(Sawmill, channel, this, _netManager); + } + + impl.MaxChannelCount = _cfg.GetCVar(CVars.TransferStreamLimit); + + var datum = new Player + { + Impl = impl, + }; + + _onlinePlayers.Add(channel.UserId, datum); + + await impl.ServerInit(); + } + + public event Action ClientHandshakeComplete + { + add { } + remove { } + } + + private void NetManagerOnDisconnect(object? sender, NetDisconnectedArgs e) + { + if (!_onlinePlayers.Remove(e.Channel.UserId, out var player)) + return; + + Sawmill.Debug("Cleaning up connection for channel {Player} that disconnected", e.Channel); + player.Impl.Dispose(); + } + + private sealed class Player + { + public required BaseTransferImpl Impl; + } +} diff --git a/Robust.Server/Network/Transfer/ServerTransferTestManager.cs b/Robust.Server/Network/Transfer/ServerTransferTestManager.cs new file mode 100644 index 000000000..02c34abd8 --- /dev/null +++ b/Robust.Server/Network/Transfer/ServerTransferTestManager.cs @@ -0,0 +1,23 @@ +using Robust.Server.Console; +using Robust.Server.Player; +using Robust.Shared.Log; +using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; + +namespace Robust.Server.Network.Transfer; + +internal sealed class ServerTransferTestManager( + ITransferManager manager, + ILogManager logManager, + IConGroupController controller, + IPlayerManager playerManager) + : TransferTestManager(manager, logManager) +{ + protected override bool PermissionCheck(INetChannel channel) + { + if (!playerManager.TryGetSessionByChannel(channel, out var session)) + return false; + + return controller.CanCommand(session, TransferTestCommand.CommandKey); + } +} diff --git a/Robust.Server/Player/IPlayerManager.cs b/Robust.Server/Player/IPlayerManager.cs index 0f53202bf..c6bc4a8c4 100644 --- a/Robust.Server/Player/IPlayerManager.cs +++ b/Robust.Server/Player/IPlayerManager.cs @@ -1,4 +1,5 @@ using Robust.Shared.Input; +using Robust.Shared.Network; using Robust.Shared.Player; namespace Robust.Server.Player; @@ -10,4 +11,6 @@ namespace Robust.Server.Player; public interface IPlayerManager : ISharedPlayerManager { BoundKeyMap KeyMap { get; } + + internal void MarkPlayerResourcesSent(INetChannel channel); } diff --git a/Robust.Server/Player/PlayerManager.cs b/Robust.Server/Player/PlayerManager.cs index 823706bce..c5fde767b 100644 --- a/Robust.Server/Player/PlayerManager.cs +++ b/Robust.Server/Player/PlayerManager.cs @@ -120,13 +120,34 @@ namespace Robust.Server.Player private void HandlePlayerListReq(MsgPlayerListReq message) { var channel = message.MsgChannel; + var session = (CommonSession) GetSessionByChannel(channel); + session.InitialPlayerListReqDone = true; + + if (!session.InitialResourcesDone) + return; + + SendPlayerList(channel, session); + } + + public void MarkPlayerResourcesSent(INetChannel channel) + { + var session = (CommonSession) GetSessionByChannel(channel); + session.InitialResourcesDone = true; + + if (!session.InitialPlayerListReqDone) + return; + + SendPlayerList(channel, session); + } + + private void SendPlayerList(INetChannel channel, CommonSession session) + { var players = Sessions; var netMsg = new MsgPlayerList(); // client session is complete, set their status accordingly. // This is done before the packet is built, so that the client // can see themselves Connected. - var session = GetSessionByChannel(channel); session.ConnectedTime = DateTime.UtcNow; SetStatus(session, SessionStatus.Connected); diff --git a/Robust.Server/ServerIoC.cs b/Robust.Server/ServerIoC.cs index 382596902..2c26ca5df 100644 --- a/Robust.Server/ServerIoC.cs +++ b/Robust.Server/ServerIoC.cs @@ -5,6 +5,7 @@ using Robust.Server.DataMetrics; using Robust.Server.GameObjects; using Robust.Server.GameStates; using Robust.Server.Localization; +using Robust.Server.Network.Transfer; using Robust.Server.Placement; using Robust.Server.Player; using Robust.Server.Prototypes; @@ -25,6 +26,7 @@ using Robust.Shared.IoC; using Robust.Shared.Localization; using Robust.Shared.Map; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Player; using Robust.Shared.Prototypes; using Robust.Shared.Reflection; @@ -102,6 +104,8 @@ namespace Robust.Server deps.Register(); deps.Register(); deps.Register(); + deps.Register(); + deps.Register(); } } } diff --git a/Robust.Server/ServerStatus/IStatusHandlerContext.cs b/Robust.Server/ServerStatus/IStatusHandlerContext.cs index 932ce152b..3459e197d 100644 --- a/Robust.Server/ServerStatus/IStatusHandlerContext.cs +++ b/Robust.Server/ServerStatus/IStatusHandlerContext.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Http; +using System.Net.WebSockets; using System.Threading.Tasks; using Microsoft.Extensions.Primitives; @@ -25,6 +26,8 @@ namespace Robust.Server.ServerStatus IDictionary ResponseHeaders { get; } bool KeepAlive { get; set; } + bool IsWebSocketRequest { get; } + Task RequestBodyJsonAsync(); Task RespondNoContentAsync(); @@ -54,5 +57,7 @@ namespace Robust.Server.ServerStatus Task RespondJsonAsync(object jsonData, HttpStatusCode code = HttpStatusCode.OK); Task RespondStreamAsync(HttpStatusCode code = HttpStatusCode.OK); + + Task AcceptWebSocketAsync(); } } diff --git a/Robust.Server/ServerStatus/StatusHost.cs b/Robust.Server/ServerStatus/StatusHost.cs index 7c8d2fbd0..aa4cfab9d 100644 --- a/Robust.Server/ServerStatus/StatusHost.cs +++ b/Robust.Server/ServerStatus/StatusHost.cs @@ -14,6 +14,7 @@ using System.Linq; using System.Net; using System.Net.Http; using System.Net.Mime; +using System.Net.WebSockets; using System.Text.Json; using System.Text.Json.Nodes; using System.Text.Json.Serialization; @@ -242,6 +243,7 @@ namespace Robust.Server.ServerStatus public Uri Url => _context.Request.Url!; public bool IsGetLike => RequestMethod == HttpMethod.Head || RequestMethod == HttpMethod.Get; public IReadOnlyDictionary RequestHeaders { get; } + public bool IsWebSocketRequest => _context.Request.IsWebSocketRequest; public bool KeepAlive { @@ -353,6 +355,12 @@ namespace Robust.Server.ServerStatus return Task.FromResult(_context.Response.OutputStream); } + public async Task AcceptWebSocketAsync() + { + var context = await _context.AcceptWebSocketAsync(null); + return context.WebSocket; + } + private void RespondShared() { foreach (var (header, value) in _responseHeaders) diff --git a/Robust.Server/Upload/NetworkResourceManager.cs b/Robust.Server/Upload/NetworkResourceManager.cs index e3fbe3824..c52458770 100644 --- a/Robust.Server/Upload/NetworkResourceManager.cs +++ b/Robust.Server/Upload/NetworkResourceManager.cs @@ -1,77 +1,153 @@ using System; +using System.Buffers.Binary; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; using Robust.Server.Console; using Robust.Server.Player; using Robust.Shared; using Robust.Shared.Configuration; using Robust.Shared.IoC; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Player; using Robust.Shared.Upload; +using Robust.Shared.Utility; using Robust.Shared.ViewVariables; namespace Robust.Server.Upload; +public sealed class NetworkResourcesUploadedEvent +{ + public ICommonSession Session { get; } + public ImmutableArray<(ResPath Relative, byte[] Data)> Files { get; } + + internal NetworkResourcesUploadedEvent(ICommonSession session, ImmutableArray<(ResPath, byte[])> files) + { + Session = session; + Files = files; + } +} + public sealed class NetworkResourceManager : SharedNetworkResourceManager { + internal const int AckInitial = 1; + [Dependency] private readonly IPlayerManager _playerManager = default!; [Dependency] private readonly IServerNetManager _serverNetManager = default!; [Dependency] private readonly IConfigurationManager _cfgManager = default!; [Dependency] private readonly IConGroupController _controller = default!; + [Obsolete("Use ResourcesUploaded instead")] public event Action? OnResourceUploaded; + public event Action? ResourcesUploaded; [ViewVariables] public bool Enabled { get; private set; } = true; [ViewVariables] public float SizeLimit { get; private set; } - public override void Initialize() + internal event Action? AckReceived; + + internal override void Initialize() { base.Initialize(); + TransferManager.RegisterTransferMessage(TransferKeyNetworkDownload); + TransferManager.RegisterTransferMessage(TransferKeyNetworkUpload, ReceiveUpload); + _cfgManager.OnValueChanged(CVars.ResourceUploadingEnabled, value => Enabled = value, true); _cfgManager.OnValueChanged(CVars.ResourceUploadingLimitMb, value => SizeLimit = value, true); + + _serverNetManager.RegisterNetMessage(RxAck); } - /// - /// Callback for when a client attempts to upload a resource. - /// - /// - /// - protected override void ResourceUploadMsg(NetworkResourceUploadMessage msg) + private void RxAck(NetworkResourceAckMessage message) + { + AckReceived?.Invoke(message.MsgChannel, message.Key); + } + + private async void ReceiveUpload(TransferReceivedEvent transfer) { // Do not allow uploading any new resources if it has been disabled. // Note: Any resources uploaded before being disabled will still be kept and sent. if (!Enabled) + { + transfer.Channel.Disconnect("Resource upload not enabled."); return; + } - if (!_playerManager.TryGetSessionByChannel(msg.MsgChannel, out var session)) + if (!_playerManager.TryGetSessionByChannel(transfer.Channel, out var session)) + { + transfer.Channel.Disconnect("Not in-game"); return; + } if (!_controller.CanCommand(session, "uploadfile")) + { + transfer.Channel.Disconnect("Not authorized"); return; + } - // Ensure the data is under the current size limit, if it's currently enabled. - if (SizeLimit > 0f && msg.Data.Length * BytesToMegabytes > SizeLimit) - return; + Sawmill.Verbose("Ingesting file uploads from {Session}", session); - base.ResourceUploadMsg(msg); + List<(ResPath Relative, byte[] Data)> ingested; + await using (var stream = transfer.DataStream) + { + ingested = await IngestFileStream(stream); + } + + Sawmill.Verbose("Ingesting file uploads complete, distributing..."); - // Now we broadcast the message! foreach (var channel in _serverNetManager.Channels) { - channel.SendMessage(msg); + SendToPlayer(channel, ingested); } - OnResourceUploaded?.Invoke(session, msg); +#pragma warning disable CS0618 // Type or member is obsolete + if (OnResourceUploaded != null) + { + foreach (var (relative, data) in ingested) + { + OnResourceUploaded?.Invoke(session, new NetworkResourceUploadMessage + { + MsgChannel = session.Channel, + Data = data, + RelativePath = relative + }); + } + } +#pragma warning restore CS0618 // Type or member is obsolete + + ResourcesUploaded?.Invoke(new NetworkResourcesUploadedEvent(session, [..ingested])); } - internal void SendToNewUser(INetChannel channel) + protected override void ValidateUpload(uint size) { - foreach (var (path, data) in ContentRoot.GetAllFiles()) - { - var msg = new NetworkResourceUploadMessage(); - msg.RelativePath = path; - msg.Data = data; - channel.SendMessage(msg); - } + if (SizeLimit > 0f && size * BytesToMegabytes > SizeLimit) + throw new Exception("File upload too large!"); + } + + internal bool SendToNewUser(INetChannel channel) + { + var allFiles = ContentRoot.GetAllFiles().ToList(); + if (allFiles.Count == 0) + return false; + + SendToPlayer(channel, allFiles, AckInitial); + return true; + } + + private async void SendToPlayer(INetChannel channel, List<(ResPath Relative, byte[] Data)> files, int ack = 0) + { + await using var stream = TransferManager.StartTransfer(channel, + new TransferStartInfo + { + MessageKey = TransferKeyNetworkDownload + }); + + var ackBytes = new byte[4]; + BinaryPrimitives.WriteInt32LittleEndian(ackBytes, ack); + await stream.WriteAsync(ackBytes); + + await WriteFileStream(stream, files); } } diff --git a/Robust.Server/Upload/UploadedContentManager.cs b/Robust.Server/Upload/UploadedContentManager.cs index 3bea743a7..69621121f 100644 --- a/Robust.Server/Upload/UploadedContentManager.cs +++ b/Robust.Server/Upload/UploadedContentManager.cs @@ -1,4 +1,5 @@ -using Robust.Shared.IoC; +using Robust.Server.Player; +using Robust.Shared.IoC; using Robust.Shared.Network; namespace Robust.Server.Upload; @@ -9,20 +10,36 @@ namespace Robust.Server.Upload; internal sealed class UploadedContentManager { [Dependency] private readonly IServerNetManager _netManager = default!; + [Dependency] private readonly IPlayerManager _playerManager = default!; [Dependency] private readonly GamePrototypeLoadManager _prototypeLoadManager = default!; [Dependency] private readonly NetworkResourceManager _networkResourceManager = default!; public void Initialize() { _netManager.Connected += NetManagerOnConnected; + _networkResourceManager.AckReceived += OnAckReceived; + } + + private void OnAckReceived(INetChannel channel, int ack) + { + if (ack != NetworkResourceManager.AckInitial) + return; + + ResourcesReady(channel); } private void NetManagerOnConnected(object? sender, NetChannelArgs e) { // This just shells out to the other managers, ensuring they are ordered properly. // Resources must be done before prototypes. - // Note: both net messages sent here are on the same group and are therefore ordered. - _networkResourceManager.SendToNewUser(e.Channel); - _prototypeLoadManager.SendToNewUser(e.Channel); + var sentAny = _networkResourceManager.SendToNewUser(e.Channel); + if (!sentAny) + ResourcesReady(e.Channel); + } + + private void ResourcesReady(INetChannel channel) + { + _prototypeLoadManager.SendToNewUser(channel); + _playerManager.MarkPlayerResourcesSent(channel); } } diff --git a/Robust.Shared.IntegrationTests/GameObjects/EntityState_Tests.cs b/Robust.Shared.IntegrationTests/GameObjects/EntityState_Tests.cs index e042a1aa5..ffdd79718 100644 --- a/Robust.Shared.IntegrationTests/GameObjects/EntityState_Tests.cs +++ b/Robust.Shared.IntegrationTests/GameObjects/EntityState_Tests.cs @@ -2,9 +2,14 @@ using System; using System.IO; using Moq; using NUnit.Framework; +using Robust.Server; using Robust.Server.Configuration; +using Robust.Server.Network.Transfer; +using Robust.Server.Player; using Robust.Server.Reflection; using Robust.Server.Serialization; +using Robust.Server.ServerStatus; +using Robust.Shared.Asynchronous; using Robust.Shared.Configuration; using Robust.Shared.ContentPack; using Robust.Shared.GameObjects; @@ -13,6 +18,7 @@ using Robust.Shared.Log; using Robust.Shared.Map; using Robust.Shared.Map.Components; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Profiling; using Robust.Shared.Reflection; using Robust.Shared.Replays; @@ -43,6 +49,7 @@ namespace Robust.UnitTesting.Shared.GameObjects container.Register(); container.Register(); container.Register(); + container.RegisterInstance(Mock.Of()); container.Register(); container.RegisterInstance(new Mock().Object); container.BuildGraph(); diff --git a/Robust.Shared/CVars.cs b/Robust.Shared/CVars.cs index c987fbf9f..1635a948b 100644 --- a/Robust.Shared/CVars.cs +++ b/Robust.Shared/CVars.cs @@ -406,6 +406,40 @@ namespace Robust.Shared public static readonly CVarDef NetHWId = CVarDef.Create("net.hwid", true, CVar.SERVERONLY); + /** + * TRANSFER + */ + + /// + /// If true, enable the WebSocket-based high bandwidth transfer channel. + /// + /// + /// + /// If set, must be set to the API address of the server, + /// and you must ensure your reverse proxy (if you have one) is configured to allow WebSocket connections. + /// + /// + /// The transfer channel has no additional encryption layer. Unless your API is exposed behind HTTPS, + /// traffic over the channel will not be encrypted, and you are discouraged from enabling it. + /// + /// + public static readonly CVarDef TransferHttp = + CVarDef.Create("transfer.http", false, CVar.SERVERONLY); + + /// + /// The base HTTP URL of the game server, used for the high-bandwidth transfer channel. + /// + public static readonly CVarDef TransferHttpEndpoint = + CVarDef.Create("transfer.http_endpoint", "http://localhost:1212/", CVar.SERVERONLY); + + /// + /// Amount of concurrent client->server transfer streams allowed. + /// + /// + /// Clients will be disconnected if they exceed this limit. + /// + public static readonly CVarDef TransferStreamLimit = + CVarDef.Create("transfer.stream_limit", 10, CVar.SERVERONLY); /** * SUS diff --git a/Robust.Shared/Console/Commands/DumpStringTableCommand.cs b/Robust.Shared/Console/Commands/DumpStringTableCommand.cs new file mode 100644 index 000000000..190a52f22 --- /dev/null +++ b/Robust.Shared/Console/Commands/DumpStringTableCommand.cs @@ -0,0 +1,23 @@ +using System.Linq; +using Robust.Shared.IoC; +using Robust.Shared.Network; + +namespace Robust.Shared.Console.Commands; + +internal sealed class DumpStringTableCommand : IConsoleCommand +{ + [Dependency] private readonly INetManager _netManager = default!; + + public string Command => "net_dumpstringtable"; + public string Description => ""; + public string Help => ""; + + public void Execute(IConsoleShell shell, string argStr, string[] args) + { + var netMgr = (NetManager)_netManager; + foreach (var (k, v) in netMgr.StringTable.Strings.OrderBy(x => x.Key)) + { + shell.WriteLine($"{k}: {v}"); + } + } +} diff --git a/Robust.Shared/Network/INetChannel.cs b/Robust.Shared/Network/INetChannel.cs index 387a26c4c..a025b3763 100644 --- a/Robust.Shared/Network/INetChannel.cs +++ b/Robust.Shared/Network/INetChannel.cs @@ -1,5 +1,6 @@ using System; using System.Net; +using Lidgren.Network; using Robust.Shared.ViewVariables; namespace Robust.Shared.Network @@ -102,5 +103,13 @@ namespace Robust.Shared.Network /// Reason why it was disconnected. /// If false, we ghost the remote client and don't tell them they got disconnected properly. void Disconnect(string reason, bool sendBye); + + /// + /// Check whether the networking layer has space to immediately send a message with the given parameters. + /// + /// + /// If this returns true, messages may still be sent, but they will be queued until there is space available. + /// + bool CanSendImmediately(NetDeliveryMethod method, int sequenceChannel); } } diff --git a/Robust.Shared/Network/Messages/Transfer/MsgTransferAckInit.cs b/Robust.Shared/Network/Messages/Transfer/MsgTransferAckInit.cs new file mode 100644 index 000000000..e93589a22 --- /dev/null +++ b/Robust.Shared/Network/Messages/Transfer/MsgTransferAckInit.cs @@ -0,0 +1,19 @@ +using Lidgren.Network; +using Robust.Shared.Serialization; + +namespace Robust.Shared.Network.Messages.Transfer; + +internal sealed class MsgTransferAckInit : NetMessage +{ + public override NetDeliveryMethod DeliveryMethod => NetDeliveryMethod.ReliableOrdered; + + public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer) + { + // No data needed. + } + + public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer) + { + // No data needed. + } +} diff --git a/Robust.Shared/Network/Messages/Transfer/MsgTransferData.cs b/Robust.Shared/Network/Messages/Transfer/MsgTransferData.cs new file mode 100644 index 000000000..fb46abe6f --- /dev/null +++ b/Robust.Shared/Network/Messages/Transfer/MsgTransferData.cs @@ -0,0 +1,36 @@ +using System; +using System.Buffers; +using Lidgren.Network; +using Robust.Shared.Network.Transfer; +using Robust.Shared.Serialization; + +namespace Robust.Shared.Network.Messages.Transfer; + +internal sealed class MsgTransferData : NetMessage +{ + internal const NetDeliveryMethod Method = NetDeliveryMethod.ReliableOrdered; + internal const int Channel = SequenceChannels.Transfer; + + public override NetDeliveryMethod DeliveryMethod => Method; + public override int SequenceChannel => Channel; + + public ArraySegment Data; + + public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer) + { + var length = buffer.ReadVariableInt32(); + if (length > BaseTransferImpl.BufferSize) + throw new Exception("Buffer size is too large"); + + var arr = ArrayPool.Shared.Rent(length); + buffer.ReadBytes(arr, 0, length); + + Data = new ArraySegment(arr, 0, length); + } + + public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer) + { + buffer.WriteVariableInt32(Data.Count); + buffer.Write(Data.AsSpan()); + } +} diff --git a/Robust.Shared/Network/Messages/Transfer/MsgTransferInit.cs b/Robust.Shared/Network/Messages/Transfer/MsgTransferInit.cs new file mode 100644 index 000000000..5d0f7e3b3 --- /dev/null +++ b/Robust.Shared/Network/Messages/Transfer/MsgTransferInit.cs @@ -0,0 +1,44 @@ +using Lidgren.Network; +using Robust.Shared.Network.Transfer; +using Robust.Shared.Serialization; + +namespace Robust.Shared.Network.Messages.Transfer; + +internal sealed class MsgTransferInit : NetMessage +{ + public (string EndpointUrl, byte[] Key)? HttpInfo; + + public override NetDeliveryMethod DeliveryMethod => NetDeliveryMethod.ReliableOrdered; + + public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer) + { + var httpAvailable = buffer.ReadBoolean(); + if (!httpAvailable) + { + HttpInfo = null; + return; + } + + buffer.SkipPadBits(); + var endpoint = buffer.ReadString(); + var key = buffer.ReadBytes(TransferImplWebSocket.RandomKeyBytes); + + HttpInfo = (endpoint, key); + } + + public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer) + { + if (HttpInfo is null) + { + buffer.Write(false); + return; + } + + buffer.Write(true); + buffer.WritePadBits(); + + var (ep, key) = HttpInfo.Value; + buffer.Write(ep); + buffer.Write(key); + } +} diff --git a/Robust.Shared/Network/NetManager.NetChannel.cs b/Robust.Shared/Network/NetManager.NetChannel.cs index 676864dcb..3e74be5f2 100644 --- a/Robust.Shared/Network/NetManager.NetChannel.cs +++ b/Robust.Shared/Network/NetManager.NetChannel.cs @@ -102,6 +102,11 @@ namespace Robust.Shared.Network _connection.Disconnect(reason, sendBye); } + public bool CanSendImmediately(NetDeliveryMethod method, int sequenceChannel) + { + return _connection.CanSendImmediately(method, sequenceChannel); + } + public override string ToString() { return $"{ConnectionId}/{UserId}"; diff --git a/Robust.Shared/Network/NetManager.Send.cs b/Robust.Shared/Network/NetManager.Send.cs index d01bcd3e6..3da72909e 100644 --- a/Robust.Shared/Network/NetManager.Send.cs +++ b/Robust.Shared/Network/NetManager.Send.cs @@ -64,6 +64,7 @@ public sealed partial class NetManager var packet = BuildMessage(message, channel.Connection.Peer); var method = message.DeliveryMethod; + var seqChannel = message.SequenceChannel; LogSend(message, method, packet); @@ -71,6 +72,7 @@ public sealed partial class NetManager { Message = packet, Method = method, + SequenceChannel = seqChannel, Owner = this, RobustMessage = message, }; @@ -114,7 +116,7 @@ public sealed partial class NetManager { channel.Encryption?.Encrypt(item.Message); - var result = channel.Connection.Peer.SendMessage(item.Message, channel.Connection, item.Method); + var result = channel.Connection.Peer.SendMessage(item.Message, channel.Connection, item.Method, item.SequenceChannel); if (result is not (NetSendResult.Sent or NetSendResult.Queued)) { // Logging stack trace here won't be useful as it'll likely be thread pooled on production scenarios. @@ -127,6 +129,7 @@ public sealed partial class NetManager { public required NetOutgoingMessage Message; public required NetDeliveryMethod Method; + public required int SequenceChannel; public required NetMessage RobustMessage; public required NetManager Owner; } diff --git a/Robust.Shared/Network/NetManager.cs b/Robust.Shared/Network/NetManager.cs index 7d9474fc9..e7c93cc2c 100644 --- a/Robust.Shared/Network/NetManager.cs +++ b/Robust.Shared/Network/NetManager.cs @@ -12,6 +12,7 @@ using Prometheus; using Robust.Shared.Configuration; using Robust.Shared.IoC; using Robust.Shared.Log; +using Robust.Shared.Network.Transfer; using Robust.Shared.Player; using Robust.Shared.Profiling; using Robust.Shared.Serialization; @@ -112,6 +113,7 @@ namespace Robust.Shared.Network [Dependency] private readonly ProfManager _prof = default!; [Dependency] private readonly HttpClientHolder _http = default!; [Dependency] private readonly IHWId _hwId = default!; + [Dependency] private readonly ITransferManager _transfer = default!; /// /// Holds lookup table for NetMessage.Id -> NetMessage.Type @@ -140,6 +142,9 @@ namespace Robust.Shared.Network private ISawmill _loggerPacket = default!; private ISawmill _authLogger = default!; + private bool _clientSerializerComplete; + private bool _clientTransferComplete; + /// public int Port => _config.GetCVar(CVars.NetPort); @@ -147,6 +152,8 @@ namespace Robust.Shared.Network public IReadOnlyDictionary MessageBandwidthUsage => _bandwidthUsage; + internal StringTable StringTable => _strings; + /// public bool IsServer { get; private set; } @@ -271,6 +278,7 @@ namespace Robust.Shared.Network _strings.Initialize(() => { _logger.Info("Message string table loaded."); }, UpdateNetMessageFunctions); _serializer.ClientHandshakeComplete += OnSerializerOnClientHandshakeComplete; + _transfer.ClientHandshakeComplete += OnTransferOnClientHandshakeComplete; _initialized = true; @@ -304,7 +312,21 @@ namespace Robust.Shared.Network private void OnSerializerOnClientHandshakeComplete() { _logger.Info("Client completed serializer handshake."); - OnConnected(ServerChannelImpl!); + _clientSerializerComplete = true; + ClientCheckSwitchToConnected(); + } + + private void OnTransferOnClientHandshakeComplete() + { + _logger.Info("Client completed transfer handshake."); + _clientTransferComplete = true; + ClientCheckSwitchToConnected(); + } + + private void ClientCheckSwitchToConnected() + { + if (_clientSerializerComplete && _clientTransferComplete) + OnConnected(ServerChannelImpl!); } private void SynchronizeNetTime() @@ -427,6 +449,9 @@ namespace Robust.Shared.Network _cancelConnectTokenSource?.Cancel(); ClientConnectState = ClientConnectionState.NotConnecting; + + _clientSerializerComplete = false; + _clientTransferComplete = false; } /// @@ -805,7 +830,9 @@ namespace Robust.Shared.Network try { - await _serializer.Handshake(channel); + await Task.WhenAll( + _serializer.Handshake(channel), + _transfer.ServerHandshake(channel)); } catch (TaskCanceledException) { diff --git a/Robust.Shared/Network/NetMessage.cs b/Robust.Shared/Network/NetMessage.cs index 036b0ac0e..83b0f805c 100644 --- a/Robust.Shared/Network/NetMessage.cs +++ b/Robust.Shared/Network/NetMessage.cs @@ -105,5 +105,13 @@ namespace Robust.Shared.Network } } } + + /// + /// The lidgren sequence channel to send this message on. + /// + /// + /// Channels 16 and higher are reserved for internal RT usage. + /// + public virtual int SequenceChannel => 0; } } diff --git a/Robust.Shared/Network/SequenceChannels.cs b/Robust.Shared/Network/SequenceChannels.cs new file mode 100644 index 000000000..872b13fa2 --- /dev/null +++ b/Robust.Shared/Network/SequenceChannels.cs @@ -0,0 +1,8 @@ +namespace Robust.Shared.Network; + +internal static class SequenceChannels +{ + public const int EngineBase = 16; + + public const int Transfer = EngineBase; +} diff --git a/Robust.Shared/Network/StringTable.cs b/Robust.Shared/Network/StringTable.cs index e292ba49d..d6d52ecff 100644 --- a/Robust.Shared/Network/StringTable.cs +++ b/Robust.Shared/Network/StringTable.cs @@ -38,6 +38,8 @@ namespace Robust.Shared.Network private InitCallback? _callback; private StringTableUpdateCallback? _updateCallback; + internal Dictionary Strings => _strings; + public ISawmill Sawmill = default!; /// diff --git a/Robust.Shared/Network/Transfer/BaseTransferImpl.cs b/Robust.Shared/Network/Transfer/BaseTransferImpl.cs new file mode 100644 index 000000000..6aa3270e5 --- /dev/null +++ b/Robust.Shared/Network/Transfer/BaseTransferImpl.cs @@ -0,0 +1,436 @@ +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Robust.Shared.Log; +using Robust.Shared.Utility; + +namespace Robust.Shared.Network.Transfer; + +internal abstract class BaseTransferImpl(ISawmill sawmill, BaseTransferManager parent, INetChannel channel) : IDisposable +{ + // Custom framing format is as follows. + //
+ // uint8 opcode + // uint8 flags + // int64 transfer ID + // [if start message]: + // uint8 key length + // byte[] key + // + // just the fucking data lol + + internal const int BufferSize = 16384; + internal const int MaxKeySize = 96; + internal const int MaxHeaderSize = 128; + + protected readonly INetChannel Channel = channel; + protected readonly ISawmill Sawmill = sawmill; + + protected long OutgoingIdCounter; + public int MaxChannelCount = int.MaxValue; + + private readonly Dictionary>> _receivingChannels = []; + + private readonly SemaphoreSlim _socketSemaphore = new(1, 1); + internal readonly BaseTransferManager Parent = parent; + + public abstract Task ServerInit(); + public abstract Task ClientInit(CancellationToken cancel); + public abstract Stream StartTransfer(TransferStartInfo startInfo); + + protected abstract bool BoundedChannel { get; } + + private void TransferReceived(string key, ChannelReader> reader) + { + if (_receivingChannels.Count >= MaxChannelCount) + { + Sawmill.Warning($"Disconnecting client {Channel} for breaching max channel count of {_receivingChannels}"); + Channel.Disconnect("Reached max transfer channel count"); + return; + } + + // var stream = new ReceiveStream(reader); + // Parent.TransferReceived(key, Channel, stream); + } + + protected void HandleHeaderReceived( + ReadOnlyMemory data, + out TransferFlags flags, + out long transferId, + out ChannelWriter> channel) + { + ParseHeader(data.Span, out flags, out transferId, out var key); + + if (!_receivingChannels.TryGetValue(transferId, out channel!)) + { + if ((flags & TransferFlags.Start) == 0) + throw new ProtocolViolationException($"Received data for unknown transfer {transferId}"); + + DebugTools.Assert(key != null); + + Sawmill.Verbose($"Starting transfer stream {transferId} with key {key}"); + + var fullChannel = BoundedChannel + ? System.Threading.Channels.Channel.CreateBounded>( + new BoundedChannelOptions(4) + { + SingleReader = true, + SingleWriter = true + }) + : System.Threading.Channels.Channel.CreateUnbounded>(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = true + }); + + channel = fullChannel.Writer; + _receivingChannels.Add(transferId, channel); + + TransferReceived(key, fullChannel.Reader); + } + } + + protected void HandlePostData(TransferFlags flags, long transferId, ChannelWriter> channel) + { + if ((flags & TransferFlags.Finish) != 0) + { + Sawmill.Verbose($"Finishing transfer stream {transferId}"); + + channel.Complete(); + _receivingChannels.Remove(transferId); + } + } + + private static void ParseHeader( + ReadOnlySpan buf, + out TransferFlags flags, + out long transferId, + out string? key) + { + flags = (TransferFlags)buf[1]; + transferId = BinaryPrimitives.ReadInt64LittleEndian(buf[2..10]); + + if ((flags & TransferFlags.Start) != 0) + { + var keyLength = buf[10]; + key = Encoding.UTF8.GetString(buf.Slice(11, keyLength)); + } + else + { + key = null; + } + } + + private sealed class ReceiveStream : SaneStream + { + private readonly ChannelReader> _bufferChannel; + + private ArraySegment _currentBuffer; + + public override bool CanRead => true; + + public ReceiveStream(ChannelReader> bufferChannel) + { + _bufferChannel = bufferChannel; + } + + public override int Read(Span buffer) + { + var read = 0; + var remainingSpan = buffer; + + while (remainingSpan.Length > 0) + { + if (_currentBuffer.Array == null || _currentBuffer.Count <= 0) + { + if (_currentBuffer.Array != null) + { + ArrayPool.Shared.Return(_currentBuffer.Array); + _currentBuffer = default; + } + + if (!_bufferChannel.TryRead(out _currentBuffer)) + { + // Only block if we haven't read any bytes yet. + if (read > 0 || !ReadNewBufferSync()) + return read; + } + } + + DebugTools.Assert(_currentBuffer.Array != null); + + var remainingBuffer = _currentBuffer.Count; + var thisRead = Math.Min(remainingSpan.Length, remainingBuffer); + + _currentBuffer.AsSpan(0, thisRead).CopyTo(remainingSpan); + remainingSpan = remainingSpan[thisRead..]; + _currentBuffer = _currentBuffer[thisRead..]; + read += thisRead; + } + + return read; + } + + public override async ValueTask ReadAsync( + Memory buffer, + CancellationToken cancellationToken = default) + { + var read = 0; + var remainingSpan = buffer; + + while (remainingSpan.Length > 0) + { + if (_currentBuffer.Array == null || _currentBuffer.Count <= 0) + { + if (_currentBuffer.Array != null) + { + ArrayPool.Shared.Return(_currentBuffer.Array); + _currentBuffer = default; + } + + if (!_bufferChannel.TryRead(out _currentBuffer)) + { + // Only block if we haven't read any bytes yet. + if (read > 0 || !await ReadNewBufferAsync()) + return read; + } + } + + DebugTools.Assert(_currentBuffer.Array != null); + + var remainingBuffer = _currentBuffer.Count; + var thisRead = Math.Min(remainingSpan.Length, remainingBuffer); + + _currentBuffer.AsMemory(0, thisRead).CopyTo(remainingSpan); + remainingSpan = remainingSpan[thisRead..]; + _currentBuffer = _currentBuffer[thisRead..]; + read += thisRead; + } + + return read; + } + + private bool ReadNewBufferSync() + { + DebugTools.Assert(_currentBuffer.Array == null); + + var waitToRead = _bufferChannel.WaitToReadAsync(); +#pragma warning disable RA0004 + var waitToReadResult = waitToRead.AsTask().Result; +#pragma warning restore RA0004 + if (!waitToReadResult) + return false; + + return _bufferChannel.TryRead(out _currentBuffer); + } + + private async Task ReadNewBufferAsync() + { + DebugTools.Assert(_currentBuffer.Array == null); + + var waitToRead = await _bufferChannel.WaitToReadAsync(); + if (!waitToRead) + return false; + + return _bufferChannel.TryRead(out _currentBuffer); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing && _currentBuffer.Array != null) + ArrayPool.Shared.Return(_currentBuffer.Array); + } + } + + protected abstract class ChunkedSendStream : SaneStream + { + protected readonly BaseTransferImpl Parent; + private readonly long _id; + private readonly string _key; + + private readonly byte[] _headerBuffer; + private readonly byte[] _dataBuffer; + private bool _isFirstTransmission = true; + private int _bufferPos; + + public override bool CanWrite => true; + + public ChunkedSendStream(BaseTransferImpl parent, long id, string key) + { + // This just has to be < buffer size & < ushort.MaxValue + // (when accounting for UTF-8 possibly being more code units than UTF-16) + if (Encoding.UTF8.GetByteCount(key) > MaxKeySize) + throw new ArgumentException("Key too long"); + + Parent = parent; + _id = id; + _key = key; + + _headerBuffer = ArrayPool.Shared.Rent(MaxHeaderSize); + _dataBuffer = ArrayPool.Shared.Rent(BufferSize); + } + + public override void Write(ReadOnlySpan buffer) + { + while (buffer.Length > 0) + { + var remainingBufferSpace = _dataBuffer.AsSpan(_bufferPos); + var thisChunk = Math.Min(remainingBufferSpace.Length, buffer.Length); + var thisSpan = buffer[..thisChunk]; + + thisSpan.CopyTo(remainingBufferSpace); + _bufferPos += thisChunk; + + if (_bufferPos == _dataBuffer.Length) + Flush(); + + buffer = buffer[thisChunk..]; + } + } + + public override async ValueTask WriteAsync( + ReadOnlyMemory buffer, + CancellationToken cancellationToken = default) + { + while (buffer.Length > 0) + { + var remainingBufferSpace = _dataBuffer.AsSpan(_bufferPos); + var thisChunk = Math.Min(remainingBufferSpace.Length, buffer.Length); + var thisSpan = buffer[..thisChunk]; + + thisSpan.Span.CopyTo(remainingBufferSpace); + _bufferPos += thisChunk; + + if (_bufferPos == _dataBuffer.Length) + await FlushAsync(cancellationToken).ConfigureAwait(false); + + buffer = buffer[thisChunk..]; + } + } + + public override void Flush() + { + FlushAsync().Wait(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await FlushAsync(finish: false, cancellationToken).ConfigureAwait(false); + } + + private async ValueTask FlushAsync(bool finish, CancellationToken cancel = default) + { + var headerLength = 10; + + var opcode = Opcode.Transfer; + var flags = TransferFlags.None; + if (_isFirstTransmission) + flags |= TransferFlags.Start; + if (_bufferPos > 0) + flags |= TransferFlags.HasData; + if (finish) + flags |= TransferFlags.Finish; + + if (flags == TransferFlags.None) + { + // Nothing to flush, whatsoever. + return; + } + + _headerBuffer[0] = (byte)opcode; + _headerBuffer[1] = (byte)flags; + BinaryPrimitives.WriteInt64LittleEndian(_headerBuffer.AsSpan(2..10), _id); + + if (_isFirstTransmission) + { + var written = Encoding.UTF8.GetBytes(_key, _headerBuffer.AsSpan(11..)); + DebugTools.Assert(written < byte.MaxValue); + _headerBuffer[10] = (byte)written; + + headerLength += 1; + headerLength += written; + } + + // Send. + using (await Parent._socketSemaphore.WaitGuardAsync().ConfigureAwait(false)) + { + await SendChunkAsync( + new ArraySegment(_headerBuffer, 0, headerLength), + cancel) + .ConfigureAwait(false); + + if (_bufferPos > 0) + { + await SendChunkAsync( + new ArraySegment(_dataBuffer, 0, _bufferPos), + cancel) + .ConfigureAwait(false); + + _bufferPos = 0; + } + } + + _isFirstTransmission = false; + } + + protected abstract ValueTask SendChunkAsync( + ArraySegment buffer, + CancellationToken cancellationToken); + + protected override void Dispose(bool disposing) + { + FlushAsync(finish: true).AsTask().Wait(); + DisposeCore(); + } + + public override async ValueTask DisposeAsync() + { + GC.SuppressFinalize(this); + await FlushAsync(finish: true).ConfigureAwait(false); + DisposeCore(); + } + + private void DisposeCore() + { + ArrayPool.Shared.Return(_dataBuffer); + ArrayPool.Shared.Return(_headerBuffer); + } + + ~ChunkedSendStream() + { + // Have to do this so the stream isn't permanently hanging on the receiving side. + FlushAsync(finish: true).AsTask().Wait(); + } + } + + public virtual void Dispose() + { + foreach (var channel in _receivingChannels.Values) + { + channel.Complete(); + } + } + + protected enum Opcode : byte + { + Transfer = 0, + } + + [Flags] + protected enum TransferFlags : byte + { + None = 0, + Start = 1 << 0, + Finish = 1 << 1, + HasData = 1 << 2, + } +} diff --git a/Robust.Shared/Network/Transfer/BaseTransferManager.Lidgren.cs b/Robust.Shared/Network/Transfer/BaseTransferManager.Lidgren.cs new file mode 100644 index 000000000..f9db301b4 --- /dev/null +++ b/Robust.Shared/Network/Transfer/BaseTransferManager.Lidgren.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Robust.Shared.Collections; +using Robust.Shared.Network.Messages.Transfer; + +namespace Robust.Shared.Network.Transfer; + +internal abstract partial class BaseTransferManager +{ + private readonly Lock _waitingSendChannelLock = new(); + private readonly Dictionary _waitingSendChannels = []; + private ValueList<(INetChannel, TaskCompletionSource)> _sendChannelQueue; + + public void FrameUpdate() + { + lock (_waitingSendChannelLock) + { + foreach (var (channel, tcs) in _waitingSendChannels) + { + if (!channel.IsConnected || SendCheck(channel)) + _sendChannelQueue.Add((channel, tcs)); + } + + // Remove BEFORE dispatching any TCSes, so we don't try to add to the list from a callback. + foreach (var (channel, _) in _sendChannelQueue) + { + _waitingSendChannels.Remove(channel); + } + } + + foreach (var (channel, tcs) in _sendChannelQueue) + { + if (!channel.IsConnected) + tcs.TrySetException(new NetChannelClosedException("Channel closed")); + else + tcs.TrySetResult(); + } + + _sendChannelQueue.Clear(); + } + + public async ValueTask WaitToSend(INetChannel channel) + { + if (SendCheck(channel)) + return; + + TaskCompletionSource tcs; + lock (_waitingSendChannelLock) + { + ref var tcsSlot = ref CollectionsMarshal.GetValueRefOrAddDefault(_waitingSendChannels, channel, out _); + tcsSlot ??= new TaskCompletionSource(); + tcs = tcsSlot; + } + + await tcs.Task; + } + + private static bool SendCheck(INetChannel channel) + { + return channel.CanSendImmediately(MsgTransferData.Method, MsgTransferData.Channel); + } + + private sealed class NetChannelClosedException(string message) : Exception(message); +} diff --git a/Robust.Shared/Network/Transfer/BaseTransferManager.cs b/Robust.Shared/Network/Transfer/BaseTransferManager.cs new file mode 100644 index 000000000..55a95bc6e --- /dev/null +++ b/Robust.Shared/Network/Transfer/BaseTransferManager.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.InteropServices; +using Prometheus; +using Robust.Shared.Asynchronous; +using Robust.Shared.Log; + +namespace Robust.Shared.Network.Transfer; + +internal abstract partial class BaseTransferManager +{ + internal static readonly Counter SentDataMetrics = Metrics.CreateCounter( + "robust_transfer_sent_bytes", + "Number of bytes sent via the transfer system"); + + internal static readonly Counter ReceivedDataMetrics = Metrics.CreateCounter( + "robust_transfer_received_bytes", + "Number of bytes received via the transfer system"); + + private readonly NetMessageAccept _side; + private readonly ITaskManager _taskManager; + + protected readonly Dictionary RegisteredKeys = []; + protected readonly ISawmill Sawmill; + + private protected BaseTransferManager( + ILogManager logManager, + NetMessageAccept side, + ITaskManager taskManager) + { + _side = side; + _taskManager = taskManager; + Sawmill = logManager.GetSawmill("net.transfer"); + } + + public void RegisterTransferMessage( + string key, + Action? rxCallback = null, + NetMessageAccept accept = NetMessageAccept.Both) + { + if ((accept & ~NetMessageAccept.Both) != 0) + throw new ArgumentException("Invalid accept given: must be client, server, or both"); + + ref var slot = ref CollectionsMarshal.GetValueRefOrAddDefault(RegisteredKeys, key, out var exists); + if (exists) + throw new InvalidOperationException($"Key '{key}' was already registered!"); + + slot = new RegisteredKey(); + + if ((accept & _side) > 0) + slot.Callback = rxCallback; + } + + internal void TransferReceived(string key, INetChannel channel, Stream stream) + { + if (!RegisteredKeys.TryGetValue(key, out var registered)) + throw new Exception($"Unknown key: {key}"); + + if (registered.Callback == null) + throw new Exception($"Key is send-only: {key}"); + + _taskManager.RunOnMainThread(() => + { + registered.Callback(new TransferReceivedEvent(key, channel, stream)); + }); + } + + protected void CheckRegistered(TransferStartInfo info) + { + if (!RegisteredKeys.ContainsKey(info.MessageKey)) + throw new ArgumentException($"Key is not registered: {info.MessageKey}"); + } + + protected sealed class RegisteredKey + { + public Action? Callback; + } +} diff --git a/Robust.Shared/Network/Transfer/ITransferManager.cs b/Robust.Shared/Network/Transfer/ITransferManager.cs new file mode 100644 index 000000000..901cf33e9 --- /dev/null +++ b/Robust.Shared/Network/Transfer/ITransferManager.cs @@ -0,0 +1,135 @@ +using System; +using System.IO; +using System.Threading.Tasks; + +namespace Robust.Shared.Network.Transfer; + +/// +/// API for high-bandwidth asynchronous data transfers between client and server. +/// +/// +/// +/// Due to technical limitations of our normal networking layer, it is not possible to send high volumes of traffic +/// over it. can avoid this limitation by using a secondary WebSocket-based channel +/// for these transfers. +/// +/// +/// The high-bandwidth channel is not available by default and must be configured by the server via +/// . If enabled, clients will connect to the channel when connecting to the server. +/// +/// +/// While the methods on themselves are not thread safe, +/// it is safe to read and write from created streams from multiple threads (one per stream). +/// +/// +[NotContentImplementable] +public interface ITransferManager +{ + /// + /// Start a transfer to a channel. + /// + /// The channel to send data to. + /// Additional info to start the transfer. + /// + /// A stream that can be written to send data. + /// This stream may employ buffering, flush or close the stream to ensure data is sent immediately. + /// + /// + /// Thrown if the provided transfer key was not registered with . + /// + Stream StartTransfer(INetChannel channel, TransferStartInfo startInfo); + + /// + /// Register a transfer stream key for sending and/or receiving. + /// + /// The name of the stream to register. + /// + /// Callback to be run when the stream is received. + /// If null, this stream may not be received on this side of the network. + /// + /// + /// Which sides of the network this stream is accepted on. + /// Useful in shared code where passing separately may be annoying. + /// + void RegisterTransferMessage( + string key, + Action? rxCallback = null, + NetMessageAccept accept = NetMessageAccept.Both); + + // Engine API. + + internal void Initialize(); + internal void FrameUpdate(); + internal Task ServerHandshake(INetChannel channel); + internal event Action ClientHandshakeComplete; +} + +/// +/// Extension methods for . +/// +public static class TransferManagerExt +{ + /// + /// Start a transfer to a channel. + /// + /// The manager to start the transfer with. + /// The channel to send data to. + /// Key to start transfer for. + /// + /// A stream that can be written to send data. + /// This stream may employ buffering, flush or close the stream to ensure data is sent immediately. + /// + /// + /// Thrown if the provided transfer key was not registered with . + /// + public static Stream StartTransfer(this ITransferManager manager, INetChannel channel, string key) + { + return manager.StartTransfer(channel, + new TransferStartInfo + { + MessageKey = key + }); + } +} + +/// +/// Information used to start a transfer stream. +/// +public sealed class TransferStartInfo +{ + /// + /// The key to start the transfer for. This uniquely identifies a "use case" and must be registered in advance. + /// + public required string MessageKey; +} + +/// +/// Event data raised when a new transfer stream is received. +/// +public sealed class TransferReceivedEvent +{ + /// + /// The key being transferred for. + /// + public readonly string Key; + + /// + /// A stream that can be used to read the received data. + /// + /// + /// Users should drain this stream as quickly as possible, as failing to do so may stall the entire transfer system. + /// + public readonly Stream DataStream; + + /// + /// The net channel that is sending the data. + /// + public readonly INetChannel Channel; + + internal TransferReceivedEvent(string key, INetChannel channel, Stream stream) + { + Key = key; + DataStream = stream; + Channel = channel; + } +} diff --git a/Robust.Shared/Network/Transfer/TransferImplLidgren.cs b/Robust.Shared/Network/Transfer/TransferImplLidgren.cs new file mode 100644 index 000000000..d5f6518d6 --- /dev/null +++ b/Robust.Shared/Network/Transfer/TransferImplLidgren.cs @@ -0,0 +1,116 @@ +using System; +using System.Buffers; +using System.IO; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Robust.Shared.Log; +using Robust.Shared.Network.Messages.Transfer; +using Robust.Shared.Utility; + +namespace Robust.Shared.Network.Transfer; + +internal sealed class TransferImplLidgren( + ISawmill sawmill, + INetChannel channel, + BaseTransferManager transferManager, + INetManager netManager) : BaseTransferImpl(sawmill, transferManager, channel) +{ + private TaskCompletionSource? _serverInitTcs; + + private (TransferFlags Flags, long TransferId, ChannelWriter> Channel)? _parsedHeader; + + public override Task ServerInit() + { + var initMsg = new MsgTransferInit(); + + netManager.ServerSendMessage(initMsg, Channel); + + _serverInitTcs = new TaskCompletionSource(); + return _serverInitTcs.Task; + } + + public override Task ClientInit(CancellationToken cancel) + { + var initMsg = new MsgTransferAckInit(); + + netManager.ClientSendMessage(initMsg); + + return Task.CompletedTask; + } + + public override Stream StartTransfer(TransferStartInfo startInfo) + { + var id = Interlocked.Increment(ref OutgoingIdCounter); + + return new SendStream(Channel, this, id, startInfo.MessageKey); + } + + // We can't meaningfully communicate backpressure into Lidgren so this is our only option. + protected override bool BoundedChannel => false; + + public void ReceiveInitAck() + { + _serverInitTcs?.TrySetResult(); + } + + public void ReceiveData(MsgTransferData data) + { + DebugTools.Assert(data.Data.Array != null); + + BaseTransferManager.ReceivedDataMetrics.Inc(data.Data.Count); + + // Header message + if (!_parsedHeader.HasValue) + { + HandleHeaderReceived(data.Data, out var flags, out var transferId, out var channel); + ArrayPool.Shared.Return(data.Data.Array); + + if ((flags & TransferFlags.HasData) == 0) + HandlePostData(flags, transferId, channel); + else + _parsedHeader = (flags, transferId, channel); + + return; + } + + // Data message + + { + var (flags, transferId, channel) = _parsedHeader.Value; + + _parsedHeader = null; + + channel.WriteAsync(data.Data).AsTask().Wait(); + + HandlePostData(flags, transferId, channel); + } + } + + private sealed class SendStream : ChunkedSendStream + { + private readonly INetChannel _channel; + + public SendStream(INetChannel channel, TransferImplLidgren parent, long id, string key) : base(parent, id, key) + { + _channel = channel; + } + + protected override async ValueTask SendChunkAsync(ArraySegment buffer, CancellationToken cancellationToken) + { + if (!_channel.IsConnected) + throw new InvalidOperationException("Channel is disconnected"); + + BaseTransferManager.SentDataMetrics.Inc(buffer.Count); + + await Parent.Parent.WaitToSend(_channel); + + var msgData = new MsgTransferData + { + Data = buffer + }; + + _channel.SendMessage(msgData); + } + } +} diff --git a/Robust.Shared/Network/Transfer/TransferImplWebSocket.cs b/Robust.Shared/Network/Transfer/TransferImplWebSocket.cs new file mode 100644 index 000000000..ca43b5a34 --- /dev/null +++ b/Robust.Shared/Network/Transfer/TransferImplWebSocket.cs @@ -0,0 +1,136 @@ +using System; +using System.Buffers; +using System.IO; +using System.Net; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Robust.Shared.Log; +using Robust.Shared.Utility; + +namespace Robust.Shared.Network.Transfer; + +#pragma warning disable RA0004 // Task.Result + +internal abstract class TransferImplWebSocket : BaseTransferImpl +{ + internal const string KeyHeaderName = "RT-Key"; + internal const string UserIdHeaderName = "RT-UserId"; + + internal const int RandomKeyBytes = 32; + + private readonly byte[] _headerBuffer = ArrayPool.Shared.Rent(MaxHeaderSize); + + private readonly CancellationTokenSource _readCancel = new(); + + public WebSocket? WebSocket; + + protected TransferImplWebSocket(ISawmill sawmill, BaseTransferManager parent, INetChannel channel) + : base(sawmill, parent, channel) + { + } + + protected override bool BoundedChannel => true; + + public override Stream StartTransfer(TransferStartInfo startInfo) + { + if (WebSocket == null) + throw new InvalidOperationException("Player not connected yet"); + + var id = Interlocked.Increment(ref OutgoingIdCounter); + + return new SendStream(this, id, startInfo.MessageKey); + } + + protected async void ReadThread() + { + DebugTools.Assert(WebSocket != null); + + try + { + var cancel = _readCancel.Token; + while (!cancel.IsCancellationRequested) + { + var receiveResult = await WebSocket + .ReceiveAsync(_headerBuffer.AsMemory(), cancel) + .ConfigureAwait(false); + + BaseTransferManager.ReceivedDataMetrics.Inc(receiveResult.Count); + + if (!receiveResult.EndOfMessage) + throw new ProtocolViolationException("Header did not fit in one receive"); + + if (receiveResult.MessageType != WebSocketMessageType.Binary) + throw new ProtocolViolationException("Data must be binary!"); + + // Parse received data. + var receivedData = _headerBuffer.AsMemory(0, receiveResult.Count); + HandleHeaderReceived(receivedData, out var flags, out var transferId, out var channel); + + if ((flags & TransferFlags.HasData) != 0) + await ReceiveTransferData(WebSocket, channel, cancel).ConfigureAwait(false); + + HandlePostData(flags, transferId, channel); + } + } + catch (Exception e) + { + Sawmill.Error($"Error reading transfer socket: {e}"); + Channel.Disconnect("Error in transfer socket"); + } + } + + private sealed class SendStream : ChunkedSendStream + { + public SendStream(TransferImplWebSocket parent, long id, string key) : base(parent, id, key) + { + } + + protected override async ValueTask SendChunkAsync(ArraySegment buffer, CancellationToken cancel) + { + var ws = ((TransferImplWebSocket)Parent).WebSocket!; + + BaseTransferManager.SentDataMetrics.Inc(buffer.Count); + + await ws.SendAsync( + buffer, + WebSocketMessageType.Binary, + endOfMessage: true, + cancel) + .ConfigureAwait(false); + } + } + + private static async ValueTask ReceiveTransferData( + WebSocket ws, + ChannelWriter> channel, + CancellationToken cancel) + { + while (!cancel.IsCancellationRequested) + { + var buf = ArrayPool.Shared.Rent(BufferSize); + var result = await ws.ReceiveAsync(buf.AsMemory(), cancel).ConfigureAwait(false); + + BaseTransferManager.ReceivedDataMetrics.Inc(result.Count); + + if (result.MessageType != WebSocketMessageType.Binary) + throw new ProtocolViolationException("Data must be binary!"); + + await channel.WriteAsync(new ArraySegment(buf, 0, result.Count), cancel).ConfigureAwait(false); + + if (result.EndOfMessage) + break; + } + } + + public override void Dispose() + { + base.Dispose(); + + WebSocket?.Dispose(); + _readCancel.Cancel(); + + ArrayPool.Shared.Return(_headerBuffer); + } +} diff --git a/Robust.Shared/Network/Transfer/TransferTest.cs b/Robust.Shared/Network/Transfer/TransferTest.cs new file mode 100644 index 000000000..83e37cd75 --- /dev/null +++ b/Robust.Shared/Network/Transfer/TransferTest.cs @@ -0,0 +1,81 @@ +using System; +using Robust.Shared.Console; +using Robust.Shared.IoC; +using Robust.Shared.Log; +using Robust.Shared.Utility; + +namespace Robust.Shared.Network.Transfer; + +internal sealed class TransferTestCommand : IConsoleCommand +{ + internal const string CommandKey = "transfer_test"; + + [Dependency] private readonly ITransferManager _transferManager = null!; + + public string Command => CommandKey; + public string Description => ""; + public string Help => "Usage: transfer_test "; + + public async void Execute(IConsoleShell shell, string argStr, string[] args) + { + if (shell.Player?.Channel is not { } channel) + { + shell.WriteError("You do not have a channel"); + return; + } + + var bufferCount = 1024; + if (args.Length >= 1) + bufferCount = Parse.Int32(args[0]); + + await using var stream = _transferManager.StartTransfer(channel, + new TransferStartInfo + { + MessageKey = TransferTestManager.Key, + }); + + var buffer = new byte[16384]; + for (var i = 0; i < bufferCount; i++) + { + await stream.WriteAsync(buffer).ConfigureAwait(false); + } + } +} + +internal abstract class TransferTestManager(ITransferManager manager, ILogManager logManager) +{ + private readonly ISawmill _sawmill = logManager.GetSawmill("net.transfer.test"); + + internal const string Key = nameof(TransferTestManager); + + public void Initialize() + { + manager.RegisterTransferMessage(Key, RxCallback); + } + + // ReSharper disable once AsyncVoidMethod + private async void RxCallback(TransferReceivedEvent receive) + { + if (!PermissionCheck(receive.Channel)) + { + receive.Channel.Disconnect("Not allowed"); + return; + } + + _sawmill.Info("Receiving debug transfer"); + + var buffer = new byte[16384]; + var totalRead = 0L; + while (true) + { + var read = await receive.DataStream.ReadAsync(buffer.AsMemory()).ConfigureAwait(false); + totalRead += read; + if (read == 0) + break; + } + + _sawmill.Info($"Debug transfer complete for {ByteHelpers.FormatKibibytes(totalRead)} bytes"); + } + + protected abstract bool PermissionCheck(INetChannel channel); +} diff --git a/Robust.Shared/Player/CommonSession.cs b/Robust.Shared/Player/CommonSession.cs index 78166dac3..36612eca0 100644 --- a/Robust.Shared/Player/CommonSession.cs +++ b/Robust.Shared/Player/CommonSession.cs @@ -50,6 +50,11 @@ internal sealed class CommonSession : ICommonSessionInternal [ViewVariables] public LoginType AuthType => Channel?.AuthType ?? default; + [ViewVariables] + public bool InitialPlayerListReqDone; + [ViewVariables] + public bool InitialResourcesDone; + public override string ToString() => Name; public CommonSession(NetUserId user, string name, SessionData data) diff --git a/Robust.Shared/Player/DummySession.cs b/Robust.Shared/Player/DummySession.cs index f56adaf05..84dbe2f8b 100644 --- a/Robust.Shared/Player/DummySession.cs +++ b/Robust.Shared/Player/DummySession.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Net; +using Lidgren.Network; using Robust.Shared.Enums; using Robust.Shared.GameObjects; using Robust.Shared.GameStates; @@ -124,4 +125,9 @@ internal sealed class DummyChannel(DummySession session) : INetChannel { throw new NotImplementedException(); } + + public bool CanSendImmediately(NetDeliveryMethod method, int sequenceChannel) + { + return true; + } } diff --git a/Robust.Shared/Upload/NetworkResourceAckMessage.cs b/Robust.Shared/Upload/NetworkResourceAckMessage.cs new file mode 100644 index 000000000..ccd073c52 --- /dev/null +++ b/Robust.Shared/Upload/NetworkResourceAckMessage.cs @@ -0,0 +1,25 @@ +using Lidgren.Network; +using Robust.Shared.Network; +using Robust.Shared.Serialization; + +namespace Robust.Shared.Upload; + +/// +/// Sent client -> server to acknowledge completion of a file upload. +/// +internal sealed class NetworkResourceAckMessage : NetMessage +{ + public override MsgGroups MsgGroup => MsgGroups.String; + + public int Key; + + public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer) + { + Key = buffer.ReadInt32(); + } + + public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer) + { + buffer.Write(Key); + } +} diff --git a/Robust.Shared/Upload/NetworkResourceUploadMessage.cs b/Robust.Shared/Upload/NetworkResourceUploadMessage.cs index 0e7c62ff3..8432ca80a 100644 --- a/Robust.Shared/Upload/NetworkResourceUploadMessage.cs +++ b/Robust.Shared/Upload/NetworkResourceUploadMessage.cs @@ -6,6 +6,7 @@ using Robust.Shared.Utility; namespace Robust.Shared.Upload; +[Obsolete("The engine no longer uses this message")] public sealed class NetworkResourceUploadMessage : NetMessage { public override MsgGroups MsgGroup => MsgGroups.String; diff --git a/Robust.Shared/Upload/SharedNetworkResourceManager.cs b/Robust.Shared/Upload/SharedNetworkResourceManager.cs index a94ba028a..84fac8f16 100644 --- a/Robust.Shared/Upload/SharedNetworkResourceManager.cs +++ b/Robust.Shared/Upload/SharedNetworkResourceManager.cs @@ -1,8 +1,15 @@ using System; +using System.Buffers.Binary; using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using Robust.Shared.Asynchronous; using Robust.Shared.ContentPack; using Robust.Shared.IoC; +using Robust.Shared.Log; using Robust.Shared.Network; +using Robust.Shared.Network.Transfer; using Robust.Shared.Replays; using Robust.Shared.Serialization; using Robust.Shared.Serialization.Markdown.Mapping; @@ -14,11 +21,26 @@ namespace Robust.Shared.Upload; /// Manager that allows resources to be added at runtime by admins. /// They will be sent to all clients automatically. ///
-public abstract class SharedNetworkResourceManager : IDisposable +public abstract class SharedNetworkResourceManager : IDisposable, IPostInjectInit { - [Dependency] private readonly INetManager _netManager = default!; + /// + /// Transfer key for client -> server uploads by privileged clients. + /// + internal const string TransferKeyNetworkUpload = "TransferKeyNetworkUpload"; + + /// + /// Transfer key for server -> client downloads + /// + internal const string TransferKeyNetworkDownload = "TransferKeyNetworkDownload"; + [Dependency] private readonly IReplayRecordingManager _replay = default!; + [Dependency] protected readonly INetManager NetManager = default!; [Dependency] protected readonly IResourceManager ResourceManager = default!; + [Dependency] protected readonly ITransferManager TransferManager = default!; + [Dependency] protected readonly ILogManager LogManager = default!; + [Dependency] private readonly ITaskManager _taskManager = default!; + + protected ISawmill Sawmill = default!; public const double BytesToMegabytes = 0.000001d; @@ -32,10 +54,8 @@ public abstract class SharedNetworkResourceManager : IDisposable public bool FileExists(ResPath path) => ContentRoot.FileExists(path); - public virtual void Initialize() + internal virtual void Initialize() { - _netManager.RegisterNetMessage(ResourceUploadMsg); - // Add our content root to the resource manager. ResourceManager.AddRoot(Prefix, ContentRoot); _replay.RecordingStarted += OnStartReplayRecording; @@ -50,23 +70,111 @@ public abstract class SharedNetworkResourceManager : IDisposable } } - protected virtual void ResourceUploadMsg(NetworkResourceUploadMessage msg) + protected internal void StoreFile(ResPath path, byte[] data) { - ContentRoot.AddOrUpdateFile(msg.RelativePath, msg.Data); - _replay.RecordReplayMessage(new ReplayResourceUploadMsg { RelativePath = msg.RelativePath, Data = msg.Data }); + ContentRoot.AddOrUpdateFile(path, data); + _replay.RecordReplayMessage(new ReplayResourceUploadMsg { RelativePath = path, Data = data }); } + private async IAsyncEnumerable<(ResPath Relative, byte[] Data)> ReadTransferStream(Stream stream) + { + var lengthBytes = new byte[4]; + var continueByte = new byte[1]; + + while (true) + { + await stream.ReadExactlyAsync(lengthBytes).ConfigureAwait(false); + var pathLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes); + + await stream.ReadExactlyAsync(lengthBytes).ConfigureAwait(false); + var dataLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthBytes); + + ValidateUpload(dataLength); + + var pathData = new byte[pathLength]; + await stream.ReadExactlyAsync(pathData).ConfigureAwait(false); + var data = new byte[dataLength]; + await stream.ReadExactlyAsync(data).ConfigureAwait(false); + + var path = new ResPath(Encoding.UTF8.GetString(pathData)); + yield return (path, data); + + await stream.ReadExactlyAsync(continueByte).ConfigureAwait(false); + if (continueByte[0] == 0) + break; + } + } + + protected virtual void ValidateUpload(uint size) + { + } + + protected async Task> IngestFileStream(Stream stream) + { + var list = new List<(ResPath Relative, byte[] Data)>(); + + await foreach (var (relative, data) in ReadTransferStream(stream).ConfigureAwait(false)) + { + Sawmill.Verbose($"Storing uploaded file: {relative} ({ByteHelpers.FormatBytes(data.Length)})"); + _taskManager.RunOnMainThread(() => + { + StoreFile(relative, data); + }); + list.Add((relative, data)); + } + + return list; + } + + internal static async Task WriteFileStream(Stream stream, IEnumerable<(ResPath Relative, byte[] Data)> files) + { + var lengthBytes = new byte[4]; + var continueByte = new byte[1]; + + var first = true; + + foreach (var (relative, data) in files) + { + if (!first) + { + continueByte[0] = 1; + await stream.WriteAsync(continueByte).ConfigureAwait(false); + } + + first = false; + + BinaryPrimitives.WriteUInt32LittleEndian(lengthBytes, (uint)Encoding.UTF8.GetByteCount(relative.CanonPath)); + await stream.WriteAsync(lengthBytes).ConfigureAwait(false); + + BinaryPrimitives.WriteUInt32LittleEndian(lengthBytes, (uint)data.Length); + await stream.WriteAsync(lengthBytes).ConfigureAwait(false); + + await stream.WriteAsync(Encoding.UTF8.GetBytes(relative.CanonPath)).ConfigureAwait(false); + await stream.WriteAsync(data).ConfigureAwait(false); + } + + continueByte[0] = 0; + await stream.WriteAsync(continueByte).ConfigureAwait(false); + } + +#pragma warning disable CA1816 // Not adding a finalizer... public void Dispose() +#pragma warning restore CA1816 { // This is called automatically when the IoCManager's dependency collection is cleared. // MemoryContentRoot uses a ReaderWriterLockSlim, which we need to dispose of. ContentRoot.Dispose(); } - [Serializable, NetSerializable] - public sealed class ReplayResourceUploadMsg + void IPostInjectInit.PostInject() { - public byte[] Data = default!; - public ResPath RelativePath = default!; + Sawmill = LogManager.GetSawmill("netres"); + } + + [Serializable, NetSerializable] + internal sealed class ReplayResourceUploadMsg + { + public required byte[] Data; + public required ResPath RelativePath; } } diff --git a/Robust.Shared/Utility/SaneStream.cs b/Robust.Shared/Utility/SaneStream.cs new file mode 100644 index 000000000..b1616cf84 --- /dev/null +++ b/Robust.Shared/Utility/SaneStream.cs @@ -0,0 +1,57 @@ +using System; +using System.IO; + +namespace Robust.Shared.Utility; + +/// +/// Base class for that has everything stubbed out in a "not supported" fashion, +/// so you don't need to implement a dozen overloads yourself. +/// +internal abstract class SaneStream : Stream +{ + public override void Flush() + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return Read(buffer.AsSpan(offset, count)); + } + + public override int Read(Span buffer) + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + Write(buffer.AsSpan(offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + throw new NotSupportedException(); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } +} diff --git a/Robust.UnitTesting/RobustIntegrationTest.NetManager.cs b/Robust.UnitTesting/RobustIntegrationTest.NetManager.cs index e830ae152..4b4d41a67 100644 --- a/Robust.UnitTesting/RobustIntegrationTest.NetManager.cs +++ b/Robust.UnitTesting/RobustIntegrationTest.NetManager.cs @@ -496,6 +496,11 @@ namespace Robust.UnitTesting // Don't handle bye sending in here I guess. Disconnect(reason); } + + public bool CanSendImmediately(NetDeliveryMethod method, int sequenceChannel) + { + return true; + } } private sealed class ConnectMessage