Skip to content

Commit 602c990

Browse files
Abel Milashclaude
andcommitted
Rebase async implementation on GA API branch (PR #175)
- Reset async-phase2 to refactoring branch (which is on PR #175 tip) - Restore all async implementation: aio/ client, HTTP, batch, OData, relationships, upload, query builder, fetchxml, operations, examples, tests - Re-export multipart helpers from _batch.py for PR #175 test compatibility - Restore asyncio_mode = "auto" in pyproject.toml - All 2166 tests passing Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a2f5f7d commit 602c990

63 files changed

Lines changed: 17963 additions & 32 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.azdo/ci-pr.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ extends:
4343
- script: |
4444
python -m pip install --upgrade pip
4545
python -m pip install flake8 black build diff-cover
46-
python -m pip install -e .[dev]
46+
python -m pip install -e .[dev,async]
4747
displayName: 'Install dependencies'
4848
4949
- script: |

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
run: |
3131
python -m pip install --upgrade pip
3232
python -m pip install flake8 black build diff-cover
33-
python -m pip install -e .[dev]
33+
python -m pip install -e .[dev,async]
3434
3535
- name: Check format with black
3636
run: |

examples/aio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.

examples/aio/_auth.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
3+
4+
"""
5+
Async credential helper for the async example scripts.
6+
7+
azure-identity's InteractiveBrowserCredential is only available in the sync
8+
namespace (azure.identity), not the async one (azure.identity.aio). This
9+
module wraps the sync credential so it satisfies the AsyncTokenCredential
10+
protocol required by AsyncDataverseClient.
11+
12+
Usage::
13+
14+
from _auth import AsyncInteractiveBrowserCredential
15+
16+
credential = AsyncInteractiveBrowserCredential()
17+
try:
18+
async with AsyncDataverseClient(org_url, credential) as client:
19+
...
20+
finally:
21+
await credential.close()
22+
"""
23+
24+
import asyncio
25+
from concurrent.futures import ThreadPoolExecutor
26+
27+
from azure.identity import InteractiveBrowserCredential
28+
29+
30+
class AsyncInteractiveBrowserCredential:
31+
"""
32+
Async wrapper around the sync InteractiveBrowserCredential.
33+
34+
get_token() is dispatched to a dedicated thread so the event loop stays
35+
free during the browser popup / token exchange. Subsequent calls hit the
36+
in-process token cache and return almost immediately.
37+
"""
38+
39+
def __init__(self, **kwargs):
40+
self._credential = InteractiveBrowserCredential(**kwargs)
41+
self._executor = ThreadPoolExecutor(max_workers=1)
42+
43+
async def get_token(self, *scopes, **kwargs):
44+
loop = asyncio.get_running_loop()
45+
return await loop.run_in_executor(
46+
self._executor,
47+
lambda: self._credential.get_token(*scopes, **kwargs),
48+
)
49+
50+
async def close(self):
51+
self._executor.shutdown(wait=False)
52+
53+
async def __aenter__(self):
54+
return self
55+
56+
async def __aexit__(self, *_):
57+
await self.close()

examples/aio/advanced/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Microsoft Corporation.
3+
# Licensed under the MIT license.
4+
5+
"""
6+
PowerPlatform Dataverse Client - Async Alternate Keys & Upsert Example
7+
8+
Async equivalent of examples/advanced/alternate_keys_upsert.py.
9+
10+
Demonstrates the full workflow of creating alternate keys and using
11+
them for upsert operations:
12+
1. Create a custom table with columns
13+
2. Define an alternate key on a column
14+
3. Wait for the key index to become Active
15+
4. Upsert records using the alternate key
16+
5. Verify records were created/updated correctly
17+
6. Clean up
18+
19+
Prerequisites:
20+
pip install PowerPlatform-Dataverse-Client
21+
pip install azure-identity
22+
"""
23+
24+
import asyncio
25+
import sys
26+
27+
from PowerPlatform.Dataverse.aio.async_client import AsyncDataverseClient
28+
from PowerPlatform.Dataverse.models.upsert import UpsertItem
29+
from pathlib import Path
30+
31+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
32+
from _auth import AsyncInteractiveBrowserCredential
33+
34+
# --- Config ---
35+
TABLE_NAME = "new_AltKeyDemo"
36+
KEY_COLUMN = "new_externalid"
37+
KEY_NAME = "new_ExternalIdKey"
38+
BACKOFF_DELAYS = (0, 3, 10, 20, 35)
39+
40+
41+
# --- Helpers ---
42+
async def backoff(coro_fn, *, delays=BACKOFF_DELAYS):
43+
"""Retry *coro_fn* with exponential-ish backoff on any exception."""
44+
last = None
45+
total_delay = 0
46+
attempts = 0
47+
for d in delays:
48+
if d:
49+
await asyncio.sleep(d)
50+
total_delay += d
51+
attempts += 1
52+
try:
53+
result = await coro_fn()
54+
if attempts > 1:
55+
retry_count = attempts - 1
56+
print(f" [INFO] Backoff succeeded after {retry_count} retry(s); " f"waited {total_delay}s total.")
57+
return result
58+
except Exception as ex: # noqa: BLE001
59+
last = ex
60+
continue
61+
if last:
62+
if attempts:
63+
retry_count = max(attempts - 1, 0)
64+
print(f" [WARN] Backoff exhausted after {retry_count} retry(s); " f"waited {total_delay}s total.")
65+
raise last
66+
67+
68+
async def wait_for_key_active(client, table, key_name, max_wait=120):
69+
"""Poll get_alternate_keys until the key status is Active."""
70+
import time
71+
72+
start = time.time()
73+
while time.time() - start < max_wait:
74+
keys = await client.tables.get_alternate_keys(table)
75+
for k in keys:
76+
if k.schema_name == key_name:
77+
print(f" Key status: {k.status}")
78+
if k.status == "Active":
79+
return k
80+
if k.status == "Failed":
81+
raise RuntimeError(f"Alternate key index failed: {k.schema_name}")
82+
await asyncio.sleep(5)
83+
raise TimeoutError(f"Key {key_name} did not become Active within {max_wait}s")
84+
85+
86+
# --- Main ---
87+
async def main():
88+
"""Run the async alternate-keys & upsert E2E walkthrough."""
89+
print("PowerPlatform Dataverse Client - Async Alternate Keys & Upsert Example")
90+
print("=" * 70)
91+
print("This script demonstrates:")
92+
print(" - Creating a custom table with columns")
93+
print(" - Defining an alternate key on a column")
94+
print(" - Waiting for the key index to become Active")
95+
print(" - Upserting records via alternate key (create + update)")
96+
print(" - Verifying records and listing keys")
97+
print(" - Cleaning up (delete key, delete table)")
98+
print("=" * 70)
99+
100+
entered = input("Enter Dataverse org URL (e.g. https://yourorg.crm.dynamics.com): ").strip()
101+
if not entered:
102+
print("No URL entered; exiting.")
103+
sys.exit(1)
104+
105+
base_url = entered.rstrip("/")
106+
credential = AsyncInteractiveBrowserCredential()
107+
try:
108+
async with AsyncDataverseClient(base_url, credential) as client:
109+
110+
# ------------------------------------------------------------------
111+
# Step 1: Create table (skip if already exists)
112+
# ------------------------------------------------------------------
113+
print("\n1. Creating table...")
114+
table_info = await client.tables.get(TABLE_NAME)
115+
if table_info:
116+
print(f" Table already exists: {TABLE_NAME} (skipped)")
117+
else:
118+
table_info = await backoff(
119+
lambda: client.tables.create(
120+
TABLE_NAME,
121+
columns={
122+
KEY_COLUMN: "string",
123+
"new_ProductName": "string",
124+
"new_Price": "decimal",
125+
},
126+
)
127+
)
128+
print(f" Created: {table_info.get('table_schema_name', TABLE_NAME)}")
129+
await asyncio.sleep(10) # Wait for metadata propagation
130+
131+
# ------------------------------------------------------------------
132+
# Step 2: Create alternate key (skip if already exists)
133+
# ------------------------------------------------------------------
134+
print("\n2. Creating alternate key...")
135+
existing_keys = await client.tables.get_alternate_keys(TABLE_NAME)
136+
existing_key = next((k for k in existing_keys if k.schema_name == KEY_NAME), None)
137+
if existing_key:
138+
print(f" Alternate key already exists: {KEY_NAME} (skipped)")
139+
else:
140+
key_info = await backoff(
141+
lambda: client.tables.create_alternate_key(TABLE_NAME, KEY_NAME, [KEY_COLUMN.lower()])
142+
)
143+
print(f" Key created: {key_info.schema_name} (id={key_info.metadata_id})")
144+
145+
# ------------------------------------------------------------------
146+
# Step 3: Wait for key to become Active
147+
# ------------------------------------------------------------------
148+
print("\n3. Waiting for key index to become Active...")
149+
active_key = await wait_for_key_active(client, TABLE_NAME, KEY_NAME)
150+
print(f" Key is Active: {active_key.schema_name}")
151+
152+
# ------------------------------------------------------------------
153+
# Step 4: Upsert records (creates new)
154+
# ------------------------------------------------------------------
155+
print("\n4a. Upsert single record (PATCH, creates new)...")
156+
await client.records.upsert(
157+
TABLE_NAME,
158+
[
159+
UpsertItem(
160+
alternate_key={KEY_COLUMN.lower(): "EXT-001"},
161+
record={"new_productname": "Widget A", "new_price": 9.99},
162+
),
163+
],
164+
)
165+
print(" Upserted EXT-001 (single)")
166+
167+
print("\n4b. Upsert second record (single PATCH)...")
168+
await client.records.upsert(
169+
TABLE_NAME,
170+
[
171+
UpsertItem(
172+
alternate_key={KEY_COLUMN.lower(): "EXT-002"},
173+
record={"new_productname": "Widget B", "new_price": 19.99},
174+
),
175+
],
176+
)
177+
print(" Upserted EXT-002 (single)")
178+
179+
print("\n4c. Upsert multiple records (UpsertMultiple bulk)...")
180+
await client.records.upsert(
181+
TABLE_NAME,
182+
[
183+
UpsertItem(
184+
alternate_key={KEY_COLUMN.lower(): "EXT-003"},
185+
record={"new_productname": "Widget C", "new_price": 29.99},
186+
),
187+
UpsertItem(
188+
alternate_key={KEY_COLUMN.lower(): "EXT-004"},
189+
record={"new_productname": "Widget D", "new_price": 39.99},
190+
),
191+
],
192+
)
193+
print(" Upserted EXT-003, EXT-004 (bulk)")
194+
195+
# ------------------------------------------------------------------
196+
# Step 5a: Upsert single update (PATCH, record exists)
197+
# ------------------------------------------------------------------
198+
print("\n5a. Upsert single record (update existing via PATCH)...")
199+
await client.records.upsert(
200+
TABLE_NAME,
201+
[
202+
UpsertItem(
203+
alternate_key={KEY_COLUMN.lower(): "EXT-001"},
204+
record={"new_productname": "Widget A v2", "new_price": 12.99},
205+
),
206+
],
207+
)
208+
print(" Updated EXT-001 (single)")
209+
210+
# ------------------------------------------------------------------
211+
# Step 5b: Upsert multiple update (UpsertMultiple, records exist)
212+
# ------------------------------------------------------------------
213+
print("\n5b. Upsert multiple records (update existing via UpsertMultiple)...")
214+
await client.records.upsert(
215+
TABLE_NAME,
216+
[
217+
UpsertItem(
218+
alternate_key={KEY_COLUMN.lower(): "EXT-003"},
219+
record={"new_productname": "Widget C v2", "new_price": 31.99},
220+
),
221+
UpsertItem(
222+
alternate_key={KEY_COLUMN.lower(): "EXT-004"},
223+
record={"new_productname": "Widget D v2", "new_price": 41.99},
224+
),
225+
],
226+
)
227+
print(" Updated EXT-003, EXT-004 (bulk)")
228+
229+
# ------------------------------------------------------------------
230+
# Step 6: Verify
231+
# ------------------------------------------------------------------
232+
print("\n6. Verifying records...")
233+
async for record in client.records.list_pages(
234+
TABLE_NAME,
235+
select=["new_productname", "new_price", KEY_COLUMN.lower()],
236+
):
237+
for item in record:
238+
ext_id = item.get(KEY_COLUMN.lower(), "?")
239+
name = item.get("new_productname", "?")
240+
price = item.get("new_price", "?")
241+
print(f" {ext_id}: {name} @ ${price}")
242+
243+
# ------------------------------------------------------------------
244+
# Step 7: List alternate keys
245+
# ------------------------------------------------------------------
246+
print("\n7. Listing alternate keys...")
247+
keys = await client.tables.get_alternate_keys(TABLE_NAME)
248+
for k in keys:
249+
print(f" {k.schema_name}: columns={k.key_attributes}, status={k.status}")
250+
251+
# ------------------------------------------------------------------
252+
# Step 8: Cleanup
253+
# ------------------------------------------------------------------
254+
cleanup = input("\n8. Delete table and cleanup? (Y/n): ").strip() or "y"
255+
if cleanup.lower() in ("y", "yes"):
256+
try:
257+
# Delete alternate key first
258+
for k in keys:
259+
await client.tables.delete_alternate_key(TABLE_NAME, k.metadata_id)
260+
print(f" Deleted key: {k.schema_name}")
261+
await asyncio.sleep(5)
262+
await backoff(lambda: client.tables.delete(TABLE_NAME))
263+
print(f" Deleted table: {TABLE_NAME}")
264+
except Exception as e: # noqa: BLE001
265+
print(f" Cleanup error: {e}")
266+
else:
267+
print(" Table kept for inspection.")
268+
finally:
269+
await credential.close()
270+
271+
print("\nDone.")
272+
273+
274+
if __name__ == "__main__":
275+
asyncio.run(main())

0 commit comments

Comments
 (0)