"""
geeViz MCP Server -- execution and introspection tools for Earth Engine via geeViz.
Unlike static doc snippets, this server executes code, inspects live GEE assets,
and dynamically queries API signatures. 27 tools.
"""
from __future__ import annotations
import os
import sys
# ---------------------------------------------------------------------------
# CLI argument parsing (before heavy imports so --help is instant)
# ---------------------------------------------------------------------------
_SANDBOX_ENABLED: bool | None = None # will be resolved in main()
# Parse --sandbox / --no-sandbox early so _help can document them
for _arg in sys.argv[1:]:
if _arg == "--sandbox":
_SANDBOX_ENABLED = True
elif _arg == "--no-sandbox":
_SANDBOX_ENABLED = False
if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
_help = """usage: python -m geeViz.mcp.server [--help] [--sandbox | --no-sandbox]
geeViz MCP Server -- execution and introspection for Earth Engine via geeViz.
Options:
-h, --help Show this help and exit.
--sandbox Force run_code sandbox ON (block os, open, eval, etc.)
--no-sandbox Force run_code sandbox OFF (full Python access)
Default: sandbox is OFF for stdio transport (local IDE use) and ON for
streamable-http when binding to non-localhost (remote/cloud deployment).
Environment (optional):
MCP_TRANSPORT Transport: "stdio" (default) or "streamable-http"
MCP_HOST Host for HTTP (default: 127.0.0.1)
MCP_PORT Port for HTTP (default: 8000)
MCP_PATH Path for HTTP (default: /mcp)
Tools (27):
run_code Execute Python/GEE code in a persistent REPL namespace
inspect_asset Get metadata for any GEE asset (with optional collection filters)
get_api_reference Look up function signatures and docstrings
search_functions Search/list functions across geeViz modules
examples List or read geeViz example scripts (action=list|get)
list_assets List assets in a GEE folder
track_tasks Get status of recent EE tasks
map_control View, list layers, or clear the geeView map (action=view|layers|clear)
save_session Save run_code history to a .py file or .ipynb notebook
env_info Get versions, REPL namespace, or project info (action=version|namespace|project)
export_image Export ee.Image to asset, Drive, or Cloud Storage (destination=asset|drive|cloud)
search_datasets Search the GEE dataset catalog by keyword
get_catalog_info Get detailed STAC metadata for a GEE dataset
cancel_tasks Cancel running/ready EE tasks (all or by name)
manage_asset Delete, copy, move, create folder, or update ACL (action=delete|copy|move|create|update_acl)
get_reference_data Look up reference dicts (band mappings, viz params, collection IDs, etc.)
get_streetview Get Google Street View imagery at a location for ground-truthing
search_places Search for places, landmarks, or businesses using Google Places API
create_report Create a new report (title, theme, layout, tone)
add_report_section Add a section to the active report (ee.Image/IC + geometry)
generate_report Generate the report (HTML, Markdown, or PDF)
get_report_status Check active report status and section list
clear_report Discard the active report
Examples:
python -m geeViz.mcp.server # stdio, no sandbox (default)
python -m geeViz.mcp.server --sandbox # stdio with sandbox
MCP_TRANSPORT=streamable-http python -m geeViz.mcp.server # HTTP, auto-sandbox
python -m geeViz.mcp --help
See also: geeViz/mcp/README.md
"""
print(_help, file=sys.stderr)
sys.exit(0)
import importlib.util
# Path setup: ensure geeViz and package root are on path
_THIS_DIR = os.path.dirname(os.path.abspath(__file__)) # .../geeViz/mcp
_GEEVIZ_DIR = os.path.dirname(_THIS_DIR) # .../geeViz
_PACKAGE_ROOT = os.path.dirname(_GEEVIZ_DIR) # .../geeVizBuilder
_EXAMPLES_DIR = os.path.join(_GEEVIZ_DIR, "examples")
sys.path = [p for p in sys.path if not (p.rstrip(os.sep).endswith("mcp") and _GEEVIZ_DIR in (p or ""))]
if _PACKAGE_ROOT not in sys.path:
sys.path.insert(0, _PACKAGE_ROOT)
if _GEEVIZ_DIR not in sys.path:
sys.path.append(_GEEVIZ_DIR)
# Load FastMCP from the MCP SDK. When run as python -m geeViz.mcp.server, the name "mcp"
# resolves to this package (geeViz.mcp), so we load the SDK's fastmcp by file from site-packages.
# If mcp is not installed (e.g. during Sphinx doc build), a lightweight stub is used so the
# module can still be imported and @app.tool() decorators pass functions through unchanged.
def _load_fastmcp():
import site as _site
for _sp in _site.getsitepackages():
_origin = os.path.join(_sp, "mcp", "server", "fastmcp.py")
if os.path.isfile(_origin):
spec = importlib.util.spec_from_file_location("_geeviz_mcp_sdk_fastmcp", _origin)
if spec and spec.loader:
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod.FastMCP
try:
from mcp.server.fastmcp import FastMCP
return FastMCP
except ModuleNotFoundError:
return None
class _StubFastMCP:
"""Lightweight stand-in when the mcp SDK is not installed.
Makes @app.tool() a no-op passthrough so functions keep their real
type and docstrings (important for Sphinx autodoc).
"""
def __init__(self, *args, **kwargs):
pass
def tool(self, **kwargs):
"""Return identity decorator -- the function is unchanged."""
def _identity(fn):
return fn
return _identity
def resource(self, *args, **kwargs):
def _identity(fn):
return fn
return _identity
def run(self, **kwargs):
raise RuntimeError("mcp SDK not installed; install with: pip install mcp")
_FastMCP = _load_fastmcp()
FastMCP = _FastMCP if _FastMCP is not None else _StubFastMCP
# Load ToolAnnotations for hinting read-only / destructive / etc.
try:
from mcp.types import ToolAnnotations
except ImportError:
# Stub if mcp SDK not installed
class ToolAnnotations:
def __init__(self, **kwargs): pass
# Pre-built annotation sets
_READ_ONLY = ToolAnnotations(readOnlyHint=True, idempotentHint=True)
_READ_ONLY_OPEN = ToolAnnotations(readOnlyHint=True, idempotentHint=True, openWorldHint=True)
_WRITE = ToolAnnotations(readOnlyHint=False, destructiveHint=False)
_WRITE_OPEN = ToolAnnotations(readOnlyHint=False, destructiveHint=False, openWorldHint=True)
_DESTRUCTIVE = ToolAnnotations(readOnlyHint=False, destructiveHint=True, openWorldHint=True)
def _load_mcp_image():
"""Load the Image class from the mcp SDK for returning images from tools.
IMPORTANT: Try the direct import first so we get the exact same Image class
that FastMCP uses internally. If we load from file (as a standalone module),
the class identity differs and FastMCP's isinstance() check fails, causing
images to not display in clients like Cursor.
"""
# Preferred: direct import matches FastMCP's own Image class
try:
from mcp.server.fastmcp.utilities.types import Image
return Image
except (ImportError, ModuleNotFoundError, AttributeError):
pass
# Fallback: load from file in site-packages (older SDK layouts)
import site as _site
for _sp in _site.getsitepackages():
_types_path = os.path.join(_sp, "mcp", "server", "fastmcp", "utilities", "types.py")
if os.path.isfile(_types_path):
try:
spec = importlib.util.spec_from_file_location("_geeviz_mcp_types", _types_path)
if spec and spec.loader:
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
cls = getattr(mod, "Image", None)
if cls is not None:
return cls
except Exception:
pass
return None
_MCPImage = _load_mcp_image()
def _load_mcp_context():
"""Load the Context class from the MCP SDK for progress reporting in tools.
Uses the same importlib approach as _load_fastmcp() to avoid name conflicts
with the geeViz.mcp package.
"""
# Preferred: direct import matches FastMCP's own Context class
try:
from mcp.server.fastmcp import Context
return Context
except (ImportError, ModuleNotFoundError, AttributeError):
pass
# Fallback: load from file in site-packages
import site as _site
for _sp in _site.getsitepackages():
_origin = os.path.join(_sp, "mcp", "server", "fastmcp", "server.py")
if os.path.isfile(_origin):
try:
spec = importlib.util.spec_from_file_location("_geeviz_mcp_sdk_context", _origin)
if spec and spec.loader:
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod.Context
except Exception:
pass
return None
_MCPContext = _load_mcp_context()
# Expose as module-level ``Context`` so typing.get_type_hints() can resolve the
# annotation in run_code's signature (required for FastMCP context injection).
Context = _MCPContext
# Load agent instructions from the bundled markdown file.
# These are injected as the MCP server instructions (like a system prompt)
# so every connected client automatically knows how to use the tools.
_INSTRUCTIONS_FILE = os.path.join(_THIS_DIR, "agent-instructions.md")
try:
with open(_INSTRUCTIONS_FILE, "r", encoding="utf-8") as _f:
_SERVER_INSTRUCTIONS = _f.read()
print(f"[geeViz MCP] Loaded instructions: {len(_SERVER_INSTRUCTIONS)} chars, {len(_SERVER_INSTRUCTIONS.split())} words")
except Exception:
_SERVER_INSTRUCTIONS = None
print("[geeViz MCP] WARNING: No agent instructions loaded")
app = FastMCP(
"geeViz",
instructions=_SERVER_INSTRUCTIONS,
json_response=True,
) if _FastMCP is not None else _StubFastMCP()
# Wrap app.tool() to auto-log every tool invocation
_original_app_tool = app.tool
def _logging_tool_decorator(*dec_args, **dec_kwargs):
"""Replacement for app.tool() that wraps each tool function with logging."""
original_decorator = _original_app_tool(*dec_args, **dec_kwargs)
def wrapper(fn):
import functools
import inspect as _insp
@functools.wraps(fn)
async def logged_fn(*args, **kwargs):
_log_tool_call(fn.__name__, kwargs)
try:
result = fn(*args, **kwargs)
# Handle both sync and async tool functions
if _insp.isawaitable(result):
result = await result
_log_tool_call(fn.__name__, kwargs, result=result)
return result
except Exception as exc:
_log_tool_call(fn.__name__, kwargs, error=exc)
raise
# If original fn is not async, keep it sync for FastMCP
if not _insp.iscoroutinefunction(fn):
@functools.wraps(fn)
def logged_fn_sync(*args, **kwargs):
_log_tool_call(fn.__name__, kwargs)
try:
result = fn(*args, **kwargs)
_log_tool_call(fn.__name__, kwargs, result=result)
return result
except Exception as exc:
_log_tool_call(fn.__name__, kwargs, error=exc)
raise
return original_decorator(logged_fn_sync)
return original_decorator(logged_fn)
return wrapper
app.tool = _logging_tool_decorator
# ---------------------------------------------------------------------------
# Tool call logging -- every MCP tool invocation is logged with timestamp,
# tool name, arguments, and status (success/error).
# Log file: <mcp_dir>/logs/tool_calls.log
# ---------------------------------------------------------------------------
import logging as _logging
import datetime as _datetime
_LOG_DIR = os.path.join(_THIS_DIR, "logs")
os.makedirs(_LOG_DIR, exist_ok=True)
_TOOL_LOG_FILE = os.path.join(_LOG_DIR, "tool_calls.log")
_tool_logger = _logging.getLogger("geeViz.mcp.tools")
_tool_logger.setLevel(_logging.DEBUG)
_tool_fh = _logging.FileHandler(_TOOL_LOG_FILE, encoding="utf-8")
_tool_fh.setFormatter(_logging.Formatter("%(message)s"))
_tool_logger.addHandler(_tool_fh)
_log_result_limit = 5000
def _log_tool_call(tool_name: str, args: dict, result=None, error=None):
"""Log a tool invocation to the tool_calls.log file."""
ts = _datetime.datetime.now().isoformat(timespec="seconds")
# Truncate large arg values for readability
clean_args = {}
for k, v in (args or {}).items():
s = str(v)
clean_args[k] = s[:_log_result_limit] + "..." if len(s) > _log_result_limit else s
entry = {
"timestamp": ts,
"tool": tool_name,
"args": clean_args,
}
if error:
entry["status"] = "ERROR"
entry["error"] = str(error)[:_log_result_limit]
else:
result_str = str(result)
entry["status"] = "OK"
entry["result_preview"] = result_str[:_log_result_limit] + "..." if len(result_str) > _log_result_limit else result_str
import json as _json_log
_tool_logger.info(_json_log.dumps(entry))
# ---------------------------------------------------------------------------
# Lazy initialization -- defer all geeViz imports until first tool call
# that needs them. Every geeViz module import triggers robustInitializer()
# at module level, so we must not import at top level.
# ---------------------------------------------------------------------------
import threading
import json
_init_lock = threading.Lock()
_initialized = False
# Module short-name -> fully qualified import path
_MODULE_MAP = {
"geeView": "geeViz.geeView",
"getImagesLib": "geeViz.getImagesLib",
"changeDetectionLib": "geeViz.changeDetectionLib",
"gee2Pandas": "geeViz.gee2Pandas",
"assetManagerLib": "geeViz.assetManagerLib",
"taskManagerLib": "geeViz.taskManagerLib",
"foliumView": "geeViz.foliumView",
"phEEnoViz": "geeViz.phEEnoViz",
"cloudStorageManagerLib": "geeViz.cloudStorageManagerLib",
"chartingLib": "geeViz.outputLib.charts",
"thumbLib": "geeViz.outputLib.thumbs",
"reportLib": "geeViz.outputLib.reports",
"getSummaryAreasLib": "geeViz.getSummaryAreasLib",
"edwLib": "geeViz.edwLib",
"googleMapsLib": "geeViz.googleMapsLib",
}
# Persistent REPL namespace for run_code
_namespace: dict = {}
# Code history for save_session
_code_history: list[str] = []
_script_dir = os.path.join(_THIS_DIR, "generated_scripts")
_output_dir = os.path.join(_THIS_DIR, "generated_outputs")
_current_script_path: str | None = None
def _init_ee_credentials():
"""Initialize Earth Engine with service account or default credentials.
Checks for service account credentials in this order:
1. ``GEE_SERVICE_ACCOUNT_KEY`` env var → path to a JSON key file
2. ``GEE_SERVICE_ACCOUNT_KEY_JSON`` env var → inline JSON key string
3. ``GOOGLE_APPLICATION_CREDENTIALS`` env var → standard ADC key file
4. Fall back to Application Default Credentials (user login, attached
service account on GCE/Cloud Run, etc.)
The ``GEE_PROJECT`` env var sets the EE project for billing/quotas.
"""
import ee
project = os.environ.get("GEE_PROJECT")
key_path = os.environ.get("GEE_SERVICE_ACCOUNT_KEY")
key_json = os.environ.get("GEE_SERVICE_ACCOUNT_KEY_JSON")
if key_path and os.path.isfile(key_path):
# Service account key file
import json
with open(key_path) as f:
key_data = json.load(f)
credentials = ee.ServiceAccountCredentials(
key_data["client_email"], key_file=key_path,
)
ee.Initialize(credentials=credentials, project=project)
print(f"EE initialized with service account: {key_data['client_email']}"
f" (project={project or 'default'})", file=sys.stderr)
elif key_json:
# Inline JSON key (for container secrets / env injection)
import json, tempfile
key_data = json.loads(key_json)
# ee.ServiceAccountCredentials needs a file path, so write a temp file
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(key_data, tmp)
tmp.close()
credentials = ee.ServiceAccountCredentials(
key_data["client_email"], key_file=tmp.name,
)
ee.Initialize(credentials=credentials, project=project)
os.unlink(tmp.name)
print(f"EE initialized with service account (inline key): "
f"{key_data['client_email']}", file=sys.stderr)
else:
# Fall back to geeViz default (ADC, user credentials, etc.)
# geeViz.geeView handles ee.Initialize() on import
pass
def _ensure_initialized():
"""Lazy-initialize EE and populate the REPL namespace. Thread-safe."""
global _initialized
if _initialized:
return
with _init_lock:
if _initialized:
return
# Initialize EE credentials before importing geeViz
# (geeViz.geeView calls ee.Initialize on import)
_init_ee_credentials()
import geeViz.geeView as gv
import geeViz.getImagesLib as gil
import geeViz.getSummaryAreasLib as sal
import geeViz.edwLib as edw
import geeViz.googleMapsLib as gm
from geeViz.outputLib import thumbs as tl
from geeViz.outputLib import reports as rl
import ee
_namespace.update({
"ee": ee,
"Map": gv.Map,
"gv": gv,
"gil": gil,
"sal": sal,
"edw": edw,
"gm": gm,
"tl": tl,
"rl": rl,
"save_file": _safe_write_file,
"__builtins__": _make_safe_builtins(),
})
_initialized = True
def _reset_namespace():
"""Clear and re-populate the REPL namespace. Also resets code history."""
global _initialized, _current_script_path
_namespace.clear()
_code_history.clear()
_current_script_path = None
_initialized = False
_ensure_initialized()
def _save_history_to_file() -> str:
"""Write accumulated code history to a timestamped .py file. Returns the path."""
global _current_script_path
import datetime
os.makedirs(_script_dir, exist_ok=True)
if _current_script_path is None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
_current_script_path = os.path.join(_script_dir, f"session_{ts}.py")
header = (
"# Auto-generated by geeViz MCP server\n"
"# Each section below is one run_code call, in order.\n\n"
"import geeViz.geeView as gv\n"
"import geeViz.getImagesLib as gil\n"
"import geeViz.getSummaryAreasLib as sal\n"
"import geeViz.edwLib as edw\n"
"import geeViz.googleMapsLib as gm\n"
"from geeViz.outputLib import thumbs as tl\n"
"from geeViz.outputLib import reports as rl\n"
"ee = gv.ee\n"
"Map = gv.Map\n\n"
)
body = "\n\n".join(
f"# --- run_code call {i+1} ---\n{block}"
for i, block in enumerate(_code_history)
)
with open(_current_script_path, "w", encoding="utf-8") as f:
f.write(header + body + "\n")
return _current_script_path
# ---------------------------------------------------------------------------
# Tool 1: run_code
# ---------------------------------------------------------------------------
import ast
import asyncio
import io
import contextlib
import traceback
# Collection type names that are dangerous to call .getInfo() on without .limit()
_COLLECTION_NAMES = {"ImageCollection", "FeatureCollection"}
# ---------------------------------------------------------------------------
# Security: Tier 1 hardening for run_code
# ---------------------------------------------------------------------------
# Modules that are blocked from import in run_code. These provide OS/network/process
# access that is unnecessary for Earth Engine workflows and dangerous if the server
# is exposed remotely.
_BLOCKED_MODULES = frozenset({
"os", "sys", "subprocess", "socket", "shutil", "ctypes", "signal",
"multiprocessing", "threading", "http", "urllib", "requests",
"pathlib", "tempfile", "glob", "io", "importlib", "code", "codeop",
"pickle", "shelve", "marshal", "builtins",
})
# Top-level module prefixes that are allowed in import statements.
# Anything not matching these prefixes AND not in _BLOCKED_MODULES gets a warning
# (not a hard block) to avoid breaking legitimate but uncommon imports.
_ALLOWED_MODULE_PREFIXES = (
"ee", "geeViz", "json", "datetime", "math", "collections",
"numpy", "np", "pandas", "pd", "plotly", "copy", "re",
"functools", "itertools", "operator", "statistics",
"pprint", "textwrap", "string", "decimal", "fractions",
)
# Builtins that are blocked from the execution namespace.
_BLOCKED_BUILTINS = frozenset({
"__import__", "eval", "exec", "compile", "open",
"breakpoint", "exit", "quit",
"globals", "locals", "vars",
"getattr", "setattr", "delattr",
})
def _safe_write_file(filename: str, content: str, mode: str = "w") -> str:
"""Write content to a file in the safe output directory.
Only allows writing to geeViz/mcp/generated_outputs/ to prevent
arbitrary file system access. Returns the full path of the written file.
Args:
filename: Just the filename (no directory). e.g. "chart.html"
content: String content to write.
mode: Write mode, "w" (text) or "wb" (binary). Default "w".
Returns:
Full path to the written file.
"""
# Strip any path components — only allow bare filenames
safe_name = os.path.basename(filename)
if not safe_name:
raise ValueError("filename must not be empty")
os.makedirs(_output_dir, exist_ok=True)
full_path = os.path.join(_output_dir, safe_name)
with open(full_path, mode) as f:
f.write(content)
return full_path
def _make_safe_builtins() -> dict:
"""Return a copy of __builtins__, optionally with dangerous functions removed.
When sandbox is disabled (local/stdio use), returns the full builtins dict
so that run_code has unrestricted Python access.
"""
import builtins
if not _SANDBOX_ENABLED: # False or None (unresolved) → no restrictions
# No restrictions — full Python access
return dict(vars(builtins))
safe = {k: v for k, v in vars(builtins).items() if k not in _BLOCKED_BUILTINS}
# Provide a safe __import__ that blocks dangerous modules
def _safe_import(name, *args, **kwargs):
top = name.split(".")[0]
if top in _BLOCKED_MODULES:
raise ImportError(
f"Import of '{name}' is blocked for security. "
f"Only Earth Engine, geeViz, and standard data libraries are allowed."
)
return __builtins__["__import__"](name, *args, **kwargs) if isinstance(__builtins__, dict) \
else builtins.__import__(name, *args, **kwargs)
safe["__import__"] = _safe_import
return safe
def _check_code_patterns(code: str) -> list[str]:
"""AST analysis: detect risky EE patterns AND blocked security patterns.
Returns a list of warning/error strings. Strings starting with "BLOCKED:"
will cause run_code to refuse execution.
When sandbox is disabled, security checks (import blocking, builtin blocking)
are skipped — only EE performance warnings are emitted.
"""
warnings: list[str] = []
try:
tree = ast.parse(code)
except SyntaxError:
return warnings # let the executor report syntax errors
for node in ast.walk(tree):
if _SANDBOX_ENABLED:
# --- Security: check imports (sandbox only) ---
if isinstance(node, ast.Import):
for alias in node.names:
top = alias.name.split(".")[0]
if top in _BLOCKED_MODULES:
warnings.append(
f"BLOCKED: import of '{alias.name}' is not allowed. "
f"Only Earth Engine, geeViz, and standard data libraries are permitted."
)
elif isinstance(node, ast.ImportFrom):
if node.module:
top = node.module.split(".")[0]
if top in _BLOCKED_MODULES:
warnings.append(
f"BLOCKED: import from '{node.module}' is not allowed. "
f"Only Earth Engine, geeViz, and standard data libraries are permitted."
)
# --- Security: check for dangerous builtin calls (sandbox only) ---
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name):
if node.func.id in ("eval", "exec", "compile", "open", "breakpoint", "__import__"):
warnings.append(
f"BLOCKED: call to '{node.func.id}()' is not allowed for security."
)
# --- EE performance: detect .getInfo() calls (always active) ---
if not (isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and node.func.attr == "getInfo"):
continue
# Check if .getInfo() is inside a for/while loop
for parent in ast.walk(tree):
if isinstance(parent, (ast.For, ast.While)):
for child in ast.walk(parent):
if child is node:
warnings.append(
"Warning: .getInfo() inside a loop can be very slow. "
"Consider gathering results server-side with ee.List or ee.Dictionary."
)
break
# Check for .getInfo() on a collection without .limit()
target = node.func.value
chain = _get_method_chain(target)
has_limit = "limit" in chain or "first" in chain
has_collection = any(name in _COLLECTION_NAMES for name in chain)
if has_collection and not has_limit:
warnings.append(
"Warning: .getInfo() on a collection without .limit() can be very slow. "
"Consider adding .limit(N) or using .first().getInfo()."
)
# Deduplicate while preserving order
seen: set[str] = set()
unique: list[str] = []
for w in warnings:
if w not in seen:
seen.add(w)
unique.append(w)
return unique
def _get_method_chain(node: ast.AST) -> list[str]:
"""Walk an attribute/call chain and return method/attribute names encountered."""
names: list[str] = []
current = node
while True:
if isinstance(current, ast.Call):
current = current.func
elif isinstance(current, ast.Attribute):
names.append(current.attr)
current = current.value
elif isinstance(current, ast.Name):
names.append(current.id)
break
else:
break
return names
[docs]
@app.tool(annotations=_WRITE)
async def run_code(code: str, timeout: int = 120, reset: bool = False, ctx: Context = None) -> str:
"""Execute Python/GEE code in a persistent REPL namespace (like Jupyter).
The namespace persists across calls -- variables set in one call are
available in the next. Pre-populated with: ee, Map (gv.Map), gv
(geeViz.geeView), gil (geeViz.getImagesLib), sal
(geeViz.getSummaryAreasLib), tl (geeViz.outputLib.thumbs), rl (geeViz.outputLib.reports),
save_file.
**Sandbox mode:** When the server is run with ``--sandbox`` or over HTTP
to a non-localhost address, ``open()``, ``os``, ``sys``, ``eval``, etc.
are blocked. For local/stdio use (the default), sandbox is OFF and full
Python access is available. Use ``save_file(filename, content)`` to write
files to the ``generated_outputs/`` directory regardless of sandbox mode.
While executing, progress heartbeats are sent every ~10 seconds to keep the
MCP client connection alive and inform the agent that the tool is still running.
Args:
code: Python code to execute.
timeout: Max seconds to wait (default 120). On Windows a hung
getInfo() cannot be force-killed; the thread continues
in background.
reset: If True, clear the namespace and re-initialize before
executing.
ctx: MCP Context (auto-injected by FastMCP). Used for progress reporting.
Returns:
JSON with keys: success (bool), stdout, stderr, result, error.
"""
if reset:
_reset_namespace()
else:
_ensure_initialized()
# Static analysis: detect risky and blocked patterns before execution
code_warnings = _check_code_patterns(code)
# Refuse execution if any BLOCKED patterns were found
blocked = [w for w in code_warnings if w.startswith("BLOCKED:")]
if blocked:
return json.dumps({
"success": False,
"stdout": "",
"stderr": "\n".join(blocked),
"result": None,
"error": "Code was blocked by security policy. " + " ".join(blocked),
"script_path": None,
})
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
result_holder: list = [None]
error_holder: list = [None]
# Save original streams so we can restore them after timeout (redirect_stdout
# modifies sys.stdout globally, which would capture the main thread's output
# if the exec thread is still running when we time out).
_orig_stdout = sys.stdout
_orig_stderr = sys.stderr
def _exec():
try:
with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
# Try to detect if the last statement is an expression
tree = ast.parse(code)
if tree.body and isinstance(tree.body[-1], ast.Expr):
# Execute everything except the last statement
if len(tree.body) > 1:
mod = ast.Module(body=tree.body[:-1], type_ignores=[])
exec(compile(mod, "<mcp>", "exec"), _namespace)
# Eval the last expression to capture its value
expr = ast.Expression(body=tree.body[-1].value)
result_holder[0] = eval(compile(expr, "<mcp>", "eval"), _namespace)
else:
exec(compile(code, "<mcp>", "exec"), _namespace)
except Exception:
error_holder[0] = traceback.format_exc()
thread = threading.Thread(target=_exec, daemon=True)
thread.start()
# Heartbeat loop: poll every 1s, timeout only after `timeout` seconds
# of *inactivity* (no new stdout/stderr output). Active code that keeps
# printing can run indefinitely.
elapsed = 0.0
idle_time = 0.0
report_interval = 10
poll_interval = 1
next_report = report_interval
last_stdout_len = 0
last_stderr_len = 0
while thread.is_alive() and idle_time < timeout:
await asyncio.sleep(min(poll_interval, timeout - idle_time))
elapsed += poll_interval
# Check for new output activity
cur_stdout_len = stdout_buf.tell()
cur_stderr_len = stderr_buf.tell()
if cur_stdout_len != last_stdout_len or cur_stderr_len != last_stderr_len:
idle_time = 0.0 # reset idle timer on any new output
last_stdout_len = cur_stdout_len
last_stderr_len = cur_stderr_len
else:
idle_time += poll_interval
if thread.is_alive() and ctx and elapsed >= next_report:
next_report += report_interval
try:
await ctx.report_progress(elapsed, timeout)
await ctx.info(f"run_code executing... ({int(elapsed)}s elapsed, {int(idle_time)}s idle / {timeout}s timeout)")
except Exception:
pass # don't let reporting errors kill the tool
# Restore original streams in case the thread's redirect is still active
# (happens on timeout when the thread's `with` block hasn't exited yet).
sys.stdout = _orig_stdout
sys.stderr = _orig_stderr
# Prepend static analysis warnings to stderr
stderr_val = stderr_buf.getvalue()
if code_warnings:
warning_block = "\n".join(code_warnings) + "\n"
stderr_val = warning_block + stderr_val
if thread.is_alive():
timeout_hints = (
f"Execution timed out after {int(idle_time)}s of inactivity ({int(elapsed)}s total). Common causes:\n"
"- .getInfo() on a large ImageCollection -- use .limit(N) or inspect_asset with date/region filters\n"
"- .getInfo() on a high-res Image over a large region -- reduce the region or increase scale\n"
"- Complex server-side computation -- break into smaller steps\n"
"Note: on Windows, the thread continues in background."
)
if elapsed >= 60:
timeout_hints += (
"\nHint: the call ran for over 60s with no output. If this was a .getInfo() call, "
"consider using inspect_asset with filters, or reduce scale/region size."
)
return json.dumps({
"success": False,
"stdout": stdout_buf.getvalue(),
"stderr": stderr_val,
"result": None,
"error": timeout_hints,
})
if error_holder[0]:
return json.dumps({
"success": False,
"stdout": stdout_buf.getvalue(),
"stderr": stderr_val,
"result": None,
"error": error_holder[0],
"script_path": None,
})
# Success -- record in history and save to file
_code_history.append(code)
script_path = _save_history_to_file()
result_val = result_holder[0]
# Make result JSON-serializable
result_str = None
if result_val is not None:
try:
json.dumps(result_val)
result_str = result_val
except (TypeError, ValueError):
result_str = repr(result_val)
return json.dumps({
"success": True,
"stdout": stdout_buf.getvalue(),
"stderr": stderr_val,
"result": result_str,
"error": None,
"script_path": script_path,
})
# ---------------------------------------------------------------------------
# Tool 2: inspect_asset
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def inspect_asset(
asset_id: str,
start_date: str = "",
end_date: str = "",
region_var: str = "",
) -> str:
"""Get detailed metadata for any GEE asset (Image, ImageCollection, FeatureCollection, etc.).
Returns band names/types, CRS, scale, date range, size, columns, and
properties. Uses ee.data.getInfo for fast catalog metadata, then fetches
live details with a 10-second timeout per query to avoid hangs on large
collections.
Args:
asset_id: Full Earth Engine asset ID (e.g. "COPERNICUS/S2_SR_HARMONIZED").
start_date: Optional start date filter for ImageCollections (YYYY-MM-DD).
end_date: Optional end date filter for ImageCollections (YYYY-MM-DD).
region_var: Optional name of an ee.Geometry or ee.FeatureCollection
variable in the REPL namespace for spatial filtering
(ImageCollections only).
Returns:
JSON with asset metadata.
"""
import concurrent.futures
import datetime as _dt
_TIMEOUT = 10 # seconds per EE query
_ensure_initialized()
ee = _namespace["ee"]
# --- Step 1: Fast catalog metadata (no compute, never hangs) ---
try:
info = ee.data.getInfo(asset_id)
except Exception as exc:
return json.dumps({"error": str(exc), "asset_id": asset_id})
if info is None:
return json.dumps({"error": f"Asset not found: {asset_id}", "asset_id": asset_id})
asset_type = info.get("type", "UNKNOWN")
result: dict = {"asset_id": asset_id, "type": asset_type}
# Include catalog-level properties (skip long description HTML)
cat_props = info.get("properties", {})
if cat_props:
dr = cat_props.get("date_range")
if dr and isinstance(dr, list) and len(dr) == 2:
try:
result["first_date"] = _dt.datetime.utcfromtimestamp(dr[0] / 1000).strftime("%Y-%m-%d")
result["last_date"] = _dt.datetime.utcfromtimestamp(dr[1] / 1000).strftime("%Y-%m-%d")
except Exception:
pass
_CATALOG_KEYS = ("title", "provider", "keywords", "tags", "period",
"visualization_0_bands", "visualization_0_min",
"visualization_0_max", "visualization_0_name",
"provider_url")
for key in _CATALOG_KEYS:
if key in cat_props:
result.setdefault("catalog", {})[key] = cat_props[key]
# Include column info for FeatureCollections
if "columns" in info:
result["columns"] = info["columns"]
def _getinfo_with_timeout(ee_obj, timeout=_TIMEOUT):
"""Run ee_obj.getInfo() in a daemon thread with timeout. Returns (result, error)."""
import threading
_result_box = [None, None] # [value, error]
def _run():
try:
_result_box[0] = ee_obj.getInfo()
except Exception as exc:
_result_box[1] = str(exc)
t = threading.Thread(target=_run, daemon=True)
t.start()
t.join(timeout=timeout)
if t.is_alive():
return None, "timeout"
return _result_box[0], _result_box[1]
try:
if asset_type in ("IMAGE", "Image"):
img_info, err = _getinfo_with_timeout(ee.Image(asset_id))
if img_info and "bands" in img_info:
result["bands"] = [
{"name": b.get("id", ""), "data_type": b.get("data_type", {}).get("precision", ""),
"crs": b.get("crs", ""), "scale": b.get("crs_transform", [None])[0]}
for b in img_info["bands"]
]
# Include image properties (class metadata, etc.)
if "properties" in img_info:
result["properties"] = img_info["properties"]
elif err:
result["detail_error"] = err
elif asset_type in ("IMAGE_COLLECTION", "ImageCollection"):
collection = ee.ImageCollection(asset_id)
# Apply filters
filters_applied = {}
if start_date:
collection = collection.filterDate(start_date, end_date or "2099-01-01")
filters_applied["start_date"] = start_date
filters_applied["end_date"] = end_date or "2099-01-01"
elif end_date:
collection = collection.filterDate("1970-01-01", end_date)
filters_applied["start_date"] = "1970-01-01"
filters_applied["end_date"] = end_date
if region_var:
region = _namespace.get(region_var)
if region is None:
return json.dumps({"error": f"Variable {region_var!r} not found in namespace."})
if isinstance(region, ee.FeatureCollection):
region = region.geometry()
elif not isinstance(region, ee.Geometry):
return json.dumps({
"error": f"Variable {region_var!r} is {type(region).__name__}, "
"expected ee.Geometry or ee.FeatureCollection.",
})
collection = collection.filterBounds(region)
filters_applied["region_var"] = region_var
if filters_applied:
result["filters_applied"] = filters_applied
# --- Run queries with individual timeouts ---
# Each query runs in its own daemon thread so hangs don't block
queries = {}
queries["count"] = collection.size()
# Date range: use catalog date_range if no filters applied,
# otherwise compute from the filtered collection
if filters_applied or "first_date" not in result:
queries["first_date"] = collection.sort("system:time_start", True).first().date().format("YYYY-MM-dd")
queries["last_date"] = collection.sort("system:time_start", False).first().date().format("YYYY-MM-dd")
# Band info from first image
queries["first_image"] = collection.first()
import threading
results_map = {}
_lock = threading.Lock()
def _run_query(key, ee_obj):
try:
val = ee_obj.getInfo()
with _lock:
results_map[key] = val
except Exception as exc:
with _lock:
results_map[key] = f"__ERROR__:{exc}"
threads = []
for key, ee_obj in queries.items():
t = threading.Thread(target=_run_query, args=(key, ee_obj), daemon=True)
t.start()
threads.append(t)
# Wait up to _TIMEOUT for all threads
deadline = __import__("time").time() + _TIMEOUT
for t in threads:
remaining = max(0.1, deadline - __import__("time").time())
t.join(timeout=remaining)
# Mark any that didn't finish
for key in queries:
if key not in results_map:
results_map[key] = "__TIMEOUT__"
# Process results
count_val = results_map.get("count")
if isinstance(count_val, int):
result["image_count"] = count_val
elif count_val == "__TIMEOUT__":
result["image_count"] = "timeout (large collection)"
else:
result["image_count_error"] = str(count_val)
# Dates
fd = results_map.get("first_date")
ld = results_map.get("last_date")
if isinstance(fd, str) and not fd.startswith("__"):
result["first_date"] = fd
if isinstance(ld, str) and not ld.startswith("__"):
result["last_date"] = ld
# Bands and sample image properties
first_img = results_map.get("first_image")
if isinstance(first_img, dict):
if "bands" in first_img:
result["bands"] = [
{"name": b.get("id", ""), "data_type": b.get("data_type", {}).get("precision", ""),
"crs": b.get("crs", ""), "scale": b.get("crs_transform", [None])[0]}
for b in first_img["bands"]
]
# Include first image's property names (not values — those can be huge)
img_props = first_img.get("properties", {})
if img_props:
result["image_property_names"] = sorted(img_props.keys())
# Include a few key properties if they exist
for k in ("system:time_start", "system:index"):
if k in img_props:
result.setdefault("sample_image", {})[k] = img_props[k]
# If count timed out, note it
if count_val == "__TIMEOUT__":
result["note"] = "Collection too large to count within timeout."
elif asset_type in ("TABLE", "FeatureCollection"):
# Try full metadata first, fall back to limited sample
fc = ee.FeatureCollection(asset_id)
fc_info, err = _getinfo_with_timeout(fc.limit(5), _TIMEOUT)
if fc_info:
result["asset"] = _strip_coordinates(fc_info)
# Get column info
if "columns" in info:
result["columns"] = info["columns"]
elif err == "timeout":
# Try even smaller sample
fc_info2, err2 = _getinfo_with_timeout(fc.limit(1), _TIMEOUT)
if fc_info2:
result["asset"] = _strip_coordinates(fc_info2)
result["note"] = "Large FeatureCollection; showing 1 sample feature."
else:
result["detail_error"] = "timeout fetching features"
if "columns" in info:
result["columns"] = info["columns"]
else:
result["detail_error"] = err or "unknown error"
else:
# Folder or other type — return raw info
result["info"] = info
except Exception as exc:
result["detail_error"] = str(exc)
return json.dumps(result)
# ---------------------------------------------------------------------------
# Tool 3: get_api_reference
# ---------------------------------------------------------------------------
import inspect as _inspect
[docs]
@app.tool(annotations=_READ_ONLY)
def get_api_reference(module: str, function_name: str = "") -> str:
"""Look up the signature and docstring of a geeViz function or module.
Uses Python's inspect module -- always reflects the installed code.
Args:
module: Short module name. One of: geeView, getImagesLib,
changeDetectionLib, gee2Pandas, assetManagerLib,
taskManagerLib, foliumView, phEEnoViz, cloudStorageManagerLib,
chartingLib, thumbLib, reportLib, getSummaryAreasLib, edwLib.
function_name: Optional function or class name within the module.
If omitted, returns the module-level docstring.
Returns:
Signature and docstring text, or error message.
"""
_ensure_initialized()
fq = _MODULE_MAP.get(module)
if not fq:
return json.dumps({
"error": f"Unknown module: {module!r}. Valid modules: {', '.join(sorted(_MODULE_MAP))}",
})
try:
mod = importlib.import_module(fq)
except Exception as exc:
return json.dumps({"error": f"Failed to import {fq}: {exc}"})
if not function_name:
return json.dumps({
"module": module,
"docstring": _inspect.getdoc(mod) or "(no module docstring)",
})
# Resolve dotted names (e.g. "mapper.addLayer") by walking the attribute chain
obj = None
parts = function_name.split(".")
try:
obj = mod
for part in parts:
obj = getattr(obj, part)
except AttributeError:
obj = None
# Fallback: for geeView, try the mapper class if a bare name isn't found at module level
if obj is None and module == "geeView" and len(parts) == 1:
mapper_cls = getattr(mod, "mapper", None)
if mapper_cls:
obj = getattr(mapper_cls, function_name, None)
if obj is not None:
# Rewrite for clearer output
function_name = f"mapper.{function_name}"
if obj is None:
# Provide a hint if it might be a mapper method
hint = ""
if module == "geeView":
mapper_cls = getattr(mod, "mapper", None)
if mapper_cls:
methods = [m for m in dir(mapper_cls) if not m.startswith("_")
and function_name.lower() in m.lower()]
if methods:
hint = f" Did you mean: {', '.join('mapper.' + m for m in methods)}?"
return json.dumps({"error": f"{function_name!r} not found in {module}.{hint}"})
# Handle classes: show class docstring + public method list
if _inspect.isclass(obj):
methods = [
m for m in dir(obj)
if not m.startswith("_") and callable(getattr(obj, m, None))
]
return json.dumps({
"module": module,
"name": function_name,
"type": "class",
"docstring": _inspect.getdoc(obj) or "(no docstring)",
"public_methods": methods,
})
# Function or callable
try:
sig = str(_inspect.signature(obj))
except (ValueError, TypeError):
sig = "(signature unavailable)"
return json.dumps({
"module": module,
"name": function_name,
"signature": f"{function_name}{sig}",
"docstring": _inspect.getdoc(obj) or "(no docstring)",
})
# ---------------------------------------------------------------------------
# Tool 4: search_functions
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY)
def search_functions(query: str = "", module: str = "") -> str:
"""Search for functions across geeViz modules, or list functions in a specific module.
Combines search and listing into one tool:
- query only → search all modules for matching functions (by name or docstring)
- module only → list all public functions in that module
- both → search within a specific module
- neither → return list of available modules with usage hint
Args:
query: Search term (case-insensitive). Matched against function names
and the first line of their docstrings.
module: Short module name to restrict search to a single module.
Valid names: geeView, getImagesLib, changeDetectionLib,
gee2Pandas, assetManagerLib, taskManagerLib, foliumView,
phEEnoViz, cloudStorageManagerLib, chartingLib,
getSummaryAreasLib.
Returns:
JSON with matching functions. Each entry has module, name, type, description.
"""
_ensure_initialized()
# Neither query nor module -- list available modules
if not query and not module:
return json.dumps({
"modules": sorted(_MODULE_MAP.keys()),
"usage": (
'Pass module="<name>" to list all functions in a module, '
'query="<term>" to search across all modules, '
"or both to search within a specific module."
),
})
# Determine which modules to search
if module:
fq = _MODULE_MAP.get(module)
if not fq:
return json.dumps({
"error": f"Unknown module: {module!r}. Valid modules: {', '.join(sorted(_MODULE_MAP))}",
})
modules_to_search = {module: fq}
else:
modules_to_search = _MODULE_MAP
q = query.lower() if query else ""
results = []
for short_name, fq_name in modules_to_search.items():
try:
mod = importlib.import_module(fq_name)
except Exception:
continue
for name in sorted(dir(mod)):
if name.startswith("_"):
continue
obj = getattr(mod, name, None)
if obj is None or not (callable(obj) or _inspect.isclass(obj)):
continue
doc = _inspect.getdoc(obj) or ""
first_line = doc.split("\n")[0].strip() if doc else "(no description)"
if q and q not in name.lower() and q not in first_line.lower():
continue
kind = "class" if _inspect.isclass(obj) else "function"
results.append({
"module": short_name,
"name": name,
"type": kind,
"description": first_line,
})
# For geeView, also include mapper class methods
if short_name == "geeView":
mapper_cls = getattr(mod, "mapper", None)
if mapper_cls and _inspect.isclass(mapper_cls):
for mname in sorted(dir(mapper_cls)):
if mname.startswith("_"):
continue
mobj = getattr(mapper_cls, mname, None)
if not callable(mobj):
continue
doc = _inspect.getdoc(mobj) or ""
first_line = doc.split("\n")[0].strip() if doc else "(no description)"
if q and q not in mname.lower() and q not in first_line.lower():
continue
results.append({
"module": short_name,
"name": f"mapper.{mname}",
"type": "method",
"description": first_line,
})
return json.dumps({"query": query, "module": module, "count": len(results), "results": results})
# ---------------------------------------------------------------------------
# Examples (consolidated)
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY)
def examples(action: str = "list", name: str = "", filter: str = "") -> str:
"""List or read geeViz example scripts.
Args:
action: "list" (default) to list available examples, or
"get" to read the source of a specific example.
name: For action="get", the example name (with or without extension).
filter: For action="list", optional substring filter (case-insensitive).
Returns:
For "list": JSON list of {name, description} objects.
For "get": The example source code.
"""
act = action.lower().strip()
if act == "get":
if not name:
return json.dumps({"error": "Provide 'name' for action='get'."})
base = name
for ext in (".py", ".ipynb"):
if base.endswith(ext):
base = base[:-len(ext)]
break
py_path = os.path.join(_EXAMPLES_DIR, base + ".py")
nb_path = os.path.join(_EXAMPLES_DIR, base + ".ipynb")
if os.path.isfile(py_path):
with open(py_path, "r", encoding="utf-8") as f:
return json.dumps({"example": base + ".py", "type": "python", "source": f.read()})
if os.path.isfile(nb_path):
try:
with open(nb_path, "r", encoding="utf-8") as f:
nb = json.load(f)
cells = [{"cell_type": c.get("cell_type", ""), "source": "".join(c.get("source", []))}
for c in nb.get("cells", []) if "".join(c.get("source", [])).strip()]
return json.dumps({"example": base + ".ipynb", "type": "notebook", "cells": cells})
except Exception as exc:
return json.dumps({"error": f"Failed to read notebook: {exc}"})
available = _list_example_files()
return json.dumps({"error": f"Example not found: {name!r}", "available_examples": available})
# action == "list"
files = _list_example_files()
results = []
for fname in files:
if filter and filter.lower() not in fname.lower():
continue
fpath = os.path.join(_EXAMPLES_DIR, fname)
desc = ""
if fname.endswith(".py"):
try:
with open(fpath, "r", encoding="utf-8") as f:
lines = [f.readline() for _ in range(20)]
text = "".join(lines)
try:
tree = ast.parse(text)
if tree.body and isinstance(tree.body[0], ast.Expr) and isinstance(tree.body[0].value, ast.Constant):
desc = str(tree.body[0].value.value).split("\n")[0].strip()
except SyntaxError:
pass
if not desc:
for line in lines:
s = line.strip()
if s.startswith("#") and len(s) > 2:
desc = s.lstrip("#").strip()
break
except Exception:
pass
elif fname.endswith(".ipynb"):
try:
with open(fpath, "r", encoding="utf-8") as f:
nb = json.load(f)
for cell in nb.get("cells", []):
if cell.get("cell_type") == "markdown":
source = "".join(cell.get("source", [])).strip()
if source:
desc = source.split("\n")[0].lstrip("#").strip()
break
except Exception:
pass
results.append({"name": fname, "description": desc or "(no description)"})
return json.dumps({"count": len(results), "examples": results})
def _list_example_files() -> list[str]:
"""Return sorted list of example filenames."""
if not os.path.isdir(_EXAMPLES_DIR):
return []
return sorted(f for f in os.listdir(_EXAMPLES_DIR)
if (f.endswith(".py") or f.endswith(".ipynb")) and f != "__init__.py")
# ---------------------------------------------------------------------------
# Tool 7: list_assets
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def list_assets(folder: str) -> str:
"""List assets in a GEE folder or collection.
Args:
folder: Full asset path (e.g. "projects/my-project/assets/my-folder").
Returns:
JSON list of {id, type, sizeBytes} for each asset (max 200).
"""
_ensure_initialized()
ee = _namespace["ee"]
try:
result = ee.data.listAssets({"parent": folder})
except Exception as exc:
return json.dumps({"error": str(exc), "folder": folder})
assets = result.get("assets", [])
entries = []
for a in assets[:2000]:
entries.append({
"id": a.get("id") or a.get("name", ""),
"type": a.get("type", "UNKNOWN"),
"sizeBytes": a.get("sizeBytes"),
})
out: dict = {"folder": folder, "count": len(entries), "assets": entries}
if len(assets) > 2000:
out["note"] = f"Showing 2000 of {len(assets)} assets. Narrow your query for the rest."
return json.dumps(out)
# ---------------------------------------------------------------------------
# Tool 8: track_tasks
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def track_tasks(name_filter: str = "") -> str:
"""Get status of recent Earth Engine tasks.
Args:
name_filter: Optional case-insensitive filter on task description.
Returns:
JSON list of recent tasks with description, state, type, start time,
runtime, and error message (max 50).
"""
_ensure_initialized()
ee = _namespace["ee"]
try:
tasks = ee.data.getTaskList()
except Exception as exc:
return json.dumps({"error": str(exc)})
entries = []
for t in tasks[:500]:
desc = t.get("description", "")
if name_filter and name_filter.lower() not in desc.lower():
continue
entries.append({
"description": desc,
"state": t.get("state", "UNKNOWN"),
"task_type": t.get("task_type", ""),
"start_timestamp_ms": t.get("start_timestamp_ms"),
"update_timestamp_ms": t.get("update_timestamp_ms"),
"error_message": t.get("error_message", ""),
})
return json.dumps({"count": len(entries), "tasks": entries})
# ---------------------------------------------------------------------------
# Map control (consolidated)
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_WRITE)
def map_control(action: str = "view", open_browser: bool = True) -> str:
"""Control the geeView interactive map.
Args:
action: Action to perform:
- "view" (default): Open the map and return the URL.
- "layers": List current layers, visibility, and viz params.
- "clear": Remove all layers and commands.
open_browser: For action="view", whether to open in browser (default True).
Returns:
JSON with action-specific results.
"""
_ensure_initialized()
Map = _namespace["Map"]
act = action.lower().strip()
if act == "view":
url_buf = io.StringIO()
try:
with contextlib.redirect_stdout(url_buf):
if Map.mapCommandList == []:
Map.turnOnInspector()
Map.view(open_browser=open_browser, open_iframe=False)
except Exception as exc:
return json.dumps({"error": str(exc)})
printed = url_buf.getvalue()
url = None
for line in printed.splitlines():
line = line.strip()
# Prefer the "geeView URL: http..." line (has full token)
if "geeView URL:" in line:
idx = line.find("http")
if idx >= 0:
url = line[idx:]
break
# Fallback: find any line starting with http
if url is None:
for line in printed.splitlines():
line = line.strip()
if line.startswith("http"):
url = line
break
layer_count = len(Map.idDictList) if hasattr(Map, "idDictList") else 0
return json.dumps({
"url": url,
"layer_count": layer_count,
"message": f"Map opened with {layer_count} layer(s)." if url else "Map.view() ran but no URL was captured.",
"raw_output": printed.strip(),
})
elif act == "layers":
layers = []
for entry in getattr(Map, "idDictList", []):
viz_raw = entry.get("viz", "{}")
try:
viz = json.loads(viz_raw) if isinstance(viz_raw, str) else viz_raw
except (json.JSONDecodeError, TypeError):
viz = viz_raw
layers.append({
"name": entry.get("name", "(unnamed)"),
"visible": entry.get("visible", "true"),
"function": entry.get("function", ""),
"viz": viz,
})
commands = list(getattr(Map, "mapCommandList", []))
return json.dumps({"layer_count": len(layers), "layers": layers, "commands": commands})
elif act == "clear":
try:
Map.clearMap()
except Exception as exc:
return json.dumps({"error": str(exc)})
return json.dumps({"success": True, "message": "Map cleared. All layers and commands removed."})
else:
return json.dumps({"error": f"Unknown action: {action!r}. Use 'view', 'layers', or 'clear'."})
# ---------------------------------------------------------------------------
# Tool 13: save_session
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_WRITE)
def save_session(filename: str = "", format: str = "py") -> str:
"""Save the accumulated run_code history to a .py script or .ipynb notebook.
Args:
filename: Optional custom filename (saved in geeViz/mcp/generated_scripts/).
If omitted, uses a timestamped default. The correct extension
is added automatically based on format.
format: Output format -- "py" (default) for a standalone Python script,
"ipynb" for a Jupyter notebook.
Returns:
JSON with the file path and number of code blocks/cells saved.
"""
if format not in ("py", "ipynb"):
return json.dumps({
"error": f"Invalid format: {format!r}. Must be 'py' or 'ipynb'.",
})
if not _code_history:
return json.dumps({
"error": "No code has been executed yet. Use run_code first.",
})
if format == "py":
global _current_script_path
if filename:
if not filename.endswith(".py"):
filename += ".py"
os.makedirs(_script_dir, exist_ok=True)
_current_script_path = os.path.join(_script_dir, filename)
path = _save_history_to_file()
return json.dumps({
"success": True,
"script_path": path,
"code_blocks": len(_code_history),
"message": f"Saved {len(_code_history)} code block(s) to {path}",
})
# format == "ipynb"
import datetime
os.makedirs(_script_dir, exist_ok=True)
if filename:
if not filename.endswith(".ipynb"):
filename += ".ipynb"
nb_path = os.path.join(_script_dir, filename)
else:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
nb_path = os.path.join(_script_dir, f"session_{ts}.ipynb")
# Build notebook structure (nbformat 4.5)
cells = []
# Markdown header cell
cells.append({
"cell_type": "markdown",
"metadata": {},
"source": [
"# geeViz MCP Session\n",
"\n",
f"Auto-generated by geeViz MCP server on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n",
],
})
# Import cell
cells.append({
"cell_type": "code",
"metadata": {},
"source": [
"import geeViz.geeView as gv\n",
"import geeViz.getImagesLib as gil\n",
"import geeViz.getSummaryAreasLib as sal\n",
"from geeViz.outputLib import thumbs as tl\n",
"from geeViz.outputLib import reports as rl\n",
"ee = gv.ee\n",
"Map = gv.Map",
],
"execution_count": None,
"outputs": [],
})
# One code cell per run_code call
for i, block in enumerate(_code_history):
lines = block.splitlines(True) # keep line endings
# Ensure last line has newline for notebook format
if lines and not lines[-1].endswith("\n"):
lines[-1] += "\n"
cells.append({
"cell_type": "code",
"metadata": {},
"source": lines,
"execution_count": None,
"outputs": [],
})
notebook = {
"nbformat": 4,
"nbformat_minor": 5,
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3",
},
"language_info": {
"name": "python",
"version": sys.version.split()[0],
},
},
"cells": cells,
}
with open(nb_path, "w", encoding="utf-8") as f:
json.dump(notebook, f, indent=1, ensure_ascii=False)
return json.dumps({
"success": True,
"notebook_path": nb_path,
"code_cells": len(_code_history),
"message": f"Saved {len(_code_history)} code cell(s) to {nb_path}",
})
# ---------------------------------------------------------------------------
# Environment info (consolidated)
# ---------------------------------------------------------------------------
_NAMESPACE_BUILTINS = {"ee", "Map", "gv", "gil"}
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def env_info(action: str = "version") -> str:
"""Get environment information: versions, REPL namespace, or project details.
Args:
action: What to return:
- "version" (default): geeViz, EE, and Python versions.
- "namespace": User-defined variables in the REPL (no getInfo calls).
- "project": Current EE project ID and root assets.
Returns:
JSON with action-specific results.
"""
act = action.lower().strip()
if act == "version":
import geeViz
result = {
"geeViz_version": geeViz.__version__,
"python_version": sys.version,
"platform": sys.platform,
}
try:
import ee
result["ee_version"] = ee.__version__
except Exception:
result["ee_version"] = "(not available)"
return json.dumps(result)
elif act == "namespace":
_ensure_initialized()
ee = _namespace["ee"]
entries = []
for name, obj in sorted(_namespace.items()):
if name.startswith("_") or name in _NAMESPACE_BUILTINS:
continue
type_name = type(obj).__name__
for ee_type in ("Image", "ImageCollection", "FeatureCollection",
"Feature", "Geometry", "Number", "String",
"List", "Dictionary", "Filter", "Reducer",
"ComputedObject"):
if isinstance(obj, getattr(ee, ee_type, type(None))):
type_name = f"ee.{ee_type}"
break
try:
r = repr(obj)
if len(r) > 2000:
r = r[:2000] + "..."
except Exception:
r = "(repr failed)"
entries.append({"name": name, "type": type_name, "repr": r})
return json.dumps({
"count": len(entries), "variables": entries,
"note": "Excludes builtins (ee, Map, gv, gil). No getInfo() calls made.",
})
elif act == "project":
_ensure_initialized()
ee = _namespace["ee"]
result: dict = {}
try:
result["project_id"] = ee.data._get_state().cloud_api_user_project
except Exception as exc:
result["project_id"] = None
result["project_error"] = str(exc)
if result.get("project_id"):
try:
root = f"projects/{result['project_id']}/assets"
assets_response = ee.data.listAssets({"parent": root})
assets = assets_response.get("assets", [])
result["root_assets"] = [
{"id": a.get("id") or a.get("name", ""), "type": a.get("type", "UNKNOWN")}
for a in assets[:500]
]
result["root_asset_count"] = len(assets)
except Exception as exc:
result["root_assets"] = []
result["assets_error"] = str(exc)
return json.dumps(result)
else:
return json.dumps({"error": f"Unknown action: {action!r}. Use 'version', 'namespace', or 'project'."})
# ---------------------------------------------------------------------------
# Export image (consolidated)
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_WRITE_OPEN)
def export_image(
destination: str,
image_var: str,
region_var: str = "",
scale: int = 30,
crs: str = "EPSG:4326",
overwrite: bool = False,
asset_id: str = "",
pyramiding_policy: str = "mean",
output_name: str = "",
drive_folder: str = "",
bucket: str = "",
output_no_data: int = -32768,
file_format: str = "GeoTIFF",
) -> str:
"""Export an ee.Image to a GEE asset, Google Drive, or Cloud Storage.
Args:
destination: Where to export -- "asset", "drive", or "cloud".
image_var: Name of the ee.Image variable in the REPL namespace.
region_var: Name of an ee.Geometry or ee.FeatureCollection variable
for the export region. Required for drive/cloud exports;
optional for asset exports (uses image footprint if omitted).
scale: Output resolution in meters (default 30).
crs: Coordinate reference system (default "EPSG:4326").
overwrite: If True, overwrite existing asset/file (default False).
Asset-specific:
asset_id: Full destination asset ID (required for destination="asset").
pyramiding_policy: "mean" (default), "mode", "min", "max", "median", "sample".
Drive-specific:
output_name: Output filename without extension (required for drive/cloud).
drive_folder: Google Drive folder name (required for destination="drive").
Cloud Storage-specific:
output_name: Output filename without extension (required for drive/cloud).
bucket: GCS bucket name (required for destination="cloud").
output_no_data: NoData value (default -32768).
file_format: "GeoTIFF" (default) or "TFRecord".
Returns:
JSON with export status or an error.
"""
_ensure_initialized()
ee = _namespace["ee"]
gil = _namespace["gil"]
dest = destination.lower().strip()
if dest not in ("asset", "drive", "cloud"):
return json.dumps({"error": f"Unknown destination: {destination!r}. Use 'asset', 'drive', or 'cloud'."})
# Look up image
image = _namespace.get(image_var)
if image is None:
return json.dumps({"error": f"Variable {image_var!r} not found in namespace."})
if not isinstance(image, ee.Image):
return json.dumps({"error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image."})
# Look up region
region = None
if region_var:
region = _namespace.get(region_var)
if region is None:
return json.dumps({"error": f"Variable {region_var!r} not found in namespace."})
if isinstance(region, ee.FeatureCollection):
region = region.geometry()
elif not isinstance(region, ee.Geometry):
return json.dumps({"error": f"Variable {region_var!r} is {type(region).__name__}, expected ee.Geometry or ee.FeatureCollection."})
elif dest in ("drive", "cloud"):
return json.dumps({"error": f"region_var is required for destination='{dest}'."})
stdout_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf):
if dest == "asset":
if not asset_id:
return json.dumps({"error": "asset_id is required for destination='asset'."})
asset_name = asset_id.split("/")[-1]
gil.exportToAssetWrapper(
image, asset_name, asset_id,
pyramidingPolicyObject={"default": pyramiding_policy},
roi=region, scale=scale, crs=crs, overwrite=overwrite,
)
elif dest == "drive":
if not output_name or not drive_folder:
return json.dumps({"error": "output_name and drive_folder are required for destination='drive'."})
gil.exportToDriveWrapper(
image, output_name, drive_folder,
region, scale, crs, None, output_no_data,
)
elif dest == "cloud":
if not output_name or not bucket:
return json.dumps({"error": "output_name and bucket are required for destination='cloud'."})
gil.exportToCloudStorageWrapper(
image, output_name, bucket,
region, scale, crs, None, output_no_data,
file_format, {"cloudOptimized": True}, overwrite,
)
except Exception as exc:
return json.dumps({"error": f"Export failed: {exc}", "stdout": stdout_buf.getvalue()})
return json.dumps({
"success": True,
"destination": dest,
"scale": scale,
"crs": crs,
"stdout": stdout_buf.getvalue().strip(),
"message": f"Export to {dest} started. Use track_tasks() to monitor progress.",
})
import urllib.request
import urllib.parse
import urllib.error
def _read_cache_meta() -> dict:
"""Read the cache timestamp metadata file."""
if os.path.isfile(_CACHE_META_FILE):
try:
with open(_CACHE_META_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def _write_cache_meta(meta: dict) -> None:
"""Write the cache timestamp metadata file."""
os.makedirs(_CACHE_DIR, exist_ok=True)
with open(_CACHE_META_FILE, "w", encoding="utf-8") as f:
json.dump(meta, f)
def _get_cached_catalog(name: str) -> list[dict] | None:
"""Return parsed JSON list for a catalog, fetching/caching as needed.
Args:
name: "official" or "community"
Returns:
List of dataset dicts, or None if unavailable.
"""
with _cache_lock:
cache_file = os.path.join(_CACHE_DIR, _CATALOG_FILES[name])
meta = _read_cache_meta()
ts_key = f"{name}_ts"
now = _time.time()
# Check if cache is fresh
cached_exists = os.path.isfile(cache_file)
cache_fresh = cached_exists and (now - meta.get(ts_key, 0)) < _CACHE_TTL
if cache_fresh:
try:
with open(cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass # Fall through to fetch
# Fetch from remote
url = _CATALOG_URLS[name]
try:
req = urllib.request.Request(url, headers={"User-Agent": "geeViz-MCP/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8")
data = json.loads(raw)
# Cache the result
os.makedirs(_CACHE_DIR, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
f.write(raw)
meta[ts_key] = now
_write_cache_meta(meta)
return data
except Exception:
# Fetch failed -- use stale cache if available
if cached_exists:
try:
with open(cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return None
# ---------------------------------------------------------------------------
# Tool 20: search_datasets
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY)
def search_datasets(query: str, source: str = "all", max_results: int = 50) -> str:
"""Search the GEE dataset catalog by keyword.
Searches both the official Earth Engine catalog (~500+ datasets) and
the community catalog (~200+ datasets). Uses word-level matching
against title, tags, id, and provider fields with relevance scoring.
Args:
query: Search terms (e.g. "landsat surface reflectance", "DEM",
"sentinel fire"). Case-insensitive.
source: Which catalog to search: "official", "community", or
"all" (default).
max_results: Maximum number of results to return (default 10).
Returns:
JSON list of matching datasets with id, title, type, provider,
tags, source, and additional metadata.
"""
if source not in ("official", "community", "all"):
return json.dumps({
"error": f"Invalid source: {source!r}. Must be 'official', 'community', or 'all'.",
})
sources_to_search = (
["official", "community"] if source == "all"
else [source]
)
# Load catalogs
catalogs: dict[str, list[dict]] = {}
errors: list[str] = []
for src in sources_to_search:
data = _get_cached_catalog(src)
if data is not None:
catalogs[src] = data
else:
errors.append(f"Failed to load {src} catalog (no cache available).")
if not catalogs:
return json.dumps({"error": " ".join(errors)})
# Split query into words for multi-word matching
query_words = query.lower().split()
if not query_words:
return json.dumps({"error": "Empty query."})
# Field weights
weights = {"title": 3, "tags": 2, "id": 2, "provider": 1}
scored: list[tuple[int, dict]] = []
for src_name, entries in catalogs.items():
for entry in entries:
# Extract searchable fields
title = (entry.get("title") or "").lower()
tags = (entry.get("tags") or "").lower()
eid = (entry.get("id") or "").lower()
provider = (entry.get("provider") or "").lower()
fields = {"title": title, "tags": tags, "id": eid, "provider": provider}
# Score: sum of (weight × number of query words matched in field)
score = 0
for field_name, field_val in fields.items():
for word in query_words:
if word in field_val:
score += weights[field_name]
if score == 0:
continue
# Build result entry
result_entry: dict = {
"id": entry.get("id", ""),
"title": entry.get("title", ""),
"type": entry.get("type", ""),
"provider": entry.get("provider", ""),
"tags": entry.get("tags", ""),
"source": src_name,
}
if src_name == "official":
result_entry["date_range"] = entry.get("date_range", "")
# Build STAC URL
eid_raw = entry.get("id", "")
if eid_raw:
parts = eid_raw.split("/")
stac_dir = parts[0]
stac_file = eid_raw.replace("/", "_")
result_entry["stac_url"] = (
f"https://earthengine-stac.storage.googleapis.com/"
f"catalog/{stac_dir}/{stac_file}.json"
)
else:
# Community catalog fields
result_entry["thematic_group"] = entry.get("thematic_group", "")
result_entry["docs"] = entry.get("docs", "")
scored.append((score, result_entry))
# Sort by score descending, then by title alphabetically
scored.sort(key=lambda x: (-x[0], x[1].get("title", "")))
results = [entry for _, entry in scored[:max_results]]
out: dict = {
"query": query,
"source": source,
"count": len(results),
"total_matches": len(scored),
"results": results,
}
if errors:
out["warnings"] = errors
return json.dumps(out)
# ---------------------------------------------------------------------------
# Tool 21: get_catalog_info
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def get_catalog_info(dataset_id: str) -> str:
"""Get detailed STAC metadata for a GEE dataset.
Fetches the full STAC JSON record from earthengine-stac.storage.googleapis.com
and returns it as-is. The record includes bands (with classes, wavelengths,
scale/offset), description, temporal/spatial extent, keywords, license,
visualization parameters, provider info, and links.
This is the "drill down" companion to search_datasets -- use
search_datasets to find datasets, then get_catalog_info for full details.
Only works for official GEE datasets (STAC records don't exist for
community datasets). For community datasets, use inspect_asset instead.
Args:
dataset_id: Full GEE dataset ID (e.g. "LANDSAT/LC09/C02/T1_L2").
Returns:
The full STAC JSON record for the dataset, or an error message.
"""
# Build STAC URL: first segment is directory, full ID with / -> _ is filename
parts = dataset_id.split("/")
if not parts:
return json.dumps({"error": "Empty dataset_id."})
stac_dir = parts[0]
stac_file = dataset_id.replace("/", "_")
stac_url = (
f"https://earthengine-stac.storage.googleapis.com/"
f"catalog/{stac_dir}/{stac_file}.json"
)
try:
req = urllib.request.Request(stac_url, headers={"User-Agent": "geeViz-MCP/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
stac = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
if exc.code == 404:
return json.dumps({
"error": f"No STAC record found for {dataset_id!r}. "
"This may be a community dataset -- try inspect_asset instead.",
"dataset_id": dataset_id,
"stac_url": stac_url,
})
return json.dumps({
"error": f"HTTP {exc.code} fetching STAC record: {exc.reason}",
"stac_url": stac_url,
})
except Exception as exc:
return json.dumps({"error": f"Failed to fetch STAC record: {exc}", "stac_url": stac_url})
# Return the full STAC record as-is
return json.dumps(stac)
import base64 as _base64
[docs]
def cancel_tasks(name_filter: str = "") -> str:
"""Cancel running and ready Earth Engine tasks.
If name_filter is provided, cancels only tasks whose description
contains the filter string. Otherwise cancels ALL ready/running tasks.
Uses geeViz's taskManagerLib for the actual cancellation.
Args:
name_filter: Optional substring filter. Only tasks whose description
contains this string will be cancelled. If empty, all
ready/running tasks are cancelled.
Returns:
JSON with task counts and cancellation status.
"""
_ensure_initialized()
import geeViz.taskManagerLib as tml
# Get current task state before cancellation
task_state = tml.getTasks()
ready_count = len(task_state.get("ready", []))
running_count = len(task_state.get("running", []))
stdout_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf):
if name_filter:
tml.cancelByName(name_filter)
else:
tml.batchCancel()
except Exception as exc:
return json.dumps({
"error": f"Cancel failed: {exc}",
"stdout": stdout_buf.getvalue(),
})
return json.dumps({
"success": True,
"name_filter": name_filter or "(all)",
"ready_before": ready_count,
"running_before": running_count,
"stdout": stdout_buf.getvalue().strip(),
"message": "Task cancellation completed.",
})
# ---------------------------------------------------------------------------
# Asset management (consolidated)
# ---------------------------------------------------------------------------
[docs]
@app.tool(annotations=_DESTRUCTIVE)
def manage_asset(
action: str,
asset_id: str = "",
dest_id: str = "",
overwrite: bool = False,
folder_type: str = "Folder",
all_users_can_read: bool = False,
readers: str = "",
writers: str = "",
) -> str:
"""Manage GEE assets: delete, copy, move, create folders, update permissions.
Args:
action: Operation to perform:
- "delete": Delete a single asset.
- "copy": Copy asset_id to dest_id.
- "move": Copy asset_id to dest_id, then delete source.
- "create": Create a folder or ImageCollection at asset_id.
- "update_acl": Update permissions on asset_id.
asset_id: Full asset path. Required for all actions.
For "create", this is the folder path to create.
dest_id: Destination path (required for "copy" and "move").
overwrite: If True, overwrite existing destination (default False).
folder_type: For action="create" -- "Folder" (default) or "ImageCollection".
all_users_can_read: For action="update_acl" -- make publicly readable.
readers: For action="update_acl" -- comma-separated reader emails.
writers: For action="update_acl" -- comma-separated writer emails.
Returns:
JSON confirmation or error.
"""
_ensure_initialized()
ee = _namespace["ee"]
import geeViz.assetManagerLib as aml
act = action.lower().strip()
if not asset_id and act != "create":
return json.dumps({"error": "asset_id is required."})
if act == "delete":
if not aml.ee_asset_exists(asset_id):
return json.dumps({"error": f"Asset not found: {asset_id}"})
try:
ee.data.deleteAsset(asset_id)
except Exception as exc:
return json.dumps({"error": f"Delete failed: {exc}"})
return json.dumps({"success": True, "message": f"Asset {asset_id} deleted."})
elif act in ("copy", "move"):
if not dest_id:
return json.dumps({"error": f"dest_id is required for action='{act}'."})
if not aml.ee_asset_exists(asset_id):
return json.dumps({"error": f"Source asset not found: {asset_id}"})
if aml.ee_asset_exists(dest_id):
if overwrite:
try:
ee.data.deleteAsset(dest_id)
except Exception as exc:
return json.dumps({"error": f"Failed to delete existing dest: {exc}"})
else:
return json.dumps({"error": f"Destination exists: {dest_id}. Set overwrite=True to replace."})
try:
ee.data.copyAsset(asset_id, dest_id)
except Exception as exc:
return json.dumps({"error": f"Copy failed: {exc}"})
if act == "move":
try:
ee.data.deleteAsset(asset_id)
except Exception as exc:
return json.dumps({"error": f"Copied to {dest_id} but failed to delete source: {exc}", "dest_id": dest_id})
verb = "moved" if act == "move" else "copied"
return json.dumps({"success": True, "message": f"Asset {verb} from {asset_id} to {dest_id}."})
elif act == "create":
folder_path = asset_id or dest_id
if not folder_path:
return json.dumps({"error": "asset_id is required for action='create' (the folder path)."})
if folder_type not in ("Folder", "ImageCollection"):
return json.dumps({"error": f"Invalid folder_type: {folder_type!r}. Use 'Folder' or 'ImageCollection'."})
stdout_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf):
if folder_type == "ImageCollection":
aml.create_image_collection(folder_path)
else:
aml.create_asset(folder_path, recursive=True)
except Exception as exc:
return json.dumps({"error": f"Create failed: {exc}", "stdout": stdout_buf.getvalue()})
return json.dumps({"success": True, "message": f"{folder_type} created at {folder_path}.", "stdout": stdout_buf.getvalue().strip()})
elif act == "update_acl":
readers_list = [r.strip() for r in readers.split(",") if r.strip()] if readers else []
writers_list = [w.strip() for w in writers.split(",") if w.strip()] if writers else []
stdout_buf = io.StringIO()
try:
with contextlib.redirect_stdout(stdout_buf):
aml.updateACL(asset_id, writers=writers_list, all_users_can_read=all_users_can_read, readers=readers_list)
except Exception as exc:
return json.dumps({"error": f"ACL update failed: {exc}", "stdout": stdout_buf.getvalue()})
return json.dumps({"success": True, "message": f"Permissions updated for {asset_id}.", "stdout": stdout_buf.getvalue().strip()})
else:
return json.dumps({"error": f"Unknown action: {action!r}. Use 'delete', 'copy', 'move', 'create', or 'update_acl'."})
def _strip_coordinates(obj):
"""Recursively strip GeoJSON coordinates from nested dicts/lists.
Replaces ``"coordinates": [...]`` with ``"coordinates": "(stripped)"``
to keep large coordinate arrays out of the LLM context window.
"""
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
if k == "coordinates" and isinstance(v, list):
out[k] = "(stripped)"
else:
out[k] = _strip_coordinates(v)
return out
if isinstance(obj, list):
return [_strip_coordinates(v) for v in obj]
return obj
def _make_serializable(obj):
"""Recursively convert ee objects to JSON-safe values.
GeoJSON coordinates are stripped to avoid injecting huge coordinate
arrays into the LLM context.
"""
if obj is None or isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, list):
return [_make_serializable(v) for v in obj]
if isinstance(obj, dict):
return {k: _make_serializable(v) for k, v in obj.items()}
# ee.Geometry -> type only (coordinates are too large for context)
try:
import ee as _ee
if isinstance(obj, _ee.Geometry):
geojson = obj.getInfo()
return {"type": geojson.get("type", "Geometry"), "coordinates": "(stripped)"}
except Exception:
pass
# Other ee objects -> repr string
return repr(obj)
[docs]
@app.tool(annotations=_READ_ONLY)
def get_reference_data(name: str = "") -> str:
"""Look up geeViz reference dictionaries (band mappings, collection IDs, viz params, etc.).
Args:
name: Name of the reference dict to retrieve.
Pass "" (empty) to list all available dicts with descriptions.
Returns:
JSON string with the dict contents or listing of available dicts.
"""
_ensure_initialized()
# Listing mode
if not name:
listing = [
{"name": k, "description": v["description"]}
for k, v in _REFERENCE_DATA.items()
]
return json.dumps({
"available": listing,
"count": len(listing),
"usage": 'Call get_reference_data(name="<dict_name>") to retrieve contents.',
"note": "getImagesLib has additional module-level objects not listed here; use run_code to access them.",
})
# Lookup mode
entry = _REFERENCE_DATA.get(name)
if entry is None:
available = sorted(_REFERENCE_DATA.keys())
return json.dumps({"error": f"Unknown reference dict: {name!r}", "available": available})
try:
import geeViz.getImagesLib as gil
raw = getattr(gil, entry["attr"])
data = _make_serializable(raw)
return json.dumps({"name": name, "description": entry["description"], "data": data})
except Exception as exc:
return json.dumps({"error": f"Failed to read {name}: {exc}"})
# ---------------------------------------------------------------------------
# USFS Enterprise Data Warehouse (EDW) (consolidated)
# ---------------------------------------------------------------------------
[docs]
def get_streetview(
lon: float,
lat: float,
headings: str = "0,90,180,270",
pitch: float = 0,
fov: float = 90,
radius: int = 50,
source: str = "default",
) -> str:
"""Get Google Street View imagery at a location for ground-truthing.
Checks if Street View coverage exists, then fetches static images
at the requested headings (compass directions). Returns images
inline for visual inspection.
Useful for ground-truthing remote sensing analysis — see what a
location actually looks like from the ground.
Args:
lon: Longitude in decimal degrees.
lat: Latitude in decimal degrees.
headings: Comma-separated compass headings in degrees
(0=North, 90=East, 180=South, 270=West).
Default "0,90,180,270" (all 4 cardinal directions).
pitch: Camera pitch (-90 to 90). 0=horizontal, positive=up.
fov: Field of view in degrees (1-120). Lower = more zoom.
Default 90.
radius: Search radius in meters for nearest panorama. Default 50.
source: "default" (all) or "outdoor" (outdoor only).
Returns:
Metadata (date, location, copyright) and Street View images.
Returns error if no imagery exists at the location.
"""
_ensure_initialized()
import geeViz.googleMapsLib as _gm
# Check metadata first (free)
try:
meta = _gm.streetview_metadata(lon, lat, radius=radius, source=source)
except Exception as exc:
return json.dumps({"error": f"Street View metadata request failed: {exc}"})
if meta.get("status") != "OK":
return json.dumps({
"status": meta.get("status", "UNKNOWN"),
"message": f"No Street View imagery at ({lat}, {lon}) within {radius}m.",
"tip": "Try increasing the radius or checking a nearby road/trail.",
})
# Parse headings
heading_list = [float(h.strip()) for h in headings.split(",") if h.strip()]
# Fetch images
images = []
_direction_labels = {0: "N", 45: "NE", 90: "E", 135: "SE",
180: "S", 225: "SW", 270: "W", 315: "NW"}
for h in heading_list:
try:
img_bytes = _gm.streetview_image(
lon, lat, heading=h, pitch=pitch, fov=fov,
radius=radius, source=source,
)
if img_bytes:
label = _direction_labels.get(int(h) % 360, f"{h}°")
images.append({"heading": h, "label": label, "size": len(img_bytes)})
except Exception:
pass
# Build text response
loc = meta.get("location", {})
text_parts = [
f"**Street View** at ({loc.get('lat', lat):.5f}, {loc.get('lng', lon):.5f})",
f"**Date:** {meta.get('date', 'unknown')}",
f"**Copyright:** {meta.get('copyright', '')}",
f"**Images:** {len(images)} of {len(heading_list)} headings fetched",
]
# Try to return images inline
if _MCPImage is not None and images:
result_parts = []
for h in heading_list:
try:
img_bytes = _gm.streetview_image(
lon, lat, heading=h, pitch=pitch, fov=fov,
radius=radius, source=source,
)
if img_bytes:
label = _direction_labels.get(int(h) % 360, f"{h}°")
result_parts.append(f"**{label} ({h}°)**")
result_parts.append(_MCPImage(data=img_bytes, format="jpeg"))
except Exception:
pass
if result_parts:
return ["\n".join(text_parts)] + result_parts
return json.dumps({
"status": "OK",
"date": meta.get("date"),
"location": meta.get("location"),
"copyright": meta.get("copyright"),
"images_fetched": len(images),
"images": images,
"tip": "Use gm.streetview_html(lon, lat) in run_code for inline display.",
})
[docs]
@app.tool(annotations=_READ_ONLY_OPEN)
def search_places(
query: str,
lon: float = 0,
lat: float = 0,
radius: float = 5000,
max_results: int = 10,
) -> str:
"""Search for places using the Google Places API.
Useful for finding landmarks, businesses, or points of interest near
a study area. Can also geocode addresses.
Args:
query: Search text (e.g. "fire station", "visitor center",
"4240 S Olympic Way, SLC, UT").
lon: Longitude for location bias (0 = no bias).
lat: Latitude for location bias (0 = no bias).
radius: Bias radius in meters. Default 5000.
max_results: Maximum results (1-20). Default 10.
Returns:
JSON with matching places (name, address, coordinates, rating, types).
"""
_ensure_initialized()
import geeViz.googleMapsLib as _gm
kwargs: dict[str, Any] = {
"query": query,
"max_results": max_results,
"radius": radius,
}
if lat != 0 and lon != 0:
kwargs["lat"] = lat
kwargs["lon"] = lon
try:
places = _gm.search_places(**kwargs)
except Exception as exc:
return json.dumps({"error": f"Places search failed: {exc}"})
return json.dumps({
"count": len(places),
"places": places,
})
# ---------------------------------------------------------------------------
# Report tools
# ---------------------------------------------------------------------------
# Global report instance — persists across tool calls
_active_report = None
[docs]
@app.tool(annotations=_WRITE)
def create_report(
title: str = "Report",
theme: str = "dark",
layout: str = "report",
tone: str = "neutral",
header_text: str = "",
prompt: str = "",
) -> str:
"""Create (or reset) a report. Must be called before add_report_section.
Initializes a new Report object that persists across MCP calls.
Any previously active report is discarded.
Args:
title: Report title.
theme: "dark" (default) or "light".
layout: "report" (portrait, vertical) or "poster" (landscape grid).
tone: "neutral" (default), "informative", "technical", or custom tone.
header_text: Introductory text below the title.
prompt: Additional guidance for the executive summary LLM narrative.
Returns:
Confirmation with the report title and settings.
"""
_ensure_initialized()
global _active_report
from geeViz.outputLib import reports as _rl
_active_report = _rl.Report(
title=title,
theme=theme,
layout=layout,
tone=tone,
header_text=header_text or None,
prompt=prompt or None,
)
return json.dumps({
"success": True,
"message": f"Report '{title}' created ({theme} theme, {layout} layout, {tone} tone).",
"tip": "Use add_report_section to add sections, then generate_report to produce output.",
})
[docs]
@app.tool(annotations=_WRITE)
def add_report_section(
ee_obj_var: str,
geometry_var: str,
title: str = "Section",
prompt: str = "",
thumb_format: str = "png",
band_names: str = "",
scale: int = 30,
chart_types: str = "",
basemap: str = "",
burn_in_geometry: bool = False,
geometry_outline_color: str = "",
geometry_fill_color: str = "",
transition_periods: str = "",
sankey_band_name: str = "",
feature_label: str = "",
area_format: str = "Percentage",
date_format: str = "YYYY",
reducer: str = "",
generate_table: bool = True,
generate_chart: bool = True,
) -> str:
"""Add a section to the active report.
Each section analyses one ee.Image or ee.ImageCollection over a geometry.
The report automatically generates a thumbnail, data table, chart, and
LLM narrative for each section.
Args:
ee_obj_var: Name of an ee.Image or ee.ImageCollection variable in the
REPL namespace.
geometry_var: Name of an ee.Geometry, ee.Feature, or ee.FeatureCollection
variable in the REPL namespace.
title: Section heading.
prompt: Optional per-section guidance for the LLM narrative.
thumb_format: "png" (static), "gif" (animated), "filmstrip" (grid),
or "none" (no thumbnail). Default "png".
band_names: Comma-separated band names (auto-detected if empty).
scale: Pixel scale in meters (default 30).
chart_types: Comma-separated list of chart types to produce (0-3).
Valid types: "bar", "line+markers", "donut", "scatter",
"sankey", "stacked_bar", "stacked_line+markers".
When "sankey" is included, transition_periods and
sankey_band_name are used for that chart.
Leave empty to auto-detect a single chart type.
Examples: "sankey,line+markers", "bar,donut", "sankey".
basemap: Basemap preset for thumbnail (e.g. "esri-satellite").
burn_in_geometry: Burn study area boundary onto the thumbnail.
geometry_outline_color: Boundary outline color (e.g. "white", "red").
geometry_fill_color: Boundary fill color with alpha (e.g. "FFFFFF33").
transition_periods: JSON list of year pairs for Sankey
(e.g. "[[1985,2000],[2000,2024]]").
sankey_band_name: Band name for Sankey (auto-detected if empty).
feature_label: Property for per-feature labels (FeatureCollection).
area_format: "Percentage" (default), "Hectares", "Acres", "Pixels".
date_format: EE date format (default "YYYY").
reducer: Override reducer ("mean", "first", "mode", etc.).
generate_table: Include a data table (default True).
generate_chart: Include a chart (default True).
Returns:
Confirmation with the section index and title.
"""
_ensure_initialized()
global _active_report
ee = _namespace["ee"]
if _active_report is None:
return json.dumps({"error": "No active report. Call create_report first."})
# Resolve EE objects from namespace
ee_obj = _namespace.get(ee_obj_var)
if ee_obj is None:
return json.dumps({"error": f"Variable '{ee_obj_var}' not found in REPL namespace."})
geom = _namespace.get(geometry_var)
if geom is None:
return json.dumps({"error": f"Variable '{geometry_var}' not found in REPL namespace."})
# Build kwargs
kwargs = {"scale": scale, "area_format": area_format, "date_format": date_format}
# Parse chart_types — comma-separated list
ct_list = [c.strip() for c in chart_types.split(",") if c.strip()] if chart_types else []
if band_names:
kwargs["band_names"] = [b.strip() for b in band_names.split(",")]
if basemap:
kwargs["basemap"] = basemap
if burn_in_geometry:
kwargs["burn_in_geometry"] = True
if geometry_outline_color:
kwargs["geometry_outline_color"] = geometry_outline_color
if geometry_fill_color:
kwargs["geometry_fill_color"] = geometry_fill_color
if feature_label:
kwargs["feature_label"] = feature_label
if reducer:
_reducer_map = {
"first": ee.Reducer.first(),
"mean": ee.Reducer.mean(),
"median": ee.Reducer.median(),
"min": ee.Reducer.min(),
"max": ee.Reducer.max(),
"sum": ee.Reducer.sum(),
"mode": ee.Reducer.mode(),
"stdDev": ee.Reducer.stdDev(),
"count": ee.Reducer.count(),
}
kwargs["reducer"] = _reducer_map.get(reducer.strip())
if transition_periods:
try:
kwargs["transition_periods"] = json.loads(transition_periods)
except json.JSONDecodeError:
return json.dumps({"error": f"Invalid transition_periods JSON: {transition_periods}"})
if sankey_band_name:
kwargs["sankey_band_name"] = sankey_band_name
tf = thumb_format.lower().strip() if thumb_format else "png"
if tf == "none":
tf = None
try:
_active_report.add_section(
ee_obj=ee_obj,
geometry=geom,
title=title,
prompt=prompt or None,
generate_table=generate_table,
generate_chart=generate_chart,
thumb_format=tf,
chart_types=ct_list if ct_list else None,
**kwargs,
)
except Exception as exc:
return json.dumps({"error": f"Failed to add section: {exc}"})
n = len(_active_report._sections)
return json.dumps({
"success": True,
"message": f"Section {n} '{title}' added to report '{_active_report.title}'.",
"total_sections": n,
"tip": "Add more sections or call generate_report to produce the output.",
})
[docs]
@app.tool(annotations=_WRITE_OPEN)
def generate_report(
format: str = "html",
output_filename: str = "",
) -> str:
"""Generate the report from all added sections.
Runs all EE computations (thumbnails, charts, tables) and LLM narratives
in parallel, then renders the final output. This may take 30-120 seconds
depending on the number of sections.
Args:
format: Output format -- "html" (interactive charts, default),
"md" (markdown text only), or "pdf" (static images).
output_filename: Filename for the output (saved to generated_outputs/).
Auto-generated if empty.
Returns:
The file path of the generated report, plus a metadata summary.
"""
_ensure_initialized()
global _active_report
if _active_report is None:
return json.dumps({"error": "No active report. Call create_report first."})
if not _active_report._sections:
return json.dumps({"error": "Report has no sections. Call add_report_section first."})
fmt = format.lower().strip()
if fmt not in ("html", "md", "pdf"):
return json.dumps({"error": f"Invalid format '{format}'. Use 'html', 'md', or 'pdf'."})
# Determine output path
os.makedirs(_output_dir, exist_ok=True)
if output_filename:
out_path = os.path.join(_output_dir, output_filename)
else:
import time as _time_mod
ts = int(_time_mod.time())
ext = {"html": ".html", "md": ".md", "pdf": ".pdf"}[fmt]
safe_title = "".join(c if c.isalnum() or c in " _-" else "_" for c in _active_report.title)[:40].strip()
out_path = os.path.join(_output_dir, f"report_{safe_title}_{ts}{ext}")
try:
result = _active_report.generate(format=fmt, output_path=out_path)
except Exception as exc:
return json.dumps({"error": f"Report generation failed: {exc}"})
# Build metadata
try:
meta_df = _active_report.metadata()
meta_md = meta_df.to_markdown(index=False)
except Exception:
meta_md = "(metadata unavailable)"
return json.dumps({
"success": True,
"format": fmt,
"output_path": out_path,
"sections": len(_active_report._sections),
"metadata": meta_md,
"tip": f"Report saved to {out_path}",
})
[docs]
@app.tool(annotations=_READ_ONLY)
def get_report_status() -> str:
"""Check the current report status -- title, theme, section count, and
section titles.
Returns:
Report status or a message if no report is active.
"""
global _active_report
if _active_report is None:
return json.dumps({
"active": False,
"message": "No active report. Call create_report to start one.",
})
sections = []
for i, sec in enumerate(_active_report._sections):
sections.append({
"index": i + 1,
"title": sec.title,
"thumb_format": sec.thumb_format,
"generate_table": sec.generate_table,
"generate_chart": sec.generate_chart,
})
return json.dumps({
"active": True,
"title": _active_report.title,
"theme": _active_report.theme,
"layout": _active_report.layout,
"tone": _active_report.tone,
"section_count": len(_active_report._sections),
"sections": sections,
})
[docs]
@app.tool(annotations=_DESTRUCTIVE)
def clear_report() -> str:
"""Discard the active report and all its sections.
Returns:
Confirmation that the report was cleared.
"""
global _active_report
old_title = _active_report.title if _active_report else None
_active_report = None
if old_title:
return json.dumps({"success": True, "message": f"Report '{old_title}' cleared."})
return json.dumps({"success": True, "message": "No active report to clear."})
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def _eager_init():
"""Pre-initialize EE in a background thread so the first tool call is fast."""
import threading
def _init():
try:
_ensure_initialized()
print("EE initialized (background warmup)", file=sys.stderr)
except Exception as exc:
print(f"Background warmup failed (will retry on first tool call): {exc}", file=sys.stderr)
t = threading.Thread(target=_init, daemon=True)
t.start()
def main() -> None:
global _SANDBOX_ENABLED
# Pre-warm EE auth so first tool call doesn't stall
_eager_init()
# stdio is standard for Cursor/IDE integration; use streamable-http for HTTP
transport = os.environ.get("MCP_TRANSPORT", "stdio")
if transport == "streamable-http":
host = os.environ.get("MCP_HOST", "127.0.0.1")
port = int(os.environ.get("MCP_PORT", "8000"))
path = os.environ.get("MCP_PATH", "/mcp")
# Normalize: strip Windows-mangled Git Bash paths and ensure leading /
if len(path) > 4 and ":" in path[:3]: # e.g. "C:/Program Files/Git/mcp"
path = "/" + path.rsplit("/", 1)[-1]
if not path.startswith("/"):
path = "/" + path
# Resolve sandbox default: ON for non-localhost HTTP, OFF for localhost
if _SANDBOX_ENABLED is None:
_is_localhost = host in ("127.0.0.1", "localhost", "::1")
_SANDBOX_ENABLED = not _is_localhost
# FastMCP.run() doesn't accept host/port kwargs; set them on the
# settings object directly so uvicorn picks them up.
app.settings.host = host
app.settings.port = port
app.settings.streamable_http_path = path
# When binding to 0.0.0.0 (cloud deployment), disable DNS rebinding
# protection so external hostnames (e.g. Cloud Run) are accepted.
if host == "0.0.0.0":
app.settings.transport_security.enable_dns_rebinding_protection = False
print(f"MCP server starting at http://{host}:{port}{path} (sandbox={'ON' if _SANDBOX_ENABLED else 'OFF'})", file=sys.stderr)
app.run(transport=transport, mount_path=path)
else:
# stdio transport — default sandbox OFF (local IDE use)
if _SANDBOX_ENABLED is None:
_SANDBOX_ENABLED = False
print(f"MCP server starting (stdio, sandbox={'ON' if _SANDBOX_ENABLED else 'OFF'})", file=sys.stderr)
app.run(transport=transport)
if __name__ == "__main__":
# print(inspect_asset("COPERNICUS/S2_SR_HARMONIZED"))
main()
# %%