maimai_py.api

  1import logging
  2import os
  3from importlib.util import find_spec
  4from logging import warning
  5from typing import Annotated, Any, Callable, Literal, Optional, Union
  6from urllib.parse import unquote, urlparse
  7
  8from pydantic import PydanticUndefinedAnnotation
  9
 10from maimai_py.maimai import MaimaiClient, MaimaiClientMultithreading, MaimaiPlates, MaimaiSongs, _UnsetSentinel
 11from maimai_py.models import *
 12from maimai_py.providers import *
 13from maimai_py.providers.hybrid import HybridProvider
 14
 15PlateAttrs = Literal["remained", "cleared", "played", "all"]
 16
 17
 18def xstr(s: Optional[str]) -> str:
 19    return "" if s is None else str(s).lower()
 20
 21
 22def istr(i: Optional[list]) -> str:
 23    return "" if i is None else "".join(i).lower()
 24
 25
 26def pagination(page_size, page, data):
 27    total_pages = (len(data) + page_size - 1) // page_size
 28    if page < 1 or page > total_pages:
 29        return []
 30
 31    start = (page - 1) * page_size
 32    end = page * page_size
 33    return data[start:end]
 34
 35
 36def get_filters(functions: dict[Any, Callable[..., bool]]):
 37    union = [flag for cond, flag in functions.items() if cond is not None]
 38    filter = lambda obj: all([flag(obj) for flag in union])
 39    return filter
 40
 41
 42if find_spec("fastapi"):
 43    from fastapi import APIRouter, Depends, FastAPI, Query, Request
 44    from fastapi.openapi.utils import get_openapi
 45    from fastapi.responses import JSONResponse
 46
 47    class MaimaiRoutes:
 48        _client: MaimaiClient
 49        _with_curves: bool
 50
 51        _lxns_token: Optional[str] = None
 52        _divingfish_token: Optional[str] = None
 53        _arcade_proxy: Optional[str] = None
 54
 55        def __init__(
 56            self,
 57            client: MaimaiClient,
 58            lxns_token: Optional[str] = None,
 59            divingfish_token: Optional[str] = None,
 60            arcade_proxy: Optional[str] = None,
 61            with_curves: bool = False,
 62        ):
 63            self._client = client
 64            self._lxns_token = lxns_token
 65            self._divingfish_token = divingfish_token
 66            self._arcade_proxy = arcade_proxy
 67            self._with_curves = with_curves
 68
 69        def _dep_lxns_player(self, credentials: Optional[str] = None, friend_code: Optional[int] = None, qq: Optional[int] = None):
 70            return PlayerIdentifier(credentials=credentials, qq=qq, friend_code=friend_code)
 71
 72        def _dep_divingfish_player(self, username: Optional[str] = None, credentials: Optional[str] = None, qq: Optional[int] = None):
 73            return PlayerIdentifier(qq=qq, credentials=credentials, username=username)
 74
 75        def _dep_arcade_player(self, credentials: str):
 76            return PlayerIdentifier(credentials=credentials)
 77
 78        def _dep_divingfish(self) -> IProvider:
 79            return DivingFishProvider(developer_token=self._divingfish_token)
 80
 81        def _dep_lxns(self) -> IProvider:
 82            return LXNSProvider(developer_token=self._lxns_token)
 83
 84        def _dep_arcade(self) -> IProvider:
 85            return ArcadeProvider(http_proxy=self._arcade_proxy)
 86
 87        def _dep_hybrid(self) -> IProvider:
 88            return HybridProvider()
 89
 90        def get_router(self, dep_provider: Callable, dep_player: Optional[Callable] = None, skip_base: bool = True) -> APIRouter:
 91            router = APIRouter()
 92
 93            def try_add_route(func: Callable, router: APIRouter, dep_provider: Callable):
 94                provider_type = func.__annotations__.get("provider")
 95                if provider_type and isinstance(dep_provider(), provider_type):
 96                    method = "GET" if "get_" in func.__name__ else "POST"
 97                    response_model = func.__annotations__.get("return")
 98                    router.add_api_route(
 99                        f"/{func.__name__.split('_')[-1]}",
100                        func,
101                        name=f"{func.__name__}",
102                        methods=[method],
103                        response_model=response_model,
104                        description=func.__doc__,
105                    )
106
107            async def _get_songs(
108                id: Optional[int] = None,
109                title: Optional[str] = None,
110                artist: Optional[str] = None,
111                genre: Optional[Genre] = None,
112                bpm: Optional[int] = None,
113                map: Optional[str] = None,
114                version: Optional[int] = None,
115                type: Optional[SongType] = None,
116                level: Optional[str] = None,
117                versions: Optional[Version] = None,
118                keywords: Optional[str] = None,
119                page: int = Query(1, ge=1),
120                page_size: int = Query(100, ge=1),
121                provider: ISongProvider = Depends(dep_provider),
122            ) -> list[Song]:
123                curve_provider = DivingFishProvider(developer_token=self._divingfish_token) if self._with_curves else None
124                maimai_songs: MaimaiSongs = await self._client.songs(provider=provider, curve_provider=curve_provider)
125                type_func: Callable[[Song], bool] = lambda song: song.get_difficulties(type) != []  # type: ignore
126                level_func: Callable[[Song], bool] = lambda song: any([diff.level == level for diff in song.get_difficulties()])
127                versions_func: Callable[[Song], bool] = lambda song: versions.value <= song.version < all_versions[all_versions.index(versions) + 1].value  # type: ignore
128                keywords_func: Callable[[Song], bool] = lambda song: xstr(keywords) in xstr(song.title) + xstr(song.artist) + istr(song.aliases)
129                songs = await maimai_songs.filter(id=id, title=title, artist=artist, genre=genre, bpm=bpm, map=map, version=version)
130                filters = get_filters({type: type_func, level: level_func, versions: versions_func, keywords: keywords_func})
131                result = [song for song in songs if filters(song)]
132                return pagination(page_size, page, result)
133
134            async def _get_icons(
135                id: Optional[int] = None,
136                name: Optional[str] = None,
137                description: Optional[str] = None,
138                genre: Optional[str] = None,
139                keywords: Optional[str] = None,
140                page: int = Query(1, ge=1),
141                page_size: int = Query(100, ge=1),
142                provider: IItemListProvider = Depends(dep_provider),
143            ) -> list[PlayerIcon]:
144                items = await self._client.items(PlayerIcon, provider=provider)
145                if id is not None:
146                    return [item] if (item := await items.by_id(id)) else []
147                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
148                filters = get_filters({keywords: keyword_func})
149                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
150                return pagination(page_size, page, result)
151
152            async def _get_nameplates(
153                id: Optional[int] = None,
154                name: Optional[str] = None,
155                description: Optional[str] = None,
156                genre: Optional[str] = None,
157                keywords: Optional[str] = None,
158                page: int = Query(1, ge=1),
159                page_size: int = Query(100, ge=1),
160                provider: IItemListProvider = Depends(dep_provider),
161            ) -> list[PlayerNamePlate]:
162                items = await self._client.items(PlayerNamePlate, provider=provider)
163                if id is not None:
164                    return [item] if (item := await items.by_id(id)) else []
165                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
166                filters = get_filters({keywords: keyword_func})
167                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
168                return pagination(page_size, page, result)
169
170            async def _get_frames(
171                id: Optional[int] = None,
172                name: Optional[str] = None,
173                description: Optional[str] = None,
174                genre: Optional[str] = None,
175                keywords: Optional[str] = None,
176                page: int = Query(1, ge=1),
177                page_size: int = Query(100, ge=1),
178                provider: IItemListProvider = Depends(dep_provider),
179            ) -> list[PlayerFrame]:
180                items = await self._client.items(PlayerFrame, provider=provider)
181                if id is not None:
182                    return [item] if (item := await items.by_id(id)) else []
183                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
184                filters = get_filters({keywords: keyword_func})
185                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
186                return pagination(page_size, page, result)
187
188            async def _get_trophies(
189                id: Optional[int] = None,
190                name: Optional[str] = None,
191                color: Optional[str] = None,
192                keywords: Optional[str] = None,
193                page: int = Query(1, ge=1),
194                page_size: int = Query(100, ge=1),
195                provider: IItemListProvider = Depends(dep_provider),
196            ) -> list[PlayerTrophy]:
197                items = await self._client.items(PlayerTrophy, provider=provider)
198                if id is not None:
199                    return [item] if (item := await items.by_id(id)) else []
200                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.color))
201                filters = get_filters({keywords: keyword_func})
202                result = [x for x in await items.filter(name=name, color=color) if filters(x)]
203                return pagination(page_size, page, result)
204
205            async def _get_charas(
206                id: Optional[int] = None,
207                name: Optional[str] = None,
208                keywords: Optional[str] = None,
209                page: int = Query(1, ge=1),
210                page_size: int = Query(100, ge=1),
211                provider: IItemListProvider = Depends(dep_provider),
212            ) -> list[PlayerChara]:
213                items = await self._client.items(PlayerChara, provider=provider)
214                if id is not None:
215                    return [item] if (item := await items.by_id(id)) else []
216                keyword_func = lambda chara: xstr(keywords) in xstr(chara.name)
217                filters = get_filters({keywords: keyword_func})
218                result = [x for x in await items.filter(name=name) if filters(x)]
219                return pagination(page_size, page, result)
220
221            async def _get_partners(
222                id: Optional[int] = None,
223                name: Optional[str] = None,
224                keywords: Optional[str] = None,
225                page: int = Query(1, ge=1),
226                page_size: int = Query(100, ge=1),
227                provider: IItemListProvider = Depends(dep_provider),
228            ) -> list[PlayerPartner]:
229                items = await self._client.items(PlayerPartner, provider=provider)
230                if id is not None:
231                    return [item] if (item := await items.by_id(id)) else []
232                keyword_func = lambda partner: xstr(keywords) in xstr(partner.name)
233                filters = get_filters({keywords: keyword_func})
234                result = [x for x in await items.filter(name=name) if filters(x)]
235                return pagination(page_size, page, result)
236
237            async def _get_areas(
238                lang: Literal["ja", "zh"] = "ja",
239                id: Optional[str] = None,
240                name: Optional[str] = None,
241                keywords: Optional[str] = None,
242                page: int = Query(1, ge=1),
243                page_size: int = Query(100, ge=1),
244                provider: IAreaProvider = Depends(dep_provider),
245            ) -> list[Area]:
246                areas = await self._client.areas(lang, provider=provider)
247                if id is not None:
248                    return [area] if (area := await areas.by_id(id)) else []
249                if name is not None:
250                    return [area] if (area := await areas.by_name(name)) else []
251                keyword_func = lambda area: xstr(keywords) in (xstr(area.name) + xstr(area.comment))
252                filters = get_filters({keywords: keyword_func})
253                result = [x for x in await areas.get_all() if filters(x)]
254                return pagination(page_size, page, result)
255
256            async def _get_scores(
257                provider: IScoreProvider = Depends(dep_provider),
258                player: PlayerIdentifier = Depends(dep_player),
259            ) -> list[ScoreExtend]:
260                scores = await self._client.scores(player, provider=provider)
261                return scores.scores
262
263            async def _get_regions(
264                provider: IRegionProvider = Depends(dep_provider),
265                player: PlayerIdentifier = Depends(dep_player),
266            ) -> list[PlayerRegion]:
267                return await self._client.regions(player, provider=provider)
268
269            async def _get_players(
270                provider: IPlayerProvider = Depends(dep_provider),
271                player: PlayerIdentifier = Depends(dep_player),
272            ) -> Union[Player, DivingFishPlayer, LXNSPlayer, ArcadePlayer]:
273                return await self._client.players(player, provider=provider)
274
275            async def _get_bests(
276                provider: IScoreProvider = Depends(dep_provider),
277                player: PlayerIdentifier = Depends(dep_player),
278            ) -> PlayerBests:
279                maimai_scores = await self._client.bests(player, provider=provider)
280                return maimai_scores.get_player_bests()
281
282            async def _post_scores(
283                scores: list[Score],
284                provider: IScoreUpdateProvider = Depends(dep_provider),
285                player: PlayerIdentifier = Depends(dep_player),
286            ) -> None:
287                await self._client.updates(player, scores, provider=provider)
288
289            async def _get_plates(
290                plate: str,
291                attr: Literal["remained", "cleared", "played", "all"] = "remained",
292                provider: IScoreProvider = Depends(dep_provider),
293                player: PlayerIdentifier = Depends(dep_player),
294            ) -> list[PlateObject]:
295                plates: MaimaiPlates = await self._client.plates(player, plate, provider=provider)
296                return await getattr(plates, f"get_{attr}")()
297
298            async def _get_minfo(
299                id: Optional[int] = None,
300                title: Optional[str] = None,
301                keywords: Optional[str] = None,
302                provider: IScoreProvider = Depends(dep_provider),
303                player: PlayerIdentifier = Depends(dep_player),
304            ) -> Optional[PlayerSong]:
305                song_trait = id if id is not None else title if title is not None else keywords if keywords is not None else None
306                identifier = None if player._is_empty() else player
307                if song_trait is not None:
308                    return await self._client.minfo(song_trait, identifier, provider=provider)
309
310            async def _get_identifiers(
311                code: str,
312                provider: IPlayerIdentifierProvider = Depends(dep_provider),
313            ) -> PlayerIdentifier:
314                return await self._client.identifiers(code, provider=provider)
315
316            bases: list[Callable] = [_get_songs, _get_icons, _get_nameplates, _get_frames, _get_trophies, _get_charas, _get_partners, _get_areas]
317            players: list[Callable] = [_get_scores, _get_regions, _get_players, _get_bests, _post_scores, _get_plates, _get_minfo, _get_identifiers]
318
319            all = players + (bases if not skip_base else [])
320            try:
321                [try_add_route(func, router, dep_provider) for func in all]
322            except PydanticUndefinedAnnotation:
323                warning(
324                    "Current pydantic version does not support maimai.py API annotations"
325                    "MaimaiRoutes may not work properly."
326                    "Please upgrade pydantic to 2.7+."
327                )
328
329            return router
330
331
332if all([find_spec(p) for p in ["fastapi", "uvicorn", "typer"]]):
333    import typer
334    import uvicorn
335    from fastapi import APIRouter, Depends, FastAPI, Request
336    from fastapi.openapi.utils import get_openapi
337    from fastapi.responses import JSONResponse
338
339    # prepare for ASGI app
340    asgi_app = FastAPI(title="maimai.py API", description="The definitive python wrapper for MaimaiCN related development.")
341    routes = MaimaiRoutes(MaimaiClientMultithreading())  # type: ignore
342
343    # register routes and middlewares
344    asgi_app.include_router(routes.get_router(routes._dep_hybrid, skip_base=False), tags=["base"])
345    asgi_app.include_router(routes.get_router(routes._dep_divingfish, routes._dep_divingfish_player), prefix="/divingfish", tags=["divingfish"])
346    asgi_app.include_router(routes.get_router(routes._dep_lxns, routes._dep_lxns_player), prefix="/lxns", tags=["lxns"])
347    asgi_app.include_router(routes.get_router(routes._dep_arcade, routes._dep_arcade_player), prefix="/arcade", tags=["arcade"])
348
349    def main(
350        host: Annotated[str, typer.Option(help="The host address to bind to.")] = "127.0.0.1",
351        port: Annotated[int, typer.Option(help="The port number to bind to.")] = 8000,
352        redis: Annotated[Optional[str], typer.Option(help="Redis server address, for example: redis://localhost:6379/0.")] = None,
353        lxns_token: Annotated[Optional[str], typer.Option(help="LXNS developer token for LXNS API.")] = None,
354        divingfish_token: Annotated[Optional[str], typer.Option(help="DivingFish developer token for DivingFish API.")] = None,
355        arcade_proxy: Annotated[Optional[str], typer.Option(help="HTTP proxy for Arcade API.")] = None,
356        with_curves: Annotated[bool, typer.Option(help="Whether to fetch curves from Divingfish.")] = False,
357    ):
358        # prepare for redis cache backend
359        redis_backend = UNSET
360        if redis and find_spec("redis"):
361            from aiocache import RedisCache
362            from aiocache.serializers import PickleSerializer
363
364            redis_url = urlparse(redis)
365            redis_backend = RedisCache(
366                serializer=PickleSerializer(),
367                endpoint=unquote(redis_url.hostname or "localhost"),
368                port=redis_url.port or 6379,
369                password=redis_url.password,
370                db=int(unquote(redis_url.path).replace("/", "")),
371            )
372
373        # override the default maimai.py client
374        routes._client._cache = routes._client._cache if isinstance(redis_backend, _UnsetSentinel) else redis_backend
375        routes._lxns_token = lxns_token or os.environ.get("LXNS_DEVELOPER_TOKEN")
376        routes._divingfish_token = divingfish_token or os.environ.get("DIVINGFISH_DEVELOPER_TOKEN")
377        routes._arcade_proxy = arcade_proxy
378        routes._with_curves = with_curves
379
380        @asgi_app.exception_handler(MaimaiPyError)
381        async def exception_handler(request: Request, exc: MaimaiPyError):
382            return JSONResponse(
383                status_code=400,
384                content={"message": f"Oops! There goes a maimai.py error {exc}.", "details": repr(exc)},
385            )
386
387        @asgi_app.get("/", include_in_schema=False)
388        async def root():
389            return {"message": "Hello, maimai.py! Check /docs for more information."}
390
391        @asgi_app.on_event("startup")
392        async def startup_event():
393            if routes._with_curves:
394                curve_provider = DivingFishProvider(developer_token=routes._divingfish_token)
395                logging.info("with_curves is enabled, pre-fetching curves from DivingFish.")
396                await routes._client.songs(provider=HybridProvider(), curve_provider=curve_provider)
397
398        # run the ASGI app with uvicorn
399        uvicorn.run(asgi_app, host=host, port=port)
400
401    def openapi():
402        specs = get_openapi(
403            title=asgi_app.title,
404            version=asgi_app.version,
405            openapi_version=asgi_app.openapi_version,
406            description=asgi_app.description,
407            routes=asgi_app.routes,
408        )
409        with open(f"openapi.json", "w") as f:
410            json.dump(specs, f)
411
412    if __name__ == "__main__":
413        typer.run(main)
414
415
416if find_spec("maimai_ffi") and find_spec("nuitka"):
417    import json
418
419    import cryptography
420    import cryptography.fernet
421    import cryptography.hazmat.backends
422    import cryptography.hazmat.primitives.ciphers
423    import maimai_ffi
424    import maimai_ffi.model
425    import maimai_ffi.request
426    import redis
PlateAttrs = typing.Literal['remained', 'cleared', 'played', 'all']
def xstr(s: Optional[str]) -> str:
19def xstr(s: Optional[str]) -> str:
20    return "" if s is None else str(s).lower()
def istr(i: Optional[list]) -> str:
23def istr(i: Optional[list]) -> str:
24    return "" if i is None else "".join(i).lower()
def pagination(page_size, page, data):
27def pagination(page_size, page, data):
28    total_pages = (len(data) + page_size - 1) // page_size
29    if page < 1 or page > total_pages:
30        return []
31
32    start = (page - 1) * page_size
33    end = page * page_size
34    return data[start:end]
def get_filters(functions: dict[typing.Any, typing.Callable[..., bool]]):
37def get_filters(functions: dict[Any, Callable[..., bool]]):
38    union = [flag for cond, flag in functions.items() if cond is not None]
39    filter = lambda obj: all([flag(obj) for flag in union])
40    return filter
class MaimaiRoutes:
 48    class MaimaiRoutes:
 49        _client: MaimaiClient
 50        _with_curves: bool
 51
 52        _lxns_token: Optional[str] = None
 53        _divingfish_token: Optional[str] = None
 54        _arcade_proxy: Optional[str] = None
 55
 56        def __init__(
 57            self,
 58            client: MaimaiClient,
 59            lxns_token: Optional[str] = None,
 60            divingfish_token: Optional[str] = None,
 61            arcade_proxy: Optional[str] = None,
 62            with_curves: bool = False,
 63        ):
 64            self._client = client
 65            self._lxns_token = lxns_token
 66            self._divingfish_token = divingfish_token
 67            self._arcade_proxy = arcade_proxy
 68            self._with_curves = with_curves
 69
 70        def _dep_lxns_player(self, credentials: Optional[str] = None, friend_code: Optional[int] = None, qq: Optional[int] = None):
 71            return PlayerIdentifier(credentials=credentials, qq=qq, friend_code=friend_code)
 72
 73        def _dep_divingfish_player(self, username: Optional[str] = None, credentials: Optional[str] = None, qq: Optional[int] = None):
 74            return PlayerIdentifier(qq=qq, credentials=credentials, username=username)
 75
 76        def _dep_arcade_player(self, credentials: str):
 77            return PlayerIdentifier(credentials=credentials)
 78
 79        def _dep_divingfish(self) -> IProvider:
 80            return DivingFishProvider(developer_token=self._divingfish_token)
 81
 82        def _dep_lxns(self) -> IProvider:
 83            return LXNSProvider(developer_token=self._lxns_token)
 84
 85        def _dep_arcade(self) -> IProvider:
 86            return ArcadeProvider(http_proxy=self._arcade_proxy)
 87
 88        def _dep_hybrid(self) -> IProvider:
 89            return HybridProvider()
 90
 91        def get_router(self, dep_provider: Callable, dep_player: Optional[Callable] = None, skip_base: bool = True) -> APIRouter:
 92            router = APIRouter()
 93
 94            def try_add_route(func: Callable, router: APIRouter, dep_provider: Callable):
 95                provider_type = func.__annotations__.get("provider")
 96                if provider_type and isinstance(dep_provider(), provider_type):
 97                    method = "GET" if "get_" in func.__name__ else "POST"
 98                    response_model = func.__annotations__.get("return")
 99                    router.add_api_route(
100                        f"/{func.__name__.split('_')[-1]}",
101                        func,
102                        name=f"{func.__name__}",
103                        methods=[method],
104                        response_model=response_model,
105                        description=func.__doc__,
106                    )
107
108            async def _get_songs(
109                id: Optional[int] = None,
110                title: Optional[str] = None,
111                artist: Optional[str] = None,
112                genre: Optional[Genre] = None,
113                bpm: Optional[int] = None,
114                map: Optional[str] = None,
115                version: Optional[int] = None,
116                type: Optional[SongType] = None,
117                level: Optional[str] = None,
118                versions: Optional[Version] = None,
119                keywords: Optional[str] = None,
120                page: int = Query(1, ge=1),
121                page_size: int = Query(100, ge=1),
122                provider: ISongProvider = Depends(dep_provider),
123            ) -> list[Song]:
124                curve_provider = DivingFishProvider(developer_token=self._divingfish_token) if self._with_curves else None
125                maimai_songs: MaimaiSongs = await self._client.songs(provider=provider, curve_provider=curve_provider)
126                type_func: Callable[[Song], bool] = lambda song: song.get_difficulties(type) != []  # type: ignore
127                level_func: Callable[[Song], bool] = lambda song: any([diff.level == level for diff in song.get_difficulties()])
128                versions_func: Callable[[Song], bool] = lambda song: versions.value <= song.version < all_versions[all_versions.index(versions) + 1].value  # type: ignore
129                keywords_func: Callable[[Song], bool] = lambda song: xstr(keywords) in xstr(song.title) + xstr(song.artist) + istr(song.aliases)
130                songs = await maimai_songs.filter(id=id, title=title, artist=artist, genre=genre, bpm=bpm, map=map, version=version)
131                filters = get_filters({type: type_func, level: level_func, versions: versions_func, keywords: keywords_func})
132                result = [song for song in songs if filters(song)]
133                return pagination(page_size, page, result)
134
135            async def _get_icons(
136                id: Optional[int] = None,
137                name: Optional[str] = None,
138                description: Optional[str] = None,
139                genre: Optional[str] = None,
140                keywords: Optional[str] = None,
141                page: int = Query(1, ge=1),
142                page_size: int = Query(100, ge=1),
143                provider: IItemListProvider = Depends(dep_provider),
144            ) -> list[PlayerIcon]:
145                items = await self._client.items(PlayerIcon, provider=provider)
146                if id is not None:
147                    return [item] if (item := await items.by_id(id)) else []
148                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
149                filters = get_filters({keywords: keyword_func})
150                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
151                return pagination(page_size, page, result)
152
153            async def _get_nameplates(
154                id: Optional[int] = None,
155                name: Optional[str] = None,
156                description: Optional[str] = None,
157                genre: Optional[str] = None,
158                keywords: Optional[str] = None,
159                page: int = Query(1, ge=1),
160                page_size: int = Query(100, ge=1),
161                provider: IItemListProvider = Depends(dep_provider),
162            ) -> list[PlayerNamePlate]:
163                items = await self._client.items(PlayerNamePlate, provider=provider)
164                if id is not None:
165                    return [item] if (item := await items.by_id(id)) else []
166                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
167                filters = get_filters({keywords: keyword_func})
168                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
169                return pagination(page_size, page, result)
170
171            async def _get_frames(
172                id: Optional[int] = None,
173                name: Optional[str] = None,
174                description: Optional[str] = None,
175                genre: Optional[str] = None,
176                keywords: Optional[str] = None,
177                page: int = Query(1, ge=1),
178                page_size: int = Query(100, ge=1),
179                provider: IItemListProvider = Depends(dep_provider),
180            ) -> list[PlayerFrame]:
181                items = await self._client.items(PlayerFrame, provider=provider)
182                if id is not None:
183                    return [item] if (item := await items.by_id(id)) else []
184                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
185                filters = get_filters({keywords: keyword_func})
186                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
187                return pagination(page_size, page, result)
188
189            async def _get_trophies(
190                id: Optional[int] = None,
191                name: Optional[str] = None,
192                color: Optional[str] = None,
193                keywords: Optional[str] = None,
194                page: int = Query(1, ge=1),
195                page_size: int = Query(100, ge=1),
196                provider: IItemListProvider = Depends(dep_provider),
197            ) -> list[PlayerTrophy]:
198                items = await self._client.items(PlayerTrophy, provider=provider)
199                if id is not None:
200                    return [item] if (item := await items.by_id(id)) else []
201                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.color))
202                filters = get_filters({keywords: keyword_func})
203                result = [x for x in await items.filter(name=name, color=color) if filters(x)]
204                return pagination(page_size, page, result)
205
206            async def _get_charas(
207                id: Optional[int] = None,
208                name: Optional[str] = None,
209                keywords: Optional[str] = None,
210                page: int = Query(1, ge=1),
211                page_size: int = Query(100, ge=1),
212                provider: IItemListProvider = Depends(dep_provider),
213            ) -> list[PlayerChara]:
214                items = await self._client.items(PlayerChara, provider=provider)
215                if id is not None:
216                    return [item] if (item := await items.by_id(id)) else []
217                keyword_func = lambda chara: xstr(keywords) in xstr(chara.name)
218                filters = get_filters({keywords: keyword_func})
219                result = [x for x in await items.filter(name=name) if filters(x)]
220                return pagination(page_size, page, result)
221
222            async def _get_partners(
223                id: Optional[int] = None,
224                name: Optional[str] = None,
225                keywords: Optional[str] = None,
226                page: int = Query(1, ge=1),
227                page_size: int = Query(100, ge=1),
228                provider: IItemListProvider = Depends(dep_provider),
229            ) -> list[PlayerPartner]:
230                items = await self._client.items(PlayerPartner, provider=provider)
231                if id is not None:
232                    return [item] if (item := await items.by_id(id)) else []
233                keyword_func = lambda partner: xstr(keywords) in xstr(partner.name)
234                filters = get_filters({keywords: keyword_func})
235                result = [x for x in await items.filter(name=name) if filters(x)]
236                return pagination(page_size, page, result)
237
238            async def _get_areas(
239                lang: Literal["ja", "zh"] = "ja",
240                id: Optional[str] = None,
241                name: Optional[str] = None,
242                keywords: Optional[str] = None,
243                page: int = Query(1, ge=1),
244                page_size: int = Query(100, ge=1),
245                provider: IAreaProvider = Depends(dep_provider),
246            ) -> list[Area]:
247                areas = await self._client.areas(lang, provider=provider)
248                if id is not None:
249                    return [area] if (area := await areas.by_id(id)) else []
250                if name is not None:
251                    return [area] if (area := await areas.by_name(name)) else []
252                keyword_func = lambda area: xstr(keywords) in (xstr(area.name) + xstr(area.comment))
253                filters = get_filters({keywords: keyword_func})
254                result = [x for x in await areas.get_all() if filters(x)]
255                return pagination(page_size, page, result)
256
257            async def _get_scores(
258                provider: IScoreProvider = Depends(dep_provider),
259                player: PlayerIdentifier = Depends(dep_player),
260            ) -> list[ScoreExtend]:
261                scores = await self._client.scores(player, provider=provider)
262                return scores.scores
263
264            async def _get_regions(
265                provider: IRegionProvider = Depends(dep_provider),
266                player: PlayerIdentifier = Depends(dep_player),
267            ) -> list[PlayerRegion]:
268                return await self._client.regions(player, provider=provider)
269
270            async def _get_players(
271                provider: IPlayerProvider = Depends(dep_provider),
272                player: PlayerIdentifier = Depends(dep_player),
273            ) -> Union[Player, DivingFishPlayer, LXNSPlayer, ArcadePlayer]:
274                return await self._client.players(player, provider=provider)
275
276            async def _get_bests(
277                provider: IScoreProvider = Depends(dep_provider),
278                player: PlayerIdentifier = Depends(dep_player),
279            ) -> PlayerBests:
280                maimai_scores = await self._client.bests(player, provider=provider)
281                return maimai_scores.get_player_bests()
282
283            async def _post_scores(
284                scores: list[Score],
285                provider: IScoreUpdateProvider = Depends(dep_provider),
286                player: PlayerIdentifier = Depends(dep_player),
287            ) -> None:
288                await self._client.updates(player, scores, provider=provider)
289
290            async def _get_plates(
291                plate: str,
292                attr: Literal["remained", "cleared", "played", "all"] = "remained",
293                provider: IScoreProvider = Depends(dep_provider),
294                player: PlayerIdentifier = Depends(dep_player),
295            ) -> list[PlateObject]:
296                plates: MaimaiPlates = await self._client.plates(player, plate, provider=provider)
297                return await getattr(plates, f"get_{attr}")()
298
299            async def _get_minfo(
300                id: Optional[int] = None,
301                title: Optional[str] = None,
302                keywords: Optional[str] = None,
303                provider: IScoreProvider = Depends(dep_provider),
304                player: PlayerIdentifier = Depends(dep_player),
305            ) -> Optional[PlayerSong]:
306                song_trait = id if id is not None else title if title is not None else keywords if keywords is not None else None
307                identifier = None if player._is_empty() else player
308                if song_trait is not None:
309                    return await self._client.minfo(song_trait, identifier, provider=provider)
310
311            async def _get_identifiers(
312                code: str,
313                provider: IPlayerIdentifierProvider = Depends(dep_provider),
314            ) -> PlayerIdentifier:
315                return await self._client.identifiers(code, provider=provider)
316
317            bases: list[Callable] = [_get_songs, _get_icons, _get_nameplates, _get_frames, _get_trophies, _get_charas, _get_partners, _get_areas]
318            players: list[Callable] = [_get_scores, _get_regions, _get_players, _get_bests, _post_scores, _get_plates, _get_minfo, _get_identifiers]
319
320            all = players + (bases if not skip_base else [])
321            try:
322                [try_add_route(func, router, dep_provider) for func in all]
323            except PydanticUndefinedAnnotation:
324                warning(
325                    "Current pydantic version does not support maimai.py API annotations"
326                    "MaimaiRoutes may not work properly."
327                    "Please upgrade pydantic to 2.7+."
328                )
329
330            return router
MaimaiRoutes( client: maimai_py.maimai.MaimaiClient, lxns_token: Optional[str] = None, divingfish_token: Optional[str] = None, arcade_proxy: Optional[str] = None, with_curves: bool = False)
56        def __init__(
57            self,
58            client: MaimaiClient,
59            lxns_token: Optional[str] = None,
60            divingfish_token: Optional[str] = None,
61            arcade_proxy: Optional[str] = None,
62            with_curves: bool = False,
63        ):
64            self._client = client
65            self._lxns_token = lxns_token
66            self._divingfish_token = divingfish_token
67            self._arcade_proxy = arcade_proxy
68            self._with_curves = with_curves
def get_router( self, dep_provider: Callable, dep_player: Optional[Callable] = None, skip_base: bool = True) -> fastapi.routing.APIRouter:
 91        def get_router(self, dep_provider: Callable, dep_player: Optional[Callable] = None, skip_base: bool = True) -> APIRouter:
 92            router = APIRouter()
 93
 94            def try_add_route(func: Callable, router: APIRouter, dep_provider: Callable):
 95                provider_type = func.__annotations__.get("provider")
 96                if provider_type and isinstance(dep_provider(), provider_type):
 97                    method = "GET" if "get_" in func.__name__ else "POST"
 98                    response_model = func.__annotations__.get("return")
 99                    router.add_api_route(
100                        f"/{func.__name__.split('_')[-1]}",
101                        func,
102                        name=f"{func.__name__}",
103                        methods=[method],
104                        response_model=response_model,
105                        description=func.__doc__,
106                    )
107
108            async def _get_songs(
109                id: Optional[int] = None,
110                title: Optional[str] = None,
111                artist: Optional[str] = None,
112                genre: Optional[Genre] = None,
113                bpm: Optional[int] = None,
114                map: Optional[str] = None,
115                version: Optional[int] = None,
116                type: Optional[SongType] = None,
117                level: Optional[str] = None,
118                versions: Optional[Version] = None,
119                keywords: Optional[str] = None,
120                page: int = Query(1, ge=1),
121                page_size: int = Query(100, ge=1),
122                provider: ISongProvider = Depends(dep_provider),
123            ) -> list[Song]:
124                curve_provider = DivingFishProvider(developer_token=self._divingfish_token) if self._with_curves else None
125                maimai_songs: MaimaiSongs = await self._client.songs(provider=provider, curve_provider=curve_provider)
126                type_func: Callable[[Song], bool] = lambda song: song.get_difficulties(type) != []  # type: ignore
127                level_func: Callable[[Song], bool] = lambda song: any([diff.level == level for diff in song.get_difficulties()])
128                versions_func: Callable[[Song], bool] = lambda song: versions.value <= song.version < all_versions[all_versions.index(versions) + 1].value  # type: ignore
129                keywords_func: Callable[[Song], bool] = lambda song: xstr(keywords) in xstr(song.title) + xstr(song.artist) + istr(song.aliases)
130                songs = await maimai_songs.filter(id=id, title=title, artist=artist, genre=genre, bpm=bpm, map=map, version=version)
131                filters = get_filters({type: type_func, level: level_func, versions: versions_func, keywords: keywords_func})
132                result = [song for song in songs if filters(song)]
133                return pagination(page_size, page, result)
134
135            async def _get_icons(
136                id: Optional[int] = None,
137                name: Optional[str] = None,
138                description: Optional[str] = None,
139                genre: Optional[str] = None,
140                keywords: Optional[str] = None,
141                page: int = Query(1, ge=1),
142                page_size: int = Query(100, ge=1),
143                provider: IItemListProvider = Depends(dep_provider),
144            ) -> list[PlayerIcon]:
145                items = await self._client.items(PlayerIcon, provider=provider)
146                if id is not None:
147                    return [item] if (item := await items.by_id(id)) else []
148                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
149                filters = get_filters({keywords: keyword_func})
150                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
151                return pagination(page_size, page, result)
152
153            async def _get_nameplates(
154                id: Optional[int] = None,
155                name: Optional[str] = None,
156                description: Optional[str] = None,
157                genre: Optional[str] = None,
158                keywords: Optional[str] = None,
159                page: int = Query(1, ge=1),
160                page_size: int = Query(100, ge=1),
161                provider: IItemListProvider = Depends(dep_provider),
162            ) -> list[PlayerNamePlate]:
163                items = await self._client.items(PlayerNamePlate, provider=provider)
164                if id is not None:
165                    return [item] if (item := await items.by_id(id)) else []
166                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
167                filters = get_filters({keywords: keyword_func})
168                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
169                return pagination(page_size, page, result)
170
171            async def _get_frames(
172                id: Optional[int] = None,
173                name: Optional[str] = None,
174                description: Optional[str] = None,
175                genre: Optional[str] = None,
176                keywords: Optional[str] = None,
177                page: int = Query(1, ge=1),
178                page_size: int = Query(100, ge=1),
179                provider: IItemListProvider = Depends(dep_provider),
180            ) -> list[PlayerFrame]:
181                items = await self._client.items(PlayerFrame, provider=provider)
182                if id is not None:
183                    return [item] if (item := await items.by_id(id)) else []
184                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.description) + xstr(icon.genre))
185                filters = get_filters({keywords: keyword_func})
186                result = [x for x in await items.filter(name=name, description=description, genre=genre) if filters(x)]
187                return pagination(page_size, page, result)
188
189            async def _get_trophies(
190                id: Optional[int] = None,
191                name: Optional[str] = None,
192                color: Optional[str] = None,
193                keywords: Optional[str] = None,
194                page: int = Query(1, ge=1),
195                page_size: int = Query(100, ge=1),
196                provider: IItemListProvider = Depends(dep_provider),
197            ) -> list[PlayerTrophy]:
198                items = await self._client.items(PlayerTrophy, provider=provider)
199                if id is not None:
200                    return [item] if (item := await items.by_id(id)) else []
201                keyword_func = lambda icon: xstr(keywords) in (xstr(icon.name) + xstr(icon.color))
202                filters = get_filters({keywords: keyword_func})
203                result = [x for x in await items.filter(name=name, color=color) if filters(x)]
204                return pagination(page_size, page, result)
205
206            async def _get_charas(
207                id: Optional[int] = None,
208                name: Optional[str] = None,
209                keywords: Optional[str] = None,
210                page: int = Query(1, ge=1),
211                page_size: int = Query(100, ge=1),
212                provider: IItemListProvider = Depends(dep_provider),
213            ) -> list[PlayerChara]:
214                items = await self._client.items(PlayerChara, provider=provider)
215                if id is not None:
216                    return [item] if (item := await items.by_id(id)) else []
217                keyword_func = lambda chara: xstr(keywords) in xstr(chara.name)
218                filters = get_filters({keywords: keyword_func})
219                result = [x for x in await items.filter(name=name) if filters(x)]
220                return pagination(page_size, page, result)
221
222            async def _get_partners(
223                id: Optional[int] = None,
224                name: Optional[str] = None,
225                keywords: Optional[str] = None,
226                page: int = Query(1, ge=1),
227                page_size: int = Query(100, ge=1),
228                provider: IItemListProvider = Depends(dep_provider),
229            ) -> list[PlayerPartner]:
230                items = await self._client.items(PlayerPartner, provider=provider)
231                if id is not None:
232                    return [item] if (item := await items.by_id(id)) else []
233                keyword_func = lambda partner: xstr(keywords) in xstr(partner.name)
234                filters = get_filters({keywords: keyword_func})
235                result = [x for x in await items.filter(name=name) if filters(x)]
236                return pagination(page_size, page, result)
237
238            async def _get_areas(
239                lang: Literal["ja", "zh"] = "ja",
240                id: Optional[str] = None,
241                name: Optional[str] = None,
242                keywords: Optional[str] = None,
243                page: int = Query(1, ge=1),
244                page_size: int = Query(100, ge=1),
245                provider: IAreaProvider = Depends(dep_provider),
246            ) -> list[Area]:
247                areas = await self._client.areas(lang, provider=provider)
248                if id is not None:
249                    return [area] if (area := await areas.by_id(id)) else []
250                if name is not None:
251                    return [area] if (area := await areas.by_name(name)) else []
252                keyword_func = lambda area: xstr(keywords) in (xstr(area.name) + xstr(area.comment))
253                filters = get_filters({keywords: keyword_func})
254                result = [x for x in await areas.get_all() if filters(x)]
255                return pagination(page_size, page, result)
256
257            async def _get_scores(
258                provider: IScoreProvider = Depends(dep_provider),
259                player: PlayerIdentifier = Depends(dep_player),
260            ) -> list[ScoreExtend]:
261                scores = await self._client.scores(player, provider=provider)
262                return scores.scores
263
264            async def _get_regions(
265                provider: IRegionProvider = Depends(dep_provider),
266                player: PlayerIdentifier = Depends(dep_player),
267            ) -> list[PlayerRegion]:
268                return await self._client.regions(player, provider=provider)
269
270            async def _get_players(
271                provider: IPlayerProvider = Depends(dep_provider),
272                player: PlayerIdentifier = Depends(dep_player),
273            ) -> Union[Player, DivingFishPlayer, LXNSPlayer, ArcadePlayer]:
274                return await self._client.players(player, provider=provider)
275
276            async def _get_bests(
277                provider: IScoreProvider = Depends(dep_provider),
278                player: PlayerIdentifier = Depends(dep_player),
279            ) -> PlayerBests:
280                maimai_scores = await self._client.bests(player, provider=provider)
281                return maimai_scores.get_player_bests()
282
283            async def _post_scores(
284                scores: list[Score],
285                provider: IScoreUpdateProvider = Depends(dep_provider),
286                player: PlayerIdentifier = Depends(dep_player),
287            ) -> None:
288                await self._client.updates(player, scores, provider=provider)
289
290            async def _get_plates(
291                plate: str,
292                attr: Literal["remained", "cleared", "played", "all"] = "remained",
293                provider: IScoreProvider = Depends(dep_provider),
294                player: PlayerIdentifier = Depends(dep_player),
295            ) -> list[PlateObject]:
296                plates: MaimaiPlates = await self._client.plates(player, plate, provider=provider)
297                return await getattr(plates, f"get_{attr}")()
298
299            async def _get_minfo(
300                id: Optional[int] = None,
301                title: Optional[str] = None,
302                keywords: Optional[str] = None,
303                provider: IScoreProvider = Depends(dep_provider),
304                player: PlayerIdentifier = Depends(dep_player),
305            ) -> Optional[PlayerSong]:
306                song_trait = id if id is not None else title if title is not None else keywords if keywords is not None else None
307                identifier = None if player._is_empty() else player
308                if song_trait is not None:
309                    return await self._client.minfo(song_trait, identifier, provider=provider)
310
311            async def _get_identifiers(
312                code: str,
313                provider: IPlayerIdentifierProvider = Depends(dep_provider),
314            ) -> PlayerIdentifier:
315                return await self._client.identifiers(code, provider=provider)
316
317            bases: list[Callable] = [_get_songs, _get_icons, _get_nameplates, _get_frames, _get_trophies, _get_charas, _get_partners, _get_areas]
318            players: list[Callable] = [_get_scores, _get_regions, _get_players, _get_bests, _post_scores, _get_plates, _get_minfo, _get_identifiers]
319
320            all = players + (bases if not skip_base else [])
321            try:
322                [try_add_route(func, router, dep_provider) for func in all]
323            except PydanticUndefinedAnnotation:
324                warning(
325                    "Current pydantic version does not support maimai.py API annotations"
326                    "MaimaiRoutes may not work properly."
327                    "Please upgrade pydantic to 2.7+."
328                )
329
330            return router
routes = <MaimaiRoutes object>
def main( host: Annotated[str, <typer.models.OptionInfo object>] = '127.0.0.1', port: Annotated[int, <typer.models.OptionInfo object>] = 8000, redis: Annotated[Optional[str], <typer.models.OptionInfo object>] = None, lxns_token: Annotated[Optional[str], <typer.models.OptionInfo object>] = None, divingfish_token: Annotated[Optional[str], <typer.models.OptionInfo object>] = None, arcade_proxy: Annotated[Optional[str], <typer.models.OptionInfo object>] = None, with_curves: Annotated[bool, <typer.models.OptionInfo object>] = False):
350    def main(
351        host: Annotated[str, typer.Option(help="The host address to bind to.")] = "127.0.0.1",
352        port: Annotated[int, typer.Option(help="The port number to bind to.")] = 8000,
353        redis: Annotated[Optional[str], typer.Option(help="Redis server address, for example: redis://localhost:6379/0.")] = None,
354        lxns_token: Annotated[Optional[str], typer.Option(help="LXNS developer token for LXNS API.")] = None,
355        divingfish_token: Annotated[Optional[str], typer.Option(help="DivingFish developer token for DivingFish API.")] = None,
356        arcade_proxy: Annotated[Optional[str], typer.Option(help="HTTP proxy for Arcade API.")] = None,
357        with_curves: Annotated[bool, typer.Option(help="Whether to fetch curves from Divingfish.")] = False,
358    ):
359        # prepare for redis cache backend
360        redis_backend = UNSET
361        if redis and find_spec("redis"):
362            from aiocache import RedisCache
363            from aiocache.serializers import PickleSerializer
364
365            redis_url = urlparse(redis)
366            redis_backend = RedisCache(
367                serializer=PickleSerializer(),
368                endpoint=unquote(redis_url.hostname or "localhost"),
369                port=redis_url.port or 6379,
370                password=redis_url.password,
371                db=int(unquote(redis_url.path).replace("/", "")),
372            )
373
374        # override the default maimai.py client
375        routes._client._cache = routes._client._cache if isinstance(redis_backend, _UnsetSentinel) else redis_backend
376        routes._lxns_token = lxns_token or os.environ.get("LXNS_DEVELOPER_TOKEN")
377        routes._divingfish_token = divingfish_token or os.environ.get("DIVINGFISH_DEVELOPER_TOKEN")
378        routes._arcade_proxy = arcade_proxy
379        routes._with_curves = with_curves
380
381        @asgi_app.exception_handler(MaimaiPyError)
382        async def exception_handler(request: Request, exc: MaimaiPyError):
383            return JSONResponse(
384                status_code=400,
385                content={"message": f"Oops! There goes a maimai.py error {exc}.", "details": repr(exc)},
386            )
387
388        @asgi_app.get("/", include_in_schema=False)
389        async def root():
390            return {"message": "Hello, maimai.py! Check /docs for more information."}
391
392        @asgi_app.on_event("startup")
393        async def startup_event():
394            if routes._with_curves:
395                curve_provider = DivingFishProvider(developer_token=routes._divingfish_token)
396                logging.info("with_curves is enabled, pre-fetching curves from DivingFish.")
397                await routes._client.songs(provider=HybridProvider(), curve_provider=curve_provider)
398
399        # run the ASGI app with uvicorn
400        uvicorn.run(asgi_app, host=host, port=port)
def openapi():
402    def openapi():
403        specs = get_openapi(
404            title=asgi_app.title,
405            version=asgi_app.version,
406            openapi_version=asgi_app.openapi_version,
407            description=asgi_app.description,
408            routes=asgi_app.routes,
409        )
410        with open(f"openapi.json", "w") as f:
411            json.dump(specs, f)