it

BigQuery Scheduled Query Distributed System —

BigQuery Scheduled Query Distributed System —

BigQuery Scheduled Queries คืออะไร

BigQuery Scheduled Query Distributed System —

BigQuery Scheduled Queries เป็น feature ที่ให้ตั้ง SQL queries ให้รันอัตโนมัติตาม schedule ที่กำหนด เช่น ทุกชั่วโมง ทุกวัน หรือทุกสัปดาห์ ใช้สำหรับ ETL processes, data aggregation, reporting tables และ data warehouse maintenance โดยไม่ต้องสร้าง infrastructure เพิ่มเติม

ข้อดีของ BigQuery Scheduled Queries ได้แก่ serverless ไม่ต้องจัดการ infrastructure, built-in retry logic สำหรับ failed queries, parameterized queries ด้วย @run_time และ @run_date, email notifications เมื่อ query fail, integration กับ BigQuery Data Transfer Service และ IAM-based access control

Distributed System ใน context นี้หมายถึงการออกแบบ data pipeline ที่กระจาย workload ออกเป็นหลาย scheduled queries ที่ทำงานประสานกัน แทนที่จะรัน single massive query ที่ใช้เวลานานและเสี่ยง timeout ช่วยให้ pipeline reliable, maintainable และ cost-efficient มากขึ้น

Use cases ได้แก่ daily/hourly data aggregation, materialized view refreshes, data quality checks, cross-dataset data synchronization, reporting table preparation และ data retention/cleanup

เนื้อหาเกี่ยวข้อง — React Bind คืออะไร — คู่มือโปรแกรมมิ่ง 2026

ตั้งค่า Scheduled Queries ใน BigQuery

วิธีสร้างและจัดการ Scheduled Queries

# === BigQuery Scheduled Queries Setup ===



# 1. Using gcloud CLI

# ===================================



# ติดตั้ง gcloud

# curl https://sdk.cloud.google.com | bash

# gcloud init



# สร้าง Scheduled Query

bq mk --transfer_config \

    --project_id=my-project \

    --data_source=scheduled_query \

    --target_dataset=analytics \

    --display_name="Daily User Aggregation" \

    --schedule="every 24 hours" \

    --params='{

        "query": "INSERT INTO `my-project.analytics.daily_users` SELECT DATE(event_timestamp) AS event_date, COUNT(DISTINCT user_id) AS unique_users, COUNT(*) AS total_events, COUNTIF(event_name = \"purchase\") AS purchases FROM `my-project.raw.events` WHERE DATE(event_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY) GROUP BY 1",

        "destination_table_name_template": "",

        "write_disposition": "WRITE_APPEND"

    }'



# List scheduled queries

bq ls --transfer_config --transfer_location=us



# Check run history

bq ls --transfer_run --transfer_location=us \

    --run_attempt=LATEST \

    projects/my-project/locations/us/transferConfigs/TRANSFER_ID



# 2. Using SQL directly in BigQuery Console

# ===================================



-- Daily Sales Aggregation

-- Schedule: Every day at 06:00 UTC

-- Destination: analytics.daily_sales



SELECT

    DATE(order_timestamp) AS order_date,

    product_category,

    COUNT(*) AS order_count,

    SUM(amount) AS total_revenue,

    AVG(amount) AS avg_order_value,

    COUNT(DISTINCT customer_id) AS unique_customers

FROM `my-project.raw.orders`

WHERE DATE(order_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY)

GROUP BY 1, 2;



-- Hourly Metrics Rollup

-- Schedule: Every hour

SELECT

    TIMESTAMP_TRUNC(@run_time, HOUR) AS metric_hour,

    service_name,

    COUNT(*) AS request_count,

    AVG(latency_ms) AS avg_latency,

    APPROX_QUANTILES(latency_ms, 100)[OFFSET(95)] AS p95_latency,

    COUNTIF(status_code >= 500) AS error_count,

    SAFE_DIVIDE(COUNTIF(status_code >= 500), COUNT(*)) * 100 AS error_rate_pct

FROM `my-project.logs.requests`

WHERE timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 1 HOUR)

  AND timestamp < @run_time

GROUP BY 1, 2;



-- Weekly Data Quality Check

-- Schedule: Every Monday at 00:00 UTC

SELECT

    'orders' AS table_name,

    COUNT(*) AS total_rows,

    COUNTIF(customer_id IS NULL) AS null_customer_ids,

    COUNTIF(amount <= 0) AS invalid_amounts,

    COUNT(DISTINCT DATE(order_timestamp)) AS distinct_dates,

    MIN(order_timestamp) AS earliest_record,

    MAX(order_timestamp) AS latest_record,

    CURRENT_TIMESTAMP() AS check_timestamp

FROM `my-project.raw.orders`

WHERE DATE(order_timestamp) >= DATE_SUB(@run_date, INTERVAL 7 DAY);

สร้าง Distributed Data Pipeline

ออกแบบ pipeline แบบ distributed

แนะนำเพิ่มเติม — XM Signal

# === Distributed Pipeline Architecture ===



# Pipeline Stages (each is a separate scheduled query):

#

# Stage 1: Raw Data Ingestion (every hour)

#   └── Extract from source tables, light transformation

#

# Stage 2: Data Cleansing (every hour, 15 min after Stage 1)

#   └── Remove duplicates, fix data types, handle nulls

#

# Stage 3: Aggregation (every hour, 30 min after Stage 1)

#   └── Pre-aggregate for common query patterns

#

# Stage 4: Reporting Tables (daily at 07:00)

#   └── Build final reporting tables from aggregations

#

# Stage 5: Data Quality (daily at 08:00)

#   └── Validate data quality, alert on issues



# === Stage 1: Raw Data Extraction ===

-- Schedule: every 1 hour

-- Name: stage1_extract_events



CREATE OR REPLACE TABLE `project.staging.events_hourly`

PARTITION BY DATE(event_timestamp)

CLUSTER BY user_id, event_name

AS

SELECT

    event_id,

    user_id,

    event_name,

    event_timestamp,

    PARSE_JSON(event_params) AS params,

    device_type,

    country,

    session_id,

    CURRENT_TIMESTAMP() AS processed_at

FROM `project.raw.events_stream`

WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)

  AND event_timestamp < @run_time;



# === Stage 2: Data Cleansing ===

-- Schedule: every 1 hour (offset 15 min)

-- Name: stage2_cleanse



INSERT INTO `project.clean.events`

SELECT DISTINCT

    event_id,

    user_id,

    LOWER(TRIM(event_name)) AS event_name,

    event_timestamp,

    params,

    COALESCE(device_type, 'unknown') AS device_type,

    COALESCE(country, 'unknown') AS country,

    session_id,

    processed_at

FROM `project.staging.events_hourly`

WHERE event_id NOT IN (

    SELECT event_id FROM `project.clean.events`

    WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)

);



# === Stage 3: Hourly Aggregation ===

-- Schedule: every 1 hour (offset 30 min)

-- Name: stage3_aggregate



MERGE `project.analytics.hourly_metrics` AS target

USING (

    SELECT

        TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,

        event_name,

        country,

        device_type,

        COUNT(*) AS event_count,

        COUNT(DISTINCT user_id) AS unique_users,

        COUNT(DISTINCT session_id) AS sessions

    FROM `project.clean.events`

    WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)

      AND event_timestamp < @run_time

    GROUP BY 1, 2, 3, 4

) AS source

ON target.hour = source.hour

    AND target.event_name = source.event_name

    AND target.country = source.country

    AND target.device_type = source.device_type

WHEN MATCHED THEN

    UPDATE SET

        event_count = source.event_count,

        unique_users = source.unique_users,

        sessions = source.sessions

WHEN NOT MATCHED THEN

    INSERT (hour, event_name, country, device_type, event_count, unique_users, sessions)

    VALUES (source.hour, source.event_name, source.country, source.device_type,

            source.event_count, source.unique_users, source.sessions);



# === Stage 4: Daily Reporting ===

-- Schedule: daily at 07:00 UTC

-- Name: stage4_daily_report



CREATE OR REPLACE TABLE `project.reporting.daily_dashboard` AS

SELECT

    DATE(hour) AS report_date,

    SUM(event_count) AS total_events,

    SUM(unique_users) AS total_users,

    SUM(sessions) AS total_sessions,

    SUM(IF(event_name = 'purchase', event_count, 0)) AS purchases,

    SAFE_DIVIDE(

        SUM(IF(event_name = 'purchase', unique_users, 0)),

        SUM(unique_users)

    ) * 100 AS conversion_rate_pct,

    ARRAY_AGG(STRUCT(country, SUM(unique_users) AS users) ORDER BY SUM(unique_users) DESC LIMIT 10) AS top_countries

FROM `project.analytics.hourly_metrics`

WHERE DATE(hour) >= DATE_SUB(@run_date, INTERVAL 30 DAY)

GROUP BY 1

ORDER BY 1 DESC;

Orchestration ด้วย Python และ Cloud Functions

BigQuery Scheduled Query Distributed System —

จัดการ scheduled queries ด้วย Python

#!/usr/bin/env python3

# bq_orchestrator.py — BigQuery Pipeline Orchestration

from google.cloud import bigquery

from google.cloud import bigquery_datatransfer

import json

import logging

from datetime import datetime, timedelta

from typing import Dict, List, Optional



logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("bq_orchestrator")



class BigQueryOrchestrator:

    def __init__(self, project_id):

        self.project_id = project_id

        self.bq_client = bigquery.Client(project=project_id)

        self.transfer_client = bigquery_datatransfer.DataTransferServiceClient()

        self.parent = f"projects/{project_id}/locations/us"

    

    def create_scheduled_query(self, name, query, schedule,

                                dataset="analytics",

                                destination_table=None,

                                write_disposition="WRITE_APPEND"):

        config = bigquery_datatransfer.TransferConfig(

            display_name=name,

            data_source_id="scheduled_query",

            destination_dataset_id=dataset,

            schedule=schedule,

            params={

                "query": query,

                "write_disposition": write_disposition,

            },

        )

        

        if destination_table:

            config.params["destination_table_name_template"] = destination_table

        

        result = self.transfer_client.create_transfer_config(

            parent=self.parent,

            transfer_config=config,

        )

        

        logger.info(f"Created scheduled query: {name} -> {result.name}")

        return result.name

    

    def list_scheduled_queries(self):

        configs = self.transfer_client.list_transfer_configs(parent=self.parent)

        

        queries = []

        for config in configs:

            if config.data_source_id == "scheduled_query":

                queries.append({

                    "name": config.display_name,

                    "config_id": config.name,

                    "schedule": config.schedule,

                    "state": config.state.name,

                    "dataset": config.destination_dataset_id,

                    "update_time": config.update_time.isoformat() if config.update_time else None,

                })

        

        return queries

    

    def get_run_history(self, config_id, limit=10):

        runs = self.transfer_client.list_transfer_runs(parent=config_id)

        

        history = []

        for i, run in enumerate(runs):

            if i >= limit:

                break

            history.append({

                "run_id": run.name,

                "state": run.state.name,

                "start_time": run.start_time.isoformat() if run.start_time else None,

                "end_time": run.end_time.isoformat() if run.end_time else None,

                "error": run.error_status.message if run.error_status else None,

            })

        

        return history

    

    def run_query(self, query, destination_table=None, dry_run=False):

        job_config = bigquery.QueryJobConfig(dry_run=dry_run)

        

        if destination_table:

            job_config.destination = destination_table

            job_config.write_disposition = "WRITE_TRUNCATE"

        

        job = self.bq_client.query(query, job_config=job_config)

        

        if dry_run:

            return {

                "bytes_processed": job.total_bytes_processed,

                "estimated_cost_usd": job.total_bytes_processed / 1e12 * 6.25,

            }

        

        result = job.result()

        

        return {

            "job_id": job.job_id,

            "rows_affected": job.num_dml_affected_rows or result.total_rows,

            "bytes_processed": job.total_bytes_processed,

            "slot_ms": job.slot_millis,

            "duration_s": (job.ended - job.started).total_seconds(),

        }

    

    def check_table_freshness(self, dataset, table):

        table_ref = f"{self.project_id}.{dataset}.{table}"

        tbl = self.bq_client.get_table(table_ref)

        

        modified = tbl.modified

        age_hours = (datetime.utcnow() - modified.replace(tzinfo=None)).total_seconds() / 3600

        

        return {

            "table": table_ref,

            "last_modified": modified.isoformat(),

            "age_hours": round(age_hours, 1),

            "num_rows": tbl.num_rows,

            "size_bytes": tbl.num_bytes,

            "is_stale": age_hours > 25,

        }

    

    def pipeline_health_check(self, pipeline_queries):

        health = {"timestamp": datetime.utcnow().isoformat(), "queries": []}

        

        for q in pipeline_queries:

            runs = self.get_run_history(q["config_id"], limit=5)

            

            recent_failures = sum(1 for r in runs if r["state"] == "FAILED")

            last_success = next(

                (r for r in runs if r["state"] == "SUCCEEDED"), None

            )

            

            health["queries"].append({

                "name": q["name"],

                "recent_failures": recent_failures,

                "last_success": last_success["end_time"] if last_success else None,

                "status": "healthy" if recent_failures == 0 else "degraded",

            })

        

        health["overall"] = (

            "healthy" if all(q["status"] == "healthy" for q in health["queries"])

            else "degraded"

        )

        

        return health



# orchestrator = BigQueryOrchestrator("my-project")

# queries = orchestrator.list_scheduled_queries()

# for q in queries:

#     print(f"{q['name']}: {q['schedule']} ({q['state']})")

# health = orchestrator.pipeline_health_check(queries)

Monitoring และ Error Handling

Monitor pipeline และจัดการ errors

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Segment Routing SSL TLS Certificate

#!/usr/bin/env python3

# pipeline_monitor.py — BigQuery Pipeline Monitoring

from google.cloud import bigquery

from google.cloud import monitoring_v3

import json

import logging

from datetime import datetime, timedelta



logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("monitor")



class PipelineMonitor:

    def __init__(self, project_id):

        self.project_id = project_id

        self.bq_client = bigquery.Client(project=project_id)

    

    def check_job_status(self, hours=24):

        query = f"""

        SELECT

            job_id,

            creation_time,

            end_time,

            state,

            total_bytes_processed,

            total_slot_ms,

            error_result.reason AS error_reason,

            error_result.message AS error_message,

            TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s

        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`

        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR)

            AND job_type = 'QUERY'

        ORDER BY creation_time DESC

        LIMIT 100

        """

        

        results = self.bq_client.query(query).result()

        

        jobs = []

        for row in results:

            jobs.append({

                "job_id": row.job_id,

                "state": row.state,

                "duration_s": row.duration_s,

                "bytes_processed": row.total_bytes_processed,

                "error": row.error_message,

            })

        

        return jobs

    

    def get_cost_summary(self, days=30):

        query = f"""

        SELECT

            DATE(creation_time) AS query_date,

            COUNT(*) AS query_count,

            SUM(total_bytes_processed) AS total_bytes,

            SUM(total_bytes_processed) / POW(10, 12) * 6.25 AS estimated_cost_usd,

            AVG(TIMESTAMP_DIFF(end_time, creation_time, SECOND)) AS avg_duration_s,

            SUM(total_slot_ms) / 1000 AS total_slot_seconds

        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`

        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)

            AND job_type = 'QUERY'

            AND state = 'DONE'

        GROUP BY 1

        ORDER BY 1 DESC

        """

        

        results = self.bq_client.query(query).result()

        

        daily_costs = []

        for row in results:

            daily_costs.append({

                "date": row.query_date.isoformat(),

                "queries": row.query_count,

                "bytes_processed_tb": round(row.total_bytes / 1e12, 3),

                "cost_usd": round(row.estimated_cost_usd, 2),

                "avg_duration_s": round(row.avg_duration_s, 1),

            })

        

        total_cost = sum(d["cost_usd"] for d in daily_costs)

        

        return {

            "period_days": days,

            "total_cost_usd": round(total_cost, 2),

            "avg_daily_cost_usd": round(total_cost / max(len(daily_costs), 1), 2),

            "daily_breakdown": daily_costs[:7],

        }

    

    def find_expensive_queries(self, days=7, limit=10):

        query = f"""

        SELECT

            job_id,

            user_email,

            creation_time,

            total_bytes_processed,

            total_bytes_processed / POW(10, 12) * 6.25 AS cost_usd,

            total_slot_ms / 1000 AS slot_seconds,

            TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s,

            SUBSTR(query, 1, 200) AS query_preview

        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`

        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)

            AND job_type = 'QUERY'

            AND state = 'DONE'

        ORDER BY total_bytes_processed DESC

        LIMIT {limit}

        """

        

        results = self.bq_client.query(query).result()

        

        expensive = []

        for row in results:

            expensive.append({

                "job_id": row.job_id,

                "user": row.user_email,

                "cost_usd": round(row.cost_usd, 4),

                "duration_s": row.duration_s,

                "query_preview": row.query_preview,

            })

        

        return expensive

    

    def generate_alert(self, check_type, details):

        alert = {

            "timestamp": datetime.utcnow().isoformat(),

            "project": self.project_id,

            "type": check_type,

            "details": details,

        }

        

        logger.warning(f"ALERT: {check_type} - {json.dumps(details)}")

        return alert



# monitor = PipelineMonitor("my-project")

# jobs = monitor.check_job_status(hours=24)

# failed = [j for j in jobs if j["state"] != "DONE"]

# if failed:

#     print(f"Failed jobs: {len(failed)}")

# costs = monitor.get_cost_summary(days=30)

# print(f"Monthly cost: ")

Cost Optimization และ Best Practices

ลดค่าใช้จ่ายและ best practices

# === BigQuery Cost Optimization ===



# 1. Partitioning and Clustering

# ===================================

-- Always partition time-series tables

CREATE TABLE `project.analytics.events` (

    event_id STRING,

    user_id STRING,

    event_name STRING,

    event_timestamp TIMESTAMP,

    properties JSON

)

PARTITION BY DATE(event_timestamp)

CLUSTER BY user_id, event_name

OPTIONS (

    partition_expiration_days = 365,

    require_partition_filter = true  -- Force partition filter in queries

);



# 2. Materialized Views (auto-refreshed)

# ===================================

CREATE MATERIALIZED VIEW `project.analytics.hourly_summary`

PARTITION BY DATE(event_hour)

CLUSTER BY event_name

AS

SELECT

    TIMESTAMP_TRUNC(event_timestamp, HOUR) AS event_hour,

    event_name,

    COUNT(*) AS event_count,

    COUNT(DISTINCT user_id) AS unique_users

FROM `project.analytics.events`

GROUP BY 1, 2;



-- BigQuery automatically uses materialized views

-- when it can answer queries from them (query rewrite)



# 3. Cost Control Best Practices

# ===================================



# Set custom cost controls per project

# bq update --default_table_expiration 7776000 my_dataset  # 90 days

# bq update --max_bytes_billed 1000000000000 my_dataset    # 1TB max



# Use LIMIT with caution (still scans all data)

# Use SELECT specific columns instead of SELECT *

# Use approximate functions: APPROX_COUNT_DISTINCT, APPROX_QUANTILES

# Use BI Engine for repeated dashboard queries



# 4. Scheduled Query Optimization

# ===================================

# - Use incremental processing (WHERE date = @run_date)

# - Avoid full table scans in scheduled queries

# - Use MERGE for upsert instead of DELETE + INSERT

# - Set appropriate schedule (don't run more often than needed)

# - Use dry_run to estimate costs before scheduling



# 5. Slot Reservations for Predictable Cost

# ===================================

# On-demand: $6.25/TB scanned (unpredictable)

# Flat-rate: $2,000/month for 100 slots (predictable)

# Editions: Autoscale with commitment discounts



# Break-even: ~320TB/month

# If scanning > 320TB/month, flat-rate is cheaper



# 6. Data Lifecycle Management

# ===================================

# Set partition expiration for auto-cleanup

ALTER TABLE `project.analytics.events`

SET OPTIONS (partition_expiration_days = 180);



# Archive old data to Cloud Storage

EXPORT DATA OPTIONS (

    uri = 'gs://my-bucket/archive/events_*.parquet',

    format = 'PARQUET',

    overwrite = true

) AS

SELECT * FROM `project.analytics.events`

WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);



# Delete archived data

DELETE FROM `project.analytics.events`

WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);



echo "BigQuery optimization complete"

FAQ คำถามที่พบบ่อย

Q: Scheduled Query มี timeout ไหม?

A: BigQuery queries มี default timeout 6 ชั่วโมง สำหรับ scheduled queries ก็เช่นกัน ถ้า query ใช้เวลานานกว่า 6 ชั่วโมงจะ fail ทางแก้คือแบ่ง query เป็นหลาย stages ที่เล็กลง ใช้ incremental processing แทน full table scan ใช้ partitioning และ clustering เพื่อลด data scanned และ optimize query ด้วย execution plan

แนะนำเพิ่มเติม — เรียนเทรดกับ iCafeForex

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: wifi ไม่อั้นคือ — ข้อมูลครบถ้วน 2026

Q: Scheduled Queries รัน fail แล้วจะเป็นอย่างไร?

A: BigQuery จะ retry อัตโนมัติตาม retry policy ที่ตั้งไว้ ส่ง email notification ไปยัง owner ของ scheduled query บันทึก error ใน run history ที่ดูได้ผ่าน Console หรือ API ถ้า fail ซ้ำหลายครั้ง BigQuery อาจ disable scheduled query สามารถตั้ง Cloud Monitoring alerts สำหรับ transfer failures เพื่อ notify ผ่าน Slack/PagerDuty

Q: ใช้ Scheduled Queries แทน Airflow ได้ไหม?

A: ได้สำหรับ simple SQL-based pipelines ที่ไม่ต้องการ complex dependencies Scheduled Queries เหมาะสำหรับ SQL transformations ที่ independent หรือ sequential ไม่เหมาะเมื่อต้องการ complex DAGs, conditional logic, non-SQL tasks (เช่น API calls, file processing), cross-service orchestration สำหรับ pipelines ซับซ้อน ใช้ Airflow/Prefect/Cloud Composer ที่เรียก BigQuery เป็น task

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Nginx Plus Tech Conference 2026

Q: ค่าใช้จ่ายของ Scheduled Queries เป็นอย่างไร?

A: ไม่มีค่าใช้จ่ายเพิ่มเติมสำหรับ scheduling ตัว scheduling เอง ค่าใช้จ่ายคิดจาก data scanned ตาม BigQuery pricing ปกติ ($6.25/TB on-demand) ดังนั้นถ้า query scan 100GB ต่อครั้ง รันวันละ 1 ครั้ง = 3TB/เดือน = $18.75/เดือน วิธีลดค่าใช้จ่าย ใช้ partitioning (scan เฉพาะ partition ที่ต้องการ), clustering, materialized views และ incremental processing

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง