High Level

PortGap Explained

Consider a scenario where data needs to be sent from Service (A) to Service (B). In addition to sending data to Service (B), Service (A) needs to persist some data locally in the same process. An example of this could be a product ordering service (A), that saves an order locally and then sends a message (command) to a shipping/distribution service (B).

The expected behavior would be to create both the order and the shipping instruction together, or not at all. It would be extremely undesirable if an order is recorded in (A), but the shipping instruction fails to be communicated to (B) and is never retried or managed.


In an environment where both services (A) & (B) utilize a common messaging framework, the integration could be handled by the messaging framework - PortGap doesn't play a role here.

In the event that the shipping service (B) is a 3rd party, or at least not utilizing a reliable messaging framework with (A), then the integration would need to be handled manually. PortGap fulfills this requirement.

Persistence

PortGap focuses on reliable delivery. To achieve this, messages are persisted first, before they are actually sent. It is important to note that persisting messages does incur some performance cost.

Send Behavior

PortGap utilizes two sending queues, and each queue has its own dedicated worker thread. When there is nothing to dequeue, each worker thread will back-off from polling their respective queues. 

A future iteration of PortGap will allow the configuration of multiple worker threads per queue - improving processing throughput.


  • The primary 'send queue' has its messages dequeued and sent based on priority and order of insertion. 
  • The second 'send-delay queue' has its messages sent based on a 'date/time'

Failure

By default, if a message fails to send, then it is moved to the delay queue with a retry 'date/time'. Once the send attempts exceed a configured limit, the message sending is considered 'expired'. Once the sending of a message has expired, it can then be: 

1) placed back in the send queue (relies on a priority value to send),

2) placed back in the delay queue (relies on a date/time to send), or 

3) discarded permanently.


Retry policies can be configured on different message types using data annotations. This type of behavior is known as 'at-least-once' sending.


PortGap supports 'at-most-once' sending. This means that no matter what, a message will never be sent more than once. In some instances, it is possible that a message is never sent at all.

Receiving

We are in the process of building in Receiving functionality. This will allow your application/service to receive an external message and persist it straight away. The processing of this message will then occur in a separate process, without the external sender being temporally coupled to the processing of the message by the receiver.

NuGet

The PortGap package is available on Nuget: Terralect.PortGap


Other useful supporting packages for PortGap are:

Terralect.PortGap.Microsoft.DependencyInjection (DI for PortGap to register its own services)

Terralect.PortGap.Newtonsoft (serialization/deserialization) of messages)

Terralect.PortGap.Serilog (logging)

Terralect.PortGap.SqlClient (persistence of messages to a SQL DB)


You can implement your own instances of these supporting packages, but we're currently using these for our own projects at the moment.


The sample ZIP at the bottom of this page demonstrates how these packages are utilized.

THE CODE

* Download Sample *

We have created a downloadable sample application that demonstrates the fundamentals of PortGap. It is available for download at the bottom of this page.

Demo Application

For demonstration purposes, we've created a console application that will function as our background service/endpoint. Our demo application is running on .NET Core, but you don't have to. To manage the lifecycle of the application we've used the 'Microsoft.Extensions.Hosting' package. This code is based on our downloadable sample application.


See the below code for the scaffolding of our demo application:

    

    class Program

    {

        static async Task Main(string[] args)

        {

            var host = new HostBuilder()

                .ConfigureServices((hostContext, serviceCollection) =>

                {

                    ConfigurePortGap(serviceCollection);

                    serviceCollection.AddHostedService<LocalService>();

                })

                .Build();

            await host.RunAsync();

        }


        static void ConfigurePortGap(IServiceCollection serviceCollection)

        {

            //We'll be adding the PortGap config here

        }

    }

    public class LocalService : IHostedService

    {

        public Task StartAsync(CancellationToken cancellationToken)

        {

            Console.WriteLine("Started");

            return Task.CompletedTask;

        }


        public Task StopAsync(CancellationToken cancellationToken)

        {

            Console.WriteLine("Stopped");

            return Task.CompletedTask;

        }

    }

Essential PortGap Configuration

Dependency Injection:

We've created a dependency injection proxy package that uses 'Microsoft.Extensions.DependencyInjection'. This assists PortGap in registering necessary classes that are instantiated during operation. Install the 'Terralect.PortGap.Microsoft.DependencyInjection' package.


Serialization:

PortGap requires a serializer to convert messages into byte arrays, and vice versa. We're going to use our Newtonsoft JSON package for this. Install 'Terralect.PortGap.Newtonsoft'. 

* You can easily implement your own.


Logging:

PortGap also requires a logger, so that internal information and errors can be written to output. We're using 'Terralect.PortGap.Serilog'

* You can easily implement your own.


Persistence:

To get started with the PortGap related configuration, you'll need to install the PortGap NuGet package 'Terralect.PortGap'. 

You'll then need to decide on what persistence technology you would like to use. We're currently supporting Microsoft SQL Server persistence using the SqlClient library. Install the 'Terralect.PortGap.SqlClient' package.

When using the 'Terralect.PortGap.SqlClient' package, the minimum necessary configuration should include the database connection string.


The ConfigurePortGap() method now looks as follows:


    static void ConfigurePortGap(IServiceCollection serviceCollection)

    {

        var portGapService = PortGapSetup.Configure(serviceCollection.CreateProxy(), config =>

        {

                config.RegisterSerializer<NewtonsoftJsonProxy>();

                config.RegisterLogger<SerilogProxy>();

                config.RegisterPersistence<SqlPersistence>(persistenceConfig =>

                {

                        persistenceConfig.Database(DbConnectionString);

                });

        });

        serviceCollection.AddSingleton(portGapService);

    }


NOTE: In the sample code above, we've decided to register the portGapService object instance as a Singleton, so that we can inject it back into the LocalService.cs during the application start. You don't have to do it this way. You COULD call "portGapService.Start(cancellationToken);" at this point, but be sure to have all other application dependency injection registrations completed first.


NOTE: You only need the reference to 'portGapService' on a service/worker endpoint, as this is how you control the Start() and Stop() of the PortGap workers. If an application is made up of a Web API AND a running service, you likely won't have PortGap workers running on the Web API, but rather on the service/worker endpoint. If your application consists of ONLY a web API, then having PortGap workers running on the website itself could prove problematic if the website is not perpetually running (IIS application pools can idle/sleep when not in use).


If you've decided to use the 'Terralect.PortGap.SqlClient' package, then some OPTIONAL settings are available to you:


    persistenceConfig.Database(DbConnectionString, settings =>

    {

        settings.SetSchema(x); //Default is 'dbo'

        settings.SetPayloadVarbinarySize(x); //Default is varbinary(max)

        settings.SetSendQueueName(x); //Default is PortGapSend

        settings.SetDelayQueueName(x); //Default is PortGapDelay

        settings.SetSentQueueName(x); //Default is PortGapSent

        settings.SetCorrelationIdMaxLength(x); //Default is 32 (GUID without hyphens)

        settings.SetDelayInformationMaxLength(x); //Default is 4096

    });


######

NOTE: You can implement your own persistence for PortGap, provided you implement the necessary interfaces. To get started you would need to implement the IPersistenceRegistration interface, then implement and register your own 'accessor' and 'scope':

    

    public class YourPersistence : IPersistenceRegistration

    {

        public void RegisterTypes(PersistenceTypeRegister persistenceTypeRegister)

        {

            persistenceTypeRegister.PersistenceAccessor<YourDbAccessor>();

            persistenceTypeRegister.PersistenceScope<YourDbScope>();

        }

    }

######

Messages

Now that the configuration is done, you can start working on the message contracts that PortGap will use for sending.

In our demo app, we'll create a message that represents the instruction payload for an order shipping service:

    

    public class ShipOrderMessage : IMessage

    {

        public string CorrelationId { get; set; }

        public int UserId { get; set; }

        public string ItemName { get; set; }

        public int Quantity { get; set; }

        public string Destination { get; set; }

        public DateTimeOffset CreatedAt { get; set; }

    }


All messages should implement IMessage. This interface requires that you have a 'string CorrelationId' property. Leaving this CorrelationId empty will cause PortGap to allocate a CorrelationId automatically. By default, PortGap generates a sequential GUID without any hyphens (eg. "ce950dfa4fdf4dc1a3d7aafe03c6ed48")

 * You need to ensure that your serializer can serialize/deserialize your messages. For simplicity, we have used Automatic Properties.

 * The CorrelationId property assists both senders and receivers in identifying messages that have been encountered before (message idempotency).


You may choose to set the CorrelationId yourself:


    ...

    public string CorrelationId { get; set; }

    public ShipOrderMessage(string orderId, int userId, string itemName, int quantity, string destination, DateTimeOffset createdAt)

    {

        CorrelationId = orderId;

    ...


Once you have defined the contract for a message, you can also set a custom retry policy:


    [Retry(IntervalType.Second, 3, 5, 2)]

    public class ShipOrderMessage : IMessage

    {

    ...


The [Retry(IntervalType.Second, 3, 5, 2)] data annotation indicates to PortGap that if this messages fails, it should retry with an interval of 3 seconds, up to 5 times, with an exponential back-off factor of 2. 


Retry is calculated as: Math.Pow(exponentialFactor, failCount - 1) * interval

In the above example it will retry in 3 sec, 6 sec, 12 sec, 24 sec, and then 48 sec.

The default retry policy for all messages is: [Retry(IntervalType.Second, 1, 3, 2)]


In certain scenarios, you may want to ensure a message is sent 'at most once'. If a message completely fails to send, it will not retry. This could mean that messages never get sent. To achieve this, PortGap will actually permanently remove the message from the respective send queue prior to sending, and then attempt to send. At-most-once delivery of a message is indicated as follows:


    [AtMostOnce(true)]

    public class ShipOrderMessage : IMessage

    {

    ...


You define the priority of a message to be sent, per message type. By default all messages have a priority of 1. If a second message type has a priority of 2, then it will only be sent once all priority 1 messages have been sent. Setting a custom priority of a message is done as follows:


    [Priority(1)]

    public class ShipOrderMessage : IMessage

    {

    ...

Queue to Send

To send a message, or rather queue up a message to be sent, you need to make use of the IMessageSender. Simply inject this interface into the class responsible for sending, and call the SendMessage() method. See the below code sample:


    public class ShippingIntegration : IShippingIntegration

    {

        private readonly IMessageSender messageSender;


        public ShippingIntegration(IMessageSender messageSender)

        {

            this.messageSender = messageSender;

        }


        public void ShipOrder(int userId, string itemName, short quantity, string destination)

        {

            //Places the message in the queue for sending

            messageSender.SendMessage(new ShipOrderMessage

            {

                UserId = userId,

                ItemName = itemName,

                Destination = destination,

                Quantity = quantity,

                CreatedAt = DateTimeOffset.Now,

            });

        }

    }

The SendMessage() method also takes a SendOptions() parameter that allows you to override the default priority for the message, or provide a specific date/time that you would like the message to send:


    messageSender.SendMessage(new SampleMessage("Hello"), new SendOptions(x));

Senders

Next you'll need to implement the integration between the local service and the recipient, as this is specific to each application. Start by implementing the interface 'IHandleSending<>':


    public class MessageSender : IHandleSending<ShipOrderMessage>

    {

        public SendResult Send(ShipOrderMessage message)

        {

            //This is where you perform the integration with the recipient of the message.

            //This could be an HTTP call, FTP file drop, pass message on to RabbitMQ etc.

            return new SendResult(); //An empty send result indicates a success

        }

    }


To indicate to PortGap that an error took place, you would need to return an error message in SendResult():


    public SendResult Send(ShipOrderMessage message)

    {

        if (DidFailToSend())

        {

            //If an error occurs when sending your message to the recipient, return SendResult("error") with some error message

            //* You could also add custom application logic in here (logging, alerts, etc.)

            return new SendResult("THIS IS WHERE YOU'LL PUT IN SOME INFORMATIONAL ERROR MESSAGE");

        }


        if (DidFailToSend())

        {

            //Another example of how an error is indicated back to PortGap

            //Here we override the default retry policy of this message, and retry in 5 minutes

            return new SendResult("ERROR", TimeSpan.FromMinutes(5));

        }

        return new SendResult(); //SUCCESS

    }


If an unhandled exception is thrown in your Send() method, then PortGap will use the message of the exception to indicate the error. It is better to handle exceptions yourself and return a SendResult(err_message), so that you can detail the error message more accurately.

Send Expiry

After each failed attempt to send a message, PortGap increments an internal counter. This counter determines the retry time interval based on the [Retry] policy (or the default, if one is not specified). Once the counter exceeds the maximum number of send attempts for that message, PortGap considers the sending 'expired'.


To handle the expiry of message sending, you need to implement the IHandleSendExpiry<> interface:


    public class SendExpiryHandler : IHandleSendExpiry<ShipOrderMessage>

    {

        public ISendExpiryAction HandleSendExpiry(ShipOrderMessage message, string errorMessage)

        {

            //Return the desired behaviour

        }

    }


This handler will automatically be invoked when the send expiry occurs. At this point you can choose to implement your behaviour, but you still need to indicate back to PortGap what you want it to do next. Currently there are 3 options available:


1) Discard Message: Permanently removes the message from sending (message may never be sent)

2) Resend Message: Place the message back in the primary send queue, which then dequeues to send again based on the message priority.

3) Retry Message: Places the message in the delay send queue (if it isn't there already), and retries at a specified point in time.


The below code sample shows these 3 send expiry actions:


    public class SendExpiryHandler : IHandleSendExpiry<ShipOrderMessage>

    {

        public ISendExpiryAction HandleSendExpiry(ShipOrderMessage message, string errorMessage)

        {

            if (DontCareIfMessageGetsDelivered)

                return new DiscardMessage();


            if (ShouldPutMessageBackIntoPrimarySendQueue)

                return new ResendMessage();


            if (KeepMessageInDelayQueue)

                return new RetryMessage(DateTimeOffset.Now.AddDays(1), false);

            

            return new ResendMessage();

        }


        private bool DontCareIfMessageGetsDelivered => RandomBool.Value();

        private bool ShouldPutMessageBackIntoPrimarySendQueue => RandomBool.Value();

        private bool KeepMessageInDelayQueue => RandomBool.Value();

    }


If no expiry handler has been created, then PortGap automatically performs the 'Retry' expiry action. PortGap will continue to retry sending the message at the maximum retry interval for that message. The internal send attempt 'counter' will stay fixed/capped at the maximum retry count. For example, if the maximum send attempts for a message is 3, then the internal counter will not exceed 3, irrespective of the number of actual retries that have been performed.

Persistence

If a PortGap message needs to be sent and your application requires some data to be persisted locally at the same time, then you would most likely want these two actions/writes to be performed in the same database transaction. When making use of the 'Terralect.PortGap.SqlPersistence' package, you can gain access to the current database connection + transaction using the SqlDatabaseScope object.


The SqlDatabaseScope is registered as Scoped can be injected into the current class, or created using a Service Provider. You can then pass on your existing DbConnection and DbTransaction to the SqlDatabaseScope using the sqlDatabaseScope.AttachExistingTransaction() method. PortGap will then make use of this DB connection when you call SendMessage().


Below is a very primitive example to demonstrate this:


    using (var context = new LocalDbContext(connectionStrings.DefaultConnection))

    {

        using (var transaction = context.Database.BeginTransaction())

        {

            sqlDatabaseScope.AttachExistingTransaction(context.Database.GetDbConnection(), transaction.GetDbTransaction());

            //Make changes to your DB

            //Send a message via PortGap


            context.SaveChanges();

            transaction.Commit();

        }

    }


There are many approaches for sharing the same DB connection between your application and PortGap. The below code shows how you could new up a DB connection, pass it into the scoped SqlDatabaseScope, and pass it into a newly created scoped container called DatabaseAccessor. The DatabaseAccessor could then contain a newly created EF DbContext that also shares the same DB connection:


    public class ScopedServiceProvider

    {

        private readonly IServiceProvider serviceProvider;

        private readonly ConnectionStrings connectionStrings;


        public ScopedServiceProvider(IServiceProvider serviceProvider, ConnectionStrings connectionStrings)

        {

            this.serviceProvider = serviceProvider;

            this.connectionStrings = connectionStrings;

        }


        public void RunInScope<TService>(Action<TService> serviceDelegate)

        {

            var newScopedProvider = serviceProvider.CreateScope().ServiceProvider;

            using (var connection = new SqlConnection(connectionStrings.DefaultConnection))

            {

                connection.Open();

                using (var transaction = connection.BeginTransaction())

                {

                    var portGapDbScope = (SqlDatabaseScope)newScopedProvider.GetService(typeof(SqlDatabaseScope));

                    using (var dbContext = new LocalDbContext(connection))

                    {

                        //Pass the existing DB connection + transaction onto PortGap

                        portGapDbScope.AttachExistingTransaction(connection, transaction);

                        //Pass the existing DB connection + transaction onto EF DbContext

                        dbContext.Database.UseTransaction(transaction);

                        //Attached the LocalDbContext into a DatabaseAccessor, so that it (DatabaseAccessor+LocalDbContext) can be injected later

                        var dbAccessor = (DatabaseAccessor)newScopedProvider.GetService(typeof(DatabaseAccessor));

                        dbAccessor.AssignExistingDbContext(dbContext);


                        var serviceInstance = newScopedProvider.GetService(typeof(TService));

                        serviceDelegate((TService) serviceInstance);

                        dbContext.SaveChanges();

                    }

                    transaction.Commit();

                }

            }

        }        

    }

    //This class would need to be registered as scoped, and injected into the class that contains your local DB access

    public class DatabaseAccessor

    {

        private LocalDbContext localDbContext;

        public void AssignExistingDbContext(LocalDbContext value)

        {

            localDbContext = value;

        }

        internal LocalDbContext LocalDbContext => localDbContext;

    }

    //Making use of above code:

    scopedServiceProvider.RunInScope<IPlaceOrderService>(service =>

    {

        service.PlaceOrder(555, "Apple", 3);

    });


Making use of a Pipes & Filters pattern could be useful in creating the shared connection in one filter, before calling into the next filter to perform the application logic + local persistence.

Preserve Messages Sent

  • Once a message has been sent, it is removed from its respective send queue. If you want to have a record of messages sent, you can enable this in the configuration of PortGap:


    var portGapService = PortGapSetup.Configure(serviceCollection.CreateProxy(), config =>

    {

        config.PreserveSentMessages();

    ...

Adding the above configuration will then indicate to the Persistence layer to copy the sent message to a 'sent' location prior to deleting sent messages.

To access the sent messages, you can make use of the IPersistenceAccessor or write your own accessors. This interface currently gives you access to all the persistence related calls used by PortGap. The methods relating to sent messages are:


    ushort DiscardSentMessages(ushort batchSize, DiscardSentParameters discardParameters);

    void ResendSentMessage(object recordId);

    List<MessageSentData> FetchSentMessages(ushort batchSize, FetchSentParameters fetchParameters);