ai

CircleCI Orbs Data Pipeline ETL — สร้าง ETL

CircleCI Orbs Data Pipeline ETL — สร้าง ETL

CircleCI Orbs คืออะไรและใช้สร้าง ETL Pipeline อย่างไร

CircleCI Orbs Data Pipeline ETL — สร้าง ETL

CircleCI Orbs เป็น reusable packages ของ CircleCI configuration ที่รวม jobs, commands และ executors ไว้ด้วยกัน เหมือน libraries สำหรับ CI/CD pipelines สามารถ share และ reuse ข้าม projects ได้ ลดการเขียน config ซ้ำซ้อน

ETL (Extract, Transform, Load) Pipeline คือกระบวนการดึงข้อมูลจาก sources ต่างๆ แปลงข้อมูลให้อยู่ในรูปแบบที่ต้องการ และ load เข้า destination (data warehouse, data lake) การใช้ CircleCI สำหรับ ETL ให้ข้อดีคือ scheduled pipelines สำหรับ batch ETL, version control สำหรับ ETL code, automated testing สำหรับ data quality และ monitoring/alerting เมื่อ pipeline fail

Orbs ที่มีประโยชน์สำหรับ ETL ได้แก่ circleci/python สำหรับ Python-based ETL, circleci/aws-cli สำหรับ AWS services (S3, Redshift), circleci/gcp-cli สำหรับ Google Cloud (BigQuery), circleci/slack สำหรับ notifications และ custom orbs ที่สร้างเองสำหรับ ETL-specific tasks

ข้อดีของ CircleCI เมื่อเทียบกับ Airflow สำหรับ simple ETL คือ ไม่ต้อง manage infrastructure, built-in scheduling, Docker support ดี, parallel execution และ caching ที่ช่วยเร่ง pipeline

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Us Kub คืออะไร — ข้อมูลครบถ้วน 2026

ติดตั้งและตั้งค่า CircleCI สำหรับ Data Pipeline

เริ่มต้นใช้ CircleCI สำหรับ ETL

# === CircleCI Configuration ===

# .circleci/config.yml



version: 2.1



# ใช้ Orbs

orbs:

  python: circleci/python@2.1

  aws-cli: circleci/aws-cli@4.1

  slack: circleci/slack@4.12



# Parameters สำหรับ scheduled runs

parameters:

  run-schedule:

    type: boolean

    default: false

  etl-type:

    type: string

    default: "full"



# Executors

executors:

  etl-executor:

    docker:

      - image: cimg/python:3.11

    resource_class: large

    environment:

      PYTHONUNBUFFERED: "1"

      ETL_ENV: "production"



# Commands (reusable steps)

commands:

  setup-etl:

    description: "Setup ETL environment"

    steps:

      - checkout

      - python/install-packages:

          pkg-manager: pip

          pip-dependency-file: requirements.txt

      - run:

          name: Install additional tools

          command: |

            pip install awscli boto3 pandas pyarrow sqlalchemy

            pip install great-expectations dbt-core



  validate-data:

    description: "Validate data quality"

    parameters:

      source:

        type: string

    steps:

      - run:

          name: "Validate << parameters.source >> data"

          command: |

            python scripts/validate.py --source << parameters.source >>



  notify-status:

    description: "Send notification"

    parameters:

      status:

        type: string

    steps:

      - slack/notify:

          event: << parameters.status >>

          template: basic_success_1



# Jobs

jobs:

  extract:

    executor: etl-executor

    steps:

      - setup-etl

      - aws-cli/setup

      - run:

          name: Extract data from sources

          command: |

            python etl/extract.py \

              --date $(date +%Y-%m-%d) \

              --type << pipeline.parameters.etl-type >>

          no_output_timeout: 30m

      - persist_to_workspace:

          root: data

          paths: ["raw/*"]



  transform:

    executor: etl-executor

    steps:

      - setup-etl

      - attach_workspace:

          at: data

      - run:

          name: Transform data

          command: |

            python etl/transform.py --input data/raw/ --output data/transformed/

      - validate-data:

          source: "transformed"

      - persist_to_workspace:

          root: data

          paths: ["transformed/*"]



  load:

    executor: etl-executor

    steps:

      - setup-etl

      - aws-cli/setup

      - attach_workspace:

          at: data

      - run:

          name: Load data to warehouse

          command: |

            python etl/load.py --input data/transformed/ --target warehouse

      - notify-status:

          status: pass



# Workflows

workflows:

  daily-etl:

    when: << pipeline.parameters.run-schedule >>

    jobs:

      - extract

      - transform:

          requires: [extract]

      - load:

          requires: [transform]



  manual-etl:

    unless: << pipeline.parameters.run-schedule >>

    jobs:

      - extract

      - transform:

          requires: [extract]

      - load:

          requires: [transform]

          filters:

            branches:

              only: main

สร้าง Custom Orb สำหรับ ETL

สร้าง reusable ETL Orb

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

# === Custom ETL Orb ===

# orb.yml — etl-toolkit orb



version: 2.1



description: "ETL Toolkit Orb for data pipeline operations"



executors:

  python-etl:

    docker:

      - image: cimg/python:3.11

    resource_class: << parameters.resource_class >>

    parameters:

      resource_class:

        type: string

        default: medium



commands:

  extract-csv:

    description: "Extract data from CSV files"

    parameters:

      source_path:

        type: string

      output_path:

        type: string

        default: "/tmp/extracted"

    steps:

      - run:

          name: "Extract CSV data"

          command: |

            mkdir -p << parameters.output_path >>

            python3 << include(scripts/extract_csv.py) >> \

              --source << parameters.source_path >> \

              --output << parameters.output_path >>



  extract-api:

    description: "Extract data from REST API"

    parameters:

      api_url:

        type: string

      auth_token_env:

        type: env_var_name

        default: API_TOKEN

      output_path:

        type: string

    steps:

      - run:

          name: "Extract API data"

          command: |

            python3 -c "

            import requests, json, os

            url = '<< parameters.api_url >>'

            token = os.environ.get('<< parameters.auth_token_env >>')

            headers = {'Authorization': f'Bearer {token}'} if token else {}

            

            resp = requests.get(url, headers=headers, timeout=60)

            resp.raise_for_status()

            

            output = '<< parameters.output_path >>'

            os.makedirs(os.path.dirname(output), exist_ok=True)

            with open(output, 'w') as f:

                json.dump(resp.json(), f)

            print(f'Extracted {len(resp.json())} records to {output}')

            "



  extract-s3:

    description: "Extract data from S3"

    parameters:

      bucket:

        type: string

      prefix:

        type: string

      output_path:

        type: string

    steps:

      - run:

          name: "Extract S3 data"

          command: |

            aws s3 sync \

              s3://<< parameters.bucket >>/<< parameters.prefix >> \

              << parameters.output_path >> \

              --exclude "*.tmp"

            echo "Downloaded from s3://<< parameters.bucket >>/<< parameters.prefix >>"



  transform-pandas:

    description: "Transform data with pandas"

    parameters:

      script:

        type: string

      input_path:

        type: string

      output_path:

        type: string

    steps:

      - run:

          name: "Transform with pandas"

          command: |

            python3 << parameters.script >> \

              --input << parameters.input_path >> \

              --output << parameters.output_path >>



  load-warehouse:

    description: "Load data to data warehouse"

    parameters:

      target:

        type: enum

        enum: ["redshift", "bigquery", "snowflake", "postgres"]

      connection_string_env:

        type: env_var_name

      table_name:

        type: string

      input_path:

        type: string

    steps:

      - run:

          name: "Load to << parameters.target >>"

          command: |

            python3 -c "

            import pandas as pd

            from sqlalchemy import create_engine

            import os

            

            conn = os.environ['<< parameters.connection_string_env >>']

            engine = create_engine(conn)

            

            df = pd.read_parquet('<< parameters.input_path >>')

            df.to_sql('<< parameters.table_name >>', engine, if_exists='append', index=False)

            print(f'Loaded {len(df)} rows to << parameters.table_name >>')

            "



  validate-quality:

    description: "Validate data quality"

    parameters:

      expectations_path:

        type: string

      data_path:

        type: string

    steps:

      - run:

          name: "Validate data quality"

          command: |

            python3 scripts/validate_quality.py \

              --expectations << parameters.expectations_path >> \

              --data << parameters.data_path >>



jobs:

  etl-pipeline:

    executor: python-etl

    parameters:

      extract_script:

        type: string

      transform_script:

        type: string

      load_target:

        type: string

    steps:

      - checkout

      - run:

          name: Install dependencies

          command: pip install -r requirements.txt

      - run:

          name: Extract

          command: python << parameters.extract_script >>

      - run:

          name: Transform

          command: python << parameters.transform_script >>

      - run:

          name: Load

          command: python << parameters.load_target >>



# Publish: circleci orb publish orb.yml myorg/etl-toolkit@1.0.0

ออกแบบ ETL Pipeline ด้วย CircleCI Workflows

CircleCI Orbs Data Pipeline ETL — สร้าง ETL

Python ETL scripts สำหรับ pipeline

#!/usr/bin/env python3

# etl/extract.py — Data Extraction Module

import argparse

import json

import logging

import os

from datetime import datetime, timedelta

from pathlib import Path

import requests

import boto3

import pandas as pd



logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

logger = logging.getLogger("etl.extract")



class DataExtractor:

    def __init__(self, output_dir="data/raw"):

        self.output_dir = Path(output_dir)

        self.output_dir.mkdir(parents=True, exist_ok=True)

    

    def extract_api(self, url, headers=None, params=None):

        logger.info(f"Extracting from API: {url}")

        resp = requests.get(url, headers=headers, params=params, timeout=120)

        resp.raise_for_status()

        

        data = resp.json()

        output_path = self.output_dir / f"api_{datetime.now().strftime('%Y%m%d_%H%M')}.json"

        

        with open(output_path, "w") as f:

            json.dump(data, f)

        

        logger.info(f"Extracted {len(data) if isinstance(data, list) else 1} records -> {output_path}")

        return output_path

    

    def extract_s3(self, bucket, prefix, date_str=None):

        logger.info(f"Extracting from S3: s3://{bucket}/{prefix}")

        s3 = boto3.client("s3")

        

        if date_str:

            prefix = f"{prefix}/{date_str}"

        

        paginator = s3.get_paginator("list_objects_v2")

        files_downloaded = 0

        

        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):

            for obj in page.get("Contents", []):

                key = obj["Key"]

                local_path = self.output_dir / Path(key).name

                s3.download_file(bucket, key, str(local_path))

                files_downloaded += 1

        

        logger.info(f"Downloaded {files_downloaded} files from S3")

        return files_downloaded

    

    def extract_database(self, connection_string, query, output_name="db_extract"):

        from sqlalchemy import create_engine

        

        logger.info(f"Extracting from database...")

        engine = create_engine(connection_string)

        

        df = pd.read_sql(query, engine)

        output_path = self.output_dir / f"{output_name}_{datetime.now().strftime('%Y%m%d')}.parquet"

        df.to_parquet(output_path, index=False)

        

        logger.info(f"Extracted {len(df)} rows -> {output_path}")

        return output_path



# etl/transform.py — Data Transformation Module

class DataTransformer:

    def __init__(self, input_dir="data/raw", output_dir="data/transformed"):

        self.input_dir = Path(input_dir)

        self.output_dir = Path(output_dir)

        self.output_dir.mkdir(parents=True, exist_ok=True)

    

    def transform(self, input_file, transformations=None):

        logger.info(f"Transforming: {input_file}")

        

        if str(input_file).endswith(".json"):

            df = pd.read_json(input_file)

        elif str(input_file).endswith(".parquet"):

            df = pd.read_parquet(input_file)

        elif str(input_file).endswith(".csv"):

            df = pd.read_csv(input_file)

        else:

            raise ValueError(f"Unsupported format: {input_file}")

        

        initial_rows = len(df)

        

        # Standard transformations

        df = df.drop_duplicates()

        df = df.dropna(subset=df.columns[:3])

        

        for col in df.select_dtypes(include=["object"]).columns:

            df[col] = df[col].str.strip()

        

        # Custom transformations

        if transformations:

            for t in transformations:

                df = t(df)

        

        output_path = self.output_dir / f"transformed_{Path(input_file).stem}.parquet"

        df.to_parquet(output_path, index=False)

        

        logger.info(f"Transformed: {initial_rows} -> {len(df)} rows -> {output_path}")

        return output_path



# etl/load.py — Data Loading Module  

class DataLoader:

    def __init__(self, target_type="postgres"):

        self.target_type = target_type

    

    def load(self, input_path, table_name, connection_string=None):

        logger.info(f"Loading {input_path} to {table_name}")

        

        df = pd.read_parquet(input_path)

        

        if self.target_type in ("postgres", "redshift", "snowflake"):

            from sqlalchemy import create_engine

            conn = connection_string or os.environ.get("DATABASE_URL")

            engine = create_engine(conn)

            df.to_sql(table_name, engine, if_exists="append", index=False, method="multi")

        

        elif self.target_type == "s3":

            s3_path = connection_string or os.environ.get("S3_OUTPUT_PATH")

            df.to_parquet(f"{s3_path}/{table_name}/{datetime.now().strftime('%Y%m%d')}.parquet")

        

        logger.info(f"Loaded {len(df)} rows to {table_name}")

        return len(df)



if __name__ == "__main__":

    parser = argparse.ArgumentParser()

    parser.add_argument("--date", default=datetime.now().strftime("%Y-%m-%d"))

    parser.add_argument("--type", default="full")

    args = parser.parse_args()

    

    extractor = DataExtractor()

    logger.info(f"Starting extraction for {args.date} (type: {args.type})")

Testing และ Data Validation ใน Pipeline

ทดสอบ ETL pipeline และ validate data quality

เนื้อหาเกี่ยวข้อง — อ่านต่อ: affective domain คือ

#!/usr/bin/env python3

# tests/test_etl.py — ETL Pipeline Tests

import pytest

import pandas as pd

import json

from pathlib import Path

from datetime import datetime



class TestExtraction:

    def test_api_extraction_returns_data(self):

        # Mock API response

        sample_data = [

            {"id": 1, "name": "Product A", "price": 100},

            {"id": 2, "name": "Product B", "price": 200},

        ]

        df = pd.DataFrame(sample_data)

        assert len(df) > 0

        assert "id" in df.columns

    

    def test_csv_extraction_handles_encoding(self):

        test_csv = "id, name, value\n1, test,100\n2, ทดสอบ,200\n"

        from io import StringIO

        df = pd.read_csv(StringIO(test_csv))

        assert len(df) == 2

        assert df.iloc[1]["name"] == "ทดสอบ"

    

    def test_extraction_handles_empty_response(self):

        df = pd.DataFrame([])

        assert len(df) == 0



class TestTransformation:

    def test_dedup_removes_duplicates(self):

        data = [

            {"id": 1, "value": "a"},

            {"id": 1, "value": "a"},

            {"id": 2, "value": "b"},

        ]

        df = pd.DataFrame(data).drop_duplicates()

        assert len(df) == 2

    

    def test_null_handling(self):

        data = [

            {"id": 1, "name": "A", "value": 100},

            {"id": 2, "name": None, "value": 200},

            {"id": 3, "name": "C", "value": None},

        ]

        df = pd.DataFrame(data)

        df_clean = df.dropna(subset=["name"])

        assert len(df_clean) == 2

    

    def test_string_trimming(self):

        data = [{"name": "  hello  "}, {"name": "world  "}]

        df = pd.DataFrame(data)

        df["name"] = df["name"].str.strip()

        assert df.iloc[0]["name"] == "hello"

        assert df.iloc[1]["name"] == "world"

    

    def test_type_conversion(self):

        data = [{"date": "2024-01-15", "amount": "100.50"}]

        df = pd.DataFrame(data)

        df["date"] = pd.to_datetime(df["date"])

        df["amount"] = df["amount"].astype(float)

        assert df["amount"].dtype == float



class TestDataQuality:

    def test_no_null_primary_keys(self):

        data = [{"id": 1}, {"id": 2}, {"id": 3}]

        df = pd.DataFrame(data)

        assert df["id"].notna().all()

    

    def test_unique_primary_keys(self):

        data = [{"id": 1}, {"id": 2}, {"id": 3}]

        df = pd.DataFrame(data)

        assert df["id"].is_unique

    

    def test_values_in_range(self):

        data = [{"price": 10}, {"price": 50}, {"price": 100}]

        df = pd.DataFrame(data)

        assert (df["price"] >= 0).all()

        assert (df["price"] <= 10000).all()

    

    def test_date_not_future(self):

        data = [{"date": "2024-01-15"}, {"date": "2024-06-01"}]

        df = pd.DataFrame(data)

        df["date"] = pd.to_datetime(df["date"])

        assert (df["date"] <= pd.Timestamp.now()).all()

    

    def test_row_count_within_expected(self):

        data = list(range(100))

        df = pd.DataFrame({"id": data})

        min_expected = 50

        max_expected = 200

        assert min_expected <= len(df) <= max_expected



# scripts/validate_quality.py — Great Expectations validation

# import great_expectations as gx

# 

# context = gx.get_context()

# 

# validator = context.sources.pandas_default.read_parquet(

#     "data/transformed/output.parquet"

# )

# 

# validator.expect_column_values_to_not_be_null("id")

# validator.expect_column_values_to_be_unique("id")

# validator.expect_column_values_to_be_between("price", min_value=0, max_value=10000)

# validator.expect_table_row_count_to_be_between(min_value=100, max_value=1000000)

# 

# results = validator.validate()

# if not results.success:

#     print("Data quality check FAILED!")

#     for r in results.results:

#         if not r.success:

#             print(f"  FAIL: {r.expectation_config.expectation_type}")

#     exit(1)

# print("Data quality check PASSED!")



# Run tests: pytest tests/test_etl.py -v

Monitoring และ Alerting สำหรับ ETL Jobs

ระบบ monitoring สำหรับ ETL pipeline

#!/usr/bin/env python3

# monitoring/etl_monitor.py — ETL Pipeline Monitoring

import requests

import json

import os

import logging

from datetime import datetime, timedelta

from typing import Dict, List



logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("etl_monitor")



class CircleCIMonitor:

    def __init__(self, token=None, org_slug=None):

        self.token = token or os.environ.get("CIRCLECI_TOKEN")

        self.org_slug = org_slug or os.environ.get("CIRCLECI_ORG")

        self.base_url = "https://circleci.com/api/v2"

        self.headers = {"Circle-Token": self.token}

    

    def get_pipeline_runs(self, project_slug, branch="main", limit=20):

        resp = requests.get(

            f"{self.base_url}/project/{project_slug}/pipeline",

            headers=self.headers,

            params={"branch": branch},

        )

        resp.raise_for_status()

        return resp.json().get("items", [])[:limit]

    

    def get_workflow_status(self, pipeline_id):

        resp = requests.get(

            f"{self.base_url}/pipeline/{pipeline_id}/workflow",

            headers=self.headers,

        )

        resp.raise_for_status()

        return resp.json().get("items", [])

    

    def get_job_details(self, workflow_id):

        resp = requests.get(

            f"{self.base_url}/workflow/{workflow_id}/job",

            headers=self.headers,

        )

        resp.raise_for_status()

        return resp.json().get("items", [])

    

    def check_etl_health(self, project_slug):

        pipelines = self.get_pipeline_runs(project_slug, limit=5)

        

        results = []

        for pipeline in pipelines:

            workflows = self.get_workflow_status(pipeline["id"])

            

            for wf in workflows:

                results.append({

                    "pipeline_id": pipeline["id"],

                    "workflow": wf["name"],

                    "status": wf["status"],

                    "created_at": wf["created_at"],

                    "stopped_at": wf.get("stopped_at"),

                })

        

        failed = [r for r in results if r["status"] == "failed"]

        success_rate = (len(results) - len(failed)) / len(results) * 100 if results else 0

        

        health = {

            "total_runs": len(results),

            "failed": len(failed),

            "success_rate": round(success_rate, 1),

            "last_status": results[0]["status"] if results else "unknown",

            "last_run": results[0]["created_at"] if results else None,

            "healthy": len(failed) == 0,

        }

        

        if not health["healthy"]:

            self._send_alert(health, failed)

        

        return health

    

    def _send_alert(self, health, failed_runs):

        webhook = os.environ.get("SLACK_WEBHOOK")

        if not webhook:

            return

        

        message = (

            f"*ETL Pipeline Alert*\n"

            f"Success Rate: {health['success_rate']}%\n"

            f"Failed Runs: {len(failed_runs)}\n"

            f"Last Status: {health['last_status']}\n"

        )

        

        for run in failed_runs[:3]:

            message += f"\n- {run['workflow']} failed at {run['created_at']}"

        

        requests.post(webhook, json={"text": message}, timeout=10)

        logger.warning(f"Alert sent: {len(failed_runs)} failed ETL runs")

    

    def generate_report(self, project_slug, days=7):

        pipelines = self.get_pipeline_runs(project_slug, limit=50)

        

        daily_stats = {}

        

        for pipeline in pipelines:

            date = pipeline["created_at"][:10]

            if date not in daily_stats:

                daily_stats[date] = {"total": 0, "success": 0, "failed": 0}

            

            workflows = self.get_workflow_status(pipeline["id"])

            for wf in workflows:

                daily_stats[date]["total"] += 1

                if wf["status"] == "success":

                    daily_stats[date]["success"] += 1

                elif wf["status"] == "failed":

                    daily_stats[date]["failed"] += 1

        

        return {

            "period_days": days,

            "daily_stats": daily_stats,

            "generated_at": datetime.utcnow().isoformat(),

        }



# monitor = CircleCIMonitor()

# health = monitor.check_etl_health("gh/myorg/etl-pipeline")

# print(json.dumps(health, indent=2))

FAQ คำถามที่พบบ่อย

Q: CircleCI Orbs กับ GitHub Actions reusable workflows ต่างกันอย่างไร?

A: CircleCI Orbs เป็น full packages ที่รวม executors, commands และ jobs ไว้ด้วยกัน publish ผ่าน Orb Registry มี versioning ชัดเจน (semver) GitHub Actions reusable workflows เป็น workflow files ที่ reference ข้าม repos ได้ มี Marketplace สำหรับ individual actions Orbs มี structure ที่ rigorous กว่า เหมาะสำหรับ complex reusable components Actions มี ecosystem ใหญ่กว่าและเข้าถึงง่ายกว่า

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Passive Income Ideas — ไอเดียสร้างรายได้แบบ

Q: CircleCI เหมาะกับ ETL ขนาดไหน?

A: CircleCI เหมาะสำหรับ batch ETL ขนาดเล็กถึงกลาง (ข้อมูล GB-level) ที่รัน scheduled (hourly, daily) resource class ใหญ่สุดมี 128GB RAM และ 64 vCPUs เพียงพอสำหรับ processing datasets หลาย GB ข้อจำกัดคือ max job runtime 5 ชั่วโมง (สำหรับ performance plan) สำหรับ ETL ขนาดใหญ่ (TB-level) หรือ real-time streaming ควรใช้ dedicated tools เช่น Airflow, Spark, Flink

Q: จะ schedule ETL pipeline บน CircleCI อย่างไร?

A: ใช้ Scheduled Pipelines ผ่าน CircleCI UI หรือ API กำหนด cron expression เช่น 0 2 * * * สำหรับทุกวัน 02:00 UTC ใช้ pipeline parameters เพื่อแยก scheduled runs จาก manual runs ข้อดีคือ trigger pipeline ด้วย specific parameters ได้ ดู schedule status ใน dashboard และ manage schedules ผ่าน API

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Image Segmentation Hexagonal Architecture

Q: วิธี handle secrets ใน ETL pipeline?

A: ใช้ CircleCI Environment Variables (Project Settings -> Environment Variables) สำหรับ database credentials, API keys, cloud credentials ใช้ Contexts สำหรับ share secrets ข้าม projects ห้าม hardcode secrets ใน config.yml ใช้ OIDC tokens สำหรับ AWS/GCP แทน static credentials เมื่อเป็นไปได้ Rotate secrets เป็นประจำ และ audit access logs

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง