ET学习笔记和资源收集

| 发布     | 分类 ET学习笔记和资源收集  | 标签 Networking 





长度 1 2 data.length
类型 byte ushort/int16 byte[]
描述 flag opcode data
// flag第一位为1表示这是rpc返回消息,否则交由MessageDispatcher分发



长度 2 size
类型 ushort/int16 List<byte[]>
描述 size packeBuffs
packeBuffs[0] = flag:byte
packeBuffs[1] = opcode:ushort
packeBuffs[2] = data:byte[]

// size = sum(flag, opcode, data)



C2S Connect连接请求

长度 4 4
类型 uint32 uint32
描述 KcpProtocalType.SYN KChannel.Con (Client)

S2C Accept接受请求

长度 4 4 4
类型 uint32 uint32 uint32
描述 KcpProtocalType.ACK KChannel.Con (Client) KChannel.Con (Server)

DisConnect断开连接

长度 4 4 4
类型 uint32 uint32 uint32
描述 KcpProtocalType.FIN KChannel.Con KChannel.RemoteConn

分包消息

比较啰嗦,后面有空再写



  • LocationProxyComponentSystem: 非Location服的 代理,负责ActorComponent向LocationComponent的注册Add、查找Get、移除Remove、锁Lock、解锁UnLock



ActorComponent.AddLocation()

ActorComponentEx

		public static async Task AddLocation(this ActorComponent self)
		{
			await Game.Scene.GetComponent<LocationProxyComponent>().Add(self.Entity.Id);
		}

LocationProxyComponentSystem


		public async Task Add(long key)
		{
			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
			await session.Call(new ObjectAddRequest() { Key = key, AppId = this.AppId });
		}

以下是Location服

ObjectAddRequestHandler

		protected override void Run(Session session, ObjectAddRequest message, Action<ObjectAddResponse> reply)
		{
			ObjectAddResponse response = new ObjectAddResponse();
			try
			{
				Game.Scene.GetComponent<LocationComponent>().Add(message.Key, message.AppId);
				reply(response);
			}
			catch (Exception e)
			{
				ReplyError(response, e, reply);
			}
		}

LocationComponent

		public void Add(long key, int appId)
		{
			this.locations[key] = appId;

			Log.Info($"location add key: {key} appid: {appId}");

			// 更新db
			//await Game.Scene.GetComponent<DBProxyComponent>().Save(new Location(key, address));
		}




ActorComponent.RemoveLocation()

ActorComponentEx


		public static async Task RemoveLocation(this ActorComponent self)
		{
			await Game.Scene.GetComponent<LocationProxyComponent>().Remove(self.Entity.Id);
		}

LocationProxyComponentSystem


		public async Task Remove(long key)
		{
			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
			await session.Call(new ObjectRemoveRequest() { Key = key });
		}

以下是Location服

ObjectRemoveRequestHandler

		protected override void Run(Session session, ObjectRemoveRequest message, Action<ObjectRemoveResponse> reply)
		{
			ObjectRemoveResponse response = new ObjectRemoveResponse();
			try
			{
				Game.Scene.GetComponent<LocationComponent>().Remove(message.Key);
				reply(response);
			}
			catch (Exception e)
			{
				ReplyError(response, e, reply);
			}
		}

LocationComponent


		public void Remove(long key)
		{
			Log.Info($"location remove key: {key}");
			this.locations.Remove(key);
		}




LocationProxyComponentSystem

		
		public async Task Lock(long key, int time = 1000)
		{
			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
			await session.Call(new ObjectLockRequest() { Key = key, LockAppId = this.AppId, Time = time });
		}

以下是Location服

ObjectLockRequestHandler

		protected override void Run(Session session, ObjectLockRequest message, Action<ObjectLockResponse> reply)
		{
			ObjectLockResponse response = new ObjectLockResponse();
			try
			{
				Game.Scene.GetComponent<LocationComponent>().LockAsync(message.Key, message.LockAppId, message.Time);
				reply(response);
			}
			catch (Exception e)
			{
				ReplyError(response, e, reply);
			}
		}

LocationComponent


		

		public Task<bool> LockAsync(long key, int appId, int time)
		{
			if (!this.lockDict.ContainsKey(key))
			{
				this.Lock(key, appId, time);
				return Task.FromResult(true);
			}

			LocationLockTask task = new LocationLockTask(key, appId, time);
			this.AddTask(key, task);
			return task.Task;
		}




LocationProxyComponentSystem

		
		
		public async Task UnLock(long key, int value)
		{
			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
			await session.Call(new ObjectUnLockRequest() { Key = key, UnLockAppId = this.AppId, AppId = value});
		}

以下是Location服

ObjectUnLockRequestHandler

		
		protected override void Run(Session session, ObjectUnLockRequest message, Action<ObjectUnLockResponse> reply)
		{
			ObjectUnLockResponse response = new ObjectUnLockResponse();
			try
			{
				Game.Scene.GetComponent<LocationComponent>().UpdateAndUnLock(message.Key, message.UnLockAppId, message.AppId);
				reply(response);
			}
			catch (Exception e)
			{
				ReplyError(response, e, reply);
			}
		}

LocationComponent


		
		public void UpdateAndUnLock(long key, int unLockAppId, int value)
		{
			int lockAppId = 0;
			this.lockDict.TryGetValue(key, out lockAppId);
			if (lockAppId != unLockAppId)
			{
				Log.Error($"unlock appid is different {lockAppId} {unLockAppId}" );
			}
			Log.Info($"location unlock key: {key} unLockAppId: {unLockAppId} new: {value}");
			this.locations[key] = value;
			this.UnLock(key);
		}




	[ObjectSystem]
	public class ActorProxyStartSystem : StartSystem<ActorProxy>
	{
		public override async void Start(ActorProxy self)
		{
			int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(self.Id);
			self.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;

			self.UpdateAsync();
		}
	}

LocationProxyComponentSystem

		
		
		public async Task<int> Get(long key)
		{
			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
			ObjectGetResponse response = (ObjectGetResponse)await session.Call(new ObjectGetRequest() { Key = key });
			return response.AppId;
		}

以下是Location服

ObjectGetRequestHandler

		protected override async void Run(Session session, ObjectGetRequest message, Action<ObjectGetResponse> reply)
		{
			ObjectGetResponse response = new ObjectGetResponse();
			try
			{
				int appId = await Game.Scene.GetComponent<LocationComponent>().GetAsync(message.Key);
				if (appId == 0)
				{
					response.Error = ErrorCode.ERR_ActorLocationNotFound;
				}
				response.AppId = appId;
				reply(response);
			}
			catch (Exception e)
			{
				ReplyError(response, e, reply);
			}
		}

LocationComponent


		

		public Task<int> GetAsync(long key)
		{
			if (!this.lockDict.ContainsKey(key))
			{
				this.locations.TryGetValue(key, out int location);
				Log.Info($"location get key: {key} {location}");
				return Task.FromResult(location);
			}

			LocationQueryTask task = new LocationQueryTask(key);
			this.AddTask(key, task);
			return task.Task;
		}




Session

this.Network.MessageDispatcher.Dispatch(this, packet);


		private void Run(Packet packet)
		{
			if (packet.Length < Packet.MinSize)
			{
				Log.Error($"message error length < {Packet.MinSize}, ip: {this.RemoteAddress}");
				this.Network.Remove(this.Id);
				return;
			}

			byte flag = packet.Flag();
			ushort opcode = packet.Opcode();

#if !SERVER
			if (OpcodeHelper.IsClientHotfixMessage(opcode))
			{
				this.Network.MessageDispatcher.Dispatch(this, packet);
				return;
			}
#endif

			// flag第一位为1表示这是rpc返回消息,否则交由MessageDispatcher分发
			if ((flag & 0x01) == 0)
			{
				this.Network.MessageDispatcher.Dispatch(this, packet);
				return;
			}
			
			OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();
			Type responseType = opcodeTypeComponent.GetType(opcode);
			object message = this.Network.MessagePacker.DeserializeFrom(responseType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
			//Log.Debug($"recv: {JsonHelper.ToJson(message)}");

			IResponse response = message as IResponse;
			if (response == null)
			{
				throw new Exception($"flag is response, but message is not! {opcode}");
			}
			Action<IResponse> action;
			if (!this.requestCallback.TryGetValue(response.RpcId, out action))
			{
				return;
			}
			this.requestCallback.Remove(response.RpcId);

			action(response);
		}

InnerMessageDispatcher


	/// <summary>
    /// 服务器与服务器执行的消息通信派发
    /// </summary>
	public class InnerMessageDispatcher: IMessageDispatcher
	{
		public void Dispatch(Session session, Packet packet)
		{
			ushort opcode = packet.Opcode();
			Type messageType = Game.Scene.GetComponent<OpcodeTypeComponent>().GetType(opcode);
			IMessage message = (IMessage)session.Network.MessagePacker.DeserializeFrom(messageType, packet.Bytes, Packet.Index, packet.Length - Packet.Index);
			
			// 收到actor消息,放入actor队列
			if (message is IActorMessage iActorMessage)
			{
				Entity entity = Game.Scene.GetComponent<ActorManagerComponent>().Get(iActorMessage.ActorId);
				if (entity == null)
				{
					Log.Warning($"not found actor: {iActorMessage.ActorId}");
					ActorResponse response = new ActorResponse
					{
						Error = ErrorCode.ERR_NotFoundActor,
						RpcId = iActorMessage.RpcId
					};
					session.Reply(response);
					return;
				}
				
				entity.GetComponent<ActorComponent>().Add(new ActorMessageInfo() { Session = session, Message = iActorMessage });
				return;
			}
			
			Game.Scene.GetComponent<MessageDispatherComponent>().Handle(session, new MessageInfo(opcode, message));
		}
	}

ActorComponentEx

	
        /// <summary>
        /// 将消息添加到队列
        /// 如果有处理任务在等待消息,就将该消息从队列里拿出来丢给它执行
        /// </summary>
		public static void Add(this ActorComponent self, ActorMessageInfo info)
		{
			self.queue.Enqueue(info);

			if (self.tcs == null)
			{
				return;
			}

			var t = self.tcs;
			self.tcs = null;
			t.SetResult(self.queue.Dequeue());
		}

ActorComponent 启动消息等待处理循环

ActorComponent.Awake

ActorComponentEx

	[ObjectSystem]
	public class ActorComponentAwakeSystem : AwakeSystem<ActorComponent>
	{
		public override void Awake(ActorComponent self)
		{
			self.entityActorHandler = new CommonEntityActorHandler();
			self.queue = new Queue<ActorMessageInfo>();
			Game.Scene.GetComponent<ActorManagerComponent>().Add(self.Entity);

			self.HandleAsync();
		}
	}

	[ObjectSystem]
	public class ActorComponentAwake1System : AwakeSystem<ActorComponent, IEntityActorHandler>
	{
		public override void Awake(ActorComponent self, IEntityActorHandler iEntityActorHandler)
		{
			self.entityActorHandler = iEntityActorHandler;
			self.queue = new Queue<ActorMessageInfo>();
			Game.Scene.GetComponent<ActorManagerComponent>().Add(self.Entity);

			self.HandleAsync();
		}
	}

ActorComponentEx

		public static async void HandleAsync(this ActorComponent self)
		{
			while (true)
			{
				if (self.IsDisposed)
				{
					return;
				}
				try
				{
					ActorMessageInfo info = await self.GetAsync();
                    // 返回null表示actor已经删除,协程要终止, ActorComponent.Dispose发送的
					if (info.Message == null)
					{
						return;
					}
					await self.entityActorHandler.Handle(info.Session, (Entity)self.Parent, info.Message);
				}
				catch (Exception e)
				{
					Log.Error(e);
				}
			}
		}


		private static Task<ActorMessageInfo> GetAsync(this ActorComponent self)
		{
			if (self.queue.Count > 0)
			{
				return Task.FromResult(self.queue.Dequeue());
			}

			self.tcs = new TaskCompletionSource<ActorMessageInfo>();
			return self.tcs.Task;
		}

IEntityActorHandler 派生类


	/// <summary>
    /// gate session收到的消息直接转发给客户端
    /// </summary>
    public class GateSessionEntityActorHandler : IEntityActorHandler
    {
        public async Task Handle(Session session, Entity entity, IActorMessage actorMessage)
        {
			ActorResponse actorResponse = new ActorResponse
			{
				RpcId = actorMessage.RpcId
			};
			try
	        {
		        // 发送给客户端
		        Session clientSession = entity as Session;
				clientSession.Send(actorMessage);

				session.Reply(actorResponse);
		        await Task.CompletedTask;
	        }
	        catch (Exception e)
	        {
		        actorResponse.Error = ErrorCode.ERR_SessionActorError;
		        actorResponse.Message = $"session actor error {e}";
				session.Reply(actorResponse);
				throw;
	        }
        }
    }

    public class CommonEntityActorHandler : IEntityActorHandler
    {
        public async Task Handle(Session session, Entity entity, IActorMessage actorMessage)
        {
			await Game.Scene.GetComponent<ActorMessageDispatherComponent>().Handle(session, entity, actorMessage);
        }
    }

    /// <summary>
    /// 玩家收到帧同步消息交给帧同步组件处理
    /// </summary>
    public class MapUnitEntityActorHandler : IEntityActorHandler
    {
        public async Task Handle(Session session, Entity entity, IActorMessage actorMessage)
        {
			if (actorMessage is OneFrameMessage aFrameMessage)
            {
				Game.Scene.GetComponent<ServerFrameComponent>().Add(aFrameMessage);

				ActorResponse actorResponse = new ActorResponse
				{
					RpcId = actorMessage.RpcId
				};
				session.Reply(actorResponse);
				return;
            }
            await Game.Scene.GetComponent<ActorMessageDispatherComponent>().Handle(session, entity, actorMessage);
        }
    }


ActorMessageDispatherComponent Actor消息分发组件


		public async Task Handle(Session session, Entity entity, IActorMessage actorRequest)
		{
			if (!this.handlers.TryGetValue(actorRequest.GetType(), out IMActorHandler handler))
			{
				throw new Exception($"not found message handler: {MongoHelper.ToJson(actorRequest)}");
			}
			
			await handler.Handle(session, entity, actorRequest);
		}

IMActorHandler 派生类

AMActorHandler<E, Message>: IMActorHandler where E: Entity where Message : class

Actor_TestHandler

AMActorRpcHandler<E, Request, Response>: IMActorHandler where E: Entity where Request: class, IActorRequest where Response : class, IActorResponse

Actor_TestRequestHandler Actor_TransferHandler C2M_TestActorRequestHandler

上一篇: ForgeNetworkingRemastered 之 FreamStream添加属性并同步
下一篇: 服务器使用AstarPathfindingProject的修改