Autogenerera SSIS-paket som läser från Twitter med hjälp av Biml och en Scriptkomponent

By | april 19, 2016

Jag har tidigare skrivit en artikel om hur du kan autogenerera SSIS-paket för att läsa in textfiler i din databas med hjälp av Biml. Det var bara ett exempel på hur du kan spara tid genom att autogenerera SSIS-paket. Det går även att läsa från andra, mer avancerade datakällor, genom att autogenerera en script-komponent. Denna artikel handlar om hur du autogenererar en scriptkomponent för Twitter, men samma metod kan användas för att läsa från andra mer avancerade datakällor med hjälp av en scriptkomponent.

Artikeln förutsätter att du använder antingen BIDS Helper (ett gratis verktyg) eller MIST (en kommersiell produkt från Varigence).

Vårt mål

Målet för denna artikel är att autogenerera ett SSIS-paket som ser ut som nedan:

TwitterBimlDemo1

TwitterBimlDemo2

SSIS-paketet skall:

  1. Trunkera en databastabell
  2. Läsa från Twitter
  3. Skriva till databastabellen

Att autogenerera SSIS-paket som läser från Twitter är kanske inte det mest användbara (även om det är ett bra exempel), men samma metod att använd en scriptkomponent kan användas för många andra intressanta datakällor. Att kunna autogenerera scriptkomponenter är därför mycket användbart.

Twitter API

Twitter har flera API:er, bland annat ett ”Search API”. Detta API är öppet för vem som helst att använda (men kräver ett konto). Det är också relativt enkelt att använda.

Autentisering

Det finns flera olika sätt att autentisera sig mot API:et. Jag har valt att använda Application-only authentication, men det finns också andra alternativ såsom att autentisera sig som en specifik användare istället.

Autentiseringen kräver att du har en registrerad applikation. Du kan enkelt och gratis registrera en applikation hos Twitter här. När du har registrerat en Twitterapp så får du en Consumer Key och en Consumer Secret.

Twitter App Consumer Key Secret

För att autentisera dig skickar du din Consumer Key och din Consumer Secret, enligt protokollet OAuth, och får tillbaka en Bearer Token. Denna Bearer Token måste du sedan inkludera i alla anrop till Twitters Search API.

Twitter Search API

Att använda Search API är enkelt. Du skickar dina sökparametrar i en URL och får tillbaka sökresultatet i JSON-format. Exempel:

https://api.twitter.com/1.1/search/tweets.json?q=%23mssql

söker på inlägg som är taggade med #mssql (notera att %23 betyder #).

När du anropar Search API, så glöm inte att inkludera en autentiseringsheader:

Authorization: Bearer AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA%2FAAAAAAAAAAAAAAAAAAAA%3DAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

I .NET kan du lägga till den så här:

request.Headers.Add("Authorization", "Bearer " + token);

Sökresultatet kan då se ut så här:

"{\"statuses\":[{\"created_at\":\"Tue Apr 19 05:06:27 +0000 2016\",\"id\":722290241235730432,\"id_str\":\"722290241235730432\",\"text\":\"Build ISA QA Trade 2015.01_20160419.1 succeeded for New Tech in 186 minutes #TFS #ContinuosIntegration #MSSQL #SSIS\",...

JSON parser

Jag har skrivit en egen enkel JSON-parser. Den är inte tänkt att användas i produktionsmiljöer, så om du skall använda mitt exempel i produktion är det bäst att titta på andra alternativ.

Generering av SSIS-paket

Vi kommer nu att fortsätta exemplet med att:

  1. Skapa databastabell
  2. Skapa ett SSIS-projekt (i Visual Studio/BIDS Helper) eller ett projekt i MIST
  3. Lägg till Biml i ditt projekt
  4. Generera SSIS-paketet

Skapa databastabell

Jag har döpt min testdatabas till JohanBimlDemo. Du kan naturligtvis döpa den till något annat, om du ändrar namnet på alla ställen i mina exempel.

Här är SQL-koden för att skapa tabellen:

USE JohanBimlDemo
GO
CREATE TABLE [dbo].[Twitter]
(
[IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED,
[Text] [nvarchar](250) NOT NULL,
[Username] [nvarchar](50) NOT NULL,
[CreatedAt] [nvarchar](50) NOT NULL
);

Skapa ett projekt

Om du använder BIDS Helper så är det bara att skapa ett SSIS-projekt som vanligt i Visual Studio. Välj New Project… -> Business Intelligence -> Integration Services Project.

Om du istället använder MIST så skapar du istället där ett projekt som vanligt.

Biml

Jag har lagt in min kompletta Bimlkod längst ned i denna artikel. Detta eftersom den är såpass lång.

För att lägga till den i BIDS Helper / Visual Studio skapar du en ny Biml fil och kopierar in Bimlkoden.

TwitterBimlDemo3

Generera SSIS-paket

Innan du kan testa att läsa från Twitter måste du fylla i din autentiseringsinformation. Leta upp de här raderna i början av Bimlkoden och fyll i med dina egna uppgifter.

<Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key here</Variable>
<Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret here</Variable>
<Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable>

Om du använder BIDS Helper så autogenererar du sedan ditt SSIS-paket genom att högerklicka på din Bimlfil och välja ”Generate SSIS Packages”.

I MIST autogenererar du ditt SSIS-paket genom att välja ”Build”.

Sedan är det färdigt att köra!

Hur det fungerar

Nu kommer vi till den viktigaste delen av artikeln, hur allt egentligen fungerar.

Först i källkoden finns en Connection och några Variables. Under Tasks finns sedan en ExecuteSQL, som utför trunkeringen av tabellen. Sedan börjar det viktigaste, under Dataflow.

ScriptComponentSource

En ScriptComponentSource använder sig av ett ScriptComponentProject för att definiera själva scriptet.

Det går att definiera ett ScriptComponentProject både ”globalt” i Biml (där samma källkod kan delas av flera ScriptComponentSources) eller ”inline” direkt under en ScriptComponentSource, så att den endast gäller en specifik ScriptComponentSource. Jag har valt ”inline”-alternativet.

ScriptComponentProject förklaras nog bäst genom att titta på mitt exempel.

  • Observera att jag har lagt ett ScriptComponentProject inuti ett annat ScriptComponentProject. Av någon anledning kräver Biml att man gör det.
  • Det är viktigt att du lägger till rätt AssemblyReferences. I mitt exempel ser du vilka som brukar behöva finnas med.
  • Sedan behöver du lägga till minst en OutputBuffer, där du skickar ditt resultat. Jag har tagit med fyra fält från Twitter, men det går naturligtvis enkelt att utöka till fler.
  • Sedan behövs en ReadOnlyVariables (eller ReadWriteVariables), för att variablerna skall bli tillgängliga i scriptet.
  • Sedan kommer dina källkodsfiler. Normalt är det två filer som skall skapas:
    • AssemblyInfo.cs, innehåller viss metadata.
    • ScriptMain.cs, innehåller din källkod.

Den viktigaste funktionen i ScriptMain är CreateNewOutputRows. Här autentiserar den sig och hämtar sedan sökresultatet.

var accessToken = getAccessToken(consumerKey, consumerSecret);
var jsontext = callSearchApi(queryString, accessToken);
var jObj = new JObject(jsontext);

Sökresultatet skrivs sedan till vår OutputBuffer så här.

foreach (JObject status in (JArray)jObj["statuses"])
{
// Reading a few fields. More Twitter fields can of course be read the same way.
var id_str = status["id_str"].ToString();
var text = status["text"].ToString();
var user = (JObject)status["user"];
var username = user["name"].ToString();
var created_at = status["created_at"].ToString();

// Send to output buffer
Output0Buffer.AddRow();
Output0Buffer.IdStr = id_str;
Output0Buffer.Text = text;
Output0Buffer.Username = username;
Output0Buffer.CreatedAt = created_at;
}

Output0Buffer.SetEndOfRowset();

OleDbDestination

Inga konstigheter med denna. Vi specificerar JohanBimlDemo som ConnectionName och tabellen [dbo].[Twitter] som ExternalTableOutput.

Källkod i Biml

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
 <Annotations>
 <Annotation>
 TwitterScriptTaskDemo.biml
 Biml Demo using script task for reading from Twitter
 Written by Johan Åhlén 2016
 </Annotation>
 <Annotation>
 Table script:
 CREATE TABLE [dbo].[Twitter]
 (
 [IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED,
 [Text] [nvarchar](250) NOT NULL,
 [Username] [nvarchar](50) NOT NULL,
 [CreatedAt] [nvarchar](50) NOT NULL
 );
 </Annotation>
 </Annotations>
 <Connections>
 <Connection Name="JohanBimlDemoConnection" ConnectionString="Provider=SQLNCLI11;Data Source=localhost;Integrated Security=SSPI;Initial Catalog=JohanBimlDemo" />
 </Connections>
 <Packages>
 <Package Name="TwitterBimlDemo" ConstraintMode="Linear" ProtectionLevel="EncryptSensitiveWithUserKey">
 <Variables>
 <!-- Enter your Twitter API keys here... -->
 <Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key</Variable>
 <Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret</Variable>
 <Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable>
 </Variables>
 <Tasks>
 <ExecuteSQL Name ="Truncate" ConnectionName="JohanBimlDemoConnection" ResultSet="None">
 <DirectInput>TRUNCATE TABLE [dbo].[Twitter]</DirectInput>
 </ExecuteSQL>
 <Dataflow Name ="Read from Twitter">
 <Transformations>
 <ScriptComponentSource Name="Twitter">
 <ScriptComponentProject>
 <ScriptComponentProject ProjectCoreName="TwitterScript" Name="TwitterScript">
 <AssemblyReferences>
 <AssemblyReference AssemblyPath="System" />
 <AssemblyReference AssemblyPath="System.Data" />
 <AssemblyReference AssemblyPath="System.Windows.Forms" />
 <AssemblyReference AssemblyPath="System.Xml" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSPipelineWrap" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSRuntimeWrap" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.PipelineHost" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.TxScript" />
 </AssemblyReferences>
 <OutputBuffers>
 <OutputBuffer Name="Output0" IsSynchronous="false">
 <Columns>
 <Column Name="IdStr" DataType ="String" Length="50"></Column>
 <Column Name="Text" DataType ="String" Length="250"></Column>
 <Column Name="Username" DataType ="String" Length="50"></Column>
 <Column Name="CreatedAt" DataType ="String" Length="50"></Column>
 </Columns>
 </OutputBuffer>
 </OutputBuffers>
 <ReadOnlyVariables>
 <Variable VariableName="ConsumerKey" DataType="String" Namespace="User"/>
 <Variable VariableName="ConsumerSecret" DataType="String" Namespace="User"/>
 <Variable VariableName="TwitterQuery" DataType="String" Namespace="User"/> 
 </ReadOnlyVariables>
 <Files>
 <File Path="AssemblyInfo.cs">
 using System.Reflection;
 using System.Runtime.CompilerServices;
 //
 // General Information about an assembly is controlled through the following
 // set of attributes. Change these attribute values to modify the information
 // associated with an assembly.
 //
 [assembly: AssemblyTitle("TwitterScript.csproj")]
 [assembly: AssemblyDescription("")]
 [assembly: AssemblyConfiguration("")]
 [assembly: AssemblyCompany("")]
 [assembly: AssemblyProduct("TwitterScript.csproj")]
 [assembly: AssemblyCopyright("Copyright @ Johan Åhlén 2016")]
 [assembly: AssemblyTrademark("")]
 [assembly: AssemblyCulture("")]
 //
 // Version information for an assembly consists of the following four values:
 //
 // Major Version
 // Minor Version
 // Build Number
 // Revision
 //
 // You can specify all the values or you can default the Revision and Build Numbers
 // by using the '*' as shown below:
 [assembly: AssemblyVersion("1.0.*")]
 </File>
 <File Path="ScriptMain.cs">
 using System;
 using System.Data;
 using System.Text;
 using System.Text.RegularExpressions;
 using System.Net;
 using System.Collections.Generic;
 using System.IO;
 using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
 using Microsoft.SqlServer.Dts.Runtime.Wrapper;

 [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
 public class ScriptMain : UserComponent
 {
 public override void CreateNewOutputRows()
 {
 var consumerKey = Variables.ConsumerKey;
 var consumerSecret = Variables.ConsumerSecret;
 var queryString = Variables.TwitterQuery;

 var accessToken = getAccessToken(consumerKey, consumerSecret);
 var jsontext = callSearchApi(queryString, accessToken);
 var jObj = new JObject(jsontext);

 foreach (JObject status in (JArray)jObj["statuses"])
 {
 // Reading a few fields. More Twitter fields can of course be read the same way.
 var id_str = status["id_str"].ToString();
 var text = status["text"].ToString();
 var user = (JObject)status["user"];
 var username = user["name"].ToString();
 var created_at = status["created_at"].ToString();

 // Send to output buffer
 Output0Buffer.AddRow();
 Output0Buffer.IdStr = id_str;
 Output0Buffer.Text = text;
 Output0Buffer.Username = username;
 Output0Buffer.CreatedAt = created_at;
 }

 Output0Buffer.SetEndOfRowset();
 }

 private string callSearchApi(string queryString, string accessToken)
 {
 // Prepare request
 var searchRequest = WebRequest.Create("https://api.twitter.com" + queryString);
 searchRequest.Method = "GET";
 searchRequest.UseDefaultCredentials = false;
 searchRequest.Headers.Add("Authorization", "Bearer " + accessToken);

 // Get the response
 var searchResponse = searchRequest.GetResponse();

 // Parse the response
 using (var responseStream = searchResponse.GetResponseStream())
 {
 var reader = new StreamReader(responseStream);
 var responseBody = reader.ReadToEnd();
 responseStream.Close();

 return responseBody;
 }
 }

 private string getAccessToken(string consumerKey, string consumerSecret)
 {
 // Prepare request
 var oauthString = Convert.ToBase64String(Encoding.ASCII.GetBytes(consumerKey + ":" + consumerSecret));
 var oauthRequest = WebRequest.Create("https://api.twitter.com/oauth2/token");
 oauthRequest.Method = "POST";
 oauthRequest.UseDefaultCredentials = false;
 oauthRequest.Headers.Add("Authorization", "Basic " + oauthString);
 oauthRequest.ContentType = "application/x-www-form-urlencoded;charset=UTF-8";
 var postContent = Encoding.ASCII.GetBytes("grant_type=client_credentials");
 oauthRequest.ContentLength = postContent.Length;
 using (var requestStream = oauthRequest.GetRequestStream())
 {
 requestStream.Write(postContent, 0, postContent.Length);
 requestStream.Close();
 }

 // Get the response
 var oauthResponse = oauthRequest.GetResponse();

 // Parse the response
 using (var responseStream = oauthResponse.GetResponseStream())
 {
 var reader = new StreamReader(responseStream);
 var responseBody = reader.ReadToEnd();
 responseStream.Close();

 var regex = new Regex("\"access_token\":\"(.+?)\"");
 var match = regex.Match(responseBody);
 var access_token = match.Groups[1].Value;

 return access_token;
 }
 }
 }
 
 // Using a very simple json implementation. If used for production, it should be replaced with a better json library.
 public class JArray : List&lt;object&gt;
 {
 internal void Parse(string jsontext, ref int i)
 {
 i = JToken.LookFor(jsontext, i, '[', true);
 i++;

 // Loop through members
 do
 {
 // Check for end of array
 if (jsontext[i] == ']')
 {
 i++;
 return;
 }

 // Read object
 var obj = JToken.Parse(jsontext, ref i);

 this.Add(obj);

 // Skip whitespace
 i = JToken.SkipWhitespace(jsontext, i);

 // Check if end of array or another member
 var ch = jsontext[i++];
 if (ch == ']')
 break;
 else if (ch == ',')
 continue;
 else
 throw new Exception("JArray.Parse: expected ] or ,");
 }
 while (true);
 }

 public JArray(string srctext)
 : base()
 {
 var i = 0;
 Parse(srctext, ref i);
 }

 internal JArray(string jsontext, ref int i)
 : base()
 {
 Parse(jsontext, ref i);
 }
 }

 public static class JToken
 {
 internal static int LookFor(string jsontext, int startpos, char searchCh, bool skipWhitespace = false)
 {
 var i = jsontext.IndexOf(searchCh, skipWhitespace ? SkipWhitespace(jsontext, startpos) : startpos);
 if (i == -1)
 throw new Exception("JToken.LookFor: can't find character " + searchCh);

 return i;
 }

 internal static int LookForUnescaped(string jsontext, int startpos, char searchCh)
 {
 var i = startpos;
 while (i &lt; jsontext.Length)
 {
 if (jsontext[i] == searchCh)
 {
 if (i == 0 || jsontext[i - 1] != '\\')
 return i;
 }

 i++;
 }

 throw new Exception("JToken.LookForUnescaped: can't find character " + searchCh);
 }

 internal static int SkipWhitespace(string jsontext, int startpos)
 {
 for (var i = startpos; i &lt; jsontext.Length; i++)
 if (!char.IsWhiteSpace(jsontext[i]))
 return i;

 throw new Exception("JToken.SkipWhitespace: end of string reached");
 }


 public static object Parse(string jsontext, ref int i)
 {
 i = SkipWhitespace(jsontext, i);

 if (jsontext[i] == '"')
 return ReadStr(jsontext, ref i);
 else if (jsontext[i] == '{')
 return ReadJObject(jsontext, ref i);
 else if (jsontext[i] == '[')
 return ReadArray(jsontext, ref i);
 else if (jsontext[i] == 'n')
 return ReadNull(jsontext, ref i);
 else if (char.IsLetter(jsontext[i]))
 return ReadBool(jsontext, ref i);
 else if (char.IsDigit(jsontext[i]) || jsontext[i] == '-')
 return ReadNumber(jsontext, ref i);
 else
 throw new Exception("JToken.Parse: unknown object type");
 }

 internal static object ReadNull(string jsontext, ref int i)
 {
 if (jsontext.Length &gt;= i + 4)
 {
 if (jsontext.Substring(i,4) == "null")
 {
 i += 4;
 return null;
 }
 }
 throw new Exception("JToken.ReadNull: unknown object type");
 }

 internal static string ReadStr(string jsontext, ref int i)
 {
 var j = LookForUnescaped(jsontext, i + 1, '"');
 var str = jsontext.Substring(i + 1, j - i - 1);
 i = j + 1;
 return str;
 }

 internal static JObject ReadJObject(string jsontext, ref int i)
 {
 var jobj = new JObject(jsontext, ref i);
 return jobj;
 }

 internal static JArray ReadArray(string jsontext, ref int i)
 {
 var jarr = new JArray(jsontext, ref i);
 return jarr;
 }

 internal static bool ReadBool(string jsontext, ref int i)
 {
 if (jsontext.Substring(i, 4) == "true")
 {
 i += 4;
 return true;
 }
 else if (jsontext.Substring(i, 5) == "false")
 {
 i += 5;
 return false;
 }
 else
 throw new Exception("JToken.ReadBool: Unknown value");
 }

 internal static double ReadNumber(string jsontext, ref int i)
 {
 var digitsstr = "";
 if (jsontext[i] == '-')
 {
 digitsstr += jsontext[i++];
 }
 while (char.IsNumber(jsontext[i]) || jsontext[i] == '.')
 {
 digitsstr += jsontext[i];
 i++;
 }

 return Convert.ToDouble(digitsstr, System.Globalization.NumberFormatInfo.InvariantInfo);
 }
 }

 public class JObject
 {
 private SortedDictionary &lt;string, object&gt; contents = new SortedDictionary&lt;string, object&gt;();

 private JObject()
 {
 }

 internal void Parse(string jsontext, ref int i)
 {
 i = JToken.LookFor(jsontext, i, '{', true);
 i++;

 // Loop through members
 do
 {
 // Look for label start (")
 i = JToken.LookFor(jsontext, i, '"', true);

 // Look for label end (")
 var j = JToken.LookFor(jsontext, i + 1, '"');

 // Assign label
 var label = jsontext.Substring(i + 1, j - i - 1);

 // Look for colon
 i = JToken.LookFor(jsontext, j + 1, ':', true);

 // Move to next character and skip whitespace
 i = JToken.SkipWhitespace(jsontext, i + 1);

 // Read token
 var token = JToken.Parse(jsontext, ref i);

 contents.Add(label, token);

 // Skip whitespace
 i = JToken.SkipWhitespace(jsontext, i);

 // Check if end of object or another member
 var ch = jsontext[i++];
 if (ch == '}')
 break;
 else if (ch == ',')
 continue;
 else
 throw new Exception("JObject.Parse: expected } or ,");
 }
 while (true);
 }

 public JObject(string jsontext)
 {
 var i = 0;
 Parse(jsontext, ref i);
 }

 internal JObject(string jsontext, ref int i)
 {
 Parse(jsontext, ref i);
 }

 public object this[string name]
 {
 get
 {
 return contents[name];
 }
 }
 }
 </File>
 </Files>
 </ScriptComponentProject>
 </ScriptComponentProject>
 </ScriptComponentSource>
 <OleDbDestination Name="Database" ConnectionName="JohanBimlDemoConnection">
 <ExternalTableOutput Table="[dbo].[Twitter]"></ExternalTableOutput>
 </OleDbDestination>
 </Transformations>
 </Dataflow>
 </Tasks>
 </Package>
 </Packages>
</Biml>