Skip to content

Commit 5a5cca6

Browse files
authored
Merge branch 'main' into feat/anonymous-totp
2 parents 3ad290d + 741989e commit 5a5cca6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+527
-265
lines changed

.github/workflows/update-api-schema.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
name: graphql
1+
name: update-api-schema
22

33
on:
44
pull_request:
55
paths:
66
- 'src/ai/backend/manager/models/**'
77
- 'src/ai/backend/manager/api/**'
8+
- 'VERSION'
89

910
jobs:
1011
api-updated:

changes/4492.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
heartbeat register service when service is dead

changes/4497.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor event dispatcher and handlers directory structure

changes/4505.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix missing log output of GraphQL top-level query fields by improving graphene's resolver info object usage

changes/4509.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add stage package to support deterministic step-by-step execution

changes/4510.refactor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor `keypair_preparation` from a classmethod of the Graphene class to a utility function to decouple logic from GraphQL

scripts/release.sh

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ pants check ::
2525
# Update VERSION file
2626
echo $TARGET_VERSION > VERSION
2727

28-
# Update the documentations
29-
./backend.ai mgr api dump-gql-schema --output docs/manager/graphql-reference/schema.graphql
30-
./backend.ai mgr api dump-openapi --output docs/manager/rest-reference/openapi.json
31-
3228
# Update the changelog
3329
LOCKSET=towncrier/$(yq '.python.interpreter_constraints[0] | split("==") | .[1]' pants.toml) ./py -m towncrier
3430

src/ai/backend/common/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ python_distribution(
4343
"src/ai/backend/common/plugin:src",
4444
"src/ai/backend/common/service_discovery:src",
4545
"src/ai/backend/common/service_discovery/etcd_discovery:src",
46+
"src/ai/backend/common/stage:src",
4647
"src/ai/backend/common/web/session:src", # not auto-inferred
4748
"!!stubs/trafaret:stubs",
4849
],

src/ai/backend/common/bgtask/bgtask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ async def _observe_bgtask(
331331
operation=ErrorOperation.EXECUTE,
332332
error_detail=ErrorDetail.INTERNAL_ERROR,
333333
)
334-
log.error("Task {} ({}): unhandled error", task_id, task_name)
334+
log.error("Task {} ({}): unhandled error: {}", task_id, task_name, e)
335335
return BgtaskFailedEvent(task_id, repr(e))
336336
finally:
337337
duration = time.perf_counter() - start_time

src/ai/backend/common/service_discovery/etcd_discovery/service_discovery.py

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,49 +23,21 @@ def __init__(self, args: ETCDServiceDiscoveryArgs) -> None:
2323
self._etcd = args.etcd
2424
self._prefix = args.prefix
2525

26-
async def register(self, service: ServiceMetadata) -> None:
27-
"""
28-
Register a service in the service discovery.
29-
:param service: Service metadata.
30-
"""
31-
32-
prefix = self._service_prefix(service.service_group, service.id)
33-
await self._etcd.put_prefix(prefix, service.to_dict(), scope=ConfigScopes.GLOBAL)
26+
async def register(self, service_meta: ServiceMetadata) -> None:
27+
prefix = self._service_prefix(service_meta.service_group, service_meta.id)
28+
await self._etcd.put_prefix(prefix, service_meta.to_dict(), scope=ConfigScopes.GLOBAL)
3429

3530
async def unregister(self, service_group: str, service_id: uuid.UUID) -> None:
36-
"""
37-
Unregister a service from the service discovery.
38-
:param service_group: Name of the service group.
39-
:param service_id: UUID of the service.
40-
"""
41-
4231
await self._etcd.delete_prefix(
4332
self._service_prefix(service_group, service_id), scope=ConfigScopes.GLOBAL
4433
)
4534

46-
async def heartbeat(self, service_group: str, service_id: uuid.UUID) -> None:
47-
"""
48-
Send a heartbeat to the service discovery.
49-
:param service_group: Name of the service group.
50-
:param service_id: UUID of the service.
51-
"""
52-
53-
prefix = self._service_prefix(service_group, service_id)
54-
raw_service_info = await self._etcd.get_prefix(prefix, scope=ConfigScopes.GLOBAL)
55-
if not raw_service_info:
56-
raise ValueError(f"Service with ID {service_id} not found.")
57-
58-
service_dict = cast(dict[str, Any], raw_service_info)
59-
service = ServiceMetadata.from_dict(service_dict)
60-
service.health_status.update_heartbeat()
61-
await self._etcd.put_prefix(prefix, service.to_dict(), scope=ConfigScopes.GLOBAL)
35+
async def heartbeat(self, service_meta: ServiceMetadata) -> None:
36+
service_meta.health_status.update_heartbeat()
37+
prefix = self._service_prefix(service_meta.service_group, service_meta.id)
38+
await self._etcd.put_prefix(prefix, service_meta.to_dict(), scope=ConfigScopes.GLOBAL)
6239

6340
async def discover(self) -> Sequence[ServiceMetadata]:
64-
"""
65-
Discover all services in the service discovery.
66-
:return: List of service metadata.
67-
"""
68-
6941
raw_service_groups = await self._etcd.get_prefix(self._prefix, scope=ConfigScopes.GLOBAL)
7042
services = []
7143
if not raw_service_groups:
@@ -77,12 +49,6 @@ async def discover(self) -> Sequence[ServiceMetadata]:
7749
return services
7850

7951
async def get_service_group(self, service_group: str) -> Sequence[ServiceMetadata]:
80-
"""
81-
Get services by group.
82-
:param service_group: Name of the service group.
83-
:return: List of service metadata.
84-
"""
85-
8652
service_group_prefix = self._service_group_prefix(service_group)
8753
raw_service_configs = await self._etcd.get_prefix(
8854
service_group_prefix, scope=ConfigScopes.GLOBAL
@@ -96,12 +62,6 @@ async def get_service_group(self, service_group: str) -> Sequence[ServiceMetadat
9662
return services
9763

9864
async def get_service(self, service_group: str, service_id: uuid.UUID) -> ServiceMetadata:
99-
"""
100-
Get service address by ID.
101-
:param service_id: UUID of the service.
102-
:return: Service metadata.
103-
"""
104-
10565
service_prefix = self._service_prefix(service_group, service_id)
10666
raw_service_config = await self._etcd.get_prefix(service_prefix, scope=ConfigScopes.GLOBAL)
10767
if not raw_service_config:

src/ai/backend/common/service_discovery/service_discovery.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ class ServiceMetadata(BaseModel):
6161
"""
6262

6363
id: uuid.UUID = Field(default_factory=uuid.uuid4)
64-
display_name: str
65-
service_group: str
66-
version: str
67-
endpoint: ServiceEndpoint
68-
health_status: HealthStatus = Field(default_factory=HealthStatus)
64+
display_name: str = Field(..., description="Display name of the service")
65+
service_group: str = Field(..., description="Name of the service group (manager, agent, etc.)")
66+
version: str = Field(..., description="Version of the service")
67+
endpoint: ServiceEndpoint = Field(..., description="Endpoint of the service")
68+
health_status: HealthStatus = Field(
69+
default_factory=HealthStatus, description="Health status of the service"
70+
)
6971

7072
@classmethod
7173
def from_dict(cls, data: dict[str, Any]) -> Self:
@@ -86,10 +88,10 @@ class ServiceDiscovery(ABC):
8688
"""
8789

8890
@abstractmethod
89-
async def register(self, service: ServiceMetadata) -> None:
91+
async def register(self, service_meta: ServiceMetadata) -> None:
9092
"""
9193
Register a service.
92-
:param service: Service metadata.
94+
:param service_meta: Service metadata.
9395
"""
9496
raise NotImplementedError
9597

@@ -103,11 +105,11 @@ async def unregister(self, service_group: str, service_id: uuid.UUID) -> None:
103105
raise NotImplementedError
104106

105107
@abstractmethod
106-
async def heartbeat(self, service_group: str, service_id: uuid.UUID) -> None:
108+
async def heartbeat(self, service_meta: ServiceMetadata) -> None:
107109
"""
108110
Send a heartbeat to the service discovery.
109-
:param service_group: Name of the service group.
110-
:param service_id: UUID of the service.
111+
When a service is not ready in the service discovery, it registers it.
112+
:param service_meta: Service metadata.
111113
"""
112114
raise NotImplementedError
113115

@@ -210,8 +212,7 @@ async def _run_service_loop(self) -> None:
210212
while not self._closed:
211213
try:
212214
await self._service_discovery.heartbeat(
213-
service_group=self._metadata.service_group,
214-
service_id=self._metadata.id,
215+
service_meta=self._metadata,
215216
)
216217
except Exception as e:
217218
log.error("Error sending heartbeat: {}", e)

src/ai/backend/common/stage/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
python_sources(name="src")

src/ai/backend/common/stage/__init__.py

Whitespace-only changes.

src/ai/backend/common/stage/types.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import asyncio
2+
import logging
3+
from abc import ABC, abstractmethod
4+
from typing import Generic, Optional, TypeVar
5+
6+
from ai.backend.logging.utils import BraceStyleAdapter
7+
8+
log = BraceStyleAdapter(logging.getLogger(__spec__.name))
9+
10+
11+
TSpec = TypeVar("TSpec")
12+
13+
14+
class SpecGenerator(ABC, Generic[TSpec]):
15+
"""
16+
Base class for all specs in the stage.
17+
18+
This class is used to define the spec for the stage.
19+
When all fields in the spec is ready, provisioner can use the spec to setup the stage.
20+
"""
21+
22+
@abstractmethod
23+
async def wait_for_spec(self) -> TSpec:
24+
"""
25+
Waits for the spec to be ready.
26+
"""
27+
raise NotImplementedError
28+
29+
30+
TResource = TypeVar("TResource")
31+
32+
33+
class Provisioner(ABC, Generic[TSpec, TResource]):
34+
"""
35+
Base class for all provisioners in the stage.
36+
"""
37+
38+
@property
39+
@abstractmethod
40+
def name(self) -> str:
41+
"""
42+
Returns the name of the provisioner.
43+
"""
44+
raise NotImplementedError
45+
46+
@abstractmethod
47+
async def setup(self, spec: TSpec) -> TResource:
48+
"""
49+
Sets up the lifecycle stage.
50+
"""
51+
raise NotImplementedError
52+
53+
@abstractmethod
54+
async def teardown(self, resource: TResource) -> None:
55+
"""
56+
Tears down the lifecycle stage.
57+
"""
58+
raise NotImplementedError
59+
60+
61+
class Stage(ABC, Generic[TSpec, TResource]):
62+
@abstractmethod
63+
async def setup(self, spec_generator: SpecGenerator[TSpec]) -> None:
64+
"""
65+
Sets up the lifecycle stage.
66+
"""
67+
raise NotImplementedError
68+
69+
@abstractmethod
70+
async def wait_for_resource(self) -> TResource:
71+
"""
72+
Waits for the resource to be ready.
73+
"""
74+
raise NotImplementedError
75+
76+
@abstractmethod
77+
async def teardown(self) -> None:
78+
"""
79+
Tears down the lifecycle stage.
80+
"""
81+
raise NotImplementedError
82+
83+
84+
class ProvisionStage(Stage[TSpec, TResource]):
85+
"""
86+
A stage that provisions a resource.
87+
88+
This stage is used to provision a resource using a provisioner.
89+
It waits for the spec to be ready and then uses the provisioner to set up the resource.
90+
"""
91+
92+
_provisioner: Provisioner
93+
_resource: Optional[TResource]
94+
_setup_completed: asyncio.Event
95+
96+
def __init__(self, provisioner: Provisioner):
97+
self._provisioner = provisioner
98+
self._resource = None
99+
self._setup_completed = asyncio.Event()
100+
101+
async def setup(self, spec_generator: SpecGenerator[TSpec]) -> None:
102+
"""
103+
Sets up the lifecycle stage.
104+
"""
105+
spec = await spec_generator.wait_for_spec()
106+
try:
107+
resource = await self._provisioner.setup(spec)
108+
self._resource = resource
109+
except Exception as e:
110+
log.error("Failed to setup resource: %s", e)
111+
finally:
112+
self._setup_completed.set()
113+
114+
async def wait_for_resource(self) -> TResource:
115+
await self._setup_completed.wait()
116+
if self._resource is None:
117+
raise RuntimeError("Resource setup failed")
118+
return self._resource
119+
120+
async def teardown(self) -> None:
121+
"""
122+
Tears down the lifecycle stage.
123+
"""
124+
if self._resource is None:
125+
return
126+
await self._provisioner.teardown(self._resource)
127+
self._resource = None

src/ai/backend/manager/api/admin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141

4242
class GQLLoggingMiddleware:
4343
def resolve(self, next, root, info: graphene.ResolveInfo, **args) -> Any:
44-
graph_ctx: GraphQueryContext = info.context
45-
if len(info.path) == 1:
44+
if info.path.prev is None: # indicates the root query
45+
graph_ctx = info.context
4646
log.info(
4747
"ADMIN.GQL (ak:{}, {}:{}, op:{})",
4848
graph_ctx.access_key,

src/ai/backend/manager/api/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from ai.backend.manager.config.provider import ManagerConfigProvider
1717
from ai.backend.manager.plugin.network import NetworkPluginContext
18+
from ai.backend.manager.scheduler.dispatcher import SchedulerDispatcher
1819
from ai.backend.manager.service.base import ServicesContext
1920
from ai.backend.manager.services.processors import Processors
2021

@@ -65,6 +66,7 @@ class RootContext(BaseContext):
6566

6667
registry: AgentRegistry
6768
agent_cache: AgentRPCCache
69+
scheduler_dispatcher: SchedulerDispatcher
6870

6971
error_monitor: ErrorPluginContext
7072
stats_monitor: StatsPluginContext

src/ai/backend/manager/config/unified.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1604,7 +1604,7 @@ class IdleCheckerConfig(BaseModel):
16041604
Enabled idle checkers.
16051605
Comma-separated list of checker names.
16061606
""",
1607-
examples=["timeout", "utilization"],
1607+
examples=["network_timeout", "utilization"],
16081608
)
16091609
app_streaming_packet_timeout: TimeDuration = Field(
16101610
default=_TimeDurationPydanticAnnotation.time_duration_validator("5m"),
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
python_sources(name="src")
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class KeyPairCreator:
6+
is_active: bool
7+
is_admin: bool
8+
resource_policy: str
9+
rate_limit: int

0 commit comments

Comments
 (0)