MessageQueue 구현하기


메세지큐(MessageQueue) 가 무엇인지 모두 잘 아실거라고 생각합니다.
여기서 제가 말하는 메세지 큐란 별도의 메시지를 처리하는 Queue 가 있고, 이는 주기적으로 계속 쌓여있는 메세지를 순차적으로 수행하는 큐입니다.

큐에 메세지가 들어오면, 수행하고...
없으면 대기하고 있다가, 새로운 메세지가 들어오면, 그때서 또 다시 메세지를 수행하는 작업이 진행됩니다.

코드는 아래와 같습니다.
설명은 생략합니다.

[ICommand]


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ThreadTdd
{
    public interface ICommand
    {
        void Execute();
    }
}
 





[MessageQueue]


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;


namespace ThreadTdd
{
    public class MessageQueue : IDisposable
    {
        private Queue<ICommand> CommandList = new Queue<ICommand>();
        private ManualResetEvent Controller = new ManualResetEvent(false);
        private Thread QueueThread = null;
        private Object QueueLockObj = new object();
        public bool IsAlive { get; private set; }

        //--------------------------------
        // 생성자
        //--------------------------------
        public MessageQueue() {
            this.QueueThread = new Thread(this.ExecuteMessage) {
                IsBackground = true, Name = "MessageQueue"
            };
            this.IsAlive = true;
            this.QueueThread.Start();
        }



        //--------------------------------
        // Command 를 메세지큐에 넣음
        //--------------------------------
        public void Enqueue(ICommand command) {
            // Queue 에 Command 를 Enqueue
            this.CommandList.Enqueue(command);

            // 대기상태에 빠져 있을지 모르니, Thread 를 동작하게 만듦
            this.Controller.Set();
        }



        //--------------------------------
        // Queue 에 쌓여있는 Command 를 수행
        //--------------------------------
        private void ExecuteMessage() {
            try {
                while (IsAlive) {
                    // Queue 에 있는 모든 Command 를 수행
                    while (this.CommandList.Count > 0) {
                        ICommand command = this.CommandList.Dequeue();
                        command.Execute();
                    }
                    // 다 수행하고 나면, 대기상태로 진입
                    this.Controller.Reset();
                    this.Controller.WaitOne(Timeout.Infinite);
                }
            } catch (ThreadAbortException) { }
        }



        //----------------------
        // 리소스 해제
        //----------------------
        public void Dispose() {
            this.IsAlive = false;
            this.Controller.Set();
            this.QueueThread.Abort();
        }
    }
}
 




참고로, Thread 에 Safe 하도록 만들기 위해서는
Enqueue 하는 부분과 Queue check 하는 부분을 동기화 해야 합니다.

궁금하신게 있으면, 댓글을 달아주세요 :)


  • 데드라 2018.05.21 17:49

    main 쪽에서 사용하는 예제도 함께 올려주시면 안될까요?

  • 이름없는개발자 2018.12.19 17:18

    덕분에 수신버퍼로 사용하는 큐를 잘 구현했습니다. 감사합니다.