Pracując w notatnikach Microsoft Fabric potrzebujemy czasem sięgać po jakąś informację do małych tabel (np. konfiguracyjnych). Uruchamianie do tego procesu Spark nie zawsze jest najlepszym rozwiązaniem, szczególnie w przypadku, gdy zależy nam na czasie.
Jako oczywiste rozwiązanie nasuwa się dostęp do tych tabel przez SQL Endpoint oraz pyODBC. Jednak czy da się to zrobić bez podawania użytkownika i hasła, korzystając z połączenia, które przecież mamy w notatniku?
Okazuje się, że tak! A to dzięki pakietowi mssparkutils.
Załóżmy, że mamy w obszarze roboczym lakehouse o nazwie lh_SQLConnectionTest, a w nim tabelę d_calendar zawierającą kalendarz od 2000-01-01 do 2024-12-31.
Zasymulujemy częste odpytywanie tej tabeli pytając w pętli o kolejne 30 dni:
from datetime import date, timedelta vStart = date(2024,7,1) for vDay in [d for d in (vStart + timedelta(n) for n in range(30))]: vSql = "select id from d_calendar where date = '{day}'".format(day = vDay) vRes = spark.sql(vSql).collect()[0][0]
Jak widać wykonanie tego skryptu trwa ok 17. sekund:
Aby podłączyć się do SQL Endpoint musimy znać jego adres. Możemy go sprawdzić w ustawieniach naszego lakehouse:
Następnie korzystając z mssparkutils.credentials.getToken, możemy uzyskać token bieżącego połączenia i delikatnie go przerabiając połączyć się do SQL Enpoint przy pomocy pyODBC:
import struct from itertools import chain, repeat import pyodbc sqlEndpoint = '*****.datawarehouse.fabric.microsoft.com' connectionString = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sqlEndpoint},1433;Encrypt=Yes;TrustServerCertificate=No" token_object = mssparkutils.credentials.getToken("pbi") encoded_bytes = bytes(chain.from_iterable(zip(bytes(token_object, "UTF-8"), repeat(0)))) token_bytes = struct.pack("<i", len(encoded_bytes)) + encoded_bytes attrs_before = {1256: token_bytes} connection = pyodbc.connect(connectionString, attrs_before=attrs_before)
Wykonanie tego samego zapytania przy użyciu pyODBC może wyglądać następująco:
from datetime import date, timedelta cursor = connection.cursor() vStart = date(2024,7,1) for vDay in [d for d in (vStart + timedelta(n) for n in range(30))]: vSql = "select id from lh_SQLConnectionTest.dbo.d_calendar where date = '{day}'".format(day = vDay) cursor.execute(vSql) vWyn = cursor.fetchall()[0][0]
Zwróć uwagę na konieczność podania pełnej nazwy tabeli z nazwą lakehouse i schematem.
Wykonanie tego skryptu trwa już tylko 5 sekund.
Jeśli chcesz poeksperymentować, wrzuć do swojego Fabrica załączony notatnik (nie zapomnij tylko podpiąć swój lakehouse do notatnika).
Napisałem tam klasę użytkową obsługującą połączenie SQL Endpoint, którą możesz wykorzystać w swoim projekcie.
Miłego eksperymentowania 😊