Boosting SSIS Performance By Replacing Lookup With Script Transform

We recently had a situation where we wanted to load 20 million rows from one database server (on-premise) to another (cloud), and the SSIS package was run on the source server. For privacy reasons, we had to transform some of the the columns (age and postal code) during this load, which we solved using lookups against a couple of destination dimension tables. Normally, I would load data into a staging table at the destination and then perform all lookups when moving from the staging table to the fact table, but in this case it wasn’t acceptable to store sensitive data in the staging table.

This was fine during testing, but didn’t work at all in production, where we had to do an initial loading of about 20 million rows. It turned out our package was only able to load 10 000 rows per 8 minutes, so the whole lot would take 11 days. Of course, the connection was broken after a couple of days.

It turned out that although we had (partial) caching, the package repeatedly queried the destination database with queries like these:

exec sp_executesql N'select * from [Dim_FbfPostnummer] 
 where [Postnummer] = @P1 
 and ([FrånOchMed] <= @P2 or [FrånOchMed] is null) 
 and (@P3 < Till or Till is null)',N'@P1 char(5),@P2 datetime2(3),@P3 datetime2(3)','79495','2007-07-02 15:09:01','2007-07-02 15:09:01' 

I suspect that because the dates were never the same, SSIS could not cache this properly. We had to find another solution.

After some thinking and searching, I found help on Matt Masson’s blog on MSDN: http://blogs.msdn.com/b/mattm/archive/2008/11/25/lookup-pattern-range-lookups.aspx

I adopted some of his code. The first lookup was fairly simple – lookup an age group from age (in years). There should always be a match, and the dimension is constant (not slowly changing). The only real difficulty I found was how to use our existing OLEDB connection. I found another helpful blog post by Matt Masson: http://blogs.msdn.com/b/mattm/archive/2008/08/22/accessing-oledb-connection-managers-in-a-script.aspx

Here is the code for this lookup script. A reference to Microsoft.SQLServer.ManagedDTS must be added.

/* Microsoft SQL Server Integration Services Script Component
*  Write scripts using Microsoft Visual C# 2008.
*  ScriptMain is the entry point class of the script.*/

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Collections.Generic;
using System.Data.OleDb;

public struct AgeGroup
{
    public int DWKey;
    public int From;
    public int To;

    public AgeGroup(int dwKey, int from, int to)
    {
        this.DWKey = dwKey;
        this.From = from;
        this.To = to;
    }
}

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    OleDbConnection _connection = null;
    List<AgeGroup> _cache = new List<AgeGroup>();
    AgeGroup _default;
    const string Query = "SELECT DWKey, FrånOchMedÅr, TillÅr FROM Dim_Åldersgrupp";

    public override void PreExecute()
    {
        base.PreExecute();
        /*
          Add your code here for preprocessing or remove if not needed
        */
        Microsoft.SqlServer.Dts.Runtime.ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(Connections.DataWarehouse);
        IDTSConnectionManagerDatabaseParameters100 cmParams = cm.InnerObject as IDTSConnectionManagerDatabaseParameters100;
        _connection = cmParams.GetConnectionForSchema() as OleDbConnection;
        FillCache();
    }

    private void FillCache()
    {
        var command = new OleDbCommand(Query, _connection);
        var reader = command.ExecuteReader();
        try
        {
            while (reader.Read())
            {
                if (reader.IsDBNull(1) && reader.IsDBNull(2))
                    _default = new AgeGroup(reader.GetInt32(0), int.MinValue, int.MaxValue);
                else
                {
                    int from = reader.IsDBNull(1) ? int.MinValue : reader.GetInt32(1);
                    int to = reader.IsDBNull(2) ? int.MaxValue : reader.GetInt32(2);
                    var ageGroup = new AgeGroup(reader.GetInt32(0), from, to);
                    _cache.Add(ageGroup);
                }
            }
        }
        finally
        {
            reader.Close();
        }
    }

    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        if (Row.AgeInYearsInt_IsNull)
            Row.Åldersgrupp = _default.DWKey;
        else
            Row.Åldersgrupp = _cache.Find(item => item.From <= Row.AgeInYearsInt && Row.AgeInYearsInt < item.To).DWKey;
    }
}

The second one was a little bit harder. This dimension was slowly changing. For an incoming postal code and date, we must look up the correct version of the dimension, because an important parameter is population, and that varies from year to year. In addition, if we don’t get a match in the lookup, we follow a different path in the dataflow, so this transform must have two outputs.

Here is the code I wrote:

/* Microsoft SQL Server Integration Services Script Component
*  Write scripts using Microsoft Visual C# 2008.
*  ScriptMain is the entry point class of the script.*/

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Collections.Generic;
using System.Data.OleDb;

public class Postnummer
{
    public int DWKey;
    public DateTime From;
    public DateTime To;
    public int? Befolkning;

    public Postnummer(int dwKey, DateTime from, DateTime to, int? befolkning)
    {
        this.DWKey = dwKey;
        this.From = from;
        this.To = to;
        this.Befolkning = befolkning;
    }
}

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    OleDbConnection _connection = null;
    Dictionary<string, List<Postnummer>> _cache = new Dictionary<string, List<Postnummer>>();
    const string Query = "SELECT DWKey, Postnummer, FrånOchMed, Till, Befolkning FROM Dim_FbfPostnummer";

    public override void PreExecute()
    {
        base.PreExecute();
        /*
          Add your code here for preprocessing or remove if not needed
        */
        Microsoft.SqlServer.Dts.Runtime.ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(Connections.DataWarehouse);
        IDTSConnectionManagerDatabaseParameters100 cmParams = cm.InnerObject as IDTSConnectionManagerDatabaseParameters100;
        _connection = cmParams.GetConnectionForSchema() as OleDbConnection;
        FillCache();
    }

    private void FillCache()
    {
        var command = new OleDbCommand(Query, _connection);
        var reader = command.ExecuteReader();
        try
        {
            int rows = 0;
            while (reader.Read())
            {
                int dwKey = reader.GetInt32(0);
                string pnr = reader.GetString(1);
                DateTime from = reader.IsDBNull(2) ? DateTime.MinValue : reader.GetDateTime(2);
                DateTime to = reader.IsDBNull(3) ? DateTime.MaxValue : reader.GetDateTime(3);
                int? befolkning = reader.IsDBNull(4) ? null : (int?)reader.GetInt32(4);
                var postnummer = new Postnummer(dwKey, from, to, befolkning);
                if (_cache.ContainsKey(pnr))
                {
                    var list = _cache[pnr];
                    list.Add(postnummer);
                }
                else
                    _cache.Add(pnr, new List<Postnummer>() { postnummer });
                rows++;
            }
            Log("Cache initialized.", rows, null);
        }
        finally
        {
            reader.Close();
        }
    }

    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        /*
          Add your code here
        */
        Postnummer postnummer = null;
        if (Row.NationalRegZipCode != null && _cache.ContainsKey(Row.NationalRegZipCode))
        {
            var list = _cache[Row.NationalRegZipCode];
            postnummer = list.Find(item => item.From <= Row.SignedTimeRounded && Row.SignedTimeRounded < item.To);
        }
        if (postnummer == null)
        {
            Row.FbfPostnummerDWKey_IsNull = true;
            Row.FbfPostnummerBefolkning_IsNull = true;
            Row.DirectRowToLookupNoMatch();
        }
        else
        {
            Row.FbfPostnummerDWKey = postnummer.DWKey;
            if (postnummer.Befolkning == null)
                Row.FbfPostnummerBefolkning_IsNull = true;
            else
                Row.FbfPostnummerBefolkning = postnummer.Befolkning.Value;
            Row.DirectRowToLookupMatch();
        }
    }

}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s