Source code for geeViz.mcp.server

"""
geeViz MCP Server -- execution and introspection tools for Earth Engine via geeViz.

Unlike static doc snippets, this server executes code, inspects live GEE assets,
and dynamically queries API signatures. 21 tools.
"""
from __future__ import annotations

import os
import sys
print('Importing geeViz MCP Server from', __file__)
# ---------------------------------------------------------------------------
# CLI argument parsing (before heavy imports so --help is instant)
# ---------------------------------------------------------------------------
_SANDBOX_ENABLED: bool | None = None  # will be resolved in main()
_audit_user_code_active = False  # True only during user code execution in sandbox

# Parse --sandbox / --no-sandbox early so _help can document them
for _arg in sys.argv[1:]:
    if _arg == "--sandbox":
        _SANDBOX_ENABLED = True
        os.environ["GEEVIZ_SANDBOX"] = "1"
    elif _arg == "--no-sandbox":
        _SANDBOX_ENABLED = False

# ---------------------------------------------------------------------------
# Audit hook — runtime-level defense that cannot be bypassed from Python.
# Catches os.system, subprocess.Popen, open(), exec(), import of blocked
# modules, etc. even when accessed via module attribute traversal
# (e.g. gv.os.system). Only active when --sandbox is set.
# ---------------------------------------------------------------------------
if _SANDBOX_ENABLED:
    _AUDIT_BLOCKED_IMPORTS = frozenset({
        "os", "subprocess", "shutil", "pathlib", "socket", "http",
        "urllib", "requests", "ctypes", "signal", "threading",
        "multiprocessing", "webbrowser", "tempfile", "code", "codeop",
        "pty", "pipes", "resource", "pickle", "shelve", "xmlrpc",
    })
    # Track whether we're inside the server's own init (allow) or user code (block)
    _audit_user_code_active = False

    def _called_from_trusted_lib():
        """True ONLY if the call is from inside trusted library code with NO user
        code frame between the call site and the trusted frame.

        Walks the stack from the syscall site upward:
        - If we hit user code (filename '<mcp>') before any trusted frame → DENY
        - If we hit a trusted frame first → ALLOW
        - stdlib/site-packages frames are skipped (transparent)

        This prevents the exploit where user code passes a callback to a trusted
        function that calls back into user code which tries the syscall — that
        callback's user frame would be above the trusted frame, so it's denied.
        """
        import inspect as _ins
        _trusted_substrings = ("geeViz\\outputLib", "geeViz/outputLib", "kaleido", "plotly")
        try:
            f = _ins.currentframe()
            while f is not None:
                fname = (f.f_code.co_filename or "")
                fname_lower = fname.lower()
                # User code lives in '<mcp>' frames (from exec(compile(..., '<mcp>')))
                if "<mcp>" in fname or "<string>" in fname:
                    return False  # User code is closer than any trusted lib → deny
                if any(s.lower() in fname_lower for s in _trusted_substrings):
                    return True
                f = f.f_back
        except Exception:
            pass
        return False

    def _sandbox_audit_hook(event, args):
        if not _audit_user_code_active:
            return  # Allow server's own operations
        if event == "import":
            mod_name = args[0].split(".")[0] if args[0] else ""
            if mod_name in _AUDIT_BLOCKED_IMPORTS:
                # Allow trusted library code (e.g. kaleido inside cl.save_chart_png)
                if _called_from_trusted_lib():
                    return
                raise ImportError(
                    f"Sandbox: import of '{args[0]}' is blocked. "
                    f"Only Earth Engine, geeViz, and standard data libraries are allowed."
                )
        elif event == "os.system":
            if _called_from_trusted_lib():
                return
            raise PermissionError("Sandbox: os.system() is blocked.")
        elif event == "subprocess.Popen":
            if _called_from_trusted_lib():
                return
            raise PermissionError("Sandbox: subprocess is blocked.")
        elif event in ("os.exec", "os.posix_spawn", "os.spawn"):
            if _called_from_trusted_lib():
                return
            raise PermissionError(f"Sandbox: {event} is blocked.")

    sys.addaudithook(_sandbox_audit_hook)


if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
    _help = """usage: python -m geeViz.mcp.server [--help] [--sandbox | --no-sandbox]

geeViz MCP Server -- execution and introspection for Earth Engine via geeViz.

Options:
  -h, --help      Show this help and exit.
  --sandbox       Force run_code sandbox ON (block os, open, eval, etc.)
  --no-sandbox    Force run_code sandbox OFF (full Python access)

  Default: sandbox is OFF for stdio transport (local IDE use) and ON for
  streamable-http when binding to non-localhost (remote/cloud deployment).

Environment (optional):
  MCP_TRANSPORT   Transport: "stdio" (default) or "streamable-http"
  MCP_HOST        Host for HTTP (default: 127.0.0.1)
  MCP_PORT        Port for HTTP (default: 8000)
  MCP_PATH        Path for HTTP (default: /mcp)

Tools (12):
  run_code                 Execute Python/GEE code in a persistent REPL namespace
  inspect_asset            Get metadata for any GEE asset (with optional collection filters)
  search_geeviz            Search geeViz modules, functions, classes, dicts, variables, examples, and REPL modules
  map_control              View, export, preview, list layers, clear, or test the geeView map (action=view|export|preview|layers|clear|test_layers)
  save_session             Save run_code history to a .py file or .ipynb notebook
  env_info                 Get versions, REPL namespace, or project info (action=version|namespace|project)
  export_image             Export ee.Image to asset, Drive, or Cloud Storage (destination=asset|drive|cloud)
  search_datasets          Search the GEE dataset catalog by keyword
  manage_asset             Delete, copy, move, create folder, or update ACL (action=delete|copy|move|create|update_acl)
  get_streetview           Get Google Street View imagery at a location for ground-truthing
  geeviz_search_places     Search for places, landmarks, or businesses using Google Places API

Examples:
  python -m geeViz.mcp.server                   # stdio, no sandbox (default)
  python -m geeViz.mcp.server --sandbox          # stdio with sandbox
  MCP_TRANSPORT=streamable-http python -m geeViz.mcp.server  # HTTP, auto-sandbox
  python -m geeViz.mcp --help

See also: geeViz/mcp/README.md
"""
    print(_help, file=sys.stderr)
    sys.exit(0)

import importlib.util

# Path setup: ensure geeViz and package root are on path
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))  # .../geeViz/mcp
_GEEVIZ_DIR = os.path.dirname(_THIS_DIR)               # .../geeViz
_PACKAGE_ROOT = os.path.dirname(_GEEVIZ_DIR)            # .../geeVizBuilder
_EXAMPLES_DIR = os.path.join(_GEEVIZ_DIR, "examples")
sys.path = [p for p in sys.path if not (p.rstrip(os.sep).endswith("mcp") and _GEEVIZ_DIR in (p or ""))]
if _PACKAGE_ROOT not in sys.path:
    sys.path.insert(0, _PACKAGE_ROOT)
if _GEEVIZ_DIR not in sys.path:
    sys.path.append(_GEEVIZ_DIR)


# Load FastMCP from the MCP SDK. When run as python -m geeViz.mcp.server, the name "mcp"
# resolves to this package (geeViz.mcp), so we load the SDK's fastmcp by file from site-packages.
# If mcp is not installed (e.g. during Sphinx doc build), a lightweight stub is used so the
# module can still be imported and @app.tool() decorators pass functions through unchanged.
def _load_fastmcp():
    import site as _site
    for _sp in _site.getsitepackages():
        _origin = os.path.join(_sp, "mcp", "server", "fastmcp.py")
        if os.path.isfile(_origin):
            spec = importlib.util.spec_from_file_location("_geeviz_mcp_sdk_fastmcp", _origin)
            if spec and spec.loader:
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                return mod.FastMCP
    try:
        from mcp.server.fastmcp import FastMCP
        return FastMCP
    except ModuleNotFoundError:
        return None


class _StubFastMCP:
    """Lightweight stand-in when the mcp SDK is not installed.

    Makes @app.tool() a no-op passthrough so functions keep their real
    type and docstrings (important for Sphinx autodoc).
    """

    def __init__(self, *args, **kwargs):
        pass

    def tool(self, **kwargs):
        """Return identity decorator -- the function is unchanged."""
        def _identity(fn):
            return fn
        return _identity

    def resource(self, *args, **kwargs):
        def _identity(fn):
            return fn
        return _identity

    def run(self, **kwargs):
        raise RuntimeError("mcp SDK not installed; install with: pip install mcp")


_FastMCP = _load_fastmcp()
FastMCP = _FastMCP if _FastMCP is not None else _StubFastMCP

# Load ToolAnnotations for hinting read-only / destructive / etc.
try:
    from mcp.types import ToolAnnotations
except ImportError:
    # Stub if mcp SDK not installed
    class ToolAnnotations:
        def __init__(self, **kwargs): pass

# Pre-built annotation sets
_READ_ONLY = ToolAnnotations(readOnlyHint=True, idempotentHint=True)
_READ_ONLY_OPEN = ToolAnnotations(readOnlyHint=True, idempotentHint=True, openWorldHint=True)
_WRITE = ToolAnnotations(readOnlyHint=False, destructiveHint=False)
_WRITE_OPEN = ToolAnnotations(readOnlyHint=False, destructiveHint=False, openWorldHint=True)
_DESTRUCTIVE = ToolAnnotations(readOnlyHint=False, destructiveHint=True, openWorldHint=True)


def _load_mcp_image():
    """Load the Image class from the mcp SDK for returning images from tools.

    IMPORTANT: Try the direct import first so we get the exact same Image class
    that FastMCP uses internally. If we load from file (as a standalone module),
    the class identity differs and FastMCP's isinstance() check fails, causing
    images to not display in clients like Cursor.
    """
    # Preferred: direct import matches FastMCP's own Image class
    try:
        from mcp.server.fastmcp.utilities.types import Image
        return Image
    except (ImportError, ModuleNotFoundError, AttributeError):
        pass
    # Fallback: load from file in site-packages (older SDK layouts)
    import site as _site
    for _sp in _site.getsitepackages():
        _types_path = os.path.join(_sp, "mcp", "server", "fastmcp", "utilities", "types.py")
        if os.path.isfile(_types_path):
            try:
                spec = importlib.util.spec_from_file_location("_geeviz_mcp_types", _types_path)
                if spec and spec.loader:
                    mod = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(mod)
                    cls = getattr(mod, "Image", None)
                    if cls is not None:
                        return cls
            except Exception:
                pass
    return None


_MCPImage = _load_mcp_image()


def _load_mcp_context():
    """Load the Context class from the MCP SDK for progress reporting in tools.

    Uses the same importlib approach as _load_fastmcp() to avoid name conflicts
    with the geeViz.mcp package.
    """
    # Preferred: direct import matches FastMCP's own Context class
    try:
        from mcp.server.fastmcp import Context
        return Context
    except (ImportError, ModuleNotFoundError, AttributeError):
        pass
    # Fallback: load from file in site-packages
    import site as _site
    for _sp in _site.getsitepackages():
        _origin = os.path.join(_sp, "mcp", "server", "fastmcp", "server.py")
        if os.path.isfile(_origin):
            try:
                spec = importlib.util.spec_from_file_location("_geeviz_mcp_sdk_context", _origin)
                if spec and spec.loader:
                    mod = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(mod)
                    return mod.Context
            except Exception:
                pass
    return None


_MCPContext = _load_mcp_context()
# Expose as module-level ``Context`` so typing.get_type_hints() can resolve the
# annotation in run_code's signature (required for FastMCP context injection).
Context = _MCPContext

# Load agent instructions from the bundled markdown file.
# These are injected as the MCP server instructions (like a system prompt)
# so every connected client automatically knows how to use the tools.
_INSTRUCTIONS_FILE = os.path.join(_THIS_DIR, "agent-instructions.md")
try:
    with open(_INSTRUCTIONS_FILE, "r", encoding="utf-8") as _f:
        _SERVER_INSTRUCTIONS = _f.read()
    print(f"[geeViz MCP] Loaded instructions: {len(_SERVER_INSTRUCTIONS)} chars, {len(_SERVER_INSTRUCTIONS.split())} words")
except Exception:
    _SERVER_INSTRUCTIONS = None
    print("[geeViz MCP] WARNING: No agent instructions loaded")

app = FastMCP(
    "geeViz",
    instructions=_SERVER_INSTRUCTIONS,
    json_response=True,
) if _FastMCP is not None else _StubFastMCP()

# ---------------------------------------------------------------------------
# Workload-tag plumbing (EE billing attribution)
#
# When the geeViz ADK agent calls a tool, its before_tool_callback stamps
# ``_workload_tag`` into the arguments dict. We pop that off here, call
# ``ee.data.setWorkloadTag(tag)`` for the duration of the tool call, and
# ``ee.data.resetWorkloadTag()`` after — EE's own ContextVar then carries
# the tag into every outbound compute body via the library's existing
# ``_maybe_populate_workload_tag`` path.
#
# Why this hook (ToolManager.call_tool) and not ``app.call_tool``: FastMCP
# captures ``app.call_tool`` as a bound method at ``__init__`` and registers
# that reference with the lowlevel MCP server, so a later monkey-patch on
# ``app.call_tool`` is never reached. ``app._tool_manager.call_tool`` is
# the next layer down and is dereferenced fresh on every dispatch.
#
# Non-agent callers (Claude Code, etc.) just don't send ``_workload_tag``
# and EE's default workload-tag behavior applies — the MCP stays
# client-agnostic.
# ---------------------------------------------------------------------------
import re as _wl_re

_WL_TAG_DISALLOWED = _wl_re.compile(r"[^a-z0-9_\-]")

# ---------------------------------------------------------------------------
# Multi-tenant routing for Earth Engine traffic
#
# When the agent's before_tool_callback stamps ``_tenant`` into a tool's
# arguments, we pop it here and set ``geeViz.eeAuth.client.CURRENT_TENANT``
# for the call duration. The library's ``TenantAwareHttp`` transport
# reads that ContextVar on every outbound EE request and stamps the
# routing header. The proxy in ``run_ui.py`` reads the header, picks the
# right service account from the registry, and signs the request.
#
# This lets ONE Python process serve many tenants concurrently — no
# ``ee.Initialize`` race, no global credential state, no per-tenant
# subprocess. EE's normal SDK behavior is preserved for callers that
# don't send a tenant (Claude Code etc.) — they hit the default SA.
# ---------------------------------------------------------------------------
from geeViz.eeAuth.client import CURRENT_TENANT as _CURRENT_TENANT_CV


def _sanitize_workload_tag(tag: str) -> str:
    """Coerce a tag to EE's accepted character set and 63-char limit.

    EE rule (from ee/_state.py): 1-63 chars, ``[a-z0-9]`` at the ends,
    ``[a-z0-9_-]`` in the middle. No uppercase, no dots, no other punct.
    """
    if not tag:
        return ""
    tag = tag.lower()
    tag = _WL_TAG_DISALLOWED.sub("-", tag)
    tag = _wl_re.sub(r"-{2,}", "-", tag)
    tag = tag.strip("-_")[:63].rstrip("-_")
    return tag


_orig_tool_manager_call_tool = app._tool_manager.call_tool


async def _tool_manager_call_tool_with_workload_tag(
    name, arguments, context=None, convert_result=False
):
    tag = None
    tenant = ""
    if isinstance(arguments, dict):
        raw = arguments.pop("_workload_tag", None)
        if raw:
            tag = _sanitize_workload_tag(str(raw))
        # Tenant is propagated separately from workload tag — proxy reads
        # it from the X-AskTerra-Tenant header set by TenantAwareHttp.
        raw_tenant = arguments.pop("_tenant", None)
        if raw_tenant:
            tenant = str(raw_tenant).strip().lower()

    # ContextVar for tenant: TenantAwareHttp reads it on every outbound
    # EE request and stamps the routing header.
    tenant_token = _CURRENT_TENANT_CV.set(tenant) if tenant else None

    try:
        if not tag:
            return await _orig_tool_manager_call_tool(
                name, arguments, context=context, convert_result=convert_result
            )
        # EE's own setWorkloadTag uses a ContextVar internally, so the tag
        # carries through ``await`` boundaries to whatever EE call user code
        # eventually makes. resetWorkloadTag in ``finally`` keeps the state
        # clean across overlapping tool calls in the same process.
        try:
            import ee.data as _ee_data
            _ee_data.setWorkloadTag(tag)
        except Exception:
            # If EE isn't initialized yet, swallow — the agent's first tool
            # call is usually run_code, which initializes EE before doing
            # any actual compute. Subsequent calls will tag correctly.
            return await _orig_tool_manager_call_tool(
                name, arguments, context=context, convert_result=convert_result
            )
        try:
            return await _orig_tool_manager_call_tool(
                name, arguments, context=context, convert_result=convert_result
            )
        finally:
            try:
                _ee_data.resetWorkloadTag()
            except Exception:
                pass
    finally:
        if tenant_token is not None:
            _CURRENT_TENANT_CV.reset(tenant_token)


app._tool_manager.call_tool = _tool_manager_call_tool_with_workload_tag

# Wrap app.tool() to auto-log every tool invocation
_original_app_tool = app.tool


def _logging_tool_decorator(*dec_args, **dec_kwargs):
    """Replacement for app.tool() that wraps each tool function with logging."""
    original_decorator = _original_app_tool(*dec_args, **dec_kwargs)

    def wrapper(fn):
        import functools
        import inspect as _insp

        @functools.wraps(fn)
        async def logged_fn(*args, **kwargs):
            _log_tool_call(fn.__name__, kwargs)
            try:
                result = fn(*args, **kwargs)
                # Handle both sync and async tool functions
                if _insp.isawaitable(result):
                    result = await result
                _log_tool_call(fn.__name__, kwargs, result=result)
                return result
            except Exception as exc:
                _log_tool_call(fn.__name__, kwargs, error=exc)
                raise

        # If original fn is not async, keep it sync for FastMCP
        if not _insp.iscoroutinefunction(fn):
            @functools.wraps(fn)
            def logged_fn_sync(*args, **kwargs):
                _log_tool_call(fn.__name__, kwargs)
                try:
                    result = fn(*args, **kwargs)
                    _log_tool_call(fn.__name__, kwargs, result=result)
                    return result
                except Exception as exc:
                    _log_tool_call(fn.__name__, kwargs, error=exc)
                    raise
            return original_decorator(logged_fn_sync)

        return original_decorator(logged_fn)

    return wrapper


app.tool = _logging_tool_decorator

# ---------------------------------------------------------------------------
# Tool call logging -- every MCP tool invocation is logged with timestamp,
# tool name, arguments, and status (success/error).
# Log file: <mcp_dir>/logs/tool_calls.log
# ---------------------------------------------------------------------------
import logging as _logging
import datetime as _datetime

_LOG_DIR = os.path.join(_THIS_DIR, "logs")
os.makedirs(_LOG_DIR, exist_ok=True)
_TOOL_LOG_FILE = os.path.join(_LOG_DIR, "tool_calls.log")

_tool_logger = _logging.getLogger("geeViz.mcp.tools")
_tool_logger.setLevel(_logging.DEBUG)
_tool_fh = _logging.FileHandler(_TOOL_LOG_FILE, encoding="utf-8")
_tool_fh.setFormatter(_logging.Formatter("%(message)s"))
_tool_logger.addHandler(_tool_fh)
_log_result_limit = 5000

def _log_tool_call(tool_name: str, args: dict, result=None, error=None):
    """Log a tool invocation to the tool_calls.log file."""
    ts = _datetime.datetime.now().isoformat(timespec="seconds")
    # Truncate large arg values for readability
    clean_args = {}
    for k, v in (args or {}).items():
        s = str(v)
        clean_args[k] = s[:_log_result_limit] + "..." if len(s) > _log_result_limit else s
    entry = {
        "timestamp": ts,
        "tool": tool_name,
        "args": clean_args,
    }
    if error:
        entry["status"] = "ERROR"
        entry["error"] = str(error)[:_log_result_limit]
    else:
        result_str = str(result)
        entry["status"] = "OK"
        entry["result_preview"] = result_str[:_log_result_limit] + "..." if len(result_str) > _log_result_limit else result_str
    import json as _json_log
    _tool_logger.info(_json_log.dumps(entry))


# ---------------------------------------------------------------------------
# Lazy initialization -- defer all geeViz imports until first tool call
# that needs them.  Every geeViz module import triggers robustInitializer()
# at module level, so we must not import at top level.
# ---------------------------------------------------------------------------
import threading
import json

_init_lock = threading.Lock()
_initialized = False

# Dynamic module tree — populated at init time by _build_module_tree()
_MODULE_TREE = {}  # short_name -> {"fq": fully_qualified_path, "mod": module_object}
_MODULE_MAP = {}   # short_name -> fully_qualified_path (backward compat)

# Packages to skip during module discovery
# - mcp: this server itself (circular)
# - examples: importing them triggers EE calls and side effects (use `examples` tool instead)
_SKIP_PACKAGES = {"geeViz.mcp", "geeViz.examples", "geeViz.migrateGEEAssets"}


def _ast_extract_members(filepath):
    """Parse a .py file with AST and extract public members without importing.

    Returns a list of dicts: {name, type, signature, docstring}.
    """
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            source = f.read()
        tree = ast.parse(source)
    except Exception:
        return [], ""

    module_doc = ast.get_docstring(tree) or ""
    members = []

    for node in tree.body:
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            if node.name.startswith("_"):
                continue
            # Build signature from args
            args = node.args
            params = []
            # Positional args (skip 'self' if present)
            all_args = [a.arg for a in args.args]
            defaults = [None] * (len(all_args) - len(args.defaults)) + [ast.dump(d) for d in args.defaults]
            for arg_name, default in zip(all_args, defaults):
                if arg_name == "self":
                    continue
                params.append(f"{arg_name}=..." if default else arg_name)
            if args.vararg:
                params.append(f"*{args.vararg.arg}")
            for kw, default in zip(args.kwonlyargs, args.kw_defaults):
                params.append(f"{kw.arg}=..." if default else kw.arg)
            if args.kwarg:
                params.append(f"**{args.kwarg.arg}")
            sig = f"{node.name}({', '.join(params)})"
            doc = ast.get_docstring(node) or ""
            first_line = doc.split("\n")[0].strip() if doc else ""
            members.append({"name": node.name, "type": "function", "signature": sig, "description": first_line, "docstring": doc})

        elif isinstance(node, ast.ClassDef):
            if node.name.startswith("_"):
                continue
            doc = ast.get_docstring(node) or ""
            first_line = doc.split("\n")[0].strip() if doc else ""
            # Extract public methods
            methods = []
            for item in node.body:
                if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)) and not item.name.startswith("_"):
                    methods.append(item.name)
            members.append({"name": node.name, "type": "class", "description": first_line, "docstring": doc, "methods": methods})

        elif isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name) and not target.id.startswith("_"):
                    entry = {"name": target.id, "type": "variable", "description": ""}
                    try:
                        entry["value"] = ast.literal_eval(node.value)
                    except (ValueError, TypeError):
                        # Store source text so the agent can read the expression
                        try:
                            entry["value"] = ast.unparse(node.value)
                        except Exception:
                            pass
                    members.append(entry)

        elif isinstance(node, ast.AnnAssign):
            if isinstance(node.target, ast.Name) and not node.target.id.startswith("_"):
                entry = {"name": node.target.id, "type": "variable", "description": ""}
                if node.value is not None:
                    try:
                        entry["value"] = ast.literal_eval(node.value)
                    except (ValueError, TypeError):
                        try:
                            entry["value"] = ast.unparse(node.value)
                        except Exception:
                            pass
                members.append(entry)

    return members, module_doc


def _list_example_files():
    """Return sorted list of example filenames."""
    if not os.path.isdir(_EXAMPLES_DIR):
        return []
    return sorted(f for f in os.listdir(_EXAMPLES_DIR)
                  if (f.endswith(".py") or f.endswith(".ipynb")) and f != "__init__.py")


def _read_example_source(filename, name, description=""):
    """Read an example file and return JSON with its source."""
    fpath = os.path.join(_EXAMPLES_DIR, filename)
    if filename.endswith(".py"):
        try:
            with open(fpath, "r", encoding="utf-8") as f:
                return json.dumps({"name": name, "file": filename, "type": "python",
                                   "description": description, "source": f.read()})
        except Exception as exc:
            return json.dumps({"error": f"Failed to read {filename}: {exc}"})
    elif filename.endswith(".ipynb"):
        try:
            with open(fpath, "r", encoding="utf-8") as f:
                nb = json.load(f)
            cells = [{"cell_type": c.get("cell_type", ""), "source": "".join(c.get("source", []))}
                     for c in nb.get("cells", []) if "".join(c.get("source", [])).strip()]
            return json.dumps({"name": name, "file": filename, "type": "notebook",
                               "description": description, "cells": cells})
        except Exception as exc:
            return json.dumps({"error": f"Failed to read notebook: {exc}"})
    return json.dumps({"error": f"Unknown file type: {filename}"})


def _build_module_tree():
    """Walk the geeViz package and build a searchable index using AST.

    No modules are imported. Populates ``_MODULE_TREE`` with module paths
    and pre-parsed member indices (names, types, signatures, docstrings).
    Modules are only imported on-demand when live values are needed
    (e.g. dict contents via ``search_geeviz(name=...)``).
    """
    global _MODULE_TREE, _MODULE_MAP
    import pkgutil
    import geeViz

    tree = {}
    fq_map = {}

    for importer, modname, ispkg in pkgutil.walk_packages(
        geeViz.__path__, prefix="geeViz."
    ):
        # Skip excluded packages
        if any(modname == skip or modname.startswith(skip + ".") for skip in _SKIP_PACKAGES):
            continue
        # Skip private modules
        leaf = modname.rsplit(".", 1)[-1]
        if leaf.startswith("_"):
            continue

        # Find the source file without importing
        try:
            spec = importlib.util.find_spec(modname)
            if spec is None or spec.origin is None:
                continue
        except (ModuleNotFoundError, ValueError):
            continue

        # Parse with AST
        members, module_doc = _ast_extract_members(spec.origin)
        first_line = module_doc.split("\n")[0].strip()[:100] if module_doc else ""

        short = leaf
        entry = {"fq": modname, "mod": None, "file": spec.origin,
                 "members": members, "doc": first_line}
        tree[modname] = entry
        if short not in tree:
            tree[short] = entry
        fq_map[short] = modname
        fq_map[modname] = modname

    # --- Index examples (AST parse .py, JSON parse .ipynb) ---
    example_members = []
    for fname in _list_example_files():
        fpath = os.path.join(_EXAMPLES_DIR, fname)
        base = fname.rsplit(".", 1)[0]
        desc = ""
        if fname.endswith(".py"):
            try:
                with open(fpath, "r", encoding="utf-8") as f:
                    source = f.read()
                try:
                    ex_tree = ast.parse(source)
                    doc = ast.get_docstring(ex_tree) or ""
                    desc = doc.split("\n")[0].strip()[:100] if doc else ""
                except SyntaxError:
                    pass
                if not desc:
                    for line in source.split("\n")[:20]:
                        s = line.strip()
                        if s.startswith("#") and len(s) > 2:
                            desc = s.lstrip("#").strip()
                            break
            except Exception:
                pass
        elif fname.endswith(".ipynb"):
            try:
                with open(fpath, "r", encoding="utf-8") as f:
                    nb = json.load(f)
                for cell in nb.get("cells", []):
                    if cell.get("cell_type") == "markdown":
                        src = "".join(cell.get("source", [])).strip()
                        if src:
                            desc = src.split("\n")[0].lstrip("#").strip()[:100]
                            break
            except Exception:
                pass
        example_members.append({"name": base, "type": "example", "description": desc or fname, "file": fname})

    tree["examples"] = {
        "fq": "geeViz.examples", "mod": None, "file": _EXAMPLES_DIR,
        "members": example_members, "doc": "geeViz example scripts and notebooks",
    }
    fq_map["examples"] = "geeViz.examples"

    _MODULE_TREE = tree
    _MODULE_MAP = fq_map
    n_mods = len(set(e["fq"] for e in tree.values()))
    n_examples = len(example_members)
    print(f"[geeViz MCP] Module tree: {n_mods} modules, {n_examples} examples indexed (zero imports)")


def _get_module(entry):
    """Lazy-import a module from a tree entry. Caches the result."""
    if entry["mod"] is None:
        try:
            entry["mod"] = importlib.import_module(entry["fq"])
        except Exception as exc:
            print(f"[geeViz MCP] Failed to import {entry['fq']}: {exc}")
            return None
    return entry["mod"]


def _describe_object(obj, name="", module_name="", max_size=20000):
    """Return a JSON-safe description of any Python object."""
    result = {"name": name}
    if module_name:
        result["module"] = module_name

    if _inspect.ismodule(obj):
        members = [m for m in sorted(dir(obj)) if not m.startswith("_")]
        result["type"] = "module"
        result["docstring"] = (_inspect.getdoc(obj) or "")[:500]
        result["public_members"] = members[:200]
        if len(members) > 200:
            result["note"] = f"Showing 200 of {len(members)} members. Use query= to filter."
        return result

    if _inspect.isclass(obj):
        methods = [m for m in sorted(dir(obj)) if not m.startswith("_") and callable(getattr(obj, m, None))]
        result["type"] = "class"
        result["docstring"] = _inspect.getdoc(obj) or ""
        result["public_methods"] = methods
        try:
            result["constructor"] = f"{name}{_inspect.signature(obj.__init__)}"
        except (ValueError, TypeError):
            pass
        return result

    if callable(obj):
        result["type"] = "function"
        try:
            result["signature"] = f"{name}{_inspect.signature(obj)}"
        except (ValueError, TypeError):
            result["signature"] = f"{name}(...)"
        result["docstring"] = _inspect.getdoc(obj) or ""
        return result

    # Non-callable: dict, list, constant, etc.
    result["type"] = type(obj).__name__

    # Check for ee objects — never call getInfo
    try:
        import ee as _ee
        if isinstance(obj, _ee.ComputedObject):
            result["repr"] = repr(obj)[:500]
            return result
    except Exception:
        pass

    if isinstance(obj, dict):
        serialized = json.dumps(_make_serializable(obj), default=str)
        if len(serialized) > max_size:
            result["keys"] = list(obj.keys())[:100]
            # Show sample of first 3 entries
            sample = {k: _make_serializable(v) for k, v in list(obj.items())[:3]}
            result["sample"] = sample
            result["note"] = f"Large dict ({len(obj)} keys). Showing keys + 3 samples."
        else:
            result["value"] = _make_serializable(obj)
        return result

    if isinstance(obj, (list, tuple)):
        if len(obj) > 50:
            result["length"] = len(obj)
            result["sample"] = _make_serializable(list(obj[:5]))
            result["note"] = f"Large {type(obj).__name__} ({len(obj)} items). Showing first 5."
        else:
            result["value"] = _make_serializable(list(obj))
        return result

    # Scalar or other
    try:
        result["value"] = _make_serializable(obj)
    except Exception:
        result["repr"] = repr(obj)[:500]
    return result

# ---------------------------------------------------------------------------
# Per-session state — isolates REPL namespace, Map, code history, outputs,
# and reports across concurrent users/sessions.
# ---------------------------------------------------------------------------
import threading as _threading

_BASE_OUTPUT_DIR = os.path.join(_THIS_DIR, "generated_outputs")
_BASE_SCRIPT_DIR = os.path.join(_THIS_DIR, "generated_scripts")
_DEFAULT_SESSION_ID = "_default"


class _SessionState:
    """All mutable state scoped to a single session."""
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.namespace: dict = {}
        self.code_history: list[str] = []
        self.current_script_path: str | None = None
        self.active_report = None
        self.initialized = False
        # Per-session output directory
        if session_id == _DEFAULT_SESSION_ID:
            self.output_dir = _BASE_OUTPUT_DIR
            self.script_dir = _BASE_SCRIPT_DIR
        else:
            self.output_dir = os.path.join(_BASE_OUTPUT_DIR, session_id)
            self.script_dir = os.path.join(_BASE_SCRIPT_DIR, session_id)
        # Stdout streaming file
        self.stdout_stream_file = os.path.join(self.output_dir, ".stdout_stream")
        self.stdout_active = False
        # Original Map reference (set during init, used to restore if clobbered)
        self._map_ref = None


_sessions: dict[str, _SessionState] = {}
_sessions_lock = _threading.Lock()


def _get_session(session_id: str | None = None) -> _SessionState:
    """Get or create a session state object. Thread-safe."""
    sid = session_id or _DEFAULT_SESSION_ID
    if sid in _sessions:
        return _sessions[sid]
    with _sessions_lock:
        if sid not in _sessions:
            _sessions[sid] = _SessionState(sid)
        return _sessions[sid]


# Backward-compatible module-level references for non-session-aware code
_output_dir = _BASE_OUTPUT_DIR
_script_dir = _BASE_SCRIPT_DIR


def _load_env():
    """Load .env file from the geeViz package directory into os.environ.

    Parses KEY=VALUE lines (ignoring comments and blank lines).
    Does NOT override existing environment variables.
    Looks for .env in the geeViz root (parent of mcp/).
    """
    env_path = os.path.join(os.path.dirname(_THIS_DIR), ".env")
    if not os.path.isfile(env_path):
        return
    try:
        with open(env_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith("#") or "=" not in line:
                    continue
                key, _, value = line.partition("=")
                key = key.strip()
                value = value.strip().strip("'\"")
                if key and key not in os.environ:
                    os.environ[key] = value
    except Exception:
        pass

_load_env()


def _init_ee_via_proxy():
    """Initialize Earth Engine to route ALL outbound traffic through the
    agent's ``/ee-api`` proxy, with tenant-aware routing.

    Requires ``EE_PROXY_URL`` env var (e.g. ``http://localhost:8888/ee-api``).
    Delegates to ``geeViz.eeAuth.initialize_via_proxy`` which is the
    canonical client-side init helper for the library.

    The tool wrapper below maintains its OWN ContextVar
    (``_CURRENT_TENANT_CV``) and copies the tenant into
    ``geeViz.eeAuth.client.CURRENT_TENANT`` for each call — that ensures
    the library's ``TenantAwareHttp`` transport sees the right value
    even though the MCP wrapper is what knows about ``_tenant`` args.

    Passes the SA's project explicitly so EE builds API URLs with the
    real consumer project. Without this, ``initialize_via_proxy``
    defaults to ``"ee-proxy-placeholder"`` and EE emits
    ``projects/ee-proxy-placeholder/value:compute`` to upstream which
    404s. Project is sourced from ``GEE_PROJECT`` env var first, then
    by decoding any ``GEE_SERVICE_ACCOUNT_B64`` blob present in env —
    no separate translation step needed.
    """
    proxy_url = os.environ.get("EE_PROXY_URL", "").strip().rstrip("/")
    if not proxy_url:
        return False

    project = os.environ.get("GEE_PROJECT", "").strip()
    if not project:
        b64 = os.environ.get("GEE_SERVICE_ACCOUNT_B64", "")
        if b64:
            import base64 as _b64
            import json as _json
            try:
                project = _json.loads(
                    _b64.b64decode(b64.encode()).decode()
                ).get("project_id") or ""
            except Exception:
                project = ""

    from geeViz.eeAuth import initialize_via_proxy
    return initialize_via_proxy(proxy_url, project=project or None)


def _init_ee_credentials():
    """Initialize Earth Engine credentials for this MCP process.

    Two paths:

    - **Proxy mode** (``EE_PROXY_URL`` set): EE routes all REST calls
      through the agent's ``/ee-api`` proxy with TenantAwareHttp adding
      the tenant header. Used in production (Cloud Run) where the agent
      and MCP share a process. The agent's proxy holds the credentials;
      this subprocess just points ``ee.Initialize`` at the proxy URL.

    - **Direct mode** (no proxy): hand off to
      :func:`geeViz.eeAuth.robust_init`, the same bootstrap geeView /
      external callers use. It runs the full credential discovery —
      ``$GOOGLE_APPLICATION_CREDENTIALS``, the EE persistent file,
      gcloud ADC, ``$GEE_SERVICE_ACCOUNT_B64``, per-tenant SA env vars
      (keyed + keyless), and an ADC fallback for Cloud Run / GKE / AWS
      WIF deployments. Multi-tenant deployments get the in-process
      proxy spun up automatically; single-tenant deployments get a
      direct ``ee.Initialize`` call.

    The direct-mode discovery is owned by ``geeViz.eeAuth.eeCreds`` —
    this function is a thin dispatch, not a credential-handling layer
    of its own.
    """
    if _init_ee_via_proxy():
        return
    from geeViz.eeAuth import robust_init
    robust_init(verbose=False, interactive=False)


def _ensure_ee_initialized():
    """Initialize EE credentials once (global, not per-session)."""
    global _initialized
    if _initialized:
        return
    with _init_lock:
        if _initialized:
            return
        _init_ee_credentials()
        # Import geeViz to trigger ee.Initialize
        import geeViz.geeView  # noqa: F401
        _initialized = True


def _ensure_initialized(session_id: str | None = None):
    """Lazy-initialize EE (global) and populate session namespace. Thread-safe."""
    _ensure_ee_initialized()
    if not _MODULE_TREE:
        _build_module_tree()
    sess = _get_session(session_id)
    if sess.initialized:
        return sess

    import geeViz.geeView as gv
    import geeViz.getImagesLib as gil
    import geeViz.getSummaryAreasLib as sal
    import geeViz.edwLib as edw
    import geeViz.googleMapsLib as gm
    import geeViz.geePalettes as palettes
    from geeViz.outputLib import charts as cl
    from geeViz.outputLib import thumbs as tl
    from geeViz.outputLib import reports as rl
    import ee
    # pandas and numpy are de-facto standard helpers the agent reaches for
    # constantly; pre-load them so `search_geeviz(name="pd.DataFrame.to_markdown")`
    # and `search_geeviz(module="pd", query="...")` resolve without requiring
    # the agent to `import pandas` inside run_code first. Both are already
    # geeViz dependencies (setup.py) and on the sandbox allowlist.
    import pandas as _pd_mod
    import numpy as _np_mod

    # Each session gets its own Map instance for layer isolation
    session_map = gv.mapper() if session_id and session_id != _DEFAULT_SESSION_ID else gv.Map

    # Per-session save_file writes to session's output dir
    def _session_save_file(filename, content, mode="w"):
        return _safe_write_file(filename, content, mode, output_dir=sess.output_dir)

    sess._map_ref = session_map
    sess.namespace.update({
        "ee": ee,
        "Map": session_map,
        "gv": gv,
        "gil": gil,
        "sal": sal,
        "edw": edw,
        "gm": gm,
        "palettes": palettes,
        "cl": cl,
        "tl": tl,
        "rl": rl,
        # Both aliases for each lib so search_geeviz finds them either way.
        "pandas": _pd_mod,
        "pd": _pd_mod,
        "numpy": _np_mod,
        "np": _np_mod,
        "save_file": _session_save_file,
        "__builtins__": _make_safe_builtins(),
    })
    sess.initialized = True
    return sess


def _reset_namespace(session_id: str | None = None):
    """Clear and re-populate a session's REPL namespace. Also resets code history."""
    sess = _get_session(session_id)
    sess.namespace.clear()
    sess.code_history.clear()
    sess.current_script_path = None
    sess.initialized = False
    _ensure_initialized(session_id)


def _save_history_to_file(sess: _SessionState) -> str:
    """Write accumulated code history to a timestamped .py file. Returns the path."""
    import datetime
    os.makedirs(sess.script_dir, exist_ok=True)
    if sess.current_script_path is None:
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        sess.current_script_path = os.path.join(sess.script_dir, f"session_{ts}.py")
    header = (
        "# Auto-generated by geeViz MCP server\n"
        "# Each section below is one run_code call, in order.\n\n"
        "import geeViz.geeView as gv\n"
        "import geeViz.getImagesLib as gil\n"
        "import geeViz.getSummaryAreasLib as sal\n"
        "import geeViz.edwLib as edw\n"
        "import geeViz.googleMapsLib as gm\n"
        "from geeViz.outputLib import charts as cl\n"
        "from geeViz.outputLib import thumbs as tl\n"
        "from geeViz.outputLib import reports as rl\n"
        "ee = gv.ee\n"
        "Map = gv.Map\n\n"
    )
    body = "\n\n".join(
        f"# --- run_code call {i+1} ---\n{block}"
        for i, block in enumerate(sess.code_history)
    )
    with open(sess.current_script_path, "w", encoding="utf-8") as f:
        f.write(header + body + "\n")
    return sess.current_script_path


# ---------------------------------------------------------------------------
# Tool 1: run_code
# ---------------------------------------------------------------------------
import ast
import asyncio
import io
import contextlib
import traceback


# Collection type names that are dangerous to call .getInfo() on without .limit()
_COLLECTION_NAMES = {"ImageCollection", "FeatureCollection"}

# ---------------------------------------------------------------------------
# Security: Tier 1 hardening for run_code
# ---------------------------------------------------------------------------

# Modules that are blocked from import in run_code. These provide OS/network/process
# access that is unnecessary for Earth Engine workflows and dangerous if the server
# is exposed remotely.
_BLOCKED_MODULES = frozenset({
    "os", "sys", "subprocess", "socket", "shutil", "ctypes", "signal",
    "multiprocessing", "threading", "http", "urllib", "requests",
    "pathlib", "tempfile", "glob", "importlib", "code", "codeop",
    "pickle", "shelve", "marshal", "builtins",
})
# Note: ``io`` is NOT blocked. ``io.BytesIO`` / ``io.StringIO`` are
# essential for the matplotlib → PIL → ``save_file`` flow and are pure
# in-memory objects with no filesystem access. The dangerous parts of
# io (``io.open``, ``io.FileIO``) require a file path that the sandbox
# doesn't grant anyway — and ``open`` is already blocked as a builtin.

# Top-level module prefixes that are allowed in import statements.
# Anything not matching these prefixes AND not in _BLOCKED_MODULES gets a warning
# (not a hard block) to avoid breaking legitimate but uncommon imports.
_ALLOWED_MODULE_PREFIXES = (
    "ee", "geeViz", "json", "datetime", "math", "collections",
    "numpy", "np", "pandas", "pd", "plotly", "copy", "re",
    "functools", "itertools", "operator", "statistics",
    "pprint", "textwrap", "string", "decimal", "fractions",
    # Plotting libraries the agent reaches for in exploration.
    # Available iff the agent runtime installs them (see Dockerfile).
    # ``cl.apply_theme(...)`` themes their output to match geeViz charts.
    "matplotlib", "seaborn", "mpl_toolkits", "io",
)

# Per-deployment additions — comma-separated env var. Lets a tenant
# config enable extra libraries (e.g. ``altair``, ``bokeh``, ``sklearn``)
# without forking the code. Each addition is still a security review;
# don't set this casually.
_EXTRA_ALLOWED = tuple(
    p.strip() for p in os.environ.get("MCP_EXTRA_ALLOWED_MODULES", "").split(",")
    if p.strip()
)
if _EXTRA_ALLOWED:
    _ALLOWED_MODULE_PREFIXES = _ALLOWED_MODULE_PREFIXES + _EXTRA_ALLOWED

# Builtins that are blocked from the execution namespace.
_BLOCKED_BUILTINS = frozenset({
    "__import__", "eval", "exec", "compile", "open",
    "breakpoint", "exit", "quit",
    "globals", "locals", "vars",
    "getattr", "setattr", "delattr",
})


def _safe_write_file(filename: str, content: str, mode: str = "w",
                     output_dir: str | None = None) -> str:
    """Write content to a file in the safe output directory.

    Only allows writing to geeViz/mcp/generated_outputs/ (or a session
    subdirectory) to prevent arbitrary file system access.

    Args:
        filename: Just the filename (no directory). e.g. "chart.html"
        content: String content to write.
        mode: Write mode, "w" (text) or "wb" (binary). Default "w".
        output_dir: Override output directory (used for session isolation).

    Returns:
        Full path to the written file.
    """
    _out = output_dir or _output_dir
    safe_name = os.path.basename(filename)
    if not safe_name:
        raise ValueError("filename must not be empty")
    os.makedirs(_out, exist_ok=True)
    full_path = os.path.join(_out, safe_name)
    # Auto-detect mode from content type
    if isinstance(content, bytes):
        mode = "wb"
    elif isinstance(content, str):
        mode = "w"
    kwargs = {"encoding": "utf-8"} if "b" not in mode else {}
    with open(full_path, mode, **kwargs) as f:
        f.write(content)
    return full_path


def _make_safe_builtins() -> dict:
    """Return a copy of __builtins__, optionally with dangerous functions removed.

    When sandbox is disabled (local/stdio use), returns the full builtins dict
    so that run_code has unrestricted Python access.
    """
    import builtins
    if not _SANDBOX_ENABLED:  # False or None (unresolved) → no restrictions
        # No restrictions — full Python access
        return dict(vars(builtins))
    safe = {k: v for k, v in vars(builtins).items() if k not in _BLOCKED_BUILTINS}
    # Provide a safe __import__ that blocks dangerous modules
    def _safe_import(name, *args, **kwargs):
        top = name.split(".")[0]
        if top in _BLOCKED_MODULES:
            raise ImportError(
                f"Import of '{name}' is blocked for security. "
                f"Only Earth Engine, geeViz, and standard data libraries are allowed."
            )
        return __builtins__["__import__"](name, *args, **kwargs) if isinstance(__builtins__, dict) \
            else builtins.__import__(name, *args, **kwargs)
    safe["__import__"] = _safe_import
    return safe


def _check_code_patterns(code: str) -> list[str]:
    """AST analysis: detect risky EE patterns AND blocked security patterns.

    Returns a list of warning/error strings. Strings starting with "BLOCKED:"
    will cause run_code to refuse execution.

    When sandbox is disabled, security checks (import blocking, builtin blocking)
    are skipped — only EE performance warnings are emitted.
    """
    warnings: list[str] = []
    try:
        tree = ast.parse(code)
    except SyntaxError:
        return warnings  # let the executor report syntax errors

    for node in ast.walk(tree):
        if _SANDBOX_ENABLED:
            # --- Security: check imports (sandbox only) ---
            if isinstance(node, ast.Import):
                for alias in node.names:
                    top = alias.name.split(".")[0]
                    if top in _BLOCKED_MODULES:
                        warnings.append(
                            f"BLOCKED: import of '{alias.name}' is not allowed. "
                            f"Only Earth Engine, geeViz, and standard data libraries are permitted."
                        )
            elif isinstance(node, ast.ImportFrom):
                if node.module:
                    top = node.module.split(".")[0]
                    if top in _BLOCKED_MODULES:
                        warnings.append(
                            f"BLOCKED: import from '{node.module}' is not allowed. "
                            f"Only Earth Engine, geeViz, and standard data libraries are permitted."
                        )

            # --- Security: check for dangerous builtin calls (sandbox only) ---
            if isinstance(node, ast.Call) and isinstance(node.func, ast.Name):
                if node.func.id in ("eval", "exec", "compile", "open", "breakpoint", "__import__"):
                    warnings.append(
                        f"BLOCKED: call to '{node.func.id}()' is not allowed for security."
                    )

        # --- Batch export blocking (sandbox only): block .start() and task.start() ---
        # Export wrapper functions are allowed (they support start=False),
        # but actually starting tasks is blocked in sandbox mode.
        if _SANDBOX_ENABLED:
            if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
                if node.func.attr == "start":
                    warnings.append(
                        "BLOCKED: .start() calls are not allowed in this environment. "
                        "Batch exports (to Assets, Drive, or Cloud Storage) cannot run here. "
                        "Download the code using the Download button and run it locally to execute exports."
                    )

        # --- EE performance: detect .getInfo() calls (always active) ---
        if not (isinstance(node, ast.Call)
                and isinstance(node.func, ast.Attribute)
                and node.func.attr == "getInfo"):
            continue

        # Check if .getInfo() is inside a for/while loop — BLOCKED, not just a warning
        for parent in ast.walk(tree):
            if isinstance(parent, (ast.For, ast.While)):
                for child in ast.walk(parent):
                    if child is node:
                        warnings.append(
                            "BLOCKED: .getInfo() inside a loop is not allowed — it causes "
                            "extreme slowness (one server round-trip per iteration). "
                            "Use server-side operations instead: ee.List, ee.Dictionary, "
                            ".map(), or pass the full collection to cl.summarize_and_chart()."
                        )
                        break

        # Check for .getInfo() on a collection without .limit()
        target = node.func.value
        chain = _get_method_chain(target)
        has_limit = "limit" in chain or "first" in chain
        has_collection = any(name in _COLLECTION_NAMES for name in chain)
        if has_collection and not has_limit:
            warnings.append(
                "Warning: .getInfo() on a collection without .limit() can be very slow. "
                "Consider adding .limit(N) or using .first().getInfo()."
            )

    # Deduplicate while preserving order
    seen: set[str] = set()
    unique: list[str] = []
    for w in warnings:
        if w not in seen:
            seen.add(w)
            unique.append(w)
    return unique


def _get_method_chain(node: ast.AST) -> list[str]:
    """Walk an attribute/call chain and return method/attribute names encountered."""
    names: list[str] = []
    current = node
    while True:
        if isinstance(current, ast.Call):
            current = current.func
        elif isinstance(current, ast.Attribute):
            names.append(current.attr)
            current = current.value
        elif isinstance(current, ast.Name):
            names.append(current.id)
            break
        else:
            break
    return names


def _save_and_clean_result(result_val):
    """Save any binary/HTML outputs to files and return a small, JSON-safe result.

    Walks the result value, saves bytes to .png/.gif files and large HTML
    to .html files in generated_outputs/, then returns a clean dict/string
    with file paths instead of raw data. Guaranteed to be small and
    JSON-serializable.
    """
    if result_val is None:
        return None

    import time as _t
    os.makedirs(_output_dir, exist_ok=True)
    ts = int(_t.time())

    _EXT_MAP = {"image": ".png"}

    def _clean(obj, depth=0):
        if depth > 5:
            return "<nested too deep>"
        if isinstance(obj, (bytes, bytearray)):
            # Save bytes to file, return path
            fname = f"output_{ts}_{id(obj) % 10000}.bin"
            fpath = os.path.join(_output_dir, fname).replace("\\", "/")
            with open(fpath, "wb") as f:
                f.write(obj)
            return f"saved to {fpath}"
        if isinstance(obj, str):
            if len(obj) > 10000 and ("data:image" in obj or "<html" in obj.lower()
                                      or "data:text/html" in obj):
                # Large HTML or data URI — save to file
                fname = f"output_{ts}_{id(obj) % 10000}.html"
                fpath = os.path.join(_output_dir, fname).replace("\\", "/")
                with open(fpath, "w", encoding="utf-8") as f:
                    f.write(obj)
                return f"saved to {fpath}"
            if len(obj) > 5000:
                return obj[:200] + f"... <truncated, {len(obj)} chars>"
            return obj
        if isinstance(obj, dict):
            clean = {}
            for k, v in obj.items():
                # Use known extension for common keys
                if isinstance(v, (bytes, bytearray)) and len(v) > 0:
                    if k == "bytes":
                        # Use sibling "format" key to determine extension
                        fmt = obj.get("format", "png")
                        ext = f".{fmt}"
                    else:
                        ext = _EXT_MAP.get(k, ".bin")
                    fname = f"output_{ts}{ext}"
                    fpath = os.path.join(_output_dir, fname).replace("\\", "/")
                    with open(fpath, "wb") as f:
                        f.write(v)
                    clean[k] = f"saved to {fpath}"
                else:
                    clean[k] = _clean(v, depth + 1)
            return clean
        if isinstance(obj, (list, tuple)):
            items = [_clean(x, depth + 1) for x in obj[:20]]
            if len(obj) > 20:
                items.append(f"... ({len(obj) - 20} more)")
            return items
        # Primitives (int, float, bool, None)
        try:
            json.dumps(obj)
            return obj
        except (TypeError, ValueError):
            return repr(obj)[:500]

    cleaned = _clean(result_val)

    # Final safety check — ensure it's JSON-serializable and not too big
    try:
        s = json.dumps(cleaned)
        if len(s) > 50000:
            return "<result too large after cleaning>"
        return cleaned
    except (TypeError, ValueError):
        return repr(cleaned)[:2000]


class _StreamingStdout(io.StringIO):
    """StringIO that also appends output to a file for cross-process polling."""
    def __init__(self, stream_file: str):
        super().__init__()
        self._stream_file = stream_file

    def write(self, s):
        try:
            with open(self._stream_file, "a", encoding="utf-8") as f:
                f.write(s)
        except Exception:
            pass
        return super().write(s)


[docs] @app.tool(annotations=_WRITE) async def run_code(code: str, timeout: int = 120, reset: bool = False, stream_stdout: bool = False, session_id: str = None, ctx: Context = None) -> str: """Execute Python/GEE code in a persistent REPL namespace (like Jupyter). The namespace persists across calls -- variables set in one call are available in the next. Pre-populated with: ee, Map (gv.Map), gv (geeViz.geeView), gil (geeViz.getImagesLib), sal (geeViz.getSummaryAreasLib), tl (geeViz.outputLib.thumbs), rl (geeViz.outputLib.reports), save_file. **Sandbox mode:** When the server is run with ``--sandbox`` or over HTTP to a non-localhost address, ``open()``, ``os``, ``sys``, ``eval``, etc. are blocked. For local/stdio use (the default), sandbox is OFF and full Python access is available. Use ``save_file(filename, content)`` to write files to the ``generated_outputs/`` directory regardless of sandbox mode. While executing, progress heartbeats are sent every ~10 seconds to keep the MCP client connection alive and inform the agent that the tool is still running. Args: code: Python code to execute. timeout: Max seconds to wait (default 120). On Windows a hung getInfo() cannot be force-killed; the thread continues in background. reset: If True, clear the namespace and re-initialize before executing. stream_stdout: If True, print output is available in real-time via the /stdout polling endpoint. Default False. session_id: Session identifier for namespace isolation. Default None (shared default session). ctx: MCP Context (auto-injected by FastMCP). Used for progress reporting. Returns: JSON with keys: success (bool), stdout, stderr, result, error. """ if reset: _reset_namespace(session_id) sess = _ensure_initialized(session_id) # Strip redundant imports that would clobber pre-populated namespace variables. # The REPL already has ee, Map, gv, gil, sal, cl, tl, rl, gm, palettes, save_file. # Agents frequently write `from geeViz import geeView as Map` which replaces the # mapper instance with the module, breaking Map.addLayer/clearMap/etc. try: _tree = ast.parse(code) _stripped = [] for node in _tree.body: skip = False if isinstance(node, (ast.Import, ast.ImportFrom)): for alias in node.names: bound_name = alias.asname or alias.name.split(".")[-1] if bound_name in ("ee", "Map", "gv", "gil", "sal", "cl", "tl", "rl", "gm", "palettes", "save_file"): skip = True break if not skip: _stripped.append(node) if len(_stripped) < len(_tree.body): _tree.body = _stripped code = ast.unparse(_tree) except SyntaxError: pass # let the actual exec catch it # Static analysis: detect risky and blocked patterns before execution code_warnings = _check_code_patterns(code) # Refuse execution if any BLOCKED patterns were found blocked = [w for w in code_warnings if w.startswith("BLOCKED:")] if blocked: return json.dumps({ "success": False, "stdout": "", "stderr": "\n".join(blocked), "result": None, "error": "Code was blocked by security policy. " + " ".join(blocked), "script_path": None, }) # Set up stdout capture — streaming version appends to a file for polling if stream_stdout: try: os.makedirs(os.path.dirname(sess.stdout_stream_file), exist_ok=True) with open(sess.stdout_stream_file, "w", encoding="utf-8") as f: f.write("") # clear except Exception: pass sess.stdout_active = True stdout_buf = _StreamingStdout(sess.stdout_stream_file) else: stdout_buf = io.StringIO() stderr_buf = io.StringIO() result_holder: list = [None] error_holder: list = [None] # Snapshot output files before execution to detect new/modified ones. # We track both existence and mtime so files overwritten in place # (common with save_file when the agent re-generates an image) are # still reported as outputs. os.makedirs(sess.output_dir, exist_ok=True) _mtimes_before = { f: os.path.getmtime(os.path.join(sess.output_dir, f)) for f in os.listdir(sess.output_dir) if os.path.isfile(os.path.join(sess.output_dir, f)) } _files_before = set(_mtimes_before.keys()) # Save original streams so we can restore them after timeout (redirect_stdout # modifies sys.stdout globally, which would capture the main thread's output # if the exec thread is still running when we time out). _orig_stdout = sys.stdout _orig_stderr = sys.stderr _ns = sess.namespace # capture for closure def _exec(): global _audit_user_code_active try: if _SANDBOX_ENABLED: _audit_user_code_active = True with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf): tree = ast.parse(code) if tree.body and isinstance(tree.body[-1], ast.Expr): if len(tree.body) > 1: mod = ast.Module(body=tree.body[:-1], type_ignores=[]) exec(compile(mod, "<mcp>", "exec"), _ns) expr = ast.Expression(body=tree.body[-1].value) result_holder[0] = eval(compile(expr, "<mcp>", "eval"), _ns) else: exec(compile(code, "<mcp>", "exec"), _ns) except Exception: error_holder[0] = traceback.format_exc() finally: _audit_user_code_active = False thread = threading.Thread(target=_exec, daemon=True) thread.start() # Heartbeat loop: poll every 1s, timeout only after `timeout` seconds # of *inactivity* (no new stdout/stderr output). Active code that keeps # printing can run indefinitely. elapsed = 0.0 idle_time = 0.0 report_interval = 10 poll_interval = 1 next_report = report_interval last_stdout_len = 0 last_stderr_len = 0 while thread.is_alive() and idle_time < timeout: await asyncio.sleep(min(poll_interval, timeout - idle_time)) elapsed += poll_interval # Check for new output activity cur_stdout_len = stdout_buf.tell() cur_stderr_len = stderr_buf.tell() if cur_stdout_len != last_stdout_len or cur_stderr_len != last_stderr_len: idle_time = 0.0 # reset idle timer on any new output last_stdout_len = cur_stdout_len last_stderr_len = cur_stderr_len else: idle_time += poll_interval if thread.is_alive() and ctx and elapsed >= next_report: next_report += report_interval try: await ctx.report_progress(elapsed, timeout) await ctx.info(f"run_code executing... ({int(elapsed)}s elapsed, {int(idle_time)}s idle / {timeout}s timeout)") except Exception: pass # don't let reporting errors kill the tool # Restore original streams in case the thread's redirect is still active # (happens on timeout when the thread's `with` block hasn't exited yet). sys.stdout = _orig_stdout sys.stderr = _orig_stderr # Guard: restore Map if user code clobbered it # (e.g. `from geeViz import geeView as Map` replaces the mapper instance with the module) import geeViz.geeView as _gv_mod if not isinstance(_ns.get("Map"), _gv_mod.mapper): _ns["Map"] = sess._map_ref # Clear streaming flag if stream_stdout: sess.stdout_active = False # Prepend static analysis warnings to stderr stderr_val = stderr_buf.getvalue() if code_warnings: warning_block = "\n".join(code_warnings) + "\n" stderr_val = warning_block + stderr_val if thread.is_alive(): timeout_hints = ( f"Execution timed out after {int(idle_time)}s of inactivity ({int(elapsed)}s total). Common causes:\n" "- .getInfo() on a large ImageCollection -- use .limit(N) or inspect_asset with date/region filters\n" "- .getInfo() on a high-res Image over a large region -- reduce the region or increase scale\n" "- Complex server-side computation -- break into smaller steps\n" "Note: on Windows, the thread continues in background." ) if elapsed >= 60: timeout_hints += ( "\nHint: the call ran for over 60s with no output. If this was a .getInfo() call, " "consider using inspect_asset with filters, or reduce scale/region size." ) return json.dumps({ "success": False, "code": code, "stdout": stdout_buf.getvalue(), "stderr": stderr_val, "result": None, "error": timeout_hints, }) if error_holder[0]: return json.dumps({ "success": False, "code": code, "stdout": stdout_buf.getvalue(), "stderr": stderr_val, "result": None, "error": error_holder[0], "script_path": None, }) # Success -- record in history and save to file sess.code_history.append(code) script_path = _save_history_to_file(sess) result_val = result_holder[0] # --- Auto-save any binary/HTML outputs and build a clean result --- # This is the ONLY place that handles bytes/large data. # Everything that comes out of here is guaranteed small and JSON-safe. result_str = _save_and_clean_result(result_val) # Detect new or modified output files (from save_file, auto-save, or direct writes) _files_after = [ f for f in os.listdir(sess.output_dir) if os.path.isfile(os.path.join(sess.output_dir, f)) ] _new_files = [] for f in _files_after: fpath = os.path.join(sess.output_dir, f) mt = os.path.getmtime(fpath) if f not in _files_before or mt > _mtimes_before.get(f, 0): _new_files.append(f) _new_files = sorted(_new_files) _IMG_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".svg"} output_markdown = None if _new_files: md_lines = [] for fname in _new_files: fpath = os.path.join(sess.output_dir, fname).replace("\\", "/") ext = os.path.splitext(fname)[1].lower() label = os.path.splitext(fname)[0].replace("_", " ").replace("-", " ").title() if ext in _IMG_EXTS: md_lines.append(f"![{label}]({fpath})") else: md_lines.append(f"[{label}]({fpath})") output_markdown = "\n".join(md_lines) # Clean stderr — strip noisy library logs (Kaleido, Chromium, http retries) # that bloat the response without useful info for the model _noise_patterns = {"kaleido", "chromium", "browser_async", "_tmpfile", "shutil.rmtree", "TemporaryDirectory", "Conforming", "navigates", "Getting tab", "Got ", "Processing fig", "Sending big command", "Sent big command", "Reloading tab", "Putting tab", "Waiting for all", "Exiting Kaleido", "Cancelling tasks", "Opening browser", "Closing browser", "Temp directory", "Found chromium"} _filtered_stderr = [] for _line in stderr_val.splitlines(): _lower = _line.lower() if any(p in _lower for p in _noise_patterns): continue _filtered_stderr.append(_line) stderr_val = "\n".join(_filtered_stderr).strip() # Clean stdout — strip base64 data URIs and cap size import re as _re stdout_val = stdout_buf.getvalue() stdout_val = _re.sub( r'data:(image|text)/[^;]+;base64,[A-Za-z0-9+/=]{100,}', '<base64 data stripped>', stdout_val, ) if len(stdout_val) > 50000: stdout_val = stdout_val[:50000] + "\n... (truncated)" return json.dumps({ "success": True, "code": code, "stdout": stdout_val, "stderr": stderr_val, "result": result_str, "error": None, "script_path": script_path, "output_markdown": output_markdown, })
# --------------------------------------------------------------------------- # Tool 2: inspect_asset # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_READ_ONLY_OPEN) def inspect_asset( asset_id: str, start_date: str = "", end_date: str = "", region_var: str = "", session_id: str = None, ) -> str: """Get detailed metadata for any GEE asset (Image, ImageCollection, FeatureCollection, etc.). Returns band names/types, CRS, scale, date range, size, columns, and properties. Uses ee.data.getInfo for fast catalog metadata, then fetches live details with a 10-second timeout per query to avoid hangs on large collections. Args: asset_id: Full Earth Engine asset ID (e.g. "COPERNICUS/S2_SR_HARMONIZED"). start_date: Optional start date filter for ImageCollections (YYYY-MM-DD). end_date: Optional end date filter for ImageCollections (YYYY-MM-DD). region_var: Optional name of an ee.Geometry or ee.FeatureCollection variable in the REPL namespace for spatial filtering (ImageCollections only). Returns: JSON with asset metadata. """ import concurrent.futures import datetime as _dt _TIMEOUT = 10 # seconds per EE query sess = _ensure_initialized(session_id) ee = sess.namespace["ee"] # --- Step 1: Fast catalog metadata (no compute, never hangs) --- try: info = ee.data.getInfo(asset_id) except Exception as exc: return json.dumps({"error": str(exc), "asset_id": asset_id}) if info is None: return json.dumps({"error": f"Asset not found: {asset_id}", "asset_id": asset_id}) asset_type = info.get("type", "UNKNOWN") result: dict = {"asset_id": asset_id, "type": asset_type} # Include catalog-level properties (skip long description HTML) cat_props = info.get("properties", {}) if cat_props: dr = cat_props.get("date_range") if dr and isinstance(dr, list) and len(dr) == 2: try: result["first_date"] = _dt.datetime.utcfromtimestamp(dr[0] / 1000).strftime("%Y-%m-%d") result["last_date"] = _dt.datetime.utcfromtimestamp(dr[1] / 1000).strftime("%Y-%m-%d") except Exception: pass _CATALOG_KEYS = ("title", "provider", "keywords", "tags", "period", "visualization_0_bands", "visualization_0_min", "visualization_0_max", "visualization_0_name", "provider_url") for key in _CATALOG_KEYS: if key in cat_props: result.setdefault("catalog", {})[key] = cat_props[key] # Include column info for FeatureCollections if "columns" in info: result["columns"] = info["columns"] def _getinfo_with_timeout(ee_obj, timeout=_TIMEOUT): """Run ee_obj.getInfo() in a daemon thread with timeout. Returns (result, error).""" import threading _result_box = [None, None] # [value, error] def _run(): try: _result_box[0] = ee_obj.getInfo() except Exception as exc: _result_box[1] = str(exc) t = threading.Thread(target=_run, daemon=True) t.start() t.join(timeout=timeout) if t.is_alive(): return None, "timeout" return _result_box[0], _result_box[1] try: if asset_type in ("IMAGE", "Image"): img_info, err = _getinfo_with_timeout(ee.Image(asset_id)) if img_info and "bands" in img_info: result["bands"] = [ {"name": b.get("id", ""), "data_type": b.get("data_type", {}).get("precision", ""), "crs": b.get("crs", ""), "scale": b.get("crs_transform", [None])[0]} for b in img_info["bands"] ] # Include image properties (class metadata, etc.) if "properties" in img_info: result["properties"] = img_info["properties"] elif err: result["detail_error"] = err elif asset_type in ("IMAGE_COLLECTION", "ImageCollection"): collection = ee.ImageCollection(asset_id) # Apply filters filters_applied = {} if start_date: collection = collection.filterDate(start_date, end_date or "2099-01-01") filters_applied["start_date"] = start_date filters_applied["end_date"] = end_date or "2099-01-01" elif end_date: collection = collection.filterDate("1970-01-01", end_date) filters_applied["start_date"] = "1970-01-01" filters_applied["end_date"] = end_date if region_var: region = sess.namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) collection = collection.filterBounds(region) filters_applied["region_var"] = region_var if filters_applied: result["filters_applied"] = filters_applied # --- Run queries with individual timeouts --- # Each query runs in its own daemon thread so hangs don't block queries = {} queries["count"] = collection.size() # Date range: use catalog date_range if no filters applied, # otherwise compute from the filtered collection if filters_applied or "first_date" not in result: queries["first_date"] = collection.sort("system:time_start", True).first().date().format("YYYY-MM-dd") queries["last_date"] = collection.sort("system:time_start", False).first().date().format("YYYY-MM-dd") # Band info from first image queries["first_image"] = collection.first() import threading results_map = {} _lock = threading.Lock() def _run_query(key, ee_obj): try: val = ee_obj.getInfo() with _lock: results_map[key] = val except Exception as exc: with _lock: results_map[key] = f"__ERROR__:{exc}" threads = [] for key, ee_obj in queries.items(): t = threading.Thread(target=_run_query, args=(key, ee_obj), daemon=True) t.start() threads.append(t) # Wait up to _TIMEOUT for all threads deadline = __import__("time").time() + _TIMEOUT for t in threads: remaining = max(0.1, deadline - __import__("time").time()) t.join(timeout=remaining) # Mark any that didn't finish for key in queries: if key not in results_map: results_map[key] = "__TIMEOUT__" # Process results count_val = results_map.get("count") if isinstance(count_val, int): result["image_count"] = count_val elif count_val == "__TIMEOUT__": result["image_count"] = "timeout (large collection)" else: result["image_count_error"] = str(count_val) # Dates fd = results_map.get("first_date") ld = results_map.get("last_date") if isinstance(fd, str) and not fd.startswith("__"): result["first_date"] = fd if isinstance(ld, str) and not ld.startswith("__"): result["last_date"] = ld # Bands and sample image properties first_img = results_map.get("first_image") if isinstance(first_img, dict): if "bands" in first_img: result["bands"] = [ {"name": b.get("id", ""), "data_type": b.get("data_type", {}).get("precision", ""), "crs": b.get("crs", ""), "scale": b.get("crs_transform", [None])[0]} for b in first_img["bands"] ] # Include first image's property names (not values — those can be huge) img_props = first_img.get("properties", {}) if img_props: result["image_property_names"] = sorted(img_props.keys()) # Include a few key properties if they exist for k in ("system:time_start", "system:index"): if k in img_props: result.setdefault("sample_image", {})[k] = img_props[k] # If count timed out, note it if count_val == "__TIMEOUT__": result["note"] = "Collection too large to count within timeout." elif asset_type in ("TABLE", "FeatureCollection"): # Try full metadata first, fall back to limited sample fc = ee.FeatureCollection(asset_id) fc_info, err = _getinfo_with_timeout(fc.limit(5), _TIMEOUT) if fc_info: result["asset"] = _strip_coordinates(fc_info) # Get column info if "columns" in info: result["columns"] = info["columns"] elif err == "timeout": # Try even smaller sample fc_info2, err2 = _getinfo_with_timeout(fc.limit(1), _TIMEOUT) if fc_info2: result["asset"] = _strip_coordinates(fc_info2) result["note"] = "Large FeatureCollection; showing 1 sample feature." else: result["detail_error"] = "timeout fetching features" if "columns" in info: result["columns"] = info["columns"] else: result["detail_error"] = err or "unknown error" else: # Folder or other type — return raw info result["info"] = info except Exception as exc: result["detail_error"] = str(exc) # --- Detect thematic/categorical bands --- # If any band has matching {band}_class_values, {band}_class_names, and # {band}_class_palette properties, flag the dataset as thematic and tell # the agent exactly what viz params to use. This eliminates the most # common agent mistake (hardcoding min/max for thematic data). bands = result.get("bands", []) props = result.get("properties", {}) if not props: # For ImageCollections, properties come from the first image info # which was fetched in the queries above. Use locals() safely. try: _first_img = locals().get("results_map", {}).get("first_image") if isinstance(_first_img, dict): props = _first_img.get("properties", {}) except Exception: pass thematic_bands = [] for band in bands: bname = band.get("name", "") if bname and all( "{}_class_{}".format(bname, suffix) in props for suffix in ("values", "names", "palette") ): thematic_bands.append(bname) if thematic_bands: result["thematic_bands"] = thematic_bands # Normalize class properties: some assets store them as comma-separated # strings instead of lists. Convert to lists so downstream code (and the # agent) gets consistent types. def _normalize_prop(v, as_int=False): if v is None or v == "": return [] if isinstance(v, str): parts = [p.strip() for p in v.split(",") if p.strip()] elif isinstance(v, (list, tuple)): parts = list(v) else: return [v] if as_int: out = [] for p in parts: try: out.append(int(p)) except (ValueError, TypeError): try: out.append(int(float(p))) except (ValueError, TypeError): out.append(p) return out return parts normalized_any = False for bname in thematic_bands: for suffix, as_int in (("values", True), ("names", False), ("palette", False)): key = f"{bname}_class_{suffix}" raw = props.get(key) if isinstance(raw, str): normalized_any = True normalized = _normalize_prop(raw, as_int=as_int) props[key] = normalized # Also update the band-level properties (if present) for band in bands: if band.get("name") == bname and "properties" in band: band["properties"][key] = normalized result["viz_recommendation"] = ( "THEMATIC DATA DETECTED — bands {} have class properties. " "You MUST use {{'autoViz': True}} as the viz params when adding " "this layer to the map. Do NOT use min/max. Example: " "Map.addLayer(data, {{'autoViz': True}}, 'Layer Name')" ).format(thematic_bands) if normalized_any: result["viz_recommendation"] += ( " NOTE: This asset stored class properties as comma-separated strings. " "They have been normalized to lists in this response. Use the values shown." ) # Cap response size to prevent token overflow response = json.dumps(result) if len(response) > 100000: # Strip large fields to fit for key in ("asset", "info", "bands"): if key in result and len(json.dumps(result.get(key, ""))) > 20000: result[key] = f"<truncated: {len(json.dumps(result[key]))} chars>" response = json.dumps(result) return response
# --------------------------------------------------------------------------- # Unified search/introspection tool # --------------------------------------------------------------------------- import inspect as _inspect def _resolve_module(name, session_ns=None): """Resolve a name to a container we can list members of. Accepts: - A geeViz module short name in ``_MODULE_TREE`` (``"getImagesLib"``). - A top-level REPL name that points to a module (``"ee"``, ``"pd"``). - A dotted path that traverses attributes from a top-level REPL name (``"ee.ImageCollection"``, ``"ee.Reducer"``, ``"pd.DataFrame"``). Each step uses ``getattr`` so classes count as valid containers — ``dir()`` returns their methods, which is what callers want. Returns ``(short_name, container)`` or ``(None, None)``. """ # Exact match in the geeViz module tree entry = _MODULE_TREE.get(name) if entry: mod = _get_module(entry) if mod is not None: return name, mod if session_ns: # Top-level REPL hit (no dots) if "." not in name: obj = session_ns.get(name) if _inspect.ismodule(obj) or _inspect.isclass(obj): return name, obj return None, None # Dotted path: walk attributes from the head identifier. parts = name.split(".") head = session_ns.get(parts[0]) if head is None: return None, None obj = head for attr in parts[1:]: obj = getattr(obj, attr, None) if obj is None: return None, None # Only accept containers (module/class). Functions/instances have # members too but listing them is usually noise. if _inspect.ismodule(obj) or _inspect.isclass(obj): return name, obj return None, None def _iter_module_members(mod, query="", include_non_callable=True): """Yield (name, obj, kind, first_line, sig) for public members of a module.""" q = query.lower() if query else "" for attr_name in sorted(dir(mod)): if attr_name.startswith("_"): continue obj = getattr(mod, attr_name, None) if obj is None: continue # Skip sub-modules from listing (too noisy) if _inspect.ismodule(obj): continue is_callable = callable(obj) or _inspect.isclass(obj) if not include_non_callable and not is_callable: continue doc = _inspect.getdoc(obj) or "" first_line = doc.split("\n")[0].strip() if doc else "" if q and q not in attr_name.lower() and q not in first_line.lower(): continue if _inspect.isclass(obj): kind = "class" elif callable(obj): kind = "function" else: kind = type(obj).__name__ sig = "" if callable(obj) and not _inspect.isclass(obj): try: sig = f"{attr_name}{_inspect.signature(obj)}" except (ValueError, TypeError): pass yield attr_name, obj, kind, first_line, sig
[docs] @app.tool(annotations=_READ_ONLY) def search_geeviz(query: str = "", name: str = "", module: str = "", session_id: str = None) -> str: """Search geeViz modules, functions, classes, variables, and any REPL module. A unified introspection tool — replaces search_functions and get_reference_data. Can look up functions, classes, dicts, constants, viz params, band mappings, palettes, and more. Args: query: Search term (case-insensitive). Matches against names and first-line docstrings across all geeViz modules. name: Exact name to look up. Accepts bare names (``"vizParamsFalse"``, ``"simpleMask"``) or dotted paths (``"getImagesLib.vizParamsFalse"``, ``"mapper.addLayer"``). Returns full details: signature, docstring for functions; keys/values for dicts; value for constants. module: Module to search or list. Accepts short names (``"getImagesLib"``, ``"charts"``, ``"thumbs"``), full paths (``"geeViz.outputLib.charts"``), or legacy aliases (``"chartingLib"``). Also accepts any module in the REPL namespace (``"ee"``) for on-the-fly lookups. Returns: JSON with results. Shape depends on the query: - No args: list of all discovered modules - module only: all public members (functions, classes, variables) - query only: search results across all modules - name only: detailed description of the named object - name + module: direct lookup within a specific module """ sess = _ensure_initialized(session_id) ns = sess.namespace # --- Direct name lookup --- if name: # Example source lookup: name="examples.CCDCViz" or bare example name _ex_name = name if name.startswith("examples."): _ex_name = name.split(".", 1)[1] ex_entry = _MODULE_TREE.get("examples") if ex_entry: for m in ex_entry.get("members", []): if m["name"] == _ex_name: return _read_example_source(m["file"], m["name"], m.get("description", "")) # Dotted path: split into module/object + attribute if "." in name and not module: parts = name.split(".", 1) mod_name, attr_path = parts[0], parts[1] # Try module tree first _, mod_obj = _resolve_module(mod_name, ns) # Then try REPL namespace (covers Map, gv, gil, etc.) if mod_obj is None and mod_name in ns: mod_obj = ns[mod_name] if mod_obj is not None: try: obj = mod_obj for p in attr_path.split("."): obj = getattr(obj, p) return json.dumps(_describe_object(obj, name=name, module_name=mod_name)) except AttributeError: pass # Fallback: mod_name might be a class inside a module (e.g. "mapper.addLayer") if mod_obj is None: for short, entry in _MODULE_TREE.items(): if short != entry["fq"]: continue for m in entry.get("members", []): if m["type"] == "class" and m["name"] == mod_name: _mod = _get_module(entry) if _mod: cls = getattr(_mod, mod_name, None) if cls: try: obj = cls for p in attr_path.split("."): obj = getattr(obj, p) return json.dumps(_describe_object(obj, name=name, module_name=entry["fq"].rsplit(".", 1)[-1])) except AttributeError: pass # If module specified, look there if module: _, mod_obj = _resolve_module(module, ns) if mod_obj is None: return json.dumps({"error": f"Module {module!r} not found."}) # Try direct attribute obj = getattr(mod_obj, name, None) # geeView mapper fallback if obj is None and module in ("geeView", "geeViz.geeView"): mapper_cls = getattr(mod_obj, "mapper", None) if mapper_cls: obj = getattr(mapper_cls, name, None) if obj is not None: name = f"mapper.{name}" if obj is not None: return json.dumps(_describe_object(obj, name=name, module_name=module)) return json.dumps({"error": f"{name!r} not found in {module}."}) # Bare name — check AST index first (no import needed if value was extracted) for short, entry in _MODULE_TREE.items(): if short != entry["fq"]: continue for m in entry.get("members", []): if m["name"] == name: # If AST captured the value, return it without importing if "value" in m: result = {"name": name, "module": entry["fq"].rsplit(".", 1)[-1], "type": type(m["value"]).__name__, "value": _make_serializable(m["value"])} return json.dumps(result) # For functions/classes, return AST info without importing if m["type"] in ("function", "class"): result = {"name": name, "module": entry["fq"].rsplit(".", 1)[-1], "type": m["type"]} if m.get("signature"): result["signature"] = m["signature"] if m.get("docstring"): result["docstring"] = m["docstring"] elif m.get("description"): result["docstring"] = m["description"] if m.get("methods"): result["methods"] = m["methods"] return json.dumps(result) # Variable without literal value — return source expression if available result = {"name": name, "module": entry["fq"].rsplit(".", 1)[-1], "type": "variable"} if "value" in m: result["value"] = m["value"] return json.dumps(result) # Fallback: geeView mapper methods (not in top-level AST) gv_entry = _MODULE_TREE.get("geeViz.geeView") if gv_entry: # Check mapper class members from AST for m in gv_entry.get("members", []): if m["type"] == "class" and m["name"] == "mapper": if name in m.get("methods", []): mod_obj = _get_module(gv_entry) if mod_obj: mapper_cls = getattr(mod_obj, "mapper", None) if mapper_cls: obj = getattr(mapper_cls, name, None) if obj: return json.dumps(_describe_object(obj, name=f"mapper.{name}", module_name="geeView")) break # Search class methods across all modules (e.g. bare "addTimeLapse" finds mapper.addTimeLapse) for short, entry in _MODULE_TREE.items(): if short != entry["fq"]: continue for m in entry.get("members", []): if m["type"] == "class" and m.get("methods") and name in m["methods"]: # Found as a method — import to get full docstring mod_obj = _get_module(entry) if mod_obj: cls = getattr(mod_obj, m["name"], None) if cls: obj = getattr(cls, name, None) if obj: return json.dumps(_describe_object(obj, name=f"{m['name']}.{name}", module_name=entry["fq"].rsplit(".", 1)[-1])) # Check REPL namespace as last resort if name in ns: return json.dumps(_describe_object(ns[name], name=name, module_name="(REPL namespace)")) return json.dumps({"error": f"{name!r} not found in any geeViz module or REPL namespace."}) # --- Module listing (AST-based, no import) --- if module: tree_entry = _MODULE_TREE.get(module) if tree_entry and "members" in tree_entry: # Use AST index — zero imports q = query.lower() if query else "" results = [] for m in tree_entry["members"]: name_match = not q or q in m["name"].lower() or q in m.get("description", "").lower() if name_match: r = {"name": m["name"], "type": m["type"]} if m.get("description"): r["description"] = m["description"] if m.get("signature"): r["signature"] = m["signature"] if m.get("methods"): r["methods"] = m["methods"] results.append(r) # Also list class methods as individual entries if m["type"] == "class" and m.get("methods"): for meth in m["methods"]: if not q or q in meth.lower(): results.append({"name": f"{m['name']}.{meth}", "type": "method", "description": f"Method of {m['name']}"}) return json.dumps({"module": module, "count": len(results), "results": results}) # Fallback for REPL modules (ee, etc.) — must import/inspect _, mod_obj = _resolve_module(module, ns) if mod_obj is None: return json.dumps({"error": f"Module {module!r} not found."}) results = [] for attr_name, obj, kind, first_line, sig in _iter_module_members(mod_obj, query): r = {"name": attr_name, "type": kind, "description": first_line} if sig: r["signature"] = sig results.append(r) return json.dumps({"module": module, "count": len(results), "results": results}) # --- Search across all modules (AST-based, no import) --- if query: q = query.lower() results = [] seen_fqs = set() for short, entry in _MODULE_TREE.items(): fq = entry["fq"] if fq in seen_fqs: continue seen_fqs.add(fq) mod_short = fq.rsplit(".", 1)[-1] for m in entry.get("members", []): if q not in m["name"].lower() and q not in m.get("description", "").lower(): # Also check class method names if m["type"] == "class" and m.get("methods"): matching_methods = [meth for meth in m["methods"] if q in meth.lower()] for meth in matching_methods: results.append({"module": mod_short, "name": f"{m['name']}.{meth}", "type": "method", "description": f"Method of {m['name']}"}) continue r = {"module": mod_short, "name": m["name"], "type": m["type"]} if m.get("description"): r["description"] = m["description"] if m.get("signature"): r["signature"] = m["signature"] results.append(r) # If it's a class, also include matching methods if m["type"] == "class" and m.get("methods"): for meth in m["methods"]: if q in meth.lower(): results.append({"module": mod_short, "name": f"{m['name']}.{meth}", "type": "method", "description": f"Method of {m['name']}"}) return json.dumps({"query": query, "count": len(results), "results": results}) # --- No args: list all modules (AST-based, no import) --- seen = set() modules = [] for short, entry in sorted(_MODULE_TREE.items()): fq = entry["fq"] if fq in seen or short != fq.rsplit(".", 1)[-1]: continue seen.add(fq) modules.append({"name": short, "full_path": fq, "description": entry.get("doc", "")}) return json.dumps({ "modules": modules, "count": len(modules), "usage": 'Use module="<name>" to list members, query="<term>" to search, name="<object>" for details.', })
# Examples tool removed -- use search_geeviz(module="examples") to list, # search_geeviz(name="CCDCViz") to read source. # --------------------------------------------------------------------------- # Map control (consolidated) # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_WRITE) def map_control(action: str = "view", open_browser: bool = True, filename: str = "map.html", session_id: str = None): """Control the geeView interactive map. `action="view"` writes the per-session runGeeViz.js to disk and opens `geeView/index.html`. In plain Python this is a `file:///` URL; in notebooks it uses an in-process threaded HTTP server (`http://localhost:<port>/...`) for iframe display. The access token is passed via URL query string. Args: action: Action to perform: - "view" (default): Validates all layers first (runs test_layers internally). If any layer fails, returns the errors without opening the map. If all pass, opens the map and returns the URL. - "layers": List current layers with visibility and viz params. - "layer_names": Quick list of just layer names (lightweight). - "clear": Remove all layers and commands. - "test_layers": Fast validation — calls getMapId() on all layers in parallel. Catches bad bands, invalid viz, computation errors. No browser required. Returns pass/fail per layer. - "preview": Quick visual check — fetches a small grid of EE map tiles for each layer around the current center/zoom and returns them as inline images (one per layer). No browser required. Use this to visually verify layers have data in the right area. Returns a dict of {layer_name: PNG image} plus center and zoom. Optional: "preview,zoom=10,grid=2" to override zoom or grid size. - "export": Validates all layers first (like "view"), then writes a self-contained geeView HTML to ``generated_outputs/{filename}``. If any layer fails, returns errors without exporting. The HTML uses absolute asset paths under ``/geeView/static`` and a ``__GEEVIZ_TOKEN__`` placeholder for the access token. Suitable for chat UIs that serve the HTML themselves and inject a fresh token on load. Use this for chat-embedded maps that should survive session reloads. - "export_layers_json": Bundle every currently-added layer into a JSON file under ``generated_outputs/{filename}``. Use this when the agent is building a CUSTOM HTML dashboard (Leaflet, MapLibre, etc.) and needs the EE layers to be re-mintable. The returned ``refresh_url`` is an endpoint the agent embeds in its HTML; fetching it returns fresh tile URLs for every layer so dashboards survive after EE mapids expire. Handles all the same input types as ``addLayer`` (Image, ImageCollection, Geometry, Feature, FeatureCollection) plus tile-URL layers added via ``Map.addTileLayer``. open_browser: For action="view", whether to open in browser (default True). filename: For action="export", the output filename (saved under ``generated_outputs/``). Defaults to ``map.html``. session_id: Session identifier for namespace isolation. Returns: JSON with action-specific results. """ sess = _ensure_initialized(session_id) Map = sess.namespace["Map"] act = action.lower().strip() # Use the same streaming stdout as run_code so the frontend polls it try: os.makedirs(os.path.dirname(sess.stdout_stream_file), exist_ok=True) with open(sess.stdout_stream_file, "w", encoding="utf-8") as f: f.write("") except Exception: pass # Redirect stdout to the streaming file for live polling by the frontend. # Use _StreamingStdout (same as run_code) so print() calls appear in the UI. _mc_stdout = _StreamingStdout(sess.stdout_stream_file) _mc_orig = sys.stdout sys.stdout = _mc_stdout try: result_json = _map_control_inner(Map, act, sess, open_browser, filename, _mc_stdout) # Inject captured stdout into the response so the frontend shows it stdout_text = _mc_stdout.getvalue().strip() if stdout_text: try: result = json.loads(result_json) result["stdout"] = stdout_text result_json = json.dumps(result) except (json.JSONDecodeError, TypeError): pass return result_json finally: sys.stdout = _mc_orig
def _map_control_inner(Map, act, sess, open_browser, filename, _mc_stdout): """Inner logic for map_control, wrapped so stdout is always restored.""" if act == "view": # --- Pre-flight: validate all layers before opening the map --- n_layers = len(getattr(Map, "idDictList", [])) print(f"Validating {n_layers} layer(s)...") try: test_result = Map.testLayers() failed = [l for l in test_result["layers"] if l["status"] == "error"] passed = [l for l in test_result["layers"] if l["status"] == "ok"] for l in passed: print(f" PASS: {l['name']}") for l in failed: print(f" FAIL: {l['name']}{l.get('error', 'unknown error')[:100]}") if failed: print(f"Validation failed — {len(failed)} layer(s) have errors.") return json.dumps({ "pass": False, "message": f"Map not opened — {len(failed)} layer(s) failed validation. Fix errors and retry.", "layers": test_result["layers"], }) print(f"All {n_layers} layer(s) passed validation.") except Exception as exc: print(f"Validation error: {exc}") return json.dumps({ "pass": False, "message": f"Layer validation raised an exception: {exc}. Map not opened.", }) # If any layer has canAreaChart=True and no turnOn command is already set, # auto-enable area charting instead of the default inspector. try: existing_cmds = list(getattr(Map, "mapCommandList", [])) has_turn_on = any("turnOn" in c for c in existing_cmds) if not has_turn_on: has_area_chart = False for entry in getattr(Map, "idDictList", []): viz_raw = entry.get("viz", "{}") try: viz = json.loads(viz_raw) if isinstance(viz_raw, str) else viz_raw except (json.JSONDecodeError, TypeError): viz = {} if isinstance(viz, dict) and viz.get("canAreaChart"): has_area_chart = True break if has_area_chart: Map.turnOnAutoAreaCharting() except Exception: pass # fall back to default inspector behavior in Map.view() url_buf = io.StringIO() try: with contextlib.redirect_stdout(url_buf): Map.view(open_browser=open_browser, open_iframe=False) except Exception as exc: return json.dumps({"error": str(exc)}) printed = url_buf.getvalue() url = None # Look for a URL on the "geeView URL:" line. Accept both http(s):// # (legacy server mode) and file:/// (srcdoc mode, the new default). for line in printed.splitlines(): line = line.strip() if "geeView URL:" in line: tail = line.split("geeView URL:", 1)[1].strip() if tail: url = tail break # Fallback: find any line starting with http or file if url is None: for line in printed.splitlines(): line = line.strip() if line.startswith(("http://", "https://", "file:///")): url = line break layer_count = len(Map.idDictList) if hasattr(Map, "idDictList") else 0 print(f"Map opened with {layer_count} layer(s).") return json.dumps({ "url": url, "layer_count": layer_count, "message": f"Map opened with {layer_count} layer(s)." if url else "Map.view() ran but no URL was captured.", "raw_output": printed.strip(), }) elif act == "layers": layers = [] for entry in getattr(Map, "idDictList", []): viz_raw = entry.get("viz", "{}") try: viz = json.loads(viz_raw) if isinstance(viz_raw, str) else viz_raw except (json.JSONDecodeError, TypeError): viz = viz_raw layers.append({ "name": entry.get("name", "(unnamed)"), "visible": entry.get("visible", "true"), "function": entry.get("function", ""), "viz": viz, }) commands = list(getattr(Map, "mapCommandList", [])) return json.dumps({"layer_count": len(layers), "layers": layers, "commands": commands}) elif act == "layer_names": names = [entry.get("name", "(unnamed)") for entry in getattr(Map, "idDictList", [])] return json.dumps({"layer_count": len(names), "layer_names": names}) elif act == "clear": try: Map.clearMap() except Exception as exc: return json.dumps({"error": str(exc)}) return json.dumps({"success": True, "message": "Map cleared. All layers and commands removed."}) elif act == "test_layers": try: result = Map.testLayers() except Exception as exc: return json.dumps({"error": str(exc)}) errors = [l for l in result["layers"] if l["status"] == "error"] warnings = [l for l in result["layers"] if l.get("warnings")] parts = [] if errors: parts.append(f"{len(errors)} layer error(s) detected.") if warnings: parts.append(f"{len(warnings)} layer(s) with warnings.") if not parts: parts.append("All layers passed.") return json.dumps({ "pass": result["pass"] and not errors, "message": " ".join(parts), "layers": result["layers"], }) elif act == "preview" or act.startswith("preview,"): # Quick tile-based preview — returns per-layer images the LLM can see grid_size = 3 zoom_override = None if "," in act: # e.g. action="preview,zoom=10,grid=2" for part in act.split(",")[1:]: part = part.strip() if part.startswith("zoom="): try: zoom_override = int(part.split("=")[1]) except ValueError: pass elif part.startswith("grid="): try: grid_size = int(part.split("=")[1]) except ValueError: pass try: result = Map.previewMap(grid_size=grid_size, zoom=zoom_override) except Exception as exc: return json.dumps({"error": f"Preview failed: {exc}"}) layer_images = result.get("layers", {}) n_ok = sum(1 for v in layer_images.values() if v is not None) n_fail = sum(1 for v in layer_images.values() if v is None) center = result.get("center", [0, 0]) z = result.get("zoom", 8) # Save preview images to session output dir import os as _os out = sess.output_dir _os.makedirs(out, exist_ok=True) saved = {} for name, img_bytes in layer_images.items(): if img_bytes is not None: safe = name.replace(" ", "_").replace("/", "_")[:40] fname = f"preview_{safe}.png" with open(_os.path.join(out, fname), "wb") as f: f.write(img_bytes) saved[name] = fname # Build response — JSON summary (visible in UI) + inline images (visible to LLM) summary = { "message": f"Preview generated for {n_ok} layer(s) at zoom {z}.", "center": center, "zoom": z, } if n_fail: summary["failed"] = n_fail if _MCPImage and saved: # Build viz context for each layer so LLM can interpret colors layer_viz = {} for idDict in Map.idDictList: lname = idDict.get("name", "") viz = idDict.get("_viz", {}) if lname in saved: desc_parts = [] if viz.get("bands"): desc_parts.append(f"bands={viz['bands']}") if viz.get("palette"): p = viz["palette"] if isinstance(p, list) and len(p) > 5: desc_parts.append(f"palette=[{p[0]}...{p[-1]}] ({len(p)} colors)") else: desc_parts.append(f"palette={p}") if viz.get("min") is not None: desc_parts.append(f"min={viz['min']}") if viz.get("max") is not None: desc_parts.append(f"max={viz['max']}") lt = viz.get("layerType", "") if "Vector" in lt or "vector" in lt: desc_parts.append("vector") for vk in ("strokeColor", "color", "fillColor", "pointRadius", "width"): if viz.get(vk) is not None: desc_parts.append(f"{vk}={viz[vk]}") if viz.get("autoViz"): desc_parts.append("autoViz=True (thematic/class data)") if idDict.get("_is_mosaic_preview"): desc_parts.append("MOSAIC of time-lapse — single representative tile, not animated") layer_viz[lname] = ", ".join(desc_parts) if desc_parts else "" content_parts = [json.dumps(summary)] for name, fname in saved.items(): # Add layer label + viz context before each image ctx = layer_viz.get(name, "") label = f"Layer: {name}" if ctx: label += f" ({ctx})" content_parts.append(label) fpath = _os.path.join(out, fname) with open(fpath, "rb") as f: content_parts.append(_MCPImage(data=f.read(), format="png")) return content_parts # Fallback: return JSON with file paths summary["files"] = saved return json.dumps(summary) elif act == "export": # --- Pre-flight: validate all layers before exporting --- n_layers = len(getattr(Map, "idDictList", [])) print(f"Validating {n_layers} layer(s)...") try: test_result = Map.testLayers() failed = [l for l in test_result["layers"] if l["status"] == "error"] passed = [l for l in test_result["layers"] if l["status"] == "ok"] for l in passed: print(f" PASS: {l['name']}") for l in failed: print(f" FAIL: {l['name']}{l.get('error', 'unknown error')[:100]}") if failed: print(f"Validation failed — {len(failed)} layer(s) have errors.") return json.dumps({ "pass": False, "message": f"Map not exported — {len(failed)} layer(s) failed validation. Fix errors and retry.", "layers": test_result["layers"], }) print(f"All {n_layers} layer(s) passed. Exporting map...") except Exception as exc: print(f"Validation error: {exc}") return json.dumps({ "pass": False, "message": f"Layer validation raised an exception: {exc}. Map not exported.", }) # Same auto-area-charting fallback as `view` try: existing_cmds = list(getattr(Map, "mapCommandList", [])) has_turn_on = any("turnOn" in c for c in existing_cmds) if not has_turn_on: has_area_chart = False for entry in getattr(Map, "idDictList", []): viz_raw = entry.get("viz", "{}") try: viz = json.loads(viz_raw) if isinstance(viz_raw, str) else viz_raw except (json.JSONDecodeError, TypeError): viz = {} if isinstance(viz, dict) and viz.get("canAreaChart"): has_area_chart = True break if has_area_chart: Map.turnOnAutoAreaCharting() except Exception: pass # Resolve output path under the session's generated_outputs directory. out_path = filename if os.path.isabs(filename) else os.path.join(sess.output_dir, filename) try: written_path = Map.export_html(out_path) except Exception as exc: return json.dumps({"error": str(exc)}) layer_count = len(Map.idDictList) if hasattr(Map, "idDictList") else 0 print(f"Map exported with {layer_count} layer(s) to {os.path.basename(written_path)}.") return json.dumps({ "path": written_path, "layer_count": layer_count, "message": f"Map exported with {layer_count} layer(s) to {os.path.basename(written_path)}.", }) elif act == "export_layers_json": # Serialize all currently-added layers to a JSON file. A separate # /api/dashboard/urls endpoint reads this file at viewing time, # calls getMapId on each layer, and returns fresh tile URLs — so # custom HTML dashboards survive expiration. try: result = Map.exportLayerJson(filename=filename or "dashboard_layers.json", output_dir=sess.output_dir) except Exception as exc: return json.dumps({"error": str(exc)}) # Add the refresh URL the agent should embed in its custom HTML. # sess.session_id is the canonical per-session identifier — `session_id` # is the outer map_control() parameter and isn't in scope inside # _map_control_inner(). sid = sess.session_id or "" # Relative URL — works when the dashboard HTML is served from the # same origin as the agent (i.e., embedded in the agent UI). For # standalone hosting on a different origin, the dashboard would # need an absolute URL and CORS — out of scope for now. result["refresh_url"] = ( f"/api/dashboard/urls?session_id={sid}" f"&file={os.path.basename(result['path'])}" ) result["message"] = ( f"{result['layer_count']} layer(s) saved to {os.path.basename(result['path'])}. " f"Embed refresh_url in your custom HTML to fetch fresh tile URLs." ) return json.dumps(result) else: return json.dumps({"error": f"Unknown action: {action!r}. Use 'view', 'layers', 'layer_names', 'clear', 'test_layers', 'preview', 'export', or 'export_layers_json'."}) # --------------------------------------------------------------------------- # Tool 13: save_session # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_WRITE) def save_session(filename: str = "", format: str = "py", session_id: str = None) -> str: """Save the accumulated run_code history to a .py script or .ipynb notebook. Args: filename: Optional custom filename (saved in geeViz/mcp/generated_scripts/). If omitted, uses a timestamped default. The correct extension is added automatically based on format. format: Output format -- "py" (default) for a standalone Python script, "ipynb" for a Jupyter notebook. Returns: JSON with the file path and number of code blocks/cells saved. """ sess = _get_session(session_id) if format not in ("py", "ipynb"): return json.dumps({ "error": f"Invalid format: {format!r}. Must be 'py' or 'ipynb'.", }) if not sess.code_history: return json.dumps({ "error": "No code has been executed yet. Use run_code first.", }) if format == "py": if filename: if not filename.endswith(".py"): filename += ".py" os.makedirs(sess.script_dir, exist_ok=True) sess.current_script_path = os.path.join(sess.script_dir, filename) path = _save_history_to_file(sess) return json.dumps({ "success": True, "script_path": path, "code_blocks": len(sess.code_history), "message": f"Saved {len(sess.code_history)} code block(s) to {path}", }) # format == "ipynb" import datetime os.makedirs(sess.script_dir, exist_ok=True) if filename: if not filename.endswith(".ipynb"): filename += ".ipynb" nb_path = os.path.join(sess.script_dir, filename) else: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") nb_path = os.path.join(sess.script_dir, f"session_{ts}.ipynb") # Build notebook structure (nbformat 4.5) cells = [] # Markdown header cell cells.append({ "cell_type": "markdown", "metadata": {}, "source": [ "# geeViz MCP Session\n", "\n", f"Auto-generated by geeViz MCP server on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n", ], }) # Import cell cells.append({ "cell_type": "code", "metadata": {}, "source": [ "import geeViz.geeView as gv\n", "import geeViz.getImagesLib as gil\n", "import geeViz.getSummaryAreasLib as sal\n", "from geeViz.outputLib import charts as cl\n", "from geeViz.outputLib import thumbs as tl\n", "from geeViz.outputLib import reports as rl\n", "ee = gv.ee\n", "Map = gv.Map", ], "execution_count": None, "outputs": [], }) # One code cell per run_code call for i, block in enumerate(sess.code_history): lines = block.splitlines(True) # keep line endings # Ensure last line has newline for notebook format if lines and not lines[-1].endswith("\n"): lines[-1] += "\n" cells.append({ "cell_type": "code", "metadata": {}, "source": lines, "execution_count": None, "outputs": [], }) notebook = { "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3", }, "language_info": { "name": "python", "version": sys.version.split()[0], }, }, "cells": cells, } with open(nb_path, "w", encoding="utf-8") as f: json.dump(notebook, f, indent=1, ensure_ascii=False) return json.dumps({ "success": True, "notebook_path": nb_path, "code_cells": len(sess.code_history), "message": f"Saved {len(sess.code_history)} code cell(s) to {nb_path}", })
# --------------------------------------------------------------------------- # Environment info (consolidated) # --------------------------------------------------------------------------- _NAMESPACE_BUILTINS = {"ee", "Map", "gv", "gil"}
[docs] @app.tool(annotations=_READ_ONLY_OPEN) def env_info(action: str = "version", session_id: str = None) -> str: """Get environment information: versions, REPL namespace, or project details. Args: action: What to return: - "version" (default): geeViz, EE, and Python versions. - "namespace": User-defined variables in the REPL (no getInfo calls). - "project": Current EE project ID and root assets. Returns: JSON with action-specific results. """ act = action.lower().strip() if act == "version": import geeViz result = { "geeViz_version": geeViz.__version__, "python_version": sys.version, "platform": sys.platform, } try: import ee result["ee_version"] = ee.__version__ except Exception: result["ee_version"] = "(not available)" return json.dumps(result) elif act == "namespace": sess = _ensure_initialized(session_id) ee = sess.namespace["ee"] entries = [] for name, obj in sorted(sess.namespace.items()): if name.startswith("_") or name in _NAMESPACE_BUILTINS: continue type_name = type(obj).__name__ for ee_type in ("Image", "ImageCollection", "FeatureCollection", "Feature", "Geometry", "Number", "String", "List", "Dictionary", "Filter", "Reducer", "ComputedObject"): if isinstance(obj, getattr(ee, ee_type, type(None))): type_name = f"ee.{ee_type}" break try: r = repr(obj) if len(r) > 2000: r = r[:2000] + "..." except Exception: r = "(repr failed)" entries.append({"name": name, "type": type_name, "repr": r}) return json.dumps({ "count": len(entries), "variables": entries, "note": "Excludes builtins (ee, Map, gv, gil). No getInfo() calls made.", }) elif act == "project": sess = _ensure_initialized(session_id) ee = sess.namespace["ee"] result: dict = {} try: result["project_id"] = ee.data._get_state().cloud_api_user_project except Exception as exc: result["project_id"] = None result["project_error"] = str(exc) if result.get("project_id"): try: root = f"projects/{result['project_id']}/assets" assets_response = ee.data.listAssets({"parent": root}) assets = assets_response.get("assets", []) result["root_assets"] = [ {"id": a.get("id") or a.get("name", ""), "type": a.get("type", "UNKNOWN")} for a in assets[:500] ] result["root_asset_count"] = len(assets) except Exception as exc: result["root_assets"] = [] result["assets_error"] = str(exc) return json.dumps(result) elif act == "reload": # Force-reload all geeViz modules in the running process. # Use this after editing geeViz source files to pick up changes # without restarting the MCP server or ADK session. import importlib reloaded = [] for mod_name in sorted(sys.modules.keys()): if mod_name.startswith("geeViz"): try: importlib.reload(sys.modules[mod_name]) reloaded.append(mod_name) except Exception: pass # Re-initialize the REPL namespace with fresh modules _reset_namespace(session_id) return json.dumps({ "action": "reload", "reloaded_modules": reloaded, "count": len(reloaded), "message": "All geeViz modules reloaded. REPL namespace reset.", }) else: return json.dumps({"error": f"Unknown action: {action!r}. Use 'version', 'namespace', 'project', or 'reload'."})
# --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # View generated output (returns image inline for LLM visual inspection) # --------------------------------------------------------------------------- # NOTE: view_output is registered below ONLY if _MCPImage loaded successfully. # It returns an Image object that MCP clients render inline.
[docs] @app.tool(annotations=_READ_ONLY) def view_output(filename: str, session_id: str = None): """View a generated output file (PNG, GIF, JPEG) as an inline image. Use this to visually inspect charts, thumbnails, previews, or any image file in the generated_outputs directory. The image is returned directly so the LLM can see it. For map previews, first call map_control(action="preview") to generate preview PNGs, then call view_output("preview_Layer_Name.png") to see them. Args: filename: Name of the file in generated_outputs/ (e.g. "chart.png", "preview_Elevation.png"). Just the filename, no directory. session_id: Session identifier for namespace isolation. Returns: The image content (displayed inline by the MCP client), or an error string. """ import os as _os sess = _ensure_initialized(session_id) out_dir = sess.output_dir safe = _os.path.basename(filename) path = _os.path.join(out_dir, safe) # Also check base output dir as fallback if not _os.path.isfile(path): path = _os.path.join(_BASE_OUTPUT_DIR, safe) if not _os.path.isfile(path): return f"File not found: {safe}" ext = _os.path.splitext(safe)[1].lower() fmt_map = {".png": "png", ".gif": "gif", ".jpg": "jpeg", ".jpeg": "jpeg", ".webp": "webp"} fmt = fmt_map.get(ext) if not fmt: return f"Unsupported image format: {ext}. Supported: {', '.join(fmt_map.keys())}" with open(path, "rb") as f: data = f.read() import io from PIL import Image as _PILImage max_dim = 256 # Aggressive — LLM doesn't need pixel-perfect, just a recognizable preview def _shrink_to_jpeg(pil_img): """Downscale and convert to JPEG bytes for minimal context cost.""" if max(pil_img.size) > max_dim: ratio = max_dim / max(pil_img.size) pil_img = pil_img.resize((int(pil_img.width * ratio), int(pil_img.height * ratio)), _PILImage.LANCZOS) if pil_img.mode in ("RGBA", "P", "LA"): bg = _PILImage.new("RGB", pil_img.size, (255, 255, 255)) if pil_img.mode == "P": pil_img = pil_img.convert("RGBA") bg.paste(pil_img, mask=pil_img.split()[-1] if pil_img.mode in ("RGBA", "LA") else None) pil_img = bg elif pil_img.mode != "RGB": pil_img = pil_img.convert("RGB") buf = io.BytesIO() pil_img.save(buf, format="JPEG", quality=70, optimize=True) return buf.getvalue() # GIFs: extract a few key frames as JPEGs if fmt == "gif" and _MCPImage is not None: try: img = _PILImage.open(io.BytesIO(data)) n_frames = getattr(img, "n_frames", 1) if n_frames > 1: max_frames = 6 # Reduced from 8 to keep context smaller step = max(1, n_frames // max_frames) parts = [f"GIF with {n_frames} frames — showing {min(n_frames, max_frames)} sampled frames:"] for i in range(0, n_frames, step): if len([p for p in parts if not isinstance(p, str)]) >= max_frames: break img.seek(i) frame = img.convert("RGBA") jpeg_bytes = _shrink_to_jpeg(frame) parts.append(f"Frame {i + 1}/{n_frames}:") parts.append(_MCPImage(data=jpeg_bytes, format="jpeg")) return parts except Exception: pass # Fall through to static handling # Static images: aggressive downscale to JPEG try: img = _PILImage.open(io.BytesIO(data)) data = _shrink_to_jpeg(img) fmt = "jpeg" except Exception: pass if _MCPImage is not None: return _MCPImage(data=data, format=fmt) # Fallback: return full base64 data URI import base64 as _b64 encoded = _b64.b64encode(data).decode("ascii") return f"data:image/{fmt};base64,{encoded}"
# Export image (consolidated) # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_WRITE_OPEN) def export_image( destination: str, image_var: str, region_var: str = "", scale: int = 30, crs: str = "EPSG:4326", overwrite: bool = False, asset_id: str = "", pyramiding_policy: str = "mean", output_name: str = "", drive_folder: str = "", bucket: str = "", output_no_data: int = -32768, file_format: str = "GeoTIFF", session_id: str = None, ) -> str: """Export an ee.Image to a GEE asset, Google Drive, or Cloud Storage. Args: destination: Where to export -- "asset", "drive", or "cloud". image_var: Name of the ee.Image variable in the REPL namespace. region_var: Name of an ee.Geometry or ee.FeatureCollection variable for the export region. Required for drive/cloud exports; optional for asset exports (uses image footprint if omitted). scale: Output resolution in meters (default 30). crs: Coordinate reference system (default "EPSG:4326"). overwrite: If True, overwrite existing asset/file (default False). Asset-specific: asset_id: Full destination asset ID (required for destination="asset"). pyramiding_policy: "mean" (default), "mode", "min", "max", "median", "sample". Drive-specific: output_name: Output filename without extension (required for drive/cloud). drive_folder: Google Drive folder name (required for destination="drive"). Cloud Storage-specific: output_name: Output filename without extension (required for drive/cloud). bucket: GCS bucket name (required for destination="cloud"). output_no_data: NoData value (default -32768). file_format: "GeoTIFF" (default) or "TFRecord". Returns: JSON with export status or an error. """ sess = _ensure_initialized(session_id) ee = sess.namespace["ee"] gil = sess.namespace["gil"] dest = destination.lower().strip() if dest not in ("asset", "drive", "cloud"): return json.dumps({"error": f"Unknown destination: {destination!r}. Use 'asset', 'drive', or 'cloud'."}) # Look up image image = sess.namespace.get(image_var) if image is None: return json.dumps({"error": f"Variable {image_var!r} not found in namespace."}) if not isinstance(image, ee.Image): return json.dumps({"error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image."}) # Look up region region = None if region_var: region = sess.namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({"error": f"Variable {region_var!r} is {type(region).__name__}, expected ee.Geometry or ee.FeatureCollection."}) elif dest in ("drive", "cloud"): return json.dumps({"error": f"region_var is required for destination='{dest}'."}) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): # In sandbox mode, create task but don't start it _start = not _SANDBOX_ENABLED if dest == "asset": if not asset_id: return json.dumps({"error": "asset_id is required for destination='asset'."}) asset_name = asset_id.split("/")[-1] gil.exportToAssetWrapper( image, asset_name, asset_id, pyramidingPolicyObject={"default": pyramiding_policy}, roi=region, scale=scale, crs=crs, overwrite=overwrite, start=_start, ) elif dest == "drive": if not output_name or not drive_folder: return json.dumps({"error": "output_name and drive_folder are required for destination='drive'."}) gil.exportToDriveWrapper( image, output_name, drive_folder, region, scale, crs, None, output_no_data, start=_start, ) elif dest == "cloud": if not output_name or not bucket: return json.dumps({"error": "output_name and bucket are required for destination='cloud'."}) gil.exportToCloudStorageWrapper( image, output_name, bucket, region, scale, crs, None, output_no_data, file_format, {"cloudOptimized": True}, overwrite, start=_start, ) except Exception as exc: return json.dumps({"error": f"Export failed: {exc}", "stdout": stdout_buf.getvalue()}) if _SANDBOX_ENABLED: return json.dumps({ "success": False, "destination": dest, "message": "Export task was created but NOT started (sandbox mode). " "Download the code and run it locally to execute the export.", }) return json.dumps({ "success": True, "destination": dest, "scale": scale, "crs": crs, "stdout": stdout_buf.getvalue().strip(), "message": f"Export to {dest} started. Use track_tasks() to monitor progress.", })
import urllib.request import urllib.parse import urllib.error # Dataset catalog cache (for search_datasets) _CACHE_DIR = os.path.join(_THIS_DIR, ".cache") _CACHE_META_FILE = os.path.join(_CACHE_DIR, "meta.json") _CACHE_TTL = 1 * 24 * 3600 # 1 day _CATALOG_FILES = { "official": "official_catalog.json", "community": "community_catalog.json", } _CATALOG_URLS = { "official": "https://earthengine-stac.storage.googleapis.com/catalog/catalog.json", "community": "https://raw.githubusercontent.com/samapriya/awesome-gee-community-datasets/master/community_datasets.json", } _cache_lock = threading.Lock() import time as _time def _fetch_catalog(url: str, name: str) -> list[dict] | None: """Fetch a dataset catalog from a URL and return a flat list of dataset dicts. For the official EE STAC catalog (nested 2-level structure), crawls all child catalogs to build a flat index. For the community catalog (already flat), returns as-is. """ import concurrent.futures def _fetch_json(u, timeout=10): req = urllib.request.Request(u, headers={"User-Agent": "geeViz-MCP/1.0"}) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) try: data = _fetch_json(url, timeout=15) except Exception: return None # Already a flat list (community catalog) if isinstance(data, list): return data # Nested STAC catalog — crawl child links if isinstance(data, dict) and "links" in data: children = [l["href"] for l in data["links"] if l.get("rel") == "child"] datasets = [] def _crawl_child(child_url): """Fetch a child catalog and extract dataset entries.""" results = [] try: child = _fetch_json(child_url, timeout=8) leaves = [l["href"] for l in child.get("links", []) if l.get("rel") == "child"] # Fetch all leaf datasets in this child catalog for leaf_url in leaves: try: leaf = _fetch_json(leaf_url, timeout=5) entry = { "id": leaf.get("id", ""), "title": leaf.get("title", ""), "type": leaf.get("gee:type", ""), "provider": ", ".join( p.get("name", "") for p in leaf.get("providers", []) ), "tags": ", ".join(leaf.get("keywords", [])), "source": "official", "date_range": "", } # Extract date range from extent ext = leaf.get("extent", {}).get("temporal", {}).get("interval", [[]]) if ext and ext[0]: entry["date_range"] = f"{ext[0][0] or ''} to {ext[0][1] or 'present'}" # STAC URL for get_catalog_info for link in leaf.get("links", []): if link.get("rel") == "self": entry["stac_url"] = link["href"] break results.append(entry) except Exception: pass except Exception: pass return results # Crawl all children in parallel (max 20 threads) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool: for child_results in pool.map(_crawl_child, children): datasets.extend(child_results) return datasets if datasets else None return None def _read_cache_meta() -> dict: """Read the cache timestamp metadata file.""" if os.path.isfile(_CACHE_META_FILE): try: with open(_CACHE_META_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {} def _write_cache_meta(meta: dict) -> None: """Write the cache timestamp metadata file.""" os.makedirs(_CACHE_DIR, exist_ok=True) with open(_CACHE_META_FILE, "w", encoding="utf-8") as f: json.dump(meta, f) def _get_cached_catalog(name: str) -> list[dict] | None: """Return parsed JSON list for a catalog, fetching/caching as needed. Args: name: "official" or "community" Returns: List of dataset dicts, or None if unavailable. """ with _cache_lock: cache_file = os.path.join(_CACHE_DIR, _CATALOG_FILES[name]) meta = _read_cache_meta() ts_key = f"{name}_ts" now = _time.time() # Check if cache is fresh cached_exists = os.path.isfile(cache_file) cache_fresh = cached_exists and (now - meta.get(ts_key, 0)) < _CACHE_TTL if cache_fresh: try: with open(cache_file, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass # Fall through to fetch # Fetch from remote url = _CATALOG_URLS[name] try: data = _fetch_catalog(url, name) if data: # Cache the normalized result os.makedirs(_CACHE_DIR, exist_ok=True) with open(cache_file, "w", encoding="utf-8") as f: json.dump(data, f) meta[ts_key] = now _write_cache_meta(meta) return data except Exception: # Fetch failed -- use stale cache if available if cached_exists: try: with open(cache_file, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return None # --------------------------------------------------------------------------- # Tool 20: search_datasets # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_READ_ONLY) def search_datasets(query: str, source: str = "all", max_results: int = 50) -> str: """Search the GEE dataset catalog by keyword. Searches both the official Earth Engine catalog (~500+ datasets) and the community catalog (~200+ datasets). Uses word-level matching against title, tags, id, and provider fields with relevance scoring. Args: query: Search terms (e.g. "landsat surface reflectance", "DEM", "sentinel fire"). Case-insensitive. source: Which catalog to search: "official", "community", or "all" (default). max_results: Maximum number of results to return (default 10). Returns: JSON list of matching datasets with id, title, type, provider, tags, source, and additional metadata. """ if source not in ("official", "community", "all"): return json.dumps({ "error": f"Invalid source: {source!r}. Must be 'official', 'community', or 'all'.", }) sources_to_search = ( ["official", "community"] if source == "all" else [source] ) # Load catalogs catalogs: dict[str, list[dict]] = {} errors: list[str] = [] for src in sources_to_search: data = _get_cached_catalog(src) if data is not None: catalogs[src] = data else: errors.append(f"Failed to load {src} catalog (no cache available).") if not catalogs: return json.dumps({"error": " ".join(errors)}) # Split query into words for multi-word matching query_words = query.lower().split() if not query_words: return json.dumps({"error": "Empty query."}) # Field weights weights = {"title": 3, "tags": 2, "id": 2, "provider": 1} scored: list[tuple[int, dict]] = [] for src_name, entries in catalogs.items(): for entry in entries: if not isinstance(entry, dict): continue # skip malformed entries # Extract searchable fields title = (entry.get("title") or "").lower() tags = (entry.get("tags") or "").lower() eid = (entry.get("id") or "").lower() provider = (entry.get("provider") or "").lower() fields = {"title": title, "tags": tags, "id": eid, "provider": provider} # Score: sum of (weight × number of query words matched in field) score = 0 for field_name, field_val in fields.items(): for word in query_words: if word in field_val: score += weights[field_name] if score == 0: continue # Build result entry result_entry: dict = { "id": entry.get("id", ""), "title": entry.get("title", ""), "type": entry.get("type", ""), "provider": entry.get("provider", ""), "tags": entry.get("tags", ""), "source": src_name, } if src_name == "official": result_entry["date_range"] = entry.get("date_range", "") # Build STAC URL eid_raw = entry.get("id", "") if eid_raw: parts = eid_raw.split("/") stac_dir = parts[0] stac_file = eid_raw.replace("/", "_") result_entry["stac_url"] = ( f"https://earthengine-stac.storage.googleapis.com/" f"catalog/{stac_dir}/{stac_file}.json" ) else: # Community catalog fields result_entry["thematic_group"] = entry.get("thematic_group", "") result_entry["docs"] = entry.get("docs", "") scored.append((score, result_entry)) # Sort by score descending, then by title alphabetically scored.sort(key=lambda x: (-x[0], x[1].get("title", ""))) results = [entry for _, entry in scored[:max_results]] out: dict = { "query": query, "source": source, "count": len(results), "total_matches": len(scored), "results": results, } if errors: out["warnings"] = errors return json.dumps(out)
import base64 as _base64 # --------------------------------------------------------------------------- # Asset management (consolidated) # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_DESTRUCTIVE) def manage_asset( action: str, asset_id: str = "", dest_id: str = "", overwrite: bool = False, folder_type: str = "Folder", all_users_can_read: bool = False, readers: str = "", writers: str = "", session_id: str = None, ) -> str: """Manage GEE assets: delete, copy, move, create folders, update permissions. Args: action: Operation to perform: - "delete": Delete a single asset. - "copy": Copy asset_id to dest_id. - "move": Copy asset_id to dest_id, then delete source. - "create": Create a folder or ImageCollection at asset_id. - "update_acl": Update permissions on asset_id. asset_id: Full asset path. Required for all actions. For "create", this is the folder path to create. dest_id: Destination path (required for "copy" and "move"). overwrite: If True, overwrite existing destination (default False). folder_type: For action="create" -- "Folder" (default) or "ImageCollection". all_users_can_read: For action="update_acl" -- make publicly readable. readers: For action="update_acl" -- comma-separated reader emails. writers: For action="update_acl" -- comma-separated writer emails. Returns: JSON confirmation or error. """ sess = _ensure_initialized(session_id) ee = sess.namespace["ee"] import geeViz.assetManagerLib as aml act = action.lower().strip() if not asset_id and act != "create": return json.dumps({"error": "asset_id is required."}) if act == "delete": if not aml.ee_asset_exists(asset_id): return json.dumps({"error": f"Asset not found: {asset_id}"}) try: ee.data.deleteAsset(asset_id) except Exception as exc: return json.dumps({"error": f"Delete failed: {exc}"}) return json.dumps({"success": True, "message": f"Asset {asset_id} deleted."}) elif act in ("copy", "move"): if not dest_id: return json.dumps({"error": f"dest_id is required for action='{act}'."}) if not aml.ee_asset_exists(asset_id): return json.dumps({"error": f"Source asset not found: {asset_id}"}) if aml.ee_asset_exists(dest_id): if overwrite: try: ee.data.deleteAsset(dest_id) except Exception as exc: return json.dumps({"error": f"Failed to delete existing dest: {exc}"}) else: return json.dumps({"error": f"Destination exists: {dest_id}. Set overwrite=True to replace."}) try: ee.data.copyAsset(asset_id, dest_id) except Exception as exc: return json.dumps({"error": f"Copy failed: {exc}"}) if act == "move": try: ee.data.deleteAsset(asset_id) except Exception as exc: return json.dumps({"error": f"Copied to {dest_id} but failed to delete source: {exc}", "dest_id": dest_id}) verb = "moved" if act == "move" else "copied" return json.dumps({"success": True, "message": f"Asset {verb} from {asset_id} to {dest_id}."}) elif act == "create": folder_path = asset_id or dest_id if not folder_path: return json.dumps({"error": "asset_id is required for action='create' (the folder path)."}) if folder_type not in ("Folder", "ImageCollection"): return json.dumps({"error": f"Invalid folder_type: {folder_type!r}. Use 'Folder' or 'ImageCollection'."}) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): if folder_type == "ImageCollection": aml.create_image_collection(folder_path) else: aml.create_asset(folder_path, recursive=True) except Exception as exc: return json.dumps({"error": f"Create failed: {exc}", "stdout": stdout_buf.getvalue()}) return json.dumps({"success": True, "message": f"{folder_type} created at {folder_path}.", "stdout": stdout_buf.getvalue().strip()}) elif act == "update_acl": readers_list = [r.strip() for r in readers.split(",") if r.strip()] if readers else [] writers_list = [w.strip() for w in writers.split(",") if w.strip()] if writers else [] stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): aml.updateACL(asset_id, writers=writers_list, all_users_can_read=all_users_can_read, readers=readers_list) except Exception as exc: return json.dumps({"error": f"ACL update failed: {exc}", "stdout": stdout_buf.getvalue()}) return json.dumps({"success": True, "message": f"Permissions updated for {asset_id}.", "stdout": stdout_buf.getvalue().strip()}) else: return json.dumps({"error": f"Unknown action: {action!r}. Use 'delete', 'copy', 'move', 'create', or 'update_acl'."})
def _strip_coordinates(obj): """Recursively strip GeoJSON coordinates from nested dicts/lists. Replaces ``"coordinates": [...]`` with ``"coordinates": "(stripped)"`` to keep large coordinate arrays out of the LLM context window. """ if isinstance(obj, dict): out = {} for k, v in obj.items(): if k == "coordinates" and isinstance(v, list): out[k] = "(stripped)" else: out[k] = _strip_coordinates(v) return out if isinstance(obj, list): return [_strip_coordinates(v) for v in obj] return obj def _make_serializable(obj): """Recursively convert ee objects to JSON-safe values. GeoJSON coordinates are stripped to avoid injecting huge coordinate arrays into the LLM context. """ if obj is None or isinstance(obj, (str, int, float, bool)): return obj if isinstance(obj, list): return [_make_serializable(v) for v in obj] if isinstance(obj, dict): return {k: _make_serializable(v) for k, v in obj.items()} # ee.Geometry -> type only (coordinates are too large for context) try: import ee as _ee if isinstance(obj, _ee.Geometry): geojson = obj.getInfo() return {"type": geojson.get("type", "Geometry"), "coordinates": "(stripped)"} except Exception: pass # Other ee objects -> repr string return repr(obj) # get_reference_data removed — use search_geeviz(name="vizParamsFalse") instead # --------------------------------------------------------------------------- # USFS Enterprise Data Warehouse (EDW) (consolidated) # ---------------------------------------------------------------------------
[docs] @app.tool(annotations=_READ_ONLY_OPEN) def get_streetview( lon: float, lat: float, headings: str = "0,90,180,270", pitch: float = 0, fov: float = 90, radius: int = 50, source: str = "default", session_id: str = None, ) -> str: """Get Google Street View imagery at a location for ground-truthing. Checks if Street View coverage exists, then fetches static images at the requested headings (compass directions). Returns images inline for visual inspection. Useful for ground-truthing remote sensing analysis — see what a location actually looks like from the ground. Args: lon: Longitude in decimal degrees. lat: Latitude in decimal degrees. headings: Comma-separated compass headings in degrees (0=North, 90=East, 180=South, 270=West). Default "0,90,180,270" (all 4 cardinal directions). pitch: Camera pitch (-90 to 90). 0=horizontal, positive=up. fov: Field of view in degrees (1-120). Lower = more zoom. Default 90. radius: Search radius in meters for nearest panorama. Default 50. source: "default" (all) or "outdoor" (outdoor only). Returns: Metadata (date, location, copyright) and Street View images. Returns error if no imagery exists at the location. """ sess = _ensure_initialized(session_id) import geeViz.googleMapsLib as _gm # Check metadata first (free) try: meta = _gm.streetview_metadata(lon, lat, radius=radius, source=source) except Exception as exc: return json.dumps({"error": f"Street View metadata request failed: {exc}"}) if meta.get("status") != "OK": return json.dumps({ "status": meta.get("status", "UNKNOWN"), "message": f"No Street View imagery at ({lat}, {lon}) within {radius}m.", "tip": "Try increasing the radius or checking a nearby road/trail.", }) # Parse headings heading_list = [float(h.strip()) for h in headings.split(",") if h.strip()] _direction_labels = {0: "N", 45: "NE", 90: "E", 135: "SE", 180: "S", 225: "SW", 270: "W", 315: "NW"} # Fetch images and save to files os.makedirs(sess.output_dir, exist_ok=True) saved_images = [] md_lines = [] for h in heading_list: try: img_bytes = _gm.streetview_image( lon, lat, heading=h, pitch=pitch, fov=fov, radius=radius, source=source, ) if img_bytes: label = _direction_labels.get(int(h) % 360, f"{h}deg") fname = f"streetview_{label}.jpg" fpath = os.path.join(sess.output_dir, fname).replace("\\", "/") with open(fpath, "wb") as f: f.write(img_bytes) saved_images.append({"heading": h, "label": label, "path": fpath, "size": len(img_bytes)}) md_lines.append(f"![Street View {label}]({fpath})") except Exception: pass loc = meta.get("location", {}) return json.dumps({ "status": "OK", "date": meta.get("date"), "location": meta.get("location"), "copyright": meta.get("copyright"), "images_fetched": len(saved_images), "images": saved_images, "output_markdown": "\n".join(md_lines) if md_lines else None, })
[docs] @app.tool(annotations=_READ_ONLY_OPEN) def geeviz_search_places( query: str, lon: float = 0, lat: float = 0, radius: float = 5000, max_results: int = 10, session_id: str = None, ) -> str: """Search for places using the Google Places API. Useful for finding landmarks, businesses, or points of interest near a study area. Can also geocode addresses. Args: query: Search text (e.g. "fire station", "visitor center", "100S 200 E, SLC, UT"). lon: Longitude for location bias (0 = no bias). lat: Latitude for location bias (0 = no bias). radius: Bias radius in meters. Default 5000. max_results: Maximum results (1-20). Default 10. Returns: JSON with matching places (name, address, coordinates, rating, types). """ _ensure_initialized(session_id) import geeViz.googleMapsLib as _gm kwargs: dict[str, Any] = { "query": query, "max_results": max_results, "radius": radius, } if lat != 0 and lon != 0: kwargs["lat"] = lat kwargs["lon"] = lon try: places = _gm.search_places(**kwargs) except Exception as exc: return json.dumps({"error": f"Places search failed: {exc}"}) return json.dumps({ "count": len(places), "places": places, })
# --------------------------------------------------------------------------- # Report tools removed — use rl.Report() in run_code instead. # See agent-instructions.md Key patterns > Reports. # Report tools removed -- use rl.Report() in run_code instead. # Entry point # --------------------------------------------------------------------------- def _eager_init(): """Pre-initialize EE in a background thread so the first tool call is fast.""" import threading def _init(): try: _ensure_initialized() print("EE initialized (background warmup)", file=sys.stderr) except Exception as exc: print(f"Background warmup failed (will retry on first tool call): {exc}", file=sys.stderr) t = threading.Thread(target=_init, daemon=True) t.start() def main() -> None: global _SANDBOX_ENABLED # Pre-warm EE auth so first tool call doesn't stall _eager_init() # stdio is standard for Cursor/IDE integration; use streamable-http for HTTP transport = os.environ.get("MCP_TRANSPORT", "stdio") if transport == "streamable-http": host = os.environ.get("MCP_HOST", "127.0.0.1") port = int(os.environ.get("MCP_PORT", "8000")) path = os.environ.get("MCP_PATH", "/mcp") # Normalize: strip Windows-mangled Git Bash paths and ensure leading / if len(path) > 4 and ":" in path[:3]: # e.g. "C:/Program Files/Git/mcp" path = "/" + path.rsplit("/", 1)[-1] if not path.startswith("/"): path = "/" + path # Resolve sandbox default: ON for non-localhost HTTP, OFF for localhost if _SANDBOX_ENABLED is None: _is_localhost = host in ("127.0.0.1", "localhost", "::1") _SANDBOX_ENABLED = not _is_localhost # FastMCP.run() doesn't accept host/port kwargs; set them on the # settings object directly so uvicorn picks them up. app.settings.host = host app.settings.port = port app.settings.streamable_http_path = path # When binding to 0.0.0.0 (cloud deployment), disable DNS rebinding # protection so external hostnames (e.g. Cloud Run) are accepted. if host == "0.0.0.0": app.settings.transport_security.enable_dns_rebinding_protection = False print(f"MCP server starting at http://{host}:{port}{path} (sandbox={'ON' if _SANDBOX_ENABLED else 'OFF'})", file=sys.stderr) app.run(transport=transport, mount_path=path) else: # stdio transport — default sandbox OFF (local IDE use) if _SANDBOX_ENABLED is None: _SANDBOX_ENABLED = False print(f"MCP server starting (stdio, sandbox={'ON' if _SANDBOX_ENABLED else 'OFF'})", file=sys.stderr) app.run(transport=transport) if __name__ == "__main__": # print(inspect_asset("COPERNICUS/S2_SR_HARMONIZED")) main() # %%