added licensing stuff and free trial timer
This commit is contained in:
223
backend/license_server.py
Normal file
223
backend/license_server.py
Normal file
@ -0,0 +1,223 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TalkEdit License Server — Stripe webhook + license key generator.
|
||||
|
||||
Usage (development):
|
||||
python backend/license_server.py
|
||||
|
||||
Then create a test license:
|
||||
python backend/license_server.py generate --email test@example.com --tier pro
|
||||
|
||||
This is a minimal server. In production, deploy as a Cloudflare Worker,
|
||||
Vercel function, or a small VPS behind nginx.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
from nacl.bindings import (
|
||||
crypto_sign_seed_keypair,
|
||||
crypto_sign,
|
||||
crypto_sign_BYTES,
|
||||
)
|
||||
|
||||
# === CONFIGURATION ===
|
||||
|
||||
# The Ed25519 private key (base64-encoded). Keep this secret!
|
||||
# Generate with: python3 -c "import os,base64; print(base64.b64encode(os.urandom(32)).decode())"
|
||||
LICENSE_PRIVATE_KEY_B64 = "ONTdT2Hn367fMlovqulz7WYQPQru7uFa/GaSfjhnR9x7Qoe7uBPNwIFeW4p7A0g05Qj14rvaQ4Mm1u/LzgeEsA=="
|
||||
|
||||
# Stripe webhook secret (set this in production)
|
||||
STRIPE_WEBHOOK_SECRET = os.environ.get("STRIPE_WEBHOOK_SECRET", "")
|
||||
|
||||
# === TIER DEFINITIONS ===
|
||||
|
||||
TIERS = {
|
||||
"pro": {
|
||||
"price_id": "price_pro_monthly", # Replace with your Stripe price ID
|
||||
"features": ["bundled_deps", "auto_updates", "priority_support"],
|
||||
"max_activations": 1,
|
||||
"duration_days": 365,
|
||||
},
|
||||
"business": {
|
||||
"price_id": "price_business_monthly",
|
||||
"features": ["bundled_deps", "auto_updates", "priority_support",
|
||||
"white_label", "audit_logging", "bulk_deployment"],
|
||||
"max_activations": 10,
|
||||
"duration_days": 365,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def generate_license_key(
|
||||
customer_email: str,
|
||||
tier: str = "pro",
|
||||
license_id: str = None,
|
||||
duration_days: int = None,
|
||||
features: list = None,
|
||||
max_activations: int = None,
|
||||
) -> str:
|
||||
"""Generate a signed TalkEdit license key.
|
||||
|
||||
Returns a string like: talkedit_v1_<base64(payload)>.<base64(signature)>
|
||||
"""
|
||||
if license_id is None:
|
||||
license_id = f"lic_{int(time.time())}_{os.urandom(4).hex()}"
|
||||
|
||||
tier_config = TIERS.get(tier, TIERS["pro"])
|
||||
if duration_days is None:
|
||||
duration_days = tier_config["duration_days"]
|
||||
if features is None:
|
||||
features = tier_config["features"]
|
||||
if max_activations is None:
|
||||
max_activations = tier_config["max_activations"]
|
||||
|
||||
now = int(time.time())
|
||||
payload = {
|
||||
"license_id": license_id,
|
||||
"customer_email": customer_email,
|
||||
"tier": tier,
|
||||
"features": features,
|
||||
"issued_at": now,
|
||||
"expires_at": now + duration_days * 86400,
|
||||
"max_activations": max_activations,
|
||||
}
|
||||
|
||||
payload_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
# Sign with Ed25519
|
||||
seed = base64.b64decode(LICENSE_PRIVATE_KEY_B64)
|
||||
if len(seed) == 64:
|
||||
seed = seed[:32] # First 32 bytes are the actual seed
|
||||
pk, sk = crypto_sign_seed_keypair(seed)
|
||||
signed = crypto_sign(payload_bytes, sk)
|
||||
signature = signed[:crypto_sign_BYTES]
|
||||
|
||||
payload_b64 = base64.b64encode(payload_bytes).decode().rstrip("=")
|
||||
sig_b64 = base64.b64encode(signature).decode().rstrip("=")
|
||||
|
||||
return f"talkedit_v1_{payload_b64}.{sig_b64}"
|
||||
|
||||
|
||||
def verify_stripe_webhook(payload: bytes, sig_header: str) -> dict:
|
||||
"""Verify Stripe webhook signature and return the event."""
|
||||
if not STRIPE_WEBHOOK_SECRET:
|
||||
raise ValueError("STRIPE_WEBHOOK_SECRET not configured")
|
||||
|
||||
# Stripe sends signature in the `stripe-signature` header
|
||||
# Format: t=timestamp,v1=signature
|
||||
parts = {}
|
||||
for item in sig_header.split(","):
|
||||
key, _, value = item.partition("=")
|
||||
parts[key.strip()] = value.strip()
|
||||
|
||||
timestamp = parts.get("t", "")
|
||||
expected_sig = parts.get("v1", "")
|
||||
|
||||
# Compute signature
|
||||
signed_payload = f"{timestamp}.{payload.decode()}".encode()
|
||||
computed_sig = hmac.new(
|
||||
STRIPE_WEBHOOK_SECRET.encode(),
|
||||
signed_payload,
|
||||
hashlib.sha256,
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(computed_sig, expected_sig):
|
||||
raise ValueError("Invalid webhook signature")
|
||||
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
# === CLI ===
|
||||
|
||||
def main():
|
||||
import sys
|
||||
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "generate":
|
||||
# CLI mode: generate a test license key
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Generate TalkEdit license key")
|
||||
parser.add_argument("--email", default="test@example.com")
|
||||
parser.add_argument("--tier", default="pro", choices=["pro", "business"])
|
||||
parser.add_argument("--days", type=int, default=None)
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
|
||||
key = generate_license_key(
|
||||
customer_email=args.email,
|
||||
tier=args.tier,
|
||||
duration_days=args.days,
|
||||
)
|
||||
print()
|
||||
print("=== TALKEDIT LICENSE KEY ===")
|
||||
print(key)
|
||||
print()
|
||||
print("Paste this into the TalkEdit app to activate.")
|
||||
return
|
||||
|
||||
# Server mode
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
import urllib.parse
|
||||
|
||||
class LicenseHandler(BaseHTTPRequestHandler):
|
||||
def do_POST(self):
|
||||
path = urllib.parse.urlparse(self.path).path
|
||||
|
||||
if path == "/webhook/stripe":
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length)
|
||||
sig_header = self.headers.get("Stripe-Signature", "")
|
||||
|
||||
try:
|
||||
event = verify_stripe_webhook(body, sig_header)
|
||||
event_type = event.get("type", "")
|
||||
|
||||
if event_type == "checkout.session.completed":
|
||||
session = event["data"]["object"]
|
||||
email = session.get("customer_email", session.get("customer_details", {}).get("email", "unknown"))
|
||||
tier = "pro" # Map from session["metadata"]["tier"] or line items
|
||||
|
||||
license_key = generate_license_key(
|
||||
customer_email=email,
|
||||
tier=tier,
|
||||
)
|
||||
|
||||
# In production: email the license key to the customer
|
||||
print(f"License generated for {email}: {license_key[:40]}...")
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.end_headers()
|
||||
self.wfile.write(json.dumps({"status": "ok"}).encode())
|
||||
else:
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Webhook error: {e}")
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(str(e).encode())
|
||||
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, format, *args):
|
||||
print(f"[license-server] {args}")
|
||||
|
||||
port = int(os.environ.get("PORT", 8643))
|
||||
server = HTTPServer(("0.0.0.0", port), LicenseHandler)
|
||||
print(f"License server listening on http://0.0.0.0:{port}")
|
||||
print(f" POST /webhook/stripe - Stripe webhook")
|
||||
print()
|
||||
print("To generate a test license:")
|
||||
print(f" python {__file__} generate --email you@example.com --tier pro")
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user