본문 바로가기
유니티3D[Unity3D]

M2MQTT for Unity 비동기 통신(Async-Await)

by 은유지니 2023. 5. 8.

https://github.com/Eunji-new/M2MqttUnity

 

GitHub - Eunji-new/M2MqttUnity: M2MQTT for Unity_비동기 소켓통신으로 변경_미연결 시, 자동 재연결

M2MQTT for Unity_비동기 소켓통신으로 변경_미연결 시, 자동 재연결. Contribute to Eunji-new/M2MqttUnity development by creating an account on GitHub.

github.com

 

기존 M2MQTT for Unity 오픈소스의 동기프로그래밍 단점을 보완한 버전입니다.

  • 소켓통신을 비동기(BeginConnect)로 변경하였습니다.
  • 이는 비동기 통신으로 소켓 ip 연결 중에 시스템이 정지하지 않고 정상작동합니다.
  • 연결이 끊겼을 경우, 자동으로 재연결하는 로직을 추가하였습니다.

 


재연결 하는 부분(M2MqttUnity.cs)

 - 코루틴을 이용하여 연결 실패 or 끊김 시, disconnect하고 5초후 재연결 시도

protected override void OnConnectionFailed(string errorMessage)
{
    AddUiMessage("CONNECTION FAILED! " + errorMessage);
    StartCoroutine(CoReConnect());
}

protected override void OnDisconnected()
{
    AddUiMessage("Disconnected.");
}

protected override void OnConnectionLost()
{
    AddUiMessage("CONNECTION LOST!");
    StartCoroutine(CoReConnect());
}

IEnumerator CoReConnect()
{
    Disconnect(); //재연결하기전 DisConnect 해줘야 연결관련 변수들이 초기화됨. 초기화 하지 않으면 연결 끊어짐 감지를 제대로 못함.
    yield return new WaitForSeconds(5.0f);
    Debug.Log($"{brokerAddress}로 Connect 재시도");
    Connect();
}

 

비동기 구현 1(MqttNetworkChannel.cs)

- socket의 BeginConnect함수로 비동기 소켓통신을 구현합니다.

- Callback함수를 받을때 IsEndConnection을 true로 변경하며, IsEndConnection이 true가 될때까지 기다립니다.

public async Task Connect()
{
    this.socket = new Socket(this.remoteIpAddress.GetAddressFamily(), SocketType.Stream, ProtocolType.Tcp);
    // try connection to the broker
    // 소켓통신 동기 방식 - 기존
    // this.socket.Connect(new IPEndPoint(this.remoteIpAddress, this.remotePort)); 
    //소켓통신 비동기 방식 - 변경
    this.socket.BeginConnect(new IPEndPoint(this.remoteIpAddress, this.remotePort), new AsyncCallback(ConnectionCallback), this.socket);
    //1초에 한번씩 콜백함수 끝났는지 검사
    while(!IsEndConnection){
        await Task.Delay(1000);
        if(IsEndConnection)
        {
            break;
        }
    }
    IsEndConnection = false;
    /* ...생략... */
}

bool IsEndConnection = false;
public void ConnectionCallback(IAsyncResult IAR)
{
    IsEndConnection = true;
}

비동기 구현 2(MqttClient.cs)

- await this.channel.Connect; 에서 위 소켓 BeginConnect가 콜백함수를 실행할때까지 기다리게 됩니다.

public async Task<byte> Connect(string clientId,
            string username,
            string password, /* ... 생략 ... */)
        {
            // create CONNECT message
            /* ... 생략 ... */

            try
            {
                // connect to the broker
                //channel의 connect함수 끝날때까지 기다림
                await this.channel.Connect();
            }
            catch (Exception ex)
            {
                throw new MqttConnectionException("Exception connecting to the broker", ex);
            }

            /* ... 생략 ... */
            
            MqttMsgConnack connack;
            //예외처리 구문 추가
            try{
                connack = (MqttMsgConnack)this.SendReceive(connect);
            }
            catch(Exception ex)
            {
                return 0;
            }
            // if connection accepted, start keep alive timer and 
            if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
            {
                /* ... 생략 ... */
				
                //IsConnected 값으로 통신 결과 판단
                this.IsConnected = true;
            }
            return connack.ReturnCode;
        }

비동기 구현 3(MqttClient.cs)

- 비동기 구현2의 Connect함수가 끝날때까지 기다린다음에 Callback 전송(M2MqttUnityClient로 전송)

public delegate void Callback();

public async void ConnectCallback(string clientId,
    string username,
    string password, Callback callback)
{
    //Connect함수 끝날때까지 기다리고 콜백 생성
    await this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
    callback();
}

비동기 구현 4(M2MqttUnityClient.cs)

- MqttClient에서 연결이 끝나면 Callback 함수를 받아서 연결 상태에 받게 결과 출력

private IEnumerator DoConnect()
        {
            // wait for the given delay
            yield return new WaitForSecondsRealtime(connectionDelay / 1000f);

            /* ... 생략 ... */

            //Connect 콜백으로 소켓 연결 끝날때 콜백받아옴.
            client.ConnectCallback(clientId, mqttUserName, mqttPassword, () => {
                Debug.Log("client.Connect end");
                
                if(client == null){
                    OnConnectionFailed("CONNECTION FAILED!");
                    return;
                }
                if (client.IsConnected)
                {
                    client.ConnectionClosed += OnMqttConnectionClosed;
                    // register to message received 
                    client.MqttMsgPublishReceived += OnMqttMessageReceived;
                    mqttClientConnected = true;
                    OnConnected();

                }
                else
                {
                    OnConnectionFailed("CONNECTION FAILED!");
                }
            });

        }