Implementing a Custom BizTalk Adapter as a Custom WCF Channel Part 2 – Receive

(Please, first read part 1 if you haven’t already done so.)

The receive side is more complex than the send side. In short, the TransportBindingElement builds a ChannelListener, which in turns creates an InputChannel. You might think that the InputChannel (or ChannelListener) would call the framework when it has received a new message, but it is in part the other way around. Messages are put in a queue by the ChannelListener when they are received. The framework calls BeginTryReceive on the InputChannel, where the channel waits until a message is available in the queue, and when one becomes available it issues a call-back to the framework. The framework then calls EndTryReceive, which returns the message.

Class Interaction Sequence

Here is a sequence diagram illustrating the calls when the receive location is enabled:

image

When a file is received, the FileSystemWatcher notifies FileChannelListener which creates a message and enqueues it. FileReceiveChannel, which is waiting for a message to be available in the queue, is notified and issues a callback to notify the framework, which calls EndTryReceive to get the message:

image

When the receive location is disabled, the channel listener and channel are closed:

image

Code

Here is the code for the channel listener:

#region using
using System;
using System.IO;
using System.Xml;
using System.Diagnostics;
using System.Collections.Generic;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;
using Microsoft.ServiceModel.Samples;
#endregion

internal class FileChannelListener : ChannelListenerBase<IInputChannel>
{
    private string filename;
    private bool useXmlWrapper;
    private Uri uri;
    private InputQueue<IInputChannel> channelQueue;
    private FileReceiveChannel currentChannel;
    private FileSystemWatcher fsWatcher;

    internal FileChannelListener(FileBindingElement bindingElement, BindingContext context)
        : base(context.Binding)
    {
        Debug.WriteLine("Constructor called", GetType().FullName);
        Debug.WriteLine("Uri=" + context.ListenUriBaseAddress.ToString() + context.ListenUriRelativeAddress.ToString(), GetType().FullName);
        Debug.WriteLine("Filename=" + bindingElement.FileName, GetType().FullName);
        Debug.WriteLine("UseXmlWrapper=" + bindingElement.UseXmlWrapper, GetType().FullName);

        this.uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
        this.filename = bindingElement.FileName;
        this.useXmlWrapper = bindingElement.UseXmlWrapper;
        this.channelQueue = new InputQueue<IInputChannel>();
        this.fsWatcher = new FileSystemWatcher();
    }

    #region CommunicationObject Members

    /// <summary>
    /// Open the listener for use. Start listening for new files.
    /// </summary>
    protected override void OnOpen(TimeSpan timeout)
    {
        Debug.WriteLine("OnOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        // Check if it is a valid directory
        string dir = Path.GetDirectoryName(filename);
        if (!Directory.Exists(dir))
            throw new ApplicationException(string.Format("Directory {0} does not exist.", dir));
        // Check if there are files there already and if so, handle them.
        string filemask = Path.GetFileName(filename);
        string[] existingFiles = Directory.GetFiles(dir, filemask);
        foreach (string f in existingFiles)
        {
            OnReceive(this, new FileSystemEventArgs(WatcherChangeTypes.Created, Path.GetDirectoryName(f), Path.GetFileName(f)));
        }
        // Set up the file system watcher so that we get notified when new files arrive.
        fsWatcher.Path = dir;
        fsWatcher.Filter = filemask;
        fsWatcher.Created += new FileSystemEventHandler(this.OnReceive);
        fsWatcher.EnableRaisingEvents = true;
    }

    protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override void OnEndOpen(IAsyncResult result)
    {
        Debug.WriteLine("OnEndOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    /// <summary>
    /// Shutdown gracefully
    /// </summary>
    protected override void OnClose(TimeSpan timeout)
    {
        Debug.WriteLine("OnClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        lock (this.ThisLock)
        {
            fsWatcher.EnableRaisingEvents = false;
            fsWatcher = null;
            this.channelQueue.Close();
        }
    }

    protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override void OnEndClose(IAsyncResult result)
    {
        Debug.WriteLine("OnEndClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    /// <summary>
    /// Shutdown ungracefully
    /// </summary>
    protected override void OnAbort()
    {
        Debug.WriteLine("OnAbort called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        // Abort can be called at anytime, so we can’t assume that we’ve been Opened successfully.
        lock (this.ThisLock)
        {
            if (fsWatcher != null)
            {
                fsWatcher.EnableRaisingEvents = false;
                fsWatcher = null;
            }
            this.channelQueue.Close();
        }
    }

    #endregion

    #region ChannelListenerBase Members

    //Synchronously returns a channel that is attached to this listener.
    protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
    {
        Debug.WriteLine("OnAcceptChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");

        if (!this.IsDisposed)
        {
            this.EnsureChannelAvailable();
        }

        IInputChannel channel;
        if (this.channelQueue.Dequeue(timeout, out channel))
        {
            return channel;
        }
        else
        {
            throw new TimeoutException();
        }
    }

    protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginAcceptChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        if (!this.IsDisposed)
        {
            this.EnsureChannelAvailable();
        }

        Debug.WriteLine("OnBeginAcceptChannel returning", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        return this.channelQueue.BeginDequeue(timeout, callback, state);
    }

    protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
    {
        Debug.WriteLine("OnEndAcceptChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        IInputChannel channel;
        if (this.channelQueue.EndDequeue(result, out channel))
        {
            return channel;
        }
        else
        {
            throw new TimeoutException();
        }
    }

    protected override bool OnWaitForChannel(TimeSpan timeout)
    {
        Debug.WriteLine("OnWaitForChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginWaitForChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override bool OnEndWaitForChannel(IAsyncResult result)
    {
        Debug.WriteLine("OnEndWaitForChannel called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }
    public override Uri Uri
    {
        get
        {
            return this.uri;
        }
    }
    #endregion

    #region Private members

    /// <summary>
    /// Called when there is a new file to dispatch.
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    private void OnReceive(object sender, FileSystemEventArgs e)
    {
        Debug.WriteLine("OnRecieve called. Filename=" + e.FullPath, GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");

        // We can be notified before the file is closed.
        // Therefore, try to rename the file in a loop until it succeeds.
        bool retry = true;
        int retryInterval = 1;
        do
        {
            try
            {
                File.Move(e.FullPath, e.FullPath + ".wip");
                retry = false;
            }
            catch (IOException ex)
            {
                Debug.WriteLine(retryInterval + " " + ex.Message, GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
                if (retryInterval < 30000) // Give up after 30 s
                {
                    Thread.Sleep(retryInterval);
                    retryInterval = retryInterval * 2;
                }
                else
                    throw ex;
            }
        }
        while (retry);

        // Since we cannot close and delete the file until the stream has been read,
        // create a buffered copy of the message.
        byte[] buffer = File.ReadAllBytes(e.FullPath + ".wip");
        Message message;
        if (useXmlWrapper)
            message = Message.CreateMessage(MessageVersion.Default, "Dummy", buffer);
        else
            message = Message.CreateMessage(MessageVersion.Default, "Dummy", XmlReader.Create(new MemoryStream(buffer)));
        message.Headers.To = uri;
        Dispatch(message);
        File.Delete(e.FullPath + ".wip");

        Debug.WriteLine("OnRecieve returning", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
    }

    /// <summary>
    /// Dispatch an incoming message to the waiting channel.
    /// </summary>
    private void Dispatch(Message message)
    {
        FileReceiveChannel newChannel;
        bool channelCreated = CreateOrRetrieveChannel(out newChannel);
        newChannel.Dispatch(message);
        if (channelCreated)
        {
            //Hand the channel off to whomever is waiting for AcceptChannel()
            //to complete
            this.channelQueue.EnqueueAndDispatch(newChannel);
        }
    }

    /// <summary>
    /// Guarantees that channel is attached to this listener.
    /// </summary>
    private void EnsureChannelAvailable()
    {
        FileReceiveChannel newChannel;
        bool channelCreated = CreateOrRetrieveChannel(out newChannel);

        if (channelCreated)
        {
            this.channelQueue.EnqueueAndDispatch(newChannel);
        }
    }

    /// <summary>
    /// If there is a channel attached to this listener, returns it.
    /// Othwerwise, creates a new channel.
    /// </summary>
    /// <param name="newChannel"></param>
    /// <returns></returns>
    private bool CreateOrRetrieveChannel(out FileReceiveChannel newChannel)
    {
        bool channelCreated = false;
        if ((newChannel = currentChannel) == null)
        {
            newChannel = new FileReceiveChannel(this);
            newChannel.Closed += new EventHandler(this.OnChannelClosed);
            currentChannel = newChannel;
            channelCreated = true;
        }

        return channelCreated;
    }

    /// <summary>
    /// Called when the channel has been closed.
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="args"></param>
    private void OnChannelClosed(object sender, EventArgs args)
    {
        FileReceiveChannel channel = (FileReceiveChannel)sender;
        if (channel == this.currentChannel)
        {
            this.currentChannel = null;
        }
    }

    #endregion
}

And here is the code for the input channel:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.ServiceModel;
using System.ServiceModel.Channels;
using Microsoft.ServiceModel.Samples;
using System.Threading;

internal class FileReceiveChannel : ChannelBase, IInputChannel
{
    private InputQueue<Message> messageQueue;

    internal FileReceiveChannel(FileChannelListener listener)
        : base(listener)
    {
        Debug.WriteLine("Constructor called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        this.messageQueue = new InputQueue<Message>();
    }

    #region CommunicationObject members

    protected override void OnOpen(TimeSpan timeout)
    {
        Debug.WriteLine("OnOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
    }

    protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override void OnEndOpen(IAsyncResult result)
    {
        Debug.WriteLine("OnEndOpen called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    //Closes the channel gracefully during normal conditions.
    protected override void OnClose(TimeSpan timeout)
    {
        Debug.WriteLine("OnClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        this.messageQueue.Close();
    }

    //Closes the channel ungracefully during error conditions.
    protected override void OnAbort()
    {
        Debug.WriteLine("OnAbort called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        this.messageQueue.Close();
    }

    protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("OnBeginClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    protected override void OnEndClose(IAsyncResult result)
    {
        Debug.WriteLine("OnEndClose called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    #endregion

    #region IInputChannel members

    public EndpointAddress LocalAddress
    {
        get
        {
            throw new NotImplementedException();
        }
    }

    public Message Receive()
    {
        Debug.WriteLine("Receive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public Message Receive(TimeSpan timeout)
    {
        Debug.WriteLine("Receive(timeout) called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public IAsyncResult BeginReceive(AsyncCallback callback, object state)
    {
        Debug.WriteLine("BeginReceive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("BeginReceive(timeout) called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public Message EndReceive(IAsyncResult result)
    {
        Debug.WriteLine("EndReceive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public bool TryReceive(TimeSpan timeout, out Message message)
    {
        Debug.WriteLine("TryReceive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        return this.messageQueue.Dequeue(timeout, out message);
    }

    public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("BeginTryReceive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        IAsyncResult result = this.messageQueue.BeginDequeue(timeout, callback, state);
        Debug.WriteLine("BeginTryReceive returning", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        return result;
    }

    public bool EndTryReceive(IAsyncResult result, out Message message)
    {
        Debug.WriteLine("EndTryReceive called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        bool ok = this.messageQueue.EndDequeue(result, out message);
        Debug.WriteLine("EndTryReceive returning", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        return ok;
    }

    public bool WaitForMessage(TimeSpan timeout)
    {
        Debug.WriteLine("WaitForMessage called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
    {
        Debug.WriteLine("BeginWaitForMessage called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    public bool EndWaitForMessage(IAsyncResult result)
    {
        Debug.WriteLine("EndWaitForMessage called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        throw new NotImplementedException();
    }

    #endregion

    #region Private methods

    //Hands the message off to other components higher up the
    //channel stack that have previously called BeginReceive()
    //and are waiting for messages to arrive on this channel.
    internal void Dispatch(Message message)
    {
        Debug.WriteLine("Dispatch called", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
        this.messageQueue.EnqueueAndDispatch(message);
        Debug.WriteLine("Dispatch returning", GetType().FullName + "(" + Thread.CurrentThread.ManagedThreadId + ")");
    }

    #endregion
}

Configure BizTalk

Create a new receive location, and choose transport type WCF-Custom. Press the Configure button. On the General tab, type an address, e.g. net.file://localhost/test.svc.

On the Binding tab, choose customBinding. Then remove existing binding elements and insert WCFFileAdapter. Specify the input file mask.

image

Testing

In part 3 of this series, I’m going to demonstrate how to test the channels without BizTalk.

Advertisements

One thought on “Implementing a Custom BizTalk Adapter as a Custom WCF Channel Part 2 – Receive”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s