Pracując w notatnikach Microsoft Fabric potrzebujemy czasem sięgać po jakąś informację do małych tabel (np. konfiguracyjnych). Uruchamianie do tego procesu Spark nie zawsze jest najlepszym rozwiązaniem, szczególnie w przypadku, gdy zależy nam na czasie. 

Jako oczywiste rozwiązanie nasuwa się dostęp do tych tabel przez SQL Endpoint oraz pyODBC. Jednak czy da się to zrobić bez podawania użytkownika i hasła, korzystając z połączenia, które przecież mamy w notatniku? 

Okazuje się, że tak! A to dzięki pakietowi mssparkutils. 

Załóżmy, że mamy w obszarze roboczym lakehouse o nazwie lh_SQLConnectionTest, a w nim tabelę d_calendar zawierającą kalendarz od 2000-01-01 do 2024-12-31. 

Zasymulujemy częste odpytywanie tej tabeli pytając w pętli o kolejne 30 dni:

from datetime import date, timedelta 
vStart = date(2024,7,1) 
for vDay in [d for d in (vStart + timedelta(n) for n in range(30))]: 
    vSql = "select id from d_calendar where date = '{day}'".format(day = vDay) 
    vRes = spark.sql(vSql).collect()[0][0] 

Jak widać wykonanie tego skryptu trwa ok 17. sekund:

Aby podłączyć się do SQL Endpoint musimy znać jego adres. Możemy go sprawdzić w ustawieniach naszego lakehouse: 

Następnie korzystając z mssparkutils.credentials.getToken, możemy uzyskać token bieżącego połączenia i delikatnie go przerabiając połączyć się do SQL Enpoint przy pomocy pyODBC: 

import struct 
from itertools import chain, repeat 
import pyodbc 
sqlEndpoint = '*****.datawarehouse.fabric.microsoft.com' 
connectionString = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sqlEndpoint},1433;Encrypt=Yes;TrustServerCertificate=No" 
token_object = mssparkutils.credentials.getToken("pbi") 
encoded_bytes = bytes(chain.from_iterable(zip(bytes(token_object, "UTF-8"), repeat(0)))) 
token_bytes = struct.pack("<i", len(encoded_bytes)) + encoded_bytes 
attrs_before = {1256: token_bytes} 
connection = pyodbc.connect(connectionString, attrs_before=attrs_before) 

Wykonanie tego samego zapytania przy użyciu pyODBC może wyglądać następująco:

from datetime import date, timedelta 
cursor = connection.cursor() 
vStart = date(2024,7,1) 
for vDay in [d for d in (vStart + timedelta(n) for n in range(30))]: 
    vSql = "select id from lh_SQLConnectionTest.dbo.d_calendar where date = '{day}'".format(day = vDay) 
    cursor.execute(vSql) 
    vWyn = cursor.fetchall()[0][0] 

Zwróć uwagę na konieczność podania pełnej nazwy tabeli z nazwą lakehouse i schematem. 

Wykonanie tego skryptu trwa już tylko 5 sekund. 

Jeśli chcesz poeksperymentować, wrzuć do swojego Fabrica załączony notatnik (nie zapomnij tylko podpiąć swój lakehouse do notatnika).  

Napisałem tam klasę użytkową obsługującą połączenie SQL Endpoint, którą możesz wykorzystać w swoim projekcie. 

Miłego eksperymentowania 😊 

pl_PL